actionqueue_daemon/http/
mod.rs1use std::sync::atomic::{AtomicBool, Ordering};
16use std::sync::{Arc, Mutex, RwLock};
17
18use actionqueue_storage::mutation::authority::StorageMutationAuthority;
19use actionqueue_storage::recovery::bootstrap::RecoveryObservations;
20use actionqueue_storage::recovery::reducer::ReplayReducer;
21use actionqueue_storage::wal::fs_writer::WalFsWriter;
22use actionqueue_storage::wal::{InstrumentedWalWriter, WalAppendTelemetry};
23
24use crate::bootstrap::{ReadyStatus, RouterConfig};
25use crate::metrics::registry::MetricsRegistry;
26use crate::time::clock::SharedDaemonClock;
27
28pub type ControlMutationAuthority =
30 Arc<Mutex<StorageMutationAuthority<InstrumentedWalWriter<WalFsWriter>, ReplayReducer>>>;
31
32pub struct RouterStateInner {
43 pub(crate) router_config: RouterConfig,
48
49 pub(crate) shared_projection: Arc<RwLock<ReplayReducer>>,
55
56 pub(crate) control_authority: Option<ControlMutationAuthority>,
58
59 pub(crate) metrics: Arc<MetricsRegistry>,
61
62 pub(crate) wal_append_telemetry: WalAppendTelemetry,
64
65 pub(crate) clock: SharedDaemonClock,
67
68 pub(crate) recovery_observations: RecoveryObservations,
70
71 pub(crate) recovery_histogram_observed: AtomicBool,
73
74 pub(crate) ready_status: ReadyStatus,
76}
77
78impl std::fmt::Debug for RouterStateInner {
79 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80 f.debug_struct("RouterStateInner")
81 .field("router_config", &self.router_config)
82 .field("shared_projection", &"Arc<RwLock<ReplayReducer>>")
83 .field("control_authority", &self.control_authority.is_some())
84 .field("metrics_enabled", &self.metrics.is_enabled())
85 .field("wal_append_telemetry", &self.wal_append_telemetry.snapshot())
86 .field("clock_refcount", &Arc::strong_count(&self.clock))
87 .field("recovery_observations", &self.recovery_observations)
88 .field(
89 "recovery_histogram_observed",
90 &self.recovery_histogram_observed.load(Ordering::Relaxed),
91 )
92 .field("ready_status", &self.ready_status)
93 .finish()
94 }
95}
96
97pub type RouterState = Arc<RouterStateInner>;
99
100pub struct RouterObservability {
105 pub metrics: Arc<MetricsRegistry>,
107 pub wal_append_telemetry: WalAppendTelemetry,
109 pub clock: SharedDaemonClock,
111 pub recovery_observations: RecoveryObservations,
113}
114
115impl RouterStateInner {
116 pub fn new(
121 router_config: RouterConfig,
122 shared_projection: Arc<RwLock<ReplayReducer>>,
123 observability: RouterObservability,
124 ready_status: ReadyStatus,
125 ) -> Self {
126 Self {
127 router_config,
128 shared_projection,
129 control_authority: None,
130 metrics: observability.metrics,
131 wal_append_telemetry: observability.wal_append_telemetry,
132 clock: observability.clock,
133 recovery_observations: observability.recovery_observations,
134 recovery_histogram_observed: AtomicBool::new(false),
135 ready_status,
136 }
137 }
138
139 pub fn with_control_authority(
141 router_config: RouterConfig,
142 shared_projection: Arc<RwLock<ReplayReducer>>,
143 observability: RouterObservability,
144 control_authority: ControlMutationAuthority,
145 ready_status: ReadyStatus,
146 ) -> Self {
147 Self {
148 router_config,
149 shared_projection,
150 control_authority: Some(control_authority),
151 metrics: observability.metrics,
152 wal_append_telemetry: observability.wal_append_telemetry,
153 clock: observability.clock,
154 recovery_observations: observability.recovery_observations,
155 recovery_histogram_observed: AtomicBool::new(false),
156 ready_status,
157 }
158 }
159}
160
161#[cfg(feature = "actor")]
162pub mod actors;
163pub mod control;
164pub mod health;
165pub mod metrics;
166pub mod pagination;
167#[cfg(feature = "platform")]
168pub mod platform;
169pub mod ready;
170pub mod run_get;
171pub mod runs_list;
172pub mod stats;
173pub mod task_get;
174pub mod tasks_list;
175
176pub(crate) fn read_projection(
178 state: &RouterStateInner,
179) -> Result<std::sync::RwLockReadGuard<'_, ReplayReducer>, Box<axum::response::Response>> {
180 state.shared_projection.read().map_err(|_| {
181 tracing::error!("shared projection RwLock poisoned — read handler degraded");
182 Box::new(projection_poison_response())
183 })
184}
185
186pub(crate) fn write_projection(
188 state: &RouterStateInner,
189) -> Result<std::sync::RwLockWriteGuard<'_, ReplayReducer>, Box<axum::response::Response>> {
190 state.shared_projection.write().map_err(|_| {
191 tracing::error!("shared projection RwLock poisoned — write handler degraded");
192 Box::new(projection_poison_response())
193 })
194}
195
196fn projection_poison_response() -> axum::response::Response {
197 use axum::http::StatusCode;
198 use axum::response::IntoResponse;
199 use axum::Json;
200
201 (
202 StatusCode::INTERNAL_SERVER_ERROR,
203 Json(serde_json::json!({
204 "error": "internal_error",
205 "message": "shared projection lock poisoned"
206 })),
207 )
208 .into_response()
209}
210
211pub fn build_router(state: RouterState) -> axum::Router {
224 let control_enabled = state.router_config.control_enabled;
225 let metrics_enabled = state.router_config.metrics_enabled;
226 let router: axum::Router<RouterState> = axum::Router::new();
227 let router = health::register_routes(router);
228 let router = ready::register_routes(router);
229 let router = stats::register_routes(router);
230 let router = tasks_list::register_routes(router);
231 let router = runs_list::register_routes(router);
232 let router = run_get::register_routes(router);
233 let router = task_get::register_routes(router);
234 let router = metrics::register_routes(router, metrics_enabled);
235 let router = control::register_routes(router, control_enabled);
236 #[cfg(feature = "actor")]
237 let router = actors::register_routes(router);
238 #[cfg(feature = "platform")]
239 let router = platform::register_routes(router);
240 router.with_state(state)
241}