Skip to main content

actionqueue_daemon/http/
mod.rs

1//! HTTP route modules.
2//!
3//! This module provides the HTTP introspection surface for the ActionQueue daemon.
4//! Routes are organized by functionality:
5//!
6//! - [`health`] - Liveness endpoint (`GET /healthz`)
7//! - [`ready`] - Readiness endpoint (`GET /ready`)
8//! - [`stats`] - Aggregate statistics endpoint (`GET /api/v1/stats`)
9//! - [`tasks_list`] - Task listing endpoint (`GET /api/v1/tasks`)
10//! - [`task_get`] - Single task endpoint (`GET /api/v1/tasks/:task_id`)
11//! - [`runs_list`] - Run listing endpoint (`GET /api/v1/runs`)
12//! - [`run_get`] - Single run endpoint (`GET /api/v1/runs/:run_id`)
13//! - [`control`] - Feature-gated control endpoints (cancel + pause/resume)
14
15use std::sync::atomic::{AtomicBool, Ordering};
16use std::sync::{Arc, Mutex, RwLock};
17
18use actionqueue_storage::mutation::authority::StorageMutationAuthority;
19use actionqueue_storage::recovery::bootstrap::RecoveryObservations;
20use actionqueue_storage::recovery::reducer::ReplayReducer;
21use actionqueue_storage::wal::fs_writer::WalFsWriter;
22use actionqueue_storage::wal::{InstrumentedWalWriter, WalAppendTelemetry};
23
24use crate::bootstrap::{ReadyStatus, RouterConfig};
25use crate::metrics::registry::MetricsRegistry;
26use crate::time::clock::SharedDaemonClock;
27
28/// Shared control mutation authority type used by control handlers.
29pub type ControlMutationAuthority =
30    Arc<Mutex<StorageMutationAuthority<InstrumentedWalWriter<WalFsWriter>, ReplayReducer>>>;
31
32/// Router state shared across all HTTP handlers.
33///
34/// This struct holds the state that is accessible to all read-only introspection
35/// endpoints (health, ready, stats). It is wrapped in `Arc` to enable cloning
36/// as required by axum's router state system.
37///
38/// # Invariant boundaries
39///
40/// This state is read-only. Handlers must not mutate any fields or introduce
41/// interior mutability beyond `Arc` cloning.
42pub struct RouterStateInner {
43    /// Router configuration for routing decisions.
44    ///
45    /// Used by [`build_router`] to determine which optional route sets
46    /// (control endpoints, metrics) are registered.
47    pub(crate) router_config: RouterConfig,
48
49    /// Shared projection state for stats and introspection.
50    ///
51    /// Wrapped in `Arc<RwLock<>>` so control handlers can sync the
52    /// authority's updated projection after mutations, while read handlers
53    /// acquire a read lock for consistent snapshots.
54    pub(crate) shared_projection: Arc<RwLock<ReplayReducer>>,
55
56    /// Optional control mutation authority lane for feature-gated control handlers.
57    pub(crate) control_authority: Option<ControlMutationAuthority>,
58
59    /// Shared daemon-local metrics registry handle.
60    pub(crate) metrics: Arc<MetricsRegistry>,
61
62    /// Authoritative WAL append telemetry for scrape-time WAL counter updates.
63    pub(crate) wal_append_telemetry: WalAppendTelemetry,
64
65    /// Authoritative daemon clock handle used by metrics derivation paths.
66    pub(crate) clock: SharedDaemonClock,
67
68    /// Authoritative recovery observations captured during bootstrap.
69    pub(crate) recovery_observations: RecoveryObservations,
70
71    /// Idempotence guard for recovery histogram observe-once semantics.
72    pub(crate) recovery_histogram_observed: AtomicBool,
73
74    /// Ready status indicating daemon readiness derived from bootstrap state.
75    pub(crate) ready_status: ReadyStatus,
76}
77
78impl std::fmt::Debug for RouterStateInner {
79    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80        f.debug_struct("RouterStateInner")
81            .field("router_config", &self.router_config)
82            .field("shared_projection", &"Arc<RwLock<ReplayReducer>>")
83            .field("control_authority", &self.control_authority.is_some())
84            .field("metrics_enabled", &self.metrics.is_enabled())
85            .field("wal_append_telemetry", &self.wal_append_telemetry.snapshot())
86            .field("clock_refcount", &Arc::strong_count(&self.clock))
87            .field("recovery_observations", &self.recovery_observations)
88            .field(
89                "recovery_histogram_observed",
90                &self.recovery_histogram_observed.load(Ordering::Relaxed),
91            )
92            .field("ready_status", &self.ready_status)
93            .finish()
94    }
95}
96
97/// Shared router state type (Arc-wrapped inner struct for Clone compatibility).
98pub type RouterState = Arc<RouterStateInner>;
99
100/// Observability dependencies grouped for router state construction.
101///
102/// Groups the telemetry, metrics, clock, and recovery observation handles
103/// that are threaded through the daemon HTTP layer.
104pub struct RouterObservability {
105    /// Shared daemon-local metrics registry handle.
106    pub metrics: Arc<MetricsRegistry>,
107    /// Authoritative WAL append telemetry for scrape-time WAL counter updates.
108    pub wal_append_telemetry: WalAppendTelemetry,
109    /// Authoritative daemon clock handle used by metrics derivation paths.
110    pub clock: SharedDaemonClock,
111    /// Authoritative recovery observations captured during bootstrap.
112    pub recovery_observations: RecoveryObservations,
113}
114
115impl RouterStateInner {
116    /// Creates a new router state from bootstrap components.
117    ///
118    /// The `ready_status` field is derived from
119    /// [`BootstrapState::ready_status()`](crate::bootstrap::BootstrapState::ready_status()).
120    pub fn new(
121        router_config: RouterConfig,
122        shared_projection: Arc<RwLock<ReplayReducer>>,
123        observability: RouterObservability,
124        ready_status: ReadyStatus,
125    ) -> Self {
126        Self {
127            router_config,
128            shared_projection,
129            control_authority: None,
130            metrics: observability.metrics,
131            wal_append_telemetry: observability.wal_append_telemetry,
132            clock: observability.clock,
133            recovery_observations: observability.recovery_observations,
134            recovery_histogram_observed: AtomicBool::new(false),
135            ready_status,
136        }
137    }
138
139    /// Creates router state with control mutation authority context.
140    pub fn with_control_authority(
141        router_config: RouterConfig,
142        shared_projection: Arc<RwLock<ReplayReducer>>,
143        observability: RouterObservability,
144        control_authority: ControlMutationAuthority,
145        ready_status: ReadyStatus,
146    ) -> Self {
147        Self {
148            router_config,
149            shared_projection,
150            control_authority: Some(control_authority),
151            metrics: observability.metrics,
152            wal_append_telemetry: observability.wal_append_telemetry,
153            clock: observability.clock,
154            recovery_observations: observability.recovery_observations,
155            recovery_histogram_observed: AtomicBool::new(false),
156            ready_status,
157        }
158    }
159}
160
161#[cfg(feature = "actor")]
162pub mod actors;
163pub mod control;
164pub mod health;
165pub mod metrics;
166pub mod pagination;
167#[cfg(feature = "platform")]
168pub mod platform;
169pub mod ready;
170pub mod run_get;
171pub mod runs_list;
172pub mod stats;
173pub mod task_get;
174pub mod tasks_list;
175
176/// Acquires a read lock on the shared projection, returning HTTP 500 on poison.
177pub(crate) fn read_projection(
178    state: &RouterStateInner,
179) -> Result<std::sync::RwLockReadGuard<'_, ReplayReducer>, Box<axum::response::Response>> {
180    state.shared_projection.read().map_err(|_| {
181        tracing::error!("shared projection RwLock poisoned — read handler degraded");
182        Box::new(projection_poison_response())
183    })
184}
185
186/// Acquires a write lock on the shared projection, returning HTTP 500 on poison.
187pub(crate) fn write_projection(
188    state: &RouterStateInner,
189) -> Result<std::sync::RwLockWriteGuard<'_, ReplayReducer>, Box<axum::response::Response>> {
190    state.shared_projection.write().map_err(|_| {
191        tracing::error!("shared projection RwLock poisoned — write handler degraded");
192        Box::new(projection_poison_response())
193    })
194}
195
196fn projection_poison_response() -> axum::response::Response {
197    use axum::http::StatusCode;
198    use axum::response::IntoResponse;
199    use axum::Json;
200
201    (
202        StatusCode::INTERNAL_SERVER_ERROR,
203        Json(serde_json::json!({
204            "error": "internal_error",
205            "message": "shared projection lock poisoned"
206        })),
207    )
208        .into_response()
209}
210
211/// Builds the HTTP router with all registered routes.
212///
213/// This function constructs an axum router and registers all read-only
214/// introspection routes (health, ready, stats) with the provided state.
215///
216/// # Arguments
217///
218/// * `state` - The shared router state containing configuration and projection
219///
220/// # Returns
221///
222/// An axum Router configured with all registered routes and the shared state.
223pub fn build_router(state: RouterState) -> axum::Router {
224    let control_enabled = state.router_config.control_enabled;
225    let metrics_enabled = state.router_config.metrics_enabled;
226    let router: axum::Router<RouterState> = axum::Router::new();
227    let router = health::register_routes(router);
228    let router = ready::register_routes(router);
229    let router = stats::register_routes(router);
230    let router = tasks_list::register_routes(router);
231    let router = runs_list::register_routes(router);
232    let router = run_get::register_routes(router);
233    let router = task_get::register_routes(router);
234    let router = metrics::register_routes(router, metrics_enabled);
235    let router = control::register_routes(router, control_enabled);
236    #[cfg(feature = "actor")]
237    let router = actors::register_routes(router);
238    #[cfg(feature = "platform")]
239    let router = platform::register_routes(router);
240    router.with_state(state)
241}