1use std::cmp::Ordering;
9
10use axum::extract::{RawQuery, State};
11use axum::response::IntoResponse;
12use axum::Json;
13use serde::Serialize;
14
15use super::pagination::{pagination_error, parse_pagination, Pagination};
16
17#[derive(Debug, Clone, Serialize)]
21pub struct TaskListResponse {
22 pub tasks: Vec<TaskSummary>,
24 pub limit: usize,
26 pub offset: usize,
28}
29
30#[derive(Debug, Clone, Serialize)]
32pub struct TaskSummary {
33 pub id: String,
35 pub run_policy: actionqueue_core::task::run_policy::RunPolicy,
37 pub constraints: actionqueue_core::task::constraints::TaskConstraints,
39 pub metadata: actionqueue_core::task::metadata::TaskMetadata,
41 pub created_at: u64,
43 pub updated_at: Option<u64>,
45 #[serde(skip_serializing_if = "Option::is_none")]
47 pub parent_task_id: Option<String>,
48}
49
50impl TaskSummary {
51 fn from_record(record: &actionqueue_storage::recovery::reducer::TaskRecord) -> Self {
53 let task_spec = record.task_spec();
54 Self {
55 id: task_spec.id().to_string(),
56 run_policy: task_spec.run_policy().clone(),
57 constraints: task_spec.constraints().clone(),
58 metadata: task_spec.metadata().clone(),
59 created_at: record.created_at(),
60 updated_at: record.updated_at(),
61 parent_task_id: task_spec.parent_task_id().map(|id| id.to_string()),
62 }
63 }
64}
65
66#[tracing::instrument(skip_all)]
71pub async fn handle(state: State<super::RouterState>, raw_query: RawQuery) -> impl IntoResponse {
72 let pagination = match parse_pagination(raw_query.0.as_deref()) {
73 Ok(pagination) => pagination,
74 Err(error) => return pagination_error(error).into_response(),
75 };
76
77 let projection = match super::read_projection(&state) {
78 Ok(guard) => guard,
79 Err(response) => return *response,
80 };
81 let response = build_task_list_response(&projection, pagination);
82 Json(response).into_response()
83}
84
85pub fn register_routes(
87 router: axum::Router<super::RouterState>,
88) -> axum::Router<super::RouterState> {
89 router.route("/api/v1/tasks", axum::routing::get(handle))
90}
91
92fn build_task_list_response(
93 projection: &actionqueue_storage::recovery::reducer::ReplayReducer,
94 pagination: Pagination,
95) -> TaskListResponse {
96 let mut tasks: Vec<TaskSummary> =
97 projection.task_records().map(TaskSummary::from_record).collect();
98 tasks.sort_by(compare_task_summaries);
99
100 let start = pagination.offset.min(tasks.len());
101 let end = start.saturating_add(pagination.limit).min(tasks.len());
102 let paged_tasks = tasks[start..end].to_vec();
103
104 TaskListResponse { tasks: paged_tasks, limit: pagination.limit, offset: pagination.offset }
105}
106
107fn compare_task_summaries(left: &TaskSummary, right: &TaskSummary) -> Ordering {
108 let priority_cmp = right.metadata.priority().cmp(&left.metadata.priority());
109 if priority_cmp != Ordering::Equal {
110 return priority_cmp;
111 }
112
113 let created_cmp = left.created_at.cmp(&right.created_at);
114 if created_cmp != Ordering::Equal {
115 return created_cmp;
116 }
117
118 left.id.cmp(&right.id)
119}
120
121#[cfg(test)]
122mod tests {
123 use std::str::FromStr;
124
125 use actionqueue_core::ids::TaskId;
126 use actionqueue_core::task::constraints::TaskConstraints;
127 use actionqueue_core::task::metadata::TaskMetadata;
128 use actionqueue_core::task::run_policy::RunPolicy;
129 use actionqueue_core::task::task_spec::{TaskPayload, TaskSpec};
130 use actionqueue_storage::recovery::reducer::ReplayReducer;
131 use actionqueue_storage::wal::event::{WalEvent, WalEventType};
132 use axum::http::StatusCode;
133 use axum::response::IntoResponse;
134
135 use super::*;
136 use crate::http::pagination::{
137 PaginationErrorDetails, PaginationErrorResponse, DEFAULT_LIMIT, MAX_LIMIT,
138 };
139
140 fn task_spec_with_priority(task_id: TaskId, priority: i32) -> TaskSpec {
141 let metadata = TaskMetadata::new(vec![], priority, None);
142
143 TaskSpec::new(
144 task_id,
145 TaskPayload::with_content_type(b"payload".to_vec(), "application/octet-stream"),
146 RunPolicy::Once,
147 TaskConstraints::default(),
148 metadata,
149 )
150 .expect("task spec should be valid")
151 }
152
153 fn apply_task(
154 reducer: &mut ReplayReducer,
155 sequence: u64,
156 task_spec: TaskSpec,
157 created_at: u64,
158 ) {
159 let event =
160 WalEvent::new(sequence, WalEventType::TaskCreated { task_spec, timestamp: created_at });
161 reducer.apply(&event).expect("task created event should apply")
162 }
163
164 #[test]
165 fn test_empty_list_response() {
166 let reducer = ReplayReducer::new();
167 let pagination = Pagination { limit: DEFAULT_LIMIT, offset: 0 };
168 let response = build_task_list_response(&reducer, pagination);
169
170 assert!(response.tasks.is_empty());
171 assert_eq!(response.limit, DEFAULT_LIMIT);
172 assert_eq!(response.offset, 0);
173 }
174
175 #[test]
176 fn test_ordering_by_priority_created_at_task_id() {
177 let mut reducer = ReplayReducer::new();
178
179 let task_high = task_spec_with_priority(
180 TaskId::from_str("00000000-0000-0000-0000-000000000004").unwrap(),
181 6,
182 );
183 let task_mid_early = task_spec_with_priority(
184 TaskId::from_str("00000000-0000-0000-0000-000000000005").unwrap(),
185 5,
186 );
187 let task_mid_a = task_spec_with_priority(
188 TaskId::from_str("00000000-0000-0000-0000-000000000001").unwrap(),
189 5,
190 );
191 let task_mid_b = task_spec_with_priority(
192 TaskId::from_str("00000000-0000-0000-0000-000000000002").unwrap(),
193 5,
194 );
195 let task_low = task_spec_with_priority(
196 TaskId::from_str("00000000-0000-0000-0000-000000000003").unwrap(),
197 4,
198 );
199
200 apply_task(&mut reducer, 1, task_high, 20);
201 apply_task(&mut reducer, 2, task_mid_early, 9);
202 apply_task(&mut reducer, 3, task_mid_a, 10);
203 apply_task(&mut reducer, 4, task_mid_b, 10);
204 apply_task(&mut reducer, 5, task_low, 5);
205
206 let response =
207 build_task_list_response(&reducer, Pagination { limit: DEFAULT_LIMIT, offset: 0 });
208
209 let ids: Vec<String> = response.tasks.iter().map(|task| task.id.clone()).collect();
210 assert_eq!(
211 ids,
212 vec![
213 "00000000-0000-0000-0000-000000000004",
214 "00000000-0000-0000-0000-000000000005",
215 "00000000-0000-0000-0000-000000000001",
216 "00000000-0000-0000-0000-000000000002",
217 "00000000-0000-0000-0000-000000000003",
218 ]
219 );
220 }
221
222 #[test]
223 fn test_pagination_slicing() {
224 let mut reducer = ReplayReducer::new();
225
226 let task_high = task_spec_with_priority(
227 TaskId::from_str("00000000-0000-0000-0000-000000000004").unwrap(),
228 6,
229 );
230 let task_mid_early = task_spec_with_priority(
231 TaskId::from_str("00000000-0000-0000-0000-000000000005").unwrap(),
232 5,
233 );
234 let task_mid_a = task_spec_with_priority(
235 TaskId::from_str("00000000-0000-0000-0000-000000000001").unwrap(),
236 5,
237 );
238 let task_mid_b = task_spec_with_priority(
239 TaskId::from_str("00000000-0000-0000-0000-000000000002").unwrap(),
240 5,
241 );
242 let task_low = task_spec_with_priority(
243 TaskId::from_str("00000000-0000-0000-0000-000000000003").unwrap(),
244 4,
245 );
246
247 apply_task(&mut reducer, 1, task_high, 20);
248 apply_task(&mut reducer, 2, task_mid_early, 9);
249 apply_task(&mut reducer, 3, task_mid_a, 10);
250 apply_task(&mut reducer, 4, task_mid_b, 10);
251 apply_task(&mut reducer, 5, task_low, 5);
252
253 let response = build_task_list_response(&reducer, Pagination { limit: 2, offset: 1 });
254
255 let ids: Vec<String> = response.tasks.iter().map(|task| task.id.clone()).collect();
256 assert_eq!(
257 ids,
258 vec!["00000000-0000-0000-0000-000000000005", "00000000-0000-0000-0000-000000000001",]
259 );
260 assert_eq!(response.limit, 2);
261 assert_eq!(response.offset, 1);
262 }
263
264 #[test]
265 fn test_invalid_pagination_limit_error_schema() {
266 let error = parse_pagination(Some("limit=0")).expect_err("limit should be invalid");
267 let response = pagination_error(error).into_response();
268 assert_eq!(response.status(), StatusCode::UNPROCESSABLE_ENTITY);
269
270 let payload = PaginationErrorResponse {
271 error: "invalid_pagination",
272 message: format!("limit must be between 1 and {MAX_LIMIT}"),
273 details: PaginationErrorDetails { field: "limit" },
274 };
275 let json = serde_json::to_string(&payload).expect("error payload should serialize");
276 assert_eq!(
277 json,
278 format!(
279 r#"{{"error":"invalid_pagination","message":"limit must be between 1 and {MAX_LIMIT}","details":{{"field":"limit"}}}}"#
280 )
281 );
282 }
283
284 #[test]
285 fn test_invalid_pagination_offset_error_schema() {
286 let error = parse_pagination(Some("offset=-1")).expect_err("offset should be invalid");
287 let response = pagination_error(error).into_response();
288 assert_eq!(response.status(), StatusCode::UNPROCESSABLE_ENTITY);
289
290 let payload = PaginationErrorResponse {
291 error: "invalid_pagination",
292 message: "offset must be a non-negative integer".to_string(),
293 details: PaginationErrorDetails { field: "offset" },
294 };
295 let json = serde_json::to_string(&payload).expect("error payload should serialize");
296 assert_eq!(
297 json,
298 r#"{"error":"invalid_pagination","message":"offset must be a non-negative integer","details":{"field":"offset"}}"#
299 );
300 }
301}