use std::path::PathBuf;
use actionqueue_storage::mutation::authority::StorageMutationAuthority;
use actionqueue_storage::recovery::bootstrap::{
load_projection_from_storage, RecoveryBootstrapError,
};
use actionqueue_storage::recovery::reducer::ReplayReducer;
use crate::config::{ConfigError, DaemonConfig};
use crate::metrics::registry::MetricsRegistry;
use crate::time::clock::{SharedDaemonClock, SystemClock};
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum BootstrapError {
Config(ConfigError),
WalInit(String),
Dependency(String),
}
impl std::fmt::Display for BootstrapError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
BootstrapError::Config(e) => write!(f, "config error: {e}"),
BootstrapError::WalInit(msg) => write!(f, "WAL initialization error: {msg}"),
BootstrapError::Dependency(msg) => write!(f, "dependency error: {msg}"),
}
}
}
impl std::error::Error for BootstrapError {}
impl std::convert::From<ConfigError> for BootstrapError {
fn from(err: ConfigError) -> Self {
BootstrapError::Config(err)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[must_use]
pub struct ReadyStatus {
is_ready: bool,
reason: &'static str,
}
impl ReadyStatus {
pub const REASON_READY: &'static str = "ready";
pub const REASON_CONFIG_INVALID: &'static str = "config_invalid";
pub const REASON_BOOTSTRAP_INCOMPLETE: &'static str = "bootstrap_incomplete";
pub const fn ready() -> Self {
Self { is_ready: true, reason: Self::REASON_READY }
}
pub const fn not_ready(reason: &'static str) -> Self {
Self { is_ready: false, reason }
}
pub const fn is_ready(&self) -> bool {
self.is_ready
}
pub const fn reason(&self) -> &'static str {
self.reason
}
}
#[derive(Debug, Clone)]
pub struct RouterConfig {
pub control_enabled: bool,
pub metrics_enabled: bool,
}
#[must_use]
pub struct BootstrapState {
config: DaemonConfig,
metrics: std::sync::Arc<MetricsRegistry>,
http_router: axum::Router,
wal_path: PathBuf,
snapshot_path: PathBuf,
projection: ReplayReducer,
router_state: crate::http::RouterState,
clock: SharedDaemonClock,
ready_status: ReadyStatus,
}
impl BootstrapState {
pub fn config(&self) -> &DaemonConfig {
&self.config
}
pub fn metrics(&self) -> &MetricsRegistry {
self.metrics.as_ref()
}
pub fn http_router(&self) -> &axum::Router {
&self.http_router
}
pub fn router_state(&self) -> &crate::http::RouterState {
&self.router_state
}
pub fn clock(&self) -> &SharedDaemonClock {
&self.clock
}
pub fn into_http(self) -> (axum::Router, crate::http::RouterState) {
(self.http_router, self.router_state)
}
pub fn wal_path(&self) -> &PathBuf {
&self.wal_path
}
pub fn snapshot_path(&self) -> &PathBuf {
&self.snapshot_path
}
pub fn projection(&self) -> &ReplayReducer {
&self.projection
}
pub fn ready_status(&self) -> ReadyStatus {
self.ready_status
}
}
pub fn bootstrap(config: DaemonConfig) -> Result<BootstrapState, BootstrapError> {
let _ = tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.try_init();
config.validate()?;
let recovery = load_projection_from_storage(&config.data_dir).map_err(map_recovery_error)?;
let wal_path = recovery.wal_path.clone();
let snapshot_path = recovery.snapshot_path.clone();
let projection = recovery.projection.clone();
let wal_append_telemetry = recovery.wal_append_telemetry.clone();
let recovery_observations = recovery.recovery_observations;
let control_authority = if config.enable_control {
Some(std::sync::Arc::new(std::sync::Mutex::new(StorageMutationAuthority::new(
recovery.wal_writer,
projection.clone(),
))))
} else {
None
};
let metrics =
std::sync::Arc::new(MetricsRegistry::new(config.metrics_bind).map_err(|error| {
BootstrapError::Dependency(format!("metrics_registry_init_failed: {error}"))
})?);
let clock: SharedDaemonClock = std::sync::Arc::new(SystemClock);
let ready_status = ReadyStatus::ready();
let router_config = crate::bootstrap::RouterConfig {
control_enabled: config.enable_control,
metrics_enabled: config.metrics_bind.is_some(),
};
let observability = crate::http::RouterObservability {
metrics: metrics.clone(),
wal_append_telemetry,
clock: clock.clone(),
recovery_observations,
};
let shared_projection = std::sync::Arc::new(std::sync::RwLock::new(projection.clone()));
let router_state_inner = if let Some(authority) = control_authority {
crate::http::RouterStateInner::with_control_authority(
router_config,
shared_projection,
observability,
authority,
ready_status,
)
} else {
crate::http::RouterStateInner::new(
router_config,
shared_projection,
observability,
ready_status,
)
};
let router_state = std::sync::Arc::new(router_state_inner);
let http_router = crate::http::build_router(router_state.clone());
Ok(BootstrapState {
config,
metrics,
http_router,
wal_path,
snapshot_path,
projection,
router_state,
clock,
ready_status,
})
}
fn map_recovery_error(error: RecoveryBootstrapError) -> BootstrapError {
match error {
RecoveryBootstrapError::WalInit(msg) => BootstrapError::WalInit(msg),
RecoveryBootstrapError::WalRead(msg)
| RecoveryBootstrapError::SnapshotLoad(msg)
| RecoveryBootstrapError::WalReplay(msg)
| RecoveryBootstrapError::SnapshotBootstrap(msg) => BootstrapError::Dependency(msg),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_bootstrap_with_valid_config() {
let config = DaemonConfig::default();
let control_flag = config.enable_control;
let result = bootstrap(config);
if let Ok(state) = result {
let router_state = state.router_state();
assert!(router_state.ready_status.is_ready());
assert_eq!(router_state.router_config.control_enabled, control_flag);
assert_eq!(router_state.router_config.metrics_enabled, state.metrics().is_enabled());
} else {
assert!(matches!(result, Err(BootstrapError::WalInit(_))));
}
}
#[test]
fn test_bootstrap_with_invalid_config() {
let config = DaemonConfig {
bind_address: std::net::SocketAddr::from(([127, 0, 0, 1], 0)),
..Default::default()
};
let result = bootstrap(config);
assert!(matches!(result, Err(BootstrapError::Config(_))));
}
}