Skip to main content

actionqueue_daemon/http/
stats.rs

1//! Stats route module.
2//!
3//! This module provides the aggregate statistics endpoint (`GET /api/v1/stats`)
4//! for the daemon. The stats endpoint returns read-only operational summaries
5//! derived from authoritative state without speculation, consistent with
6//! `actionqueue-charter.md`, `actionqueue-scope-appendix-v0.1.md`, and invariant
7//! boundaries in `invariant-boundaries-v0.1.md`.
8//!
9//! # Overview
10//!
11//! Stats are derived from:
12//! - Total task count from the task projection
13//! - Runs by state (Scheduled, Ready, Leased, Running, RetryWait, Completed, Failed, Canceled)
14//! - Total run count
15//! - Total attempt count derived from run instances
16//! - Latest sequence number from the WAL projection
17//!
18//! # Scheduling lag
19//!
20//! Scheduling lag is intentionally omitted in S1 because there is no authoritative
21//! clock injection in the daemon. Do not derive scheduling lag from system time
22//! or any non-authoritative source. If scheduling lag becomes necessary in a future
23//! release, an authoritative time source must be added to the daemon bootstrap state.
24//!
25//! # Invariant boundaries
26//!
27//! The stats handler performs no IO, writes no WAL entries, and mutates no
28//! runtime state. It reflects the authoritative projection state from bootstrap.
29//!
30//! # Response schema
31//!
32//! ```json
33//! {
34//!   "total_tasks": 5,
35//!   "total_runs": 12,
36//!   "attempts_total": 20,
37//!   "runs_by_state": {
38//!     "scheduled": 2,
39//!     "ready": 3,
40//!     "leased": 1,
41//!     "running": 4,
42//!     "retry_wait": 0,
43//!     "completed": 1,
44//!     "failed": 0,
45//!     "canceled": 1
46//!   },
47//!   "latest_sequence": 42
48//! }
49//! ```
50
51use axum::extract::State;
52use axum::response::IntoResponse;
53use axum::Json;
54use serde::Serialize;
55
56/// Stats response payload.
57///
58/// This struct represents the stable schema for the stats endpoint response.
59/// Fields should not be modified without careful consideration of external
60/// dependencies that may rely on this contract.
61#[derive(Debug, Clone, Serialize)]
62pub struct StatsResponse {
63    /// Total number of tasks tracked.
64    ///
65    /// This is derived from the authoritative task projection in the storage
66    /// reducer.
67    pub total_tasks: usize,
68
69    /// Total number of runs tracked.
70    ///
71    /// This is derived from the authoritative run instance projection in the
72    /// storage reducer.
73    pub total_runs: usize,
74
75    /// Total number of attempts across all runs.
76    ///
77    /// This is computed as the sum of `RunInstance::attempt_count()` from all
78    /// run instances in the authoritative projection. This provides a count
79    /// of all attempts that have been started for all runs.
80    pub attempts_total: u64,
81
82    /// Counts of runs by state.
83    ///
84    /// Each state is counted from the authoritative run instance state
85    /// projection. The map is sorted by state name for deterministic output.
86    pub runs_by_state: StatsRunsByState,
87
88    /// Latest sequence number processed from the WAL.
89    ///
90    /// This reflects the most recent event sequence number applied to the
91    /// projection state.
92    pub latest_sequence: u64,
93}
94
95/// Counts of runs by state.
96///
97/// This struct provides a stable schema for run state counts, with each field
98/// corresponding to a canonical state in the run lifecycle.
99#[derive(Debug, Clone, Serialize)]
100pub struct StatsRunsByState {
101    /// Number of runs in Scheduled state.
102    pub scheduled: usize,
103    /// Number of runs in Ready state.
104    pub ready: usize,
105    /// Number of runs in Leased state.
106    pub leased: usize,
107    /// Number of runs in Running state.
108    pub running: usize,
109    /// Number of runs in RetryWait state.
110    pub retry_wait: usize,
111    /// Number of runs in Completed state.
112    pub completed: usize,
113    /// Number of runs in Failed state.
114    pub failed: usize,
115    /// Number of runs in Canceled state.
116    pub canceled: usize,
117}
118
119impl StatsResponse {
120    /// Creates a new stats response from the projection state.
121    ///
122    /// This function derives stats from the authoritative ReplayReducer state,
123    /// counting runs by their current state. Empty state results in zero counts.
124    ///
125    /// # Arguments
126    ///
127    /// * `projection` - The authoritative projection state from bootstrap
128    pub fn from_projection(
129        projection: &actionqueue_storage::recovery::reducer::ReplayReducer,
130    ) -> Self {
131        // Count runs by state from the run instances projection
132        let mut scheduled = 0;
133        let mut ready = 0;
134        let mut leased = 0;
135        let mut running = 0;
136        let mut retry_wait = 0;
137        let mut completed = 0;
138        let mut failed = 0;
139        let mut canceled = 0;
140        let mut attempts_total: u64 = 0;
141
142        for run_instance in projection.run_instances() {
143            match run_instance.state() {
144                actionqueue_core::run::state::RunState::Scheduled => scheduled += 1,
145                actionqueue_core::run::state::RunState::Ready => ready += 1,
146                actionqueue_core::run::state::RunState::Leased => leased += 1,
147                actionqueue_core::run::state::RunState::Running => running += 1,
148                actionqueue_core::run::state::RunState::RetryWait => retry_wait += 1,
149                actionqueue_core::run::state::RunState::Suspended => {}
150                actionqueue_core::run::state::RunState::Completed => completed += 1,
151                actionqueue_core::run::state::RunState::Failed => failed += 1,
152                actionqueue_core::run::state::RunState::Canceled => canceled += 1,
153            }
154            // Sum attempt counts from all run instances
155            attempts_total = attempts_total.saturating_add(u64::from(run_instance.attempt_count()));
156        }
157
158        Self {
159            total_tasks: projection.task_count(),
160            total_runs: projection.run_count(),
161            attempts_total,
162            runs_by_state: StatsRunsByState {
163                scheduled,
164                ready,
165                leased,
166                running,
167                retry_wait,
168                completed,
169                failed,
170                canceled,
171            },
172            latest_sequence: projection.latest_sequence(),
173        }
174    }
175}
176
177/// Stats handler.
178///
179/// This handler responds to `GET /api/v1/stats` requests with a deterministic,
180/// side-effect-free payload containing aggregate statistics derived from
181/// authoritative state.
182///
183/// # Invariant boundaries
184///
185/// This handler performs no IO, writes no WAL entries, and mutates no runtime
186/// state. It reflects the projection state from the BootstrapState.
187#[tracing::instrument(skip_all)]
188pub async fn handle(state: State<super::RouterState>) -> impl IntoResponse {
189    let projection = match super::read_projection(&state) {
190        Ok(guard) => guard,
191        Err(response) => return *response,
192    };
193    Json(StatsResponse::from_projection(&projection)).into_response()
194}
195
196/// Registers the stats route in the router builder.
197///
198/// This function adds the `/api/v1/stats` endpoint to the router configuration.
199/// The route is always available when HTTP is enabled and does not depend
200/// on any feature flags.
201///
202/// # Arguments
203///
204/// * `router` - The axum router to register routes with
205pub fn register_routes(
206    router: axum::Router<super::RouterState>,
207) -> axum::Router<super::RouterState> {
208    router.route("/api/v1/stats", axum::routing::get(handle))
209}
210
211#[cfg(test)]
212mod tests {
213    use super::*;
214
215    #[test]
216    fn test_stats_response_serialization() {
217        let response = StatsResponse {
218            total_tasks: 5,
219            total_runs: 12,
220            attempts_total: 20,
221            runs_by_state: StatsRunsByState {
222                scheduled: 2,
223                ready: 3,
224                leased: 1,
225                running: 4,
226                retry_wait: 0,
227                completed: 1,
228                failed: 0,
229                canceled: 1,
230            },
231            latest_sequence: 42,
232        };
233
234        let json = serde_json::to_string(&response).expect("serialization should succeed");
235        assert!(json.contains("\"total_tasks\":5"));
236        assert!(json.contains("\"total_runs\":12"));
237        assert!(json.contains("\"attempts_total\":20"));
238        assert!(json.contains("\"scheduled\":2"));
239        assert!(json.contains("\"ready\":3"));
240        assert!(json.contains("\"leased\":1"));
241        assert!(json.contains("\"running\":4"));
242        assert!(json.contains("\"retry_wait\":0"));
243        assert!(json.contains("\"completed\":1"));
244        assert!(json.contains("\"failed\":0"));
245        assert!(json.contains("\"canceled\":1"));
246        assert!(json.contains("\"latest_sequence\":42"));
247    }
248
249    #[test]
250    fn test_stats_response_empty_state() {
251        let response = StatsResponse {
252            total_tasks: 0,
253            total_runs: 0,
254            attempts_total: 0,
255            runs_by_state: StatsRunsByState {
256                scheduled: 0,
257                ready: 0,
258                leased: 0,
259                running: 0,
260                retry_wait: 0,
261                completed: 0,
262                failed: 0,
263                canceled: 0,
264            },
265            latest_sequence: 0,
266        };
267
268        assert_eq!(response.total_tasks, 0);
269        assert_eq!(response.total_runs, 0);
270        assert_eq!(response.attempts_total, 0);
271        assert_eq!(response.runs_by_state.scheduled, 0);
272    }
273
274    #[test]
275    fn test_stats_runs_by_state_serialization() {
276        let state = StatsRunsByState {
277            scheduled: 1,
278            ready: 2,
279            leased: 3,
280            running: 4,
281            retry_wait: 5,
282            completed: 6,
283            failed: 7,
284            canceled: 8,
285        };
286
287        let json = serde_json::to_string(&state).expect("serialization should succeed");
288        assert!(json.contains("\"scheduled\":1"));
289        assert!(json.contains("\"ready\":2"));
290        assert!(json.contains("\"leased\":3"));
291        assert!(json.contains("\"running\":4"));
292        assert!(json.contains("\"retry_wait\":5"));
293        assert!(json.contains("\"completed\":6"));
294        assert!(json.contains("\"failed\":7"));
295        assert!(json.contains("\"canceled\":8"));
296    }
297
298    #[test]
299    fn test_stats_response_from_projection() {
300        let task_spec = actionqueue_core::task::task_spec::TaskSpec::new(
301            actionqueue_core::ids::TaskId::new(),
302            actionqueue_core::task::task_spec::TaskPayload::with_content_type(
303                vec![1, 2, 3],
304                "application/octet-stream",
305            ),
306            actionqueue_core::task::run_policy::RunPolicy::Once,
307            actionqueue_core::task::constraints::TaskConstraints::default(),
308            actionqueue_core::task::metadata::TaskMetadata::default(),
309        )
310        .expect("task spec should be valid");
311
312        let run_instance = actionqueue_core::run::run_instance::RunInstance::new_scheduled_with_id(
313            actionqueue_core::ids::RunId::new(),
314            task_spec.id(),
315            10,
316            10,
317        )
318        .expect("run instance should be valid");
319
320        let mut reducer = actionqueue_storage::recovery::reducer::ReplayReducer::new();
321
322        let task_event = actionqueue_storage::wal::event::WalEvent::new(
323            1,
324            actionqueue_storage::wal::event::WalEventType::TaskCreated { task_spec, timestamp: 0 },
325        );
326        reducer.apply(&task_event).expect("task event should apply");
327
328        let run_event = actionqueue_storage::wal::event::WalEvent::new(
329            2,
330            actionqueue_storage::wal::event::WalEventType::RunCreated { run_instance },
331        );
332        reducer.apply(&run_event).expect("run event should apply");
333
334        let response = StatsResponse::from_projection(&reducer);
335        assert_eq!(response.total_tasks, 1);
336        assert_eq!(response.total_runs, 1);
337    }
338}