1use axum::extract::State;
52use axum::response::IntoResponse;
53use axum::Json;
54use serde::Serialize;
55
56#[derive(Debug, Clone, Serialize)]
62pub struct StatsResponse {
63 pub total_tasks: usize,
68
69 pub total_runs: usize,
74
75 pub attempts_total: u64,
81
82 pub runs_by_state: StatsRunsByState,
87
88 pub latest_sequence: u64,
93}
94
95#[derive(Debug, Clone, Serialize)]
100pub struct StatsRunsByState {
101 pub scheduled: usize,
103 pub ready: usize,
105 pub leased: usize,
107 pub running: usize,
109 pub retry_wait: usize,
111 pub completed: usize,
113 pub failed: usize,
115 pub canceled: usize,
117}
118
119impl StatsResponse {
120 pub fn from_projection(
129 projection: &actionqueue_storage::recovery::reducer::ReplayReducer,
130 ) -> Self {
131 let mut scheduled = 0;
133 let mut ready = 0;
134 let mut leased = 0;
135 let mut running = 0;
136 let mut retry_wait = 0;
137 let mut completed = 0;
138 let mut failed = 0;
139 let mut canceled = 0;
140 let mut attempts_total: u64 = 0;
141
142 for run_instance in projection.run_instances() {
143 match run_instance.state() {
144 actionqueue_core::run::state::RunState::Scheduled => scheduled += 1,
145 actionqueue_core::run::state::RunState::Ready => ready += 1,
146 actionqueue_core::run::state::RunState::Leased => leased += 1,
147 actionqueue_core::run::state::RunState::Running => running += 1,
148 actionqueue_core::run::state::RunState::RetryWait => retry_wait += 1,
149 actionqueue_core::run::state::RunState::Suspended => {}
150 actionqueue_core::run::state::RunState::Completed => completed += 1,
151 actionqueue_core::run::state::RunState::Failed => failed += 1,
152 actionqueue_core::run::state::RunState::Canceled => canceled += 1,
153 }
154 attempts_total = attempts_total.saturating_add(u64::from(run_instance.attempt_count()));
156 }
157
158 Self {
159 total_tasks: projection.task_count(),
160 total_runs: projection.run_count(),
161 attempts_total,
162 runs_by_state: StatsRunsByState {
163 scheduled,
164 ready,
165 leased,
166 running,
167 retry_wait,
168 completed,
169 failed,
170 canceled,
171 },
172 latest_sequence: projection.latest_sequence(),
173 }
174 }
175}
176
177#[tracing::instrument(skip_all)]
188pub async fn handle(state: State<super::RouterState>) -> impl IntoResponse {
189 let projection = match super::read_projection(&state) {
190 Ok(guard) => guard,
191 Err(response) => return *response,
192 };
193 Json(StatsResponse::from_projection(&projection)).into_response()
194}
195
196pub fn register_routes(
206 router: axum::Router<super::RouterState>,
207) -> axum::Router<super::RouterState> {
208 router.route("/api/v1/stats", axum::routing::get(handle))
209}
210
211#[cfg(test)]
212mod tests {
213 use super::*;
214
215 #[test]
216 fn test_stats_response_serialization() {
217 let response = StatsResponse {
218 total_tasks: 5,
219 total_runs: 12,
220 attempts_total: 20,
221 runs_by_state: StatsRunsByState {
222 scheduled: 2,
223 ready: 3,
224 leased: 1,
225 running: 4,
226 retry_wait: 0,
227 completed: 1,
228 failed: 0,
229 canceled: 1,
230 },
231 latest_sequence: 42,
232 };
233
234 let json = serde_json::to_string(&response).expect("serialization should succeed");
235 assert!(json.contains("\"total_tasks\":5"));
236 assert!(json.contains("\"total_runs\":12"));
237 assert!(json.contains("\"attempts_total\":20"));
238 assert!(json.contains("\"scheduled\":2"));
239 assert!(json.contains("\"ready\":3"));
240 assert!(json.contains("\"leased\":1"));
241 assert!(json.contains("\"running\":4"));
242 assert!(json.contains("\"retry_wait\":0"));
243 assert!(json.contains("\"completed\":1"));
244 assert!(json.contains("\"failed\":0"));
245 assert!(json.contains("\"canceled\":1"));
246 assert!(json.contains("\"latest_sequence\":42"));
247 }
248
249 #[test]
250 fn test_stats_response_empty_state() {
251 let response = StatsResponse {
252 total_tasks: 0,
253 total_runs: 0,
254 attempts_total: 0,
255 runs_by_state: StatsRunsByState {
256 scheduled: 0,
257 ready: 0,
258 leased: 0,
259 running: 0,
260 retry_wait: 0,
261 completed: 0,
262 failed: 0,
263 canceled: 0,
264 },
265 latest_sequence: 0,
266 };
267
268 assert_eq!(response.total_tasks, 0);
269 assert_eq!(response.total_runs, 0);
270 assert_eq!(response.attempts_total, 0);
271 assert_eq!(response.runs_by_state.scheduled, 0);
272 }
273
274 #[test]
275 fn test_stats_runs_by_state_serialization() {
276 let state = StatsRunsByState {
277 scheduled: 1,
278 ready: 2,
279 leased: 3,
280 running: 4,
281 retry_wait: 5,
282 completed: 6,
283 failed: 7,
284 canceled: 8,
285 };
286
287 let json = serde_json::to_string(&state).expect("serialization should succeed");
288 assert!(json.contains("\"scheduled\":1"));
289 assert!(json.contains("\"ready\":2"));
290 assert!(json.contains("\"leased\":3"));
291 assert!(json.contains("\"running\":4"));
292 assert!(json.contains("\"retry_wait\":5"));
293 assert!(json.contains("\"completed\":6"));
294 assert!(json.contains("\"failed\":7"));
295 assert!(json.contains("\"canceled\":8"));
296 }
297
298 #[test]
299 fn test_stats_response_from_projection() {
300 let task_spec = actionqueue_core::task::task_spec::TaskSpec::new(
301 actionqueue_core::ids::TaskId::new(),
302 actionqueue_core::task::task_spec::TaskPayload::with_content_type(
303 vec![1, 2, 3],
304 "application/octet-stream",
305 ),
306 actionqueue_core::task::run_policy::RunPolicy::Once,
307 actionqueue_core::task::constraints::TaskConstraints::default(),
308 actionqueue_core::task::metadata::TaskMetadata::default(),
309 )
310 .expect("task spec should be valid");
311
312 let run_instance = actionqueue_core::run::run_instance::RunInstance::new_scheduled_with_id(
313 actionqueue_core::ids::RunId::new(),
314 task_spec.id(),
315 10,
316 10,
317 )
318 .expect("run instance should be valid");
319
320 let mut reducer = actionqueue_storage::recovery::reducer::ReplayReducer::new();
321
322 let task_event = actionqueue_storage::wal::event::WalEvent::new(
323 1,
324 actionqueue_storage::wal::event::WalEventType::TaskCreated { task_spec, timestamp: 0 },
325 );
326 reducer.apply(&task_event).expect("task event should apply");
327
328 let run_event = actionqueue_storage::wal::event::WalEvent::new(
329 2,
330 actionqueue_storage::wal::event::WalEventType::RunCreated { run_instance },
331 );
332 reducer.apply(&run_event).expect("run event should apply");
333
334 let response = StatsResponse::from_projection(&reducer);
335 assert_eq!(response.total_tasks, 1);
336 assert_eq!(response.total_runs, 1);
337 }
338}