use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;
use ferro_blob_store::{FsBlobStore, InMemoryBlobStore, SharedBlobStore};
use crate::metrics::{Metrics, instrument};
use crate::registry::InMemoryRegistryMeta;
use crate::router::{AppState, probe_routes, router};
pub const ENV_LISTEN: &str = "FERRO_OCI_LISTEN";
pub const ENV_STORAGE_DIR: &str = "FERRO_OCI_STORAGE_DIR";
pub const DEFAULT_LISTEN: &str = "0.0.0.0:8080";
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Config {
pub listen: String,
pub storage_dir: Option<PathBuf>,
}
impl Config {
#[must_use]
pub fn from_env() -> Self {
let listen = std::env::var(ENV_LISTEN).ok();
let storage_dir = std::env::var_os(ENV_STORAGE_DIR).filter(|v| !v.is_empty());
Self::from_raw(listen, storage_dir.map(PathBuf::from))
}
#[must_use]
pub fn from_raw(listen: Option<String>, storage_dir: Option<PathBuf>) -> Self {
let listen = listen.unwrap_or_else(|| DEFAULT_LISTEN.to_owned());
let storage_dir = storage_dir.filter(|p| !p.as_os_str().is_empty());
Self {
listen,
storage_dir,
}
}
pub fn socket_addr(&self) -> Result<SocketAddr, String> {
self.listen
.parse::<SocketAddr>()
.map_err(|e| format!("invalid {ENV_LISTEN} `{}`: {e}", self.listen))
}
pub fn blob_store(&self) -> Result<SharedBlobStore, Box<dyn std::error::Error>> {
if let Some(dir) = &self.storage_dir {
std::fs::create_dir_all(dir)?;
let store = FsBlobStore::new(dir.clone())?;
tracing::info!(path = %dir.display(), "using filesystem blob store");
Ok(Arc::new(store))
} else {
tracing::warn!("FERRO_OCI_STORAGE_DIR unset — using a non-durable in-memory blob store");
Ok(Arc::new(InMemoryBlobStore::new()))
}
}
}
pub fn build_app(blob_store: SharedBlobStore) -> axum::Router {
let registry = Arc::new(InMemoryRegistryMeta::new());
assemble(blob_store, registry)
}
pub fn build_app_persisted(blob_store: SharedBlobStore, storage_dir: &std::path::Path) -> axum::Router {
let registry = Arc::new(InMemoryRegistryMeta::with_persistence(storage_dir));
assemble(blob_store, registry)
}
fn assemble(blob_store: SharedBlobStore, registry: Arc<InMemoryRegistryMeta>) -> axum::Router {
let state = AppState::new(blob_store, registry);
let blob_count = state.blob_count_handle();
instrument(
router(state).merge(probe_routes()),
Metrics::new(),
blob_count,
)
}
pub async fn serve(config: &Config) -> Result<(), Box<dyn std::error::Error>> {
tracing::info!(?config, "ferro-oci-server starting");
let addr = config.socket_addr()?;
let blob_store = config.blob_store()?;
let app = match &config.storage_dir {
Some(dir) => build_app_persisted(blob_store, dir),
None => build_app(blob_store),
};
let listener = tokio::net::TcpListener::bind(addr).await?;
tracing::info!(%addr, "listening");
axum::serve(listener, app)
.with_graceful_shutdown(shutdown_signal())
.await?;
tracing::info!("ferro-oci-server stopped");
Ok(())
}
pub fn init_tracing() {
use tracing_subscriber::EnvFilter;
let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
let _ = tracing_subscriber::fmt().with_env_filter(filter).try_init();
}
async fn shutdown_signal() {
let ctrl_c = async {
if let Err(err) = tokio::signal::ctrl_c().await {
tracing::error!(%err, "failed to install Ctrl-C handler");
}
};
#[cfg(unix)]
let terminate = async {
match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) {
Ok(mut sig) => {
sig.recv().await;
}
Err(err) => tracing::error!(%err, "failed to install SIGTERM handler"),
}
};
#[cfg(not(unix))]
let terminate = std::future::pending::<()>();
tokio::select! {
() = ctrl_c => tracing::info!("received SIGINT — shutting down"),
() = terminate => tracing::info!("received SIGTERM — shutting down"),
}
}
#[cfg(test)]
mod tests {
use super::{Config, DEFAULT_LISTEN, ENV_LISTEN, build_app, init_tracing};
use axum::body::Body;
use axum::http::{Request, StatusCode};
use std::net::SocketAddr;
use std::path::PathBuf;
use tower::ServiceExt;
#[test]
fn from_raw_applies_defaults_when_unset() {
let cfg = Config::from_raw(None, None);
assert_eq!(cfg.listen, DEFAULT_LISTEN);
assert_eq!(cfg.storage_dir, None);
}
#[test]
fn from_raw_reads_overrides() {
let cfg = Config::from_raw(
Some("127.0.0.1:0".to_owned()),
Some(PathBuf::from("/var/lib/oci-xyz")),
);
assert_eq!(cfg.listen, "127.0.0.1:0");
assert_eq!(cfg.storage_dir, Some(PathBuf::from("/var/lib/oci-xyz")));
}
#[test]
fn from_raw_treats_empty_storage_dir_as_inmemory() {
let cfg = Config::from_raw(None, Some(PathBuf::new()));
assert_eq!(cfg.storage_dir, None);
}
#[test]
fn from_env_smoke() {
let cfg = Config::from_env();
assert!(!cfg.listen.is_empty());
}
#[test]
fn socket_addr_parses_valid_listen() {
let cfg = Config {
listen: "0.0.0.0:8080".to_owned(),
storage_dir: None,
};
assert_eq!(
cfg.socket_addr().expect("addr"),
"0.0.0.0:8080".parse::<SocketAddr>().unwrap()
);
}
#[test]
fn socket_addr_rejects_garbage() {
let cfg = Config {
listen: "nope".to_owned(),
storage_dir: None,
};
let err = cfg.socket_addr().expect_err("should fail");
assert!(err.contains(ENV_LISTEN), "error names the env var: {err}");
}
#[test]
fn blob_store_in_memory_when_unset() {
let cfg = Config {
listen: DEFAULT_LISTEN.to_owned(),
storage_dir: None,
};
assert!(cfg.blob_store().is_ok());
}
#[test]
fn blob_store_creates_fs_dir() {
let tmp = tempfile::TempDir::new().expect("tempdir");
let dir = tmp.path().join("nested/blobs");
assert!(!dir.exists());
let cfg = Config {
listen: DEFAULT_LISTEN.to_owned(),
storage_dir: Some(dir.clone()),
};
assert!(cfg.blob_store().is_ok());
assert!(dir.is_dir(), "fs blob dir created");
}
#[tokio::test]
async fn build_app_serves_probes_v2_and_metrics() {
let app = build_app(std::sync::Arc::new(
ferro_blob_store::InMemoryBlobStore::new(),
));
for (uri, expected) in [
("/live", StatusCode::OK),
("/ready", StatusCode::OK),
("/healthz", StatusCode::OK),
("/v2/", StatusCode::OK),
("/metrics", StatusCode::OK),
] {
let resp = app
.clone()
.oneshot(
Request::builder()
.uri(uri)
.body(Body::empty())
.expect("req"),
)
.await
.expect("resp");
assert_eq!(resp.status(), expected, "GET {uri}");
}
}
#[tokio::test]
async fn serve_rejects_invalid_listen_before_binding() {
let cfg = Config {
listen: "definitely-not-an-addr".to_owned(),
storage_dir: None,
};
let err = super::serve(&cfg).await.expect_err("invalid addr fails");
assert!(err.to_string().contains(ENV_LISTEN));
}
#[test]
fn init_tracing_is_idempotent() {
init_tracing();
init_tracing();
}
#[tokio::test]
async fn build_app_persisted_serves_v2_and_survives_restart() {
use super::build_app_persisted;
use axum::http::Method;
use ferro_blob_store::{Digest, InMemoryBlobStore};
let tmp = tempfile::TempDir::new().expect("tempdir");
let dir = tmp.path();
let store = std::sync::Arc::new(InMemoryBlobStore::new());
let app = build_app_persisted(store.clone(), dir);
let resp = app
.clone()
.oneshot(
Request::builder()
.uri("/v2/")
.body(Body::empty())
.expect("req"),
)
.await
.expect("resp");
assert_eq!(resp.status(), StatusCode::OK, "GET /v2/ on persisted app");
let body = b"{\"schemaVersion\":2}";
let digest = Digest::sha256_of(&body[..]).to_string();
let put = app
.oneshot(
Request::builder()
.method(Method::PUT)
.uri(format!("/v2/repo/manifests/{digest}"))
.header(
"content-type",
"application/vnd.oci.image.manifest.v1+json",
)
.body(Body::from(&body[..]))
.expect("req"),
)
.await
.expect("put resp");
assert_eq!(put.status(), StatusCode::CREATED, "manifest PUT");
let app2 = build_app_persisted(store, dir);
let head = app2
.oneshot(
Request::builder()
.method(Method::HEAD)
.uri(format!("/v2/repo/manifests/{digest}"))
.body(Body::empty())
.expect("req"),
)
.await
.expect("head resp");
assert_eq!(
head.status(),
StatusCode::OK,
"manifest survives a simulated restart of the persisted app"
);
}
}