use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, RwLock};
use actionqueue_storage::mutation::authority::StorageMutationAuthority;
use actionqueue_storage::recovery::bootstrap::RecoveryObservations;
use actionqueue_storage::recovery::reducer::ReplayReducer;
use actionqueue_storage::wal::fs_writer::WalFsWriter;
use actionqueue_storage::wal::{InstrumentedWalWriter, WalAppendTelemetry};
use crate::bootstrap::{ReadyStatus, RouterConfig};
use crate::metrics::registry::MetricsRegistry;
use crate::time::clock::SharedDaemonClock;
pub type ControlMutationAuthority =
Arc<Mutex<StorageMutationAuthority<InstrumentedWalWriter<WalFsWriter>, ReplayReducer>>>;
pub struct RouterStateInner {
pub(crate) router_config: RouterConfig,
pub(crate) shared_projection: Arc<RwLock<ReplayReducer>>,
pub(crate) control_authority: Option<ControlMutationAuthority>,
pub(crate) metrics: Arc<MetricsRegistry>,
pub(crate) wal_append_telemetry: WalAppendTelemetry,
pub(crate) clock: SharedDaemonClock,
pub(crate) recovery_observations: RecoveryObservations,
pub(crate) recovery_histogram_observed: AtomicBool,
pub(crate) ready_status: ReadyStatus,
}
impl std::fmt::Debug for RouterStateInner {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RouterStateInner")
.field("router_config", &self.router_config)
.field("shared_projection", &"Arc<RwLock<ReplayReducer>>")
.field("control_authority", &self.control_authority.is_some())
.field("metrics_enabled", &self.metrics.is_enabled())
.field("wal_append_telemetry", &self.wal_append_telemetry.snapshot())
.field("clock_refcount", &Arc::strong_count(&self.clock))
.field("recovery_observations", &self.recovery_observations)
.field(
"recovery_histogram_observed",
&self.recovery_histogram_observed.load(Ordering::Relaxed),
)
.field("ready_status", &self.ready_status)
.finish()
}
}
pub type RouterState = Arc<RouterStateInner>;
pub struct RouterObservability {
pub metrics: Arc<MetricsRegistry>,
pub wal_append_telemetry: WalAppendTelemetry,
pub clock: SharedDaemonClock,
pub recovery_observations: RecoveryObservations,
}
impl RouterStateInner {
pub fn new(
router_config: RouterConfig,
shared_projection: Arc<RwLock<ReplayReducer>>,
observability: RouterObservability,
ready_status: ReadyStatus,
) -> Self {
Self {
router_config,
shared_projection,
control_authority: None,
metrics: observability.metrics,
wal_append_telemetry: observability.wal_append_telemetry,
clock: observability.clock,
recovery_observations: observability.recovery_observations,
recovery_histogram_observed: AtomicBool::new(false),
ready_status,
}
}
pub fn with_control_authority(
router_config: RouterConfig,
shared_projection: Arc<RwLock<ReplayReducer>>,
observability: RouterObservability,
control_authority: ControlMutationAuthority,
ready_status: ReadyStatus,
) -> Self {
Self {
router_config,
shared_projection,
control_authority: Some(control_authority),
metrics: observability.metrics,
wal_append_telemetry: observability.wal_append_telemetry,
clock: observability.clock,
recovery_observations: observability.recovery_observations,
recovery_histogram_observed: AtomicBool::new(false),
ready_status,
}
}
}
#[cfg(feature = "actor")]
pub mod actors;
pub mod control;
pub mod health;
pub mod metrics;
pub mod pagination;
#[cfg(feature = "platform")]
pub mod platform;
pub mod ready;
pub mod run_get;
pub mod runs_list;
pub mod stats;
pub mod task_get;
pub mod tasks_list;
pub(crate) fn read_projection(
state: &RouterStateInner,
) -> Result<std::sync::RwLockReadGuard<'_, ReplayReducer>, Box<axum::response::Response>> {
state.shared_projection.read().map_err(|_| {
tracing::error!("shared projection RwLock poisoned — read handler degraded");
Box::new(projection_poison_response())
})
}
pub(crate) fn write_projection(
state: &RouterStateInner,
) -> Result<std::sync::RwLockWriteGuard<'_, ReplayReducer>, Box<axum::response::Response>> {
state.shared_projection.write().map_err(|_| {
tracing::error!("shared projection RwLock poisoned — write handler degraded");
Box::new(projection_poison_response())
})
}
fn projection_poison_response() -> axum::response::Response {
use axum::http::StatusCode;
use axum::response::IntoResponse;
use axum::Json;
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({
"error": "internal_error",
"message": "shared projection lock poisoned"
})),
)
.into_response()
}
pub fn build_router(state: RouterState) -> axum::Router {
let control_enabled = state.router_config.control_enabled;
let metrics_enabled = state.router_config.metrics_enabled;
let router: axum::Router<RouterState> = axum::Router::new();
let router = health::register_routes(router);
let router = ready::register_routes(router);
let router = stats::register_routes(router);
let router = tasks_list::register_routes(router);
let router = runs_list::register_routes(router);
let router = run_get::register_routes(router);
let router = task_get::register_routes(router);
let router = metrics::register_routes(router, metrics_enabled);
let router = control::register_routes(router, control_enabled);
#[cfg(feature = "actor")]
let router = actors::register_routes(router);
#[cfg(feature = "platform")]
let router = platform::register_routes(router);
router.with_state(state)
}