Skip to main content

actionqueue_daemon/http/
tasks_list.rs

1//! Tasks list route module.
2//!
3//! This module provides the task listing endpoint (`GET /api/v1/tasks`) for the daemon.
4//! The list response is deterministic, read-only, and derived solely from the
5//! authoritative projection state. Ordering and pagination follow the contract in
6//! `plans/p6-006-tasks-list-implementation-plan.md`.
7
8use std::cmp::Ordering;
9
10use axum::extract::{RawQuery, State};
11use axum::response::IntoResponse;
12use axum::Json;
13use serde::Serialize;
14
15use super::pagination::{pagination_error, parse_pagination, Pagination};
16
17/// Tasks list response payload.
18///
19/// This struct represents the stable schema for the tasks list endpoint response.
20#[derive(Debug, Clone, Serialize)]
21pub struct TaskListResponse {
22    /// Task summaries returned for the current page.
23    pub tasks: Vec<TaskSummary>,
24    /// The limit applied to the result set.
25    pub limit: usize,
26    /// The offset applied to the result set.
27    pub offset: usize,
28}
29
30/// A stable task summary returned by the tasks list endpoint.
31#[derive(Debug, Clone, Serialize)]
32pub struct TaskSummary {
33    /// Task identifier as a stable string.
34    pub id: String,
35    /// Task run policy.
36    pub run_policy: actionqueue_core::task::run_policy::RunPolicy,
37    /// Task constraints.
38    pub constraints: actionqueue_core::task::constraints::TaskConstraints,
39    /// Task metadata including priority.
40    pub metadata: actionqueue_core::task::metadata::TaskMetadata,
41    /// Task creation timestamp derived from the WAL event.
42    pub created_at: u64,
43    /// Task update timestamp (null until update events exist).
44    pub updated_at: Option<u64>,
45    /// Parent task identifier, if this task is a child in a hierarchy.
46    #[serde(skip_serializing_if = "Option::is_none")]
47    pub parent_task_id: Option<String>,
48}
49
50impl TaskSummary {
51    /// Builds a task summary from a projection record.
52    fn from_record(record: &actionqueue_storage::recovery::reducer::TaskRecord) -> Self {
53        let task_spec = record.task_spec();
54        Self {
55            id: task_spec.id().to_string(),
56            run_policy: task_spec.run_policy().clone(),
57            constraints: task_spec.constraints().clone(),
58            metadata: task_spec.metadata().clone(),
59            created_at: record.created_at(),
60            updated_at: record.updated_at(),
61            parent_task_id: task_spec.parent_task_id().map(|id| id.to_string()),
62        }
63    }
64}
65
66/// Tasks list handler.
67///
68/// This handler responds to `GET /api/v1/tasks` with a deterministic, side-effect-free
69/// payload containing task summaries derived from authoritative projection state.
70#[tracing::instrument(skip_all)]
71pub async fn handle(state: State<super::RouterState>, raw_query: RawQuery) -> impl IntoResponse {
72    let pagination = match parse_pagination(raw_query.0.as_deref()) {
73        Ok(pagination) => pagination,
74        Err(error) => return pagination_error(error).into_response(),
75    };
76
77    let projection = match super::read_projection(&state) {
78        Ok(guard) => guard,
79        Err(response) => return *response,
80    };
81    let response = build_task_list_response(&projection, pagination);
82    Json(response).into_response()
83}
84
85/// Registers the tasks list route in the router builder.
86pub fn register_routes(
87    router: axum::Router<super::RouterState>,
88) -> axum::Router<super::RouterState> {
89    router.route("/api/v1/tasks", axum::routing::get(handle))
90}
91
92fn build_task_list_response(
93    projection: &actionqueue_storage::recovery::reducer::ReplayReducer,
94    pagination: Pagination,
95) -> TaskListResponse {
96    let mut tasks: Vec<TaskSummary> =
97        projection.task_records().map(TaskSummary::from_record).collect();
98    tasks.sort_by(compare_task_summaries);
99
100    let start = pagination.offset.min(tasks.len());
101    let end = start.saturating_add(pagination.limit).min(tasks.len());
102    let paged_tasks = tasks[start..end].to_vec();
103
104    TaskListResponse { tasks: paged_tasks, limit: pagination.limit, offset: pagination.offset }
105}
106
107fn compare_task_summaries(left: &TaskSummary, right: &TaskSummary) -> Ordering {
108    let priority_cmp = right.metadata.priority().cmp(&left.metadata.priority());
109    if priority_cmp != Ordering::Equal {
110        return priority_cmp;
111    }
112
113    let created_cmp = left.created_at.cmp(&right.created_at);
114    if created_cmp != Ordering::Equal {
115        return created_cmp;
116    }
117
118    left.id.cmp(&right.id)
119}
120
121#[cfg(test)]
122mod tests {
123    use std::str::FromStr;
124
125    use actionqueue_core::ids::TaskId;
126    use actionqueue_core::task::constraints::TaskConstraints;
127    use actionqueue_core::task::metadata::TaskMetadata;
128    use actionqueue_core::task::run_policy::RunPolicy;
129    use actionqueue_core::task::task_spec::{TaskPayload, TaskSpec};
130    use actionqueue_storage::recovery::reducer::ReplayReducer;
131    use actionqueue_storage::wal::event::{WalEvent, WalEventType};
132    use axum::http::StatusCode;
133    use axum::response::IntoResponse;
134
135    use super::*;
136    use crate::http::pagination::{
137        PaginationErrorDetails, PaginationErrorResponse, DEFAULT_LIMIT, MAX_LIMIT,
138    };
139
140    fn task_spec_with_priority(task_id: TaskId, priority: i32) -> TaskSpec {
141        let metadata = TaskMetadata::new(vec![], priority, None);
142
143        TaskSpec::new(
144            task_id,
145            TaskPayload::with_content_type(b"payload".to_vec(), "application/octet-stream"),
146            RunPolicy::Once,
147            TaskConstraints::default(),
148            metadata,
149        )
150        .expect("task spec should be valid")
151    }
152
153    fn apply_task(
154        reducer: &mut ReplayReducer,
155        sequence: u64,
156        task_spec: TaskSpec,
157        created_at: u64,
158    ) {
159        let event =
160            WalEvent::new(sequence, WalEventType::TaskCreated { task_spec, timestamp: created_at });
161        reducer.apply(&event).expect("task created event should apply")
162    }
163
164    #[test]
165    fn test_empty_list_response() {
166        let reducer = ReplayReducer::new();
167        let pagination = Pagination { limit: DEFAULT_LIMIT, offset: 0 };
168        let response = build_task_list_response(&reducer, pagination);
169
170        assert!(response.tasks.is_empty());
171        assert_eq!(response.limit, DEFAULT_LIMIT);
172        assert_eq!(response.offset, 0);
173    }
174
175    #[test]
176    fn test_ordering_by_priority_created_at_task_id() {
177        let mut reducer = ReplayReducer::new();
178
179        let task_high = task_spec_with_priority(
180            TaskId::from_str("00000000-0000-0000-0000-000000000004").unwrap(),
181            6,
182        );
183        let task_mid_early = task_spec_with_priority(
184            TaskId::from_str("00000000-0000-0000-0000-000000000005").unwrap(),
185            5,
186        );
187        let task_mid_a = task_spec_with_priority(
188            TaskId::from_str("00000000-0000-0000-0000-000000000001").unwrap(),
189            5,
190        );
191        let task_mid_b = task_spec_with_priority(
192            TaskId::from_str("00000000-0000-0000-0000-000000000002").unwrap(),
193            5,
194        );
195        let task_low = task_spec_with_priority(
196            TaskId::from_str("00000000-0000-0000-0000-000000000003").unwrap(),
197            4,
198        );
199
200        apply_task(&mut reducer, 1, task_high, 20);
201        apply_task(&mut reducer, 2, task_mid_early, 9);
202        apply_task(&mut reducer, 3, task_mid_a, 10);
203        apply_task(&mut reducer, 4, task_mid_b, 10);
204        apply_task(&mut reducer, 5, task_low, 5);
205
206        let response =
207            build_task_list_response(&reducer, Pagination { limit: DEFAULT_LIMIT, offset: 0 });
208
209        let ids: Vec<String> = response.tasks.iter().map(|task| task.id.clone()).collect();
210        assert_eq!(
211            ids,
212            vec![
213                "00000000-0000-0000-0000-000000000004",
214                "00000000-0000-0000-0000-000000000005",
215                "00000000-0000-0000-0000-000000000001",
216                "00000000-0000-0000-0000-000000000002",
217                "00000000-0000-0000-0000-000000000003",
218            ]
219        );
220    }
221
222    #[test]
223    fn test_pagination_slicing() {
224        let mut reducer = ReplayReducer::new();
225
226        let task_high = task_spec_with_priority(
227            TaskId::from_str("00000000-0000-0000-0000-000000000004").unwrap(),
228            6,
229        );
230        let task_mid_early = task_spec_with_priority(
231            TaskId::from_str("00000000-0000-0000-0000-000000000005").unwrap(),
232            5,
233        );
234        let task_mid_a = task_spec_with_priority(
235            TaskId::from_str("00000000-0000-0000-0000-000000000001").unwrap(),
236            5,
237        );
238        let task_mid_b = task_spec_with_priority(
239            TaskId::from_str("00000000-0000-0000-0000-000000000002").unwrap(),
240            5,
241        );
242        let task_low = task_spec_with_priority(
243            TaskId::from_str("00000000-0000-0000-0000-000000000003").unwrap(),
244            4,
245        );
246
247        apply_task(&mut reducer, 1, task_high, 20);
248        apply_task(&mut reducer, 2, task_mid_early, 9);
249        apply_task(&mut reducer, 3, task_mid_a, 10);
250        apply_task(&mut reducer, 4, task_mid_b, 10);
251        apply_task(&mut reducer, 5, task_low, 5);
252
253        let response = build_task_list_response(&reducer, Pagination { limit: 2, offset: 1 });
254
255        let ids: Vec<String> = response.tasks.iter().map(|task| task.id.clone()).collect();
256        assert_eq!(
257            ids,
258            vec!["00000000-0000-0000-0000-000000000005", "00000000-0000-0000-0000-000000000001",]
259        );
260        assert_eq!(response.limit, 2);
261        assert_eq!(response.offset, 1);
262    }
263
264    #[test]
265    fn test_invalid_pagination_limit_error_schema() {
266        let error = parse_pagination(Some("limit=0")).expect_err("limit should be invalid");
267        let response = pagination_error(error).into_response();
268        assert_eq!(response.status(), StatusCode::UNPROCESSABLE_ENTITY);
269
270        let payload = PaginationErrorResponse {
271            error: "invalid_pagination",
272            message: format!("limit must be between 1 and {MAX_LIMIT}"),
273            details: PaginationErrorDetails { field: "limit" },
274        };
275        let json = serde_json::to_string(&payload).expect("error payload should serialize");
276        assert_eq!(
277            json,
278            format!(
279                r#"{{"error":"invalid_pagination","message":"limit must be between 1 and {MAX_LIMIT}","details":{{"field":"limit"}}}}"#
280            )
281        );
282    }
283
284    #[test]
285    fn test_invalid_pagination_offset_error_schema() {
286        let error = parse_pagination(Some("offset=-1")).expect_err("offset should be invalid");
287        let response = pagination_error(error).into_response();
288        assert_eq!(response.status(), StatusCode::UNPROCESSABLE_ENTITY);
289
290        let payload = PaginationErrorResponse {
291            error: "invalid_pagination",
292            message: "offset must be a non-negative integer".to_string(),
293            details: PaginationErrorDetails { field: "offset" },
294        };
295        let json = serde_json::to_string(&payload).expect("error payload should serialize");
296        assert_eq!(
297            json,
298            r#"{"error":"invalid_pagination","message":"offset must be a non-negative integer","details":{"field":"offset"}}"#
299        );
300    }
301}