1use std::net::SocketAddr;
11use std::path::PathBuf;
12use std::sync::Arc;
13
14use ferro_blob_store::{FsBlobStore, InMemoryBlobStore, SharedBlobStore};
15
16use crate::metrics::{Metrics, instrument};
17use crate::registry::InMemoryRegistryMeta;
18use crate::router::{AppState, probe_routes, router};
19
20pub const ENV_LISTEN: &str = "FERRO_OCI_LISTEN";
22pub const ENV_STORAGE_DIR: &str = "FERRO_OCI_STORAGE_DIR";
24
25pub const DEFAULT_LISTEN: &str = "0.0.0.0:8080";
27
28#[derive(Debug, Clone, PartialEq, Eq)]
30pub struct Config {
31 pub listen: String,
33 pub storage_dir: Option<PathBuf>,
38}
39
40impl Config {
41 #[must_use]
44 pub fn from_env() -> Self {
45 let listen = std::env::var(ENV_LISTEN).ok();
46 let storage_dir = std::env::var_os(ENV_STORAGE_DIR).filter(|v| !v.is_empty());
47 Self::from_raw(listen, storage_dir.map(PathBuf::from))
48 }
49
50 #[must_use]
58 pub fn from_raw(listen: Option<String>, storage_dir: Option<PathBuf>) -> Self {
59 let listen = listen.unwrap_or_else(|| DEFAULT_LISTEN.to_owned());
60 let storage_dir = storage_dir.filter(|p| !p.as_os_str().is_empty());
61 Self {
62 listen,
63 storage_dir,
64 }
65 }
66
67 pub fn socket_addr(&self) -> Result<SocketAddr, String> {
74 self.listen
75 .parse::<SocketAddr>()
76 .map_err(|e| format!("invalid {ENV_LISTEN} `{}`: {e}", self.listen))
77 }
78
79 pub fn blob_store(&self) -> Result<SharedBlobStore, Box<dyn std::error::Error>> {
86 if let Some(dir) = &self.storage_dir {
87 std::fs::create_dir_all(dir)?;
88 let store = FsBlobStore::new(dir.clone())?;
89 tracing::info!(path = %dir.display(), "using filesystem blob store");
90 Ok(Arc::new(store))
91 } else {
92 tracing::warn!("FERRO_OCI_STORAGE_DIR unset — using a non-durable in-memory blob store");
93 Ok(Arc::new(InMemoryBlobStore::new()))
94 }
95 }
96}
97
98pub fn build_app(blob_store: SharedBlobStore) -> axum::Router {
108 let registry = Arc::new(InMemoryRegistryMeta::new());
109 assemble(blob_store, registry)
110}
111
112pub fn build_app_persisted(blob_store: SharedBlobStore, storage_dir: &std::path::Path) -> axum::Router {
121 let registry = Arc::new(InMemoryRegistryMeta::with_persistence(storage_dir));
122 assemble(blob_store, registry)
123}
124
125fn assemble(blob_store: SharedBlobStore, registry: Arc<InMemoryRegistryMeta>) -> axum::Router {
127 let state = AppState::new(blob_store, registry);
128 let blob_count = state.blob_count_handle();
129 instrument(
130 router(state).merge(probe_routes()),
131 Metrics::new(),
132 blob_count,
133 )
134}
135
136pub async fn serve(config: &Config) -> Result<(), Box<dyn std::error::Error>> {
145 tracing::info!(?config, "ferro-oci-server starting");
146 let addr = config.socket_addr()?;
147 let blob_store = config.blob_store()?;
148 let app = match &config.storage_dir {
153 Some(dir) => build_app_persisted(blob_store, dir),
154 None => build_app(blob_store),
155 };
156
157 let listener = tokio::net::TcpListener::bind(addr).await?;
158 tracing::info!(%addr, "listening");
159
160 axum::serve(listener, app)
161 .with_graceful_shutdown(shutdown_signal())
162 .await?;
163
164 tracing::info!("ferro-oci-server stopped");
165 Ok(())
166}
167
168pub fn init_tracing() {
173 use tracing_subscriber::EnvFilter;
174
175 let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
176 let _ = tracing_subscriber::fmt().with_env_filter(filter).try_init();
177}
178
179async fn shutdown_signal() {
183 let ctrl_c = async {
184 if let Err(err) = tokio::signal::ctrl_c().await {
185 tracing::error!(%err, "failed to install Ctrl-C handler");
186 }
187 };
188
189 #[cfg(unix)]
190 let terminate = async {
191 match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) {
192 Ok(mut sig) => {
193 sig.recv().await;
194 }
195 Err(err) => tracing::error!(%err, "failed to install SIGTERM handler"),
196 }
197 };
198
199 #[cfg(not(unix))]
200 let terminate = std::future::pending::<()>();
201
202 tokio::select! {
203 () = ctrl_c => tracing::info!("received SIGINT — shutting down"),
204 () = terminate => tracing::info!("received SIGTERM — shutting down"),
205 }
206}
207
208#[cfg(test)]
209mod tests {
210 use super::{Config, DEFAULT_LISTEN, ENV_LISTEN, build_app, init_tracing};
211 use axum::body::Body;
212 use axum::http::{Request, StatusCode};
213 use std::net::SocketAddr;
214 use std::path::PathBuf;
215 use tower::ServiceExt;
216
217 #[test]
218 fn from_raw_applies_defaults_when_unset() {
219 let cfg = Config::from_raw(None, None);
220 assert_eq!(cfg.listen, DEFAULT_LISTEN);
221 assert_eq!(cfg.storage_dir, None);
222 }
223
224 #[test]
225 fn from_raw_reads_overrides() {
226 let cfg = Config::from_raw(
227 Some("127.0.0.1:0".to_owned()),
228 Some(PathBuf::from("/var/lib/oci-xyz")),
229 );
230 assert_eq!(cfg.listen, "127.0.0.1:0");
231 assert_eq!(cfg.storage_dir, Some(PathBuf::from("/var/lib/oci-xyz")));
232 }
233
234 #[test]
235 fn from_raw_treats_empty_storage_dir_as_inmemory() {
236 let cfg = Config::from_raw(None, Some(PathBuf::new()));
237 assert_eq!(cfg.storage_dir, None);
238 }
239
240 #[test]
241 fn from_env_smoke() {
242 let cfg = Config::from_env();
243 assert!(!cfg.listen.is_empty());
244 }
245
246 #[test]
247 fn socket_addr_parses_valid_listen() {
248 let cfg = Config {
249 listen: "0.0.0.0:8080".to_owned(),
250 storage_dir: None,
251 };
252 assert_eq!(
253 cfg.socket_addr().expect("addr"),
254 "0.0.0.0:8080".parse::<SocketAddr>().unwrap()
255 );
256 }
257
258 #[test]
259 fn socket_addr_rejects_garbage() {
260 let cfg = Config {
261 listen: "nope".to_owned(),
262 storage_dir: None,
263 };
264 let err = cfg.socket_addr().expect_err("should fail");
265 assert!(err.contains(ENV_LISTEN), "error names the env var: {err}");
266 }
267
268 #[test]
269 fn blob_store_in_memory_when_unset() {
270 let cfg = Config {
271 listen: DEFAULT_LISTEN.to_owned(),
272 storage_dir: None,
273 };
274 assert!(cfg.blob_store().is_ok());
275 }
276
277 #[test]
278 fn blob_store_creates_fs_dir() {
279 let tmp = tempfile::TempDir::new().expect("tempdir");
280 let dir = tmp.path().join("nested/blobs");
281 assert!(!dir.exists());
282 let cfg = Config {
283 listen: DEFAULT_LISTEN.to_owned(),
284 storage_dir: Some(dir.clone()),
285 };
286 assert!(cfg.blob_store().is_ok());
287 assert!(dir.is_dir(), "fs blob dir created");
288 }
289
290 #[tokio::test]
291 async fn build_app_serves_probes_v2_and_metrics() {
292 let app = build_app(std::sync::Arc::new(
293 ferro_blob_store::InMemoryBlobStore::new(),
294 ));
295 for (uri, expected) in [
296 ("/live", StatusCode::OK),
297 ("/ready", StatusCode::OK),
298 ("/healthz", StatusCode::OK),
299 ("/v2/", StatusCode::OK),
300 ("/metrics", StatusCode::OK),
301 ] {
302 let resp = app
303 .clone()
304 .oneshot(
305 Request::builder()
306 .uri(uri)
307 .body(Body::empty())
308 .expect("req"),
309 )
310 .await
311 .expect("resp");
312 assert_eq!(resp.status(), expected, "GET {uri}");
313 }
314 }
315
316 #[tokio::test]
317 async fn serve_rejects_invalid_listen_before_binding() {
318 let cfg = Config {
319 listen: "definitely-not-an-addr".to_owned(),
320 storage_dir: None,
321 };
322 let err = super::serve(&cfg).await.expect_err("invalid addr fails");
323 assert!(err.to_string().contains(ENV_LISTEN));
324 }
325
326 #[test]
327 fn init_tracing_is_idempotent() {
328 init_tracing();
329 init_tracing();
330 }
331
332 #[tokio::test]
333 async fn build_app_persisted_serves_v2_and_survives_restart() {
334 use super::build_app_persisted;
341 use axum::http::Method;
342 use ferro_blob_store::{Digest, InMemoryBlobStore};
343
344 let tmp = tempfile::TempDir::new().expect("tempdir");
345 let dir = tmp.path();
346 let store = std::sync::Arc::new(InMemoryBlobStore::new());
347
348 let app = build_app_persisted(store.clone(), dir);
349
350 let resp = app
352 .clone()
353 .oneshot(
354 Request::builder()
355 .uri("/v2/")
356 .body(Body::empty())
357 .expect("req"),
358 )
359 .await
360 .expect("resp");
361 assert_eq!(resp.status(), StatusCode::OK, "GET /v2/ on persisted app");
362
363 let body = b"{\"schemaVersion\":2}";
365 let digest = Digest::sha256_of(&body[..]).to_string();
366 let put = app
367 .oneshot(
368 Request::builder()
369 .method(Method::PUT)
370 .uri(format!("/v2/repo/manifests/{digest}"))
371 .header(
372 "content-type",
373 "application/vnd.oci.image.manifest.v1+json",
374 )
375 .body(Body::from(&body[..]))
376 .expect("req"),
377 )
378 .await
379 .expect("put resp");
380 assert_eq!(put.status(), StatusCode::CREATED, "manifest PUT");
381
382 let app2 = build_app_persisted(store, dir);
386 let head = app2
387 .oneshot(
388 Request::builder()
389 .method(Method::HEAD)
390 .uri(format!("/v2/repo/manifests/{digest}"))
391 .body(Body::empty())
392 .expect("req"),
393 )
394 .await
395 .expect("head resp");
396 assert_eq!(
397 head.status(),
398 StatusCode::OK,
399 "manifest survives a simulated restart of the persisted app"
400 );
401 }
402}