use std::collections::HashMap;
use std::path::Path;
use std::sync::Arc;
use anyhow::{Context, anyhow, bail};
use super::StorageBackend;
use super::local::LocalFsBackend;
use super::s3::S3Backend;
use crate::config::{Config, StorageConfig, StorageType};
#[derive(Clone, Debug)]
pub enum StorageDetails {
S3 {
bucket: String,
endpoint: Option<String>,
region: Option<String>,
},
Local {
root_path: String,
},
}
#[derive(Clone)]
pub struct NamedBackend {
pub name: String,
pub r#type: StorageType,
pub backend: Arc<dyn StorageBackend>,
pub details: StorageDetails,
}
#[derive(Clone)]
pub struct InvalidStorageEntry {
pub name: String,
pub r#type: StorageType,
pub reason: String,
pub details: StorageDetails,
}
fn extract_details(entry: &StorageConfig) -> StorageDetails {
match entry.r#type {
StorageType::S3 => entry
.s3
.as_ref()
.map(|s3| StorageDetails::S3 {
bucket: s3.bucket.clone(),
endpoint: s3.endpoint.clone(),
region: s3.region.clone(),
})
.unwrap_or(StorageDetails::S3 {
bucket: String::new(),
endpoint: None,
region: None,
}),
StorageType::Local => entry
.local
.as_ref()
.map(|local| StorageDetails::Local {
root_path: local.root_path.display().to_string(),
})
.unwrap_or(StorageDetails::Local {
root_path: String::new(),
}),
}
}
pub struct BackendRegistry {
pub backends: HashMap<String, NamedBackend>,
pub invalid: HashMap<String, InvalidStorageEntry>,
pub order: Vec<String>,
pub default_name: String,
}
pub async fn create_registry(cfg: &Config) -> anyhow::Result<BackendRegistry> {
if cfg.storages.is_empty() {
bail!("no storages defined in configuration");
}
let default = cfg
.active_storage()
.ok_or_else(|| anyhow!("no storages defined in configuration"))?;
let default_name = default.name.clone();
let mut backends: HashMap<String, NamedBackend> = HashMap::new();
let mut invalid: HashMap<String, InvalidStorageEntry> = HashMap::new();
let mut order: Vec<String> = Vec::with_capacity(cfg.storages.len());
let mut seen: std::collections::HashSet<String> =
std::collections::HashSet::with_capacity(cfg.storages.len());
for entry in &cfg.storages {
if !seen.insert(entry.name.clone()) {
bail!("duplicate storage name: '{}'", entry.name);
}
let is_default = entry.name == default_name;
tracing::info!(
storage.name = entry.name.as_str(),
storage.r#type = ?entry.r#type,
storage.default = is_default,
"registering storage backend"
);
let details = extract_details(entry);
match build_one(entry).await {
Ok(backend) => {
backends.insert(
entry.name.clone(),
NamedBackend {
name: entry.name.clone(),
r#type: entry.r#type,
backend,
details,
},
);
}
Err(e) => {
if is_default {
return Err(e)
.with_context(|| format!("default storage '{}' failed to initialize", entry.name));
}
let reason = format!("{e:#}");
tracing::warn!(
storage.name = entry.name.as_str(),
storage.r#type = ?entry.r#type,
reason = %reason,
"skipping invalid storage; UI will mark it [invalid] and requests targeting it return 503",
);
invalid.insert(
entry.name.clone(),
InvalidStorageEntry {
name: entry.name.clone(),
r#type: entry.r#type,
reason,
details,
},
);
}
}
order.push(entry.name.clone());
}
Ok(BackendRegistry {
backends,
invalid,
order,
default_name,
})
}
async fn build_one(entry: &StorageConfig) -> anyhow::Result<Arc<dyn StorageBackend>> {
let backend: Arc<dyn StorageBackend> = match entry.r#type {
StorageType::S3 => {
let s3 = entry.s3.as_ref().ok_or_else(|| {
anyhow!(
"storage '{}': type=s3 but [storages.s3] sub-table is missing",
entry.name
)
})?;
let backend = S3Backend::new(s3)
.await
.with_context(|| format!("init S3 backend '{}'", entry.name))?;
Arc::new(backend)
}
StorageType::Local => {
let local = entry.local.as_ref().ok_or_else(|| {
anyhow!(
"storage '{}': type=local but [storages.local] sub-table is missing",
entry.name
)
})?;
validate_local_root(&local.root_path)
.with_context(|| format!("invalid root_path for storage '{}'", entry.name))?;
Arc::new(LocalFsBackend::new(
local.root_path.clone(),
local.follow_symlinks,
))
}
};
Ok(backend)
}
fn validate_local_root(path: &Path) -> anyhow::Result<()> {
let metadata = std::fs::metadata(path).with_context(|| {
format!(
"root_path does not exist or is unreadable: {}",
path.display()
)
})?;
if !metadata.is_dir() {
bail!("root_path is not a directory: {}", path.display());
}
let probe = path.join(".omni-stream-write-probe");
std::fs::write(&probe, b"")
.with_context(|| format!("root_path is not writable: {}", path.display()))?;
let _ = std::fs::remove_file(&probe);
Ok(())
}