Skip to main content

actionqueue_daemon/http/
runs_list.rs

1//! Runs list route module.
2//!
3//! This module provides the run listing endpoint (`GET /api/v1/runs`) for the daemon.
4//! The list response is deterministic, read-only, and derived solely from the
5//! authoritative projection state. Ordering and pagination follow the contract in
6//! `plans/p6-008-runs-list-implementation-plan.md`.
7
8use std::cmp::Ordering;
9
10use axum::extract::{RawQuery, State};
11use axum::response::IntoResponse;
12use axum::Json;
13use serde::Serialize;
14
15use super::pagination::{pagination_error, parse_pagination, Pagination};
16
17/// Runs list response payload.
18///
19/// This struct represents the stable schema for the runs list endpoint response.
20#[derive(Debug, Clone, Serialize)]
21pub struct RunListResponse {
22    /// Run summaries returned for the current page.
23    pub runs: Vec<RunSummary>,
24    /// The limit applied to the result set.
25    pub limit: usize,
26    /// The offset applied to the result set.
27    pub offset: usize,
28}
29
30/// A stable run summary returned by the runs list endpoint.
31#[derive(Debug, Clone, Serialize)]
32pub struct RunSummary {
33    /// Run identifier as a stable string.
34    pub run_id: String,
35    /// Task identifier as a stable string.
36    pub task_id: String,
37    /// Run state (serialized via serde).
38    pub state: actionqueue_core::run::state::RunState,
39    /// Run creation timestamp derived from the WAL event.
40    pub created_at: u64,
41    /// Run scheduled_at timestamp.
42    pub scheduled_at: u64,
43    /// Number of attempts made for this run.
44    pub attempt_count: u32,
45    /// Concurrency key from task constraints, or null if task is missing.
46    pub concurrency_key: Option<String>,
47}
48
49impl RunSummary {
50    /// Builds a run summary from a run instance and an optional task spec.
51    fn from_run_instance(
52        run_instance: &actionqueue_core::run::run_instance::RunInstance,
53        task_constraints: Option<&actionqueue_core::task::constraints::TaskConstraints>,
54    ) -> Self {
55        let concurrency_key = task_constraints
56            .and_then(|constraints| constraints.concurrency_key().map(str::to_string));
57        Self {
58            run_id: run_instance.id().to_string(),
59            task_id: run_instance.task_id().to_string(),
60            state: run_instance.state(),
61            created_at: run_instance.created_at(),
62            scheduled_at: run_instance.scheduled_at(),
63            attempt_count: run_instance.attempt_count(),
64            concurrency_key,
65        }
66    }
67}
68
69/// Runs list handler.
70///
71/// This handler responds to `GET /api/v1/runs` with a deterministic, side-effect-free
72/// payload containing run summaries derived from authoritative projection state.
73#[tracing::instrument(skip_all)]
74pub async fn handle(state: State<super::RouterState>, raw_query: RawQuery) -> impl IntoResponse {
75    let pagination = match parse_pagination(raw_query.0.as_deref()) {
76        Ok(pagination) => pagination,
77        Err(error) => return pagination_error(error).into_response(),
78    };
79
80    let projection = match super::read_projection(&state) {
81        Ok(guard) => guard,
82        Err(response) => return *response,
83    };
84    let response = build_runs_list_response(&projection, pagination);
85    Json(response).into_response()
86}
87
88/// Registers the runs list route in the router builder.
89pub fn register_routes(
90    router: axum::Router<super::RouterState>,
91) -> axum::Router<super::RouterState> {
92    router.route("/api/v1/runs", axum::routing::get(handle))
93}
94
95fn build_runs_list_response(
96    projection: &actionqueue_storage::recovery::reducer::ReplayReducer,
97    pagination: Pagination,
98) -> RunListResponse {
99    let mut summaries: Vec<RunSummary> = projection
100        .run_instances()
101        .map(|run_instance| {
102            let task_id = run_instance.task_id();
103            let task_spec = projection.get_task(&task_id);
104            let task_constraints = task_spec.map(|spec| spec.constraints());
105            RunSummary::from_run_instance(run_instance, task_constraints)
106        })
107        .collect();
108    summaries.sort_by(compare_run_summaries);
109
110    let start = pagination.offset.min(summaries.len());
111    let end = start.saturating_add(pagination.limit).min(summaries.len());
112    let paged_runs = summaries[start..end].to_vec();
113
114    RunListResponse { runs: paged_runs, limit: pagination.limit, offset: pagination.offset }
115}
116
117fn compare_run_summaries(left: &RunSummary, right: &RunSummary) -> Ordering {
118    let scheduled_at_cmp = left.scheduled_at.cmp(&right.scheduled_at);
119    if scheduled_at_cmp != Ordering::Equal {
120        return scheduled_at_cmp;
121    }
122
123    left.run_id.cmp(&right.run_id)
124}
125
126#[cfg(test)]
127mod tests {
128    use std::str::FromStr;
129
130    use actionqueue_core::ids::{RunId, TaskId};
131    use actionqueue_core::run::run_instance::RunInstance;
132    use actionqueue_core::task::constraints::TaskConstraints;
133    use actionqueue_core::task::metadata::TaskMetadata;
134    use actionqueue_core::task::run_policy::RunPolicy;
135    use actionqueue_core::task::task_spec::{TaskPayload, TaskSpec};
136    use actionqueue_storage::recovery::reducer::ReplayReducer;
137    use actionqueue_storage::wal::event::{WalEvent, WalEventType};
138    use axum::http::StatusCode;
139    use axum::response::IntoResponse;
140
141    use super::*;
142    use crate::http::pagination::{
143        parse_limit, parse_offset, PaginationError, PaginationErrorDetails, PaginationErrorResponse,
144    };
145
146    fn task_spec_with_concurrency_key(task_id: TaskId, concurrency_key: Option<&str>) -> TaskSpec {
147        let constraints = TaskConstraints::new(1, None, concurrency_key.map(|s| s.to_string()))
148            .expect("constraints should be valid");
149        let metadata = TaskMetadata::default();
150
151        TaskSpec::new(
152            task_id,
153            TaskPayload::with_content_type(b"payload".to_vec(), "application/octet-stream"),
154            RunPolicy::Once,
155            constraints,
156            metadata,
157        )
158        .expect("task spec should be valid")
159    }
160
161    fn apply_task(
162        reducer: &mut ReplayReducer,
163        sequence: u64,
164        task_spec: TaskSpec,
165        created_at: u64,
166    ) {
167        let event =
168            WalEvent::new(sequence, WalEventType::TaskCreated { task_spec, timestamp: created_at });
169        reducer.apply(&event).expect("task created event should apply");
170    }
171
172    fn apply_run(reducer: &mut ReplayReducer, sequence: u64, run_instance: RunInstance) {
173        let event = WalEvent::new(sequence, WalEventType::RunCreated { run_instance });
174        reducer.apply(&event).expect("run created event should apply");
175    }
176
177    fn run_instance_scheduled(
178        run_id: RunId,
179        task_id: TaskId,
180        scheduled_at: u64,
181        created_at: u64,
182    ) -> RunInstance {
183        RunInstance::new_scheduled_with_id(run_id, task_id, scheduled_at, created_at)
184            .expect("run instance should be valid")
185    }
186
187    #[test]
188    fn test_empty_runs_list_response() {
189        let reducer = ReplayReducer::new();
190        let pagination = Pagination { limit: 100, offset: 0 };
191        let response = build_runs_list_response(&reducer, pagination);
192
193        assert!(response.runs.is_empty());
194        assert_eq!(response.limit, 100);
195        assert_eq!(response.offset, 0);
196    }
197
198    #[test]
199    fn test_ordering_by_scheduled_at_then_run_id() {
200        let mut reducer = ReplayReducer::new();
201
202        let task_id_1 = TaskId::from_str("11111111-1111-1111-1111-111111111111").unwrap();
203        let task_id_2 = TaskId::from_str("22222222-2222-2222-2222-222222222222").unwrap();
204        let task_id_3 = TaskId::from_str("33333333-3333-3333-3333-333333333333").unwrap();
205
206        let task_spec_1 = task_spec_with_concurrency_key(task_id_1, Some("key-1"));
207        let task_spec_2 = task_spec_with_concurrency_key(task_id_2, Some("key-2"));
208        let task_spec_3 = task_spec_with_concurrency_key(task_id_3, Some("key-3"));
209
210        apply_task(&mut reducer, 1, task_spec_1.clone(), 1000);
211        apply_task(&mut reducer, 2, task_spec_2.clone(), 1001);
212        apply_task(&mut reducer, 3, task_spec_3.clone(), 1002);
213
214        let run_id_1 = RunId::from_str("11111111-1111-1111-1111-111111111110").unwrap();
215        let run_id_2 = RunId::from_str("22222222-2222-2222-2222-222222222220").unwrap();
216        let run_id_3 = RunId::from_str("33333333-3333-3333-3333-333333333330").unwrap();
217
218        // Create scheduled runs with different scheduled_at values (for ordering test)
219        let run_1 = run_instance_scheduled(run_id_1, task_id_1, 1000, 1500);
220        let run_2 = run_instance_scheduled(run_id_2, task_id_2, 1000, 1501);
221        let run_3 = run_instance_scheduled(run_id_3, task_id_3, 2000, 1502);
222
223        apply_run(&mut reducer, 4, run_1);
224        apply_run(&mut reducer, 5, run_2);
225        apply_run(&mut reducer, 6, run_3);
226
227        let pagination = Pagination { limit: 100, offset: 0 };
228        let response = build_runs_list_response(&reducer, pagination);
229
230        assert_eq!(response.runs.len(), 3);
231        // First two runs have same scheduled_at (1000), so sort by run_id
232        assert_eq!(response.runs[0].run_id, run_id_1.to_string());
233        assert_eq!(response.runs[1].run_id, run_id_2.to_string());
234        // Third run has later scheduled_at (2000)
235        assert_eq!(response.runs[2].run_id, run_id_3.to_string());
236    }
237
238    #[test]
239    fn test_pagination_slicing() {
240        let mut reducer = ReplayReducer::new();
241
242        let task_id = TaskId::from_str("11111111-1111-1111-1111-111111111111").unwrap();
243        let task_spec = task_spec_with_concurrency_key(task_id, Some("key"));
244
245        apply_task(&mut reducer, 1, task_spec, 1000);
246
247        let run_ids: Vec<RunId> = (1..=5)
248            .map(|i| {
249                let uuid_str = format!("00000000-0000-0000-0000-0000000000{}{}", i / 10, i % 10);
250                RunId::from_str(&uuid_str).unwrap()
251            })
252            .collect();
253
254        for (i, run_id) in run_ids.iter().enumerate() {
255            // Use scheduled_at=1000+i, created_at=2000+i
256            let run = run_instance_scheduled(*run_id, task_id, 1000 + i as u64, 2000 + i as u64);
257            apply_run(&mut reducer, 2 + i as u64, run);
258        }
259
260        // Test limit=2, offset=1
261        let pagination = Pagination { limit: 2, offset: 1 };
262        let response = build_runs_list_response(&reducer, pagination);
263
264        assert_eq!(response.runs.len(), 2);
265        assert_eq!(response.limit, 2);
266        assert_eq!(response.offset, 1);
267        assert_eq!(response.runs[0].run_id, run_ids[1].to_string());
268        assert_eq!(response.runs[1].run_id, run_ids[2].to_string());
269
270        // Test limit=3, offset=3
271        let pagination = Pagination { limit: 3, offset: 3 };
272        let response = build_runs_list_response(&reducer, pagination);
273
274        assert_eq!(response.runs.len(), 2);
275        assert_eq!(response.limit, 3);
276        assert_eq!(response.offset, 3);
277        assert_eq!(response.runs[0].run_id, run_ids[3].to_string());
278        assert_eq!(response.runs[1].run_id, run_ids[4].to_string());
279    }
280
281    #[test]
282    fn test_invalid_pagination_limit_error_schema() {
283        // Test limit=0
284        let result = parse_limit("0");
285        assert!(matches!(result, Err(ref e) if e.field == "limit"));
286
287        // Test limit=1001 (exceeds max)
288        let result = parse_limit("1001");
289        assert!(matches!(result, Err(ref e) if e.field == "limit"));
290
291        // Verify error response structure
292        let error = PaginationError::new("limit", "limit must be between 1 and 1000");
293        let error_message = error.message.clone();
294        let error_field = error.field;
295        let response = pagination_error(error).into_response();
296        let status = response.status();
297
298        let response_json = serde_json::to_string(&PaginationErrorResponse {
299            error: "invalid_pagination",
300            message: error_message,
301            details: PaginationErrorDetails { field: error_field },
302        })
303        .expect("should serialize response");
304
305        assert_eq!(status, StatusCode::UNPROCESSABLE_ENTITY);
306        assert_eq!(
307            response_json,
308            r#"{"error":"invalid_pagination","message":"limit must be between 1 and 1000","details":{"field":"limit"}}"#
309        );
310    }
311
312    #[test]
313    fn test_invalid_pagination_offset_error_schema() {
314        // Test offset=-1 (should fail since usize can't be negative)
315        // In the implementation, parse_non_negative uses parse::<usize>()
316        // which returns an error for negative numbers
317        let result = parse_offset("-1");
318        assert!(matches!(result, Err(ref e) if e.field == "offset"));
319
320        // Verify error response structure
321        let error = PaginationError::new("offset", "offset must be a non-negative integer");
322        let error_message = error.message.clone();
323        let error_field = error.field;
324        let response = pagination_error(error).into_response();
325        let status = response.status();
326
327        let response_json = serde_json::to_string(&PaginationErrorResponse {
328            error: "invalid_pagination",
329            message: error_message,
330            details: PaginationErrorDetails { field: error_field },
331        })
332        .expect("should serialize response");
333
334        assert_eq!(status, StatusCode::UNPROCESSABLE_ENTITY);
335        assert_eq!(
336            response_json,
337            r#"{"error":"invalid_pagination","message":"offset must be a non-negative integer","details":{"field":"offset"}}"#
338        );
339    }
340
341    #[test]
342    fn test_concurrency_key_resolution_and_missing_task() {
343        let mut reducer = ReplayReducer::new();
344
345        // Task with concurrency key
346        let task_id_with_key = TaskId::from_str("11111111-1111-1111-1111-111111111111").unwrap();
347        let task_spec_with_key = task_spec_with_concurrency_key(task_id_with_key, Some("my-key"));
348
349        // Task without concurrency key (None)
350        let task_id_no_key = TaskId::from_str("22222222-2222-2222-2222-222222222222").unwrap();
351        let task_spec_no_key = task_spec_with_concurrency_key(task_id_no_key, None);
352
353        apply_task(&mut reducer, 1, task_spec_with_key, 1000);
354        apply_task(&mut reducer, 2, task_spec_no_key, 1001);
355
356        // Run with task that has concurrency key
357        let run_id_with_key = RunId::from_str("11111111-1111-1111-1111-111111111110").unwrap();
358        let run_with_key = run_instance_scheduled(run_id_with_key, task_id_with_key, 500, 1500);
359
360        // Run with task that has no concurrency key
361        let run_id_no_key = RunId::from_str("22222222-2222-2222-2222-222222222220").unwrap();
362        let run_no_key = run_instance_scheduled(run_id_no_key, task_id_no_key, 600, 1501);
363
364        // Run with missing task (task was never created)
365        let missing_task_id = TaskId::from_str("33333333-3333-3333-3333-333333333333").unwrap();
366        let run_id_missing_task = RunId::from_str("33333333-3333-3333-3333-333333333330").unwrap();
367        let run_missing_task =
368            run_instance_scheduled(run_id_missing_task, missing_task_id, 700, 1502);
369
370        apply_run(&mut reducer, 3, run_with_key);
371        apply_run(&mut reducer, 4, run_no_key);
372        apply_run(&mut reducer, 5, run_missing_task);
373
374        let pagination = Pagination { limit: 100, offset: 0 };
375        let response = build_runs_list_response(&reducer, pagination);
376
377        assert_eq!(response.runs.len(), 3);
378
379        // Runs sorted by scheduled_at ascending
380        // First run: has concurrency key (scheduled_at=500)
381        assert_eq!(response.runs[0].run_id, run_id_with_key.to_string());
382        assert_eq!(response.runs[0].concurrency_key, Some("my-key".to_string()));
383
384        // Second run: task exists but concurrency_key is None (scheduled_at=600)
385        assert_eq!(response.runs[1].run_id, run_id_no_key.to_string());
386        assert_eq!(response.runs[1].concurrency_key, None);
387
388        // Third run: task missing from reducer, should have null concurrency_key (scheduled_at=700)
389        assert_eq!(response.runs[2].run_id, run_id_missing_task.to_string());
390        assert_eq!(response.runs[2].concurrency_key, None);
391    }
392}