Skip to main content

ferro_oci_server/
serve.rs

1// SPDX-License-Identifier: Apache-2.0
2//! Runnable-server assembly: environment configuration, app wiring, and
3//! the bind+serve loop with graceful shutdown.
4//!
5//! The `ferro-oci-server` binary is a thin shim over this module — it
6//! calls [`Config::from_env`] then [`serve`]. Keeping the logic here
7//! (rather than in `fn main`) makes the configuration parser, the blob
8//! store selection, and the app assembly directly unit-testable.
9
10use std::net::SocketAddr;
11use std::path::PathBuf;
12use std::sync::Arc;
13
14use ferro_blob_store::{FsBlobStore, InMemoryBlobStore, SharedBlobStore};
15
16use crate::metrics::{Metrics, instrument};
17use crate::registry::InMemoryRegistryMeta;
18use crate::router::{AppState, probe_routes, router};
19
20/// Environment variable naming the listen socket address.
21pub const ENV_LISTEN: &str = "FERRO_OCI_LISTEN";
22/// Environment variable naming the filesystem blob-store directory.
23pub const ENV_STORAGE_DIR: &str = "FERRO_OCI_STORAGE_DIR";
24
25/// Default listen socket address.
26pub const DEFAULT_LISTEN: &str = "0.0.0.0:8080";
27
28/// Server configuration, sourced from the process environment.
29#[derive(Debug, Clone, PartialEq, Eq)]
30pub struct Config {
31    /// `host:port` the HTTP server binds to (`FERRO_OCI_LISTEN`).
32    pub listen: String,
33    /// Optional filesystem directory for blob bytes
34    /// (`FERRO_OCI_STORAGE_DIR`). When `None`, an in-memory blob store
35    /// is used — convenient for smoke tests and conformance runs, but
36    /// non-durable.
37    pub storage_dir: Option<PathBuf>,
38}
39
40impl Config {
41    /// Read the configuration from the process environment, applying
42    /// defaults for anything unset.
43    #[must_use]
44    pub fn from_env() -> Self {
45        let listen = std::env::var(ENV_LISTEN).ok();
46        let storage_dir = std::env::var_os(ENV_STORAGE_DIR).filter(|v| !v.is_empty());
47        Self::from_raw(listen, storage_dir.map(PathBuf::from))
48    }
49
50    /// Build a [`Config`] from already-resolved listen / storage-dir
51    /// values, applying defaults for `None`.
52    ///
53    /// Factored out of [`from_env`](Self::from_env) so the parsing and
54    /// default rules are unit-testable without mutating the process
55    /// environment (which `unsafe_code = forbid` disallows here). An
56    /// empty `storage_dir` path is normalised to "in-memory".
57    #[must_use]
58    pub fn from_raw(listen: Option<String>, storage_dir: Option<PathBuf>) -> Self {
59        let listen = listen.unwrap_or_else(|| DEFAULT_LISTEN.to_owned());
60        let storage_dir = storage_dir.filter(|p| !p.as_os_str().is_empty());
61        Self {
62            listen,
63            storage_dir,
64        }
65    }
66
67    /// Parse and validate the listen address.
68    ///
69    /// # Errors
70    ///
71    /// Returns an error string when `listen` is not a valid
72    /// `host:port` socket address.
73    pub fn socket_addr(&self) -> Result<SocketAddr, String> {
74        self.listen
75            .parse::<SocketAddr>()
76            .map_err(|e| format!("invalid {ENV_LISTEN} `{}`: {e}", self.listen))
77    }
78
79    /// Build the [`SharedBlobStore`] this config describes.
80    ///
81    /// # Errors
82    ///
83    /// Returns an error when a filesystem store is requested but its
84    /// directory cannot be created or opened.
85    pub fn blob_store(&self) -> Result<SharedBlobStore, Box<dyn std::error::Error>> {
86        if let Some(dir) = &self.storage_dir {
87            std::fs::create_dir_all(dir)?;
88            let store = FsBlobStore::new(dir.clone())?;
89            tracing::info!(path = %dir.display(), "using filesystem blob store");
90            Ok(Arc::new(store))
91        } else {
92            tracing::warn!("FERRO_OCI_STORAGE_DIR unset — using a non-durable in-memory blob store");
93            Ok(Arc::new(InMemoryBlobStore::new()))
94        }
95    }
96}
97
98/// Assemble the full application router from a blob store: the OCI
99/// `/v2/**` surface + Kubernetes health probes, wrapped in the
100/// Prometheus instrumentation middleware and `/metrics` endpoint.
101///
102/// The registry metadata plane is ephemeral (in-memory only) — suitable
103/// for the in-memory blob-store deployment and tests. For a durable
104/// filesystem deployment use [`build_app_persisted`], which mirrors
105/// metadata under the storage directory so manifests/tags/referrers
106/// survive a restart (R2-6).
107pub fn build_app(blob_store: SharedBlobStore) -> axum::Router {
108    let registry = Arc::new(InMemoryRegistryMeta::new());
109    assemble(blob_store, registry)
110}
111
112/// Like [`build_app`] but with a registry metadata plane durably mirrored
113/// under `storage_dir` (R2-6).
114///
115/// On boot the existing `metadata.json` (if any) under `storage_dir` is
116/// loaded, so a restart of the filesystem-backed binary keeps its
117/// manifests, tag aliases, and referrer index even though those live in
118/// the metadata plane rather than the blob store. A missing/corrupt file
119/// is tolerated (start empty + log).
120pub fn build_app_persisted(blob_store: SharedBlobStore, storage_dir: &std::path::Path) -> axum::Router {
121    let registry = Arc::new(InMemoryRegistryMeta::with_persistence(storage_dir));
122    assemble(blob_store, registry)
123}
124
125/// Shared wiring: state + probes + metrics instrumentation.
126fn assemble(blob_store: SharedBlobStore, registry: Arc<InMemoryRegistryMeta>) -> axum::Router {
127    let state = AppState::new(blob_store, registry);
128    let blob_count = state.blob_count_handle();
129    instrument(
130        router(state).merge(probe_routes()),
131        Metrics::new(),
132        blob_count,
133    )
134}
135
136/// Boot the server described by `config` and serve until a shutdown
137/// signal (`SIGINT` / `SIGTERM`) arrives.
138///
139/// # Errors
140///
141/// Returns an error when the listen address is invalid, the blob store
142/// cannot be opened, the socket cannot be bound, or the server loop
143/// fails.
144pub async fn serve(config: &Config) -> Result<(), Box<dyn std::error::Error>> {
145    tracing::info!(?config, "ferro-oci-server starting");
146    let addr = config.socket_addr()?;
147    let blob_store = config.blob_store()?;
148    // R2-6: when a filesystem storage dir is configured, persist the
149    // registry metadata (manifests/tags/referrers) alongside the blobs so
150    // a restart does not strand blobs whose manifests/tags vanished. The
151    // in-memory deployment stays ephemeral.
152    let app = match &config.storage_dir {
153        Some(dir) => build_app_persisted(blob_store, dir),
154        None => build_app(blob_store),
155    };
156
157    let listener = tokio::net::TcpListener::bind(addr).await?;
158    tracing::info!(%addr, "listening");
159
160    axum::serve(listener, app)
161        .with_graceful_shutdown(shutdown_signal())
162        .await?;
163
164    tracing::info!("ferro-oci-server stopped");
165    Ok(())
166}
167
168/// Install a `tracing-subscriber` honouring `RUST_LOG` (default `info`).
169///
170/// A failure to install (for instance, a global subscriber already
171/// present in a test harness) is ignored so the server still boots.
172pub fn init_tracing() {
173    use tracing_subscriber::EnvFilter;
174
175    let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
176    let _ = tracing_subscriber::fmt().with_env_filter(filter).try_init();
177}
178
179/// Resolve when either `SIGINT` (Ctrl-C) or `SIGTERM` (container stop)
180/// is received, so `axum::serve` can drain in-flight requests before
181/// the process exits.
182async fn shutdown_signal() {
183    let ctrl_c = async {
184        if let Err(err) = tokio::signal::ctrl_c().await {
185            tracing::error!(%err, "failed to install Ctrl-C handler");
186        }
187    };
188
189    #[cfg(unix)]
190    let terminate = async {
191        match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) {
192            Ok(mut sig) => {
193                sig.recv().await;
194            }
195            Err(err) => tracing::error!(%err, "failed to install SIGTERM handler"),
196        }
197    };
198
199    #[cfg(not(unix))]
200    let terminate = std::future::pending::<()>();
201
202    tokio::select! {
203        () = ctrl_c => tracing::info!("received SIGINT — shutting down"),
204        () = terminate => tracing::info!("received SIGTERM — shutting down"),
205    }
206}
207
208#[cfg(test)]
209mod tests {
210    use super::{Config, DEFAULT_LISTEN, ENV_LISTEN, build_app, init_tracing};
211    use axum::body::Body;
212    use axum::http::{Request, StatusCode};
213    use std::net::SocketAddr;
214    use std::path::PathBuf;
215    use tower::ServiceExt;
216
217    #[test]
218    fn from_raw_applies_defaults_when_unset() {
219        let cfg = Config::from_raw(None, None);
220        assert_eq!(cfg.listen, DEFAULT_LISTEN);
221        assert_eq!(cfg.storage_dir, None);
222    }
223
224    #[test]
225    fn from_raw_reads_overrides() {
226        let cfg = Config::from_raw(
227            Some("127.0.0.1:0".to_owned()),
228            Some(PathBuf::from("/var/lib/oci-xyz")),
229        );
230        assert_eq!(cfg.listen, "127.0.0.1:0");
231        assert_eq!(cfg.storage_dir, Some(PathBuf::from("/var/lib/oci-xyz")));
232    }
233
234    #[test]
235    fn from_raw_treats_empty_storage_dir_as_inmemory() {
236        let cfg = Config::from_raw(None, Some(PathBuf::new()));
237        assert_eq!(cfg.storage_dir, None);
238    }
239
240    #[test]
241    fn from_env_smoke() {
242        let cfg = Config::from_env();
243        assert!(!cfg.listen.is_empty());
244    }
245
246    #[test]
247    fn socket_addr_parses_valid_listen() {
248        let cfg = Config {
249            listen: "0.0.0.0:8080".to_owned(),
250            storage_dir: None,
251        };
252        assert_eq!(
253            cfg.socket_addr().expect("addr"),
254            "0.0.0.0:8080".parse::<SocketAddr>().unwrap()
255        );
256    }
257
258    #[test]
259    fn socket_addr_rejects_garbage() {
260        let cfg = Config {
261            listen: "nope".to_owned(),
262            storage_dir: None,
263        };
264        let err = cfg.socket_addr().expect_err("should fail");
265        assert!(err.contains(ENV_LISTEN), "error names the env var: {err}");
266    }
267
268    #[test]
269    fn blob_store_in_memory_when_unset() {
270        let cfg = Config {
271            listen: DEFAULT_LISTEN.to_owned(),
272            storage_dir: None,
273        };
274        assert!(cfg.blob_store().is_ok());
275    }
276
277    #[test]
278    fn blob_store_creates_fs_dir() {
279        let tmp = tempfile::TempDir::new().expect("tempdir");
280        let dir = tmp.path().join("nested/blobs");
281        assert!(!dir.exists());
282        let cfg = Config {
283            listen: DEFAULT_LISTEN.to_owned(),
284            storage_dir: Some(dir.clone()),
285        };
286        assert!(cfg.blob_store().is_ok());
287        assert!(dir.is_dir(), "fs blob dir created");
288    }
289
290    #[tokio::test]
291    async fn build_app_serves_probes_v2_and_metrics() {
292        let app = build_app(std::sync::Arc::new(
293            ferro_blob_store::InMemoryBlobStore::new(),
294        ));
295        for (uri, expected) in [
296            ("/live", StatusCode::OK),
297            ("/ready", StatusCode::OK),
298            ("/healthz", StatusCode::OK),
299            ("/v2/", StatusCode::OK),
300            ("/metrics", StatusCode::OK),
301        ] {
302            let resp = app
303                .clone()
304                .oneshot(
305                    Request::builder()
306                        .uri(uri)
307                        .body(Body::empty())
308                        .expect("req"),
309                )
310                .await
311                .expect("resp");
312            assert_eq!(resp.status(), expected, "GET {uri}");
313        }
314    }
315
316    #[tokio::test]
317    async fn serve_rejects_invalid_listen_before_binding() {
318        let cfg = Config {
319            listen: "definitely-not-an-addr".to_owned(),
320            storage_dir: None,
321        };
322        let err = super::serve(&cfg).await.expect_err("invalid addr fails");
323        assert!(err.to_string().contains(ENV_LISTEN));
324    }
325
326    #[test]
327    fn init_tracing_is_idempotent() {
328        init_tracing();
329        init_tracing();
330    }
331
332    #[tokio::test]
333    async fn build_app_persisted_serves_v2_and_survives_restart() {
334        // `build_app_persisted` must return a real, wired router (not an
335        // empty `Router::default()`): it has to route `/v2/` AND persist
336        // metadata under the storage dir so a rebuilt app still resolves a
337        // pushed blob. An empty default router would 404 the `/v2/` probe
338        // and store nothing, so this kills the `-> Default::default()`
339        // mutant.
340        use super::build_app_persisted;
341        use axum::http::Method;
342        use ferro_blob_store::{Digest, InMemoryBlobStore};
343
344        let tmp = tempfile::TempDir::new().expect("tempdir");
345        let dir = tmp.path();
346        let store = std::sync::Arc::new(InMemoryBlobStore::new());
347
348        let app = build_app_persisted(store.clone(), dir);
349
350        // The version endpoint is wired (an empty router would 404).
351        let resp = app
352            .clone()
353            .oneshot(
354                Request::builder()
355                    .uri("/v2/")
356                    .body(Body::empty())
357                    .expect("req"),
358            )
359            .await
360            .expect("resp");
361        assert_eq!(resp.status(), StatusCode::OK, "GET /v2/ on persisted app");
362
363        // Push a manifest by digest so metadata is mirrored to disk.
364        let body = b"{\"schemaVersion\":2}";
365        let digest = Digest::sha256_of(&body[..]).to_string();
366        let put = app
367            .oneshot(
368                Request::builder()
369                    .method(Method::PUT)
370                    .uri(format!("/v2/repo/manifests/{digest}"))
371                    .header(
372                        "content-type",
373                        "application/vnd.oci.image.manifest.v1+json",
374                    )
375                    .body(Body::from(&body[..]))
376                    .expect("req"),
377            )
378            .await
379            .expect("put resp");
380        assert_eq!(put.status(), StatusCode::CREATED, "manifest PUT");
381
382        // Rebuild from the SAME dir: the persisted manifest must resolve,
383        // which a `Default::default()` router (no persistence wiring)
384        // could never do.
385        let app2 = build_app_persisted(store, dir);
386        let head = app2
387            .oneshot(
388                Request::builder()
389                    .method(Method::HEAD)
390                    .uri(format!("/v2/repo/manifests/{digest}"))
391                    .body(Body::empty())
392                    .expect("req"),
393            )
394            .await
395            .expect("head resp");
396        assert_eq!(
397            head.status(),
398            StatusCode::OK,
399            "manifest survives a simulated restart of the persisted app"
400        );
401    }
402}