1use std::cmp::Ordering;
9
10use axum::extract::{RawQuery, State};
11use axum::response::IntoResponse;
12use axum::Json;
13use serde::Serialize;
14
15use super::pagination::{pagination_error, parse_pagination, Pagination};
16
17#[derive(Debug, Clone, Serialize)]
21pub struct RunListResponse {
22 pub runs: Vec<RunSummary>,
24 pub limit: usize,
26 pub offset: usize,
28}
29
30#[derive(Debug, Clone, Serialize)]
32pub struct RunSummary {
33 pub run_id: String,
35 pub task_id: String,
37 pub state: actionqueue_core::run::state::RunState,
39 pub created_at: u64,
41 pub scheduled_at: u64,
43 pub attempt_count: u32,
45 pub concurrency_key: Option<String>,
47}
48
49impl RunSummary {
50 fn from_run_instance(
52 run_instance: &actionqueue_core::run::run_instance::RunInstance,
53 task_constraints: Option<&actionqueue_core::task::constraints::TaskConstraints>,
54 ) -> Self {
55 let concurrency_key = task_constraints
56 .and_then(|constraints| constraints.concurrency_key().map(str::to_string));
57 Self {
58 run_id: run_instance.id().to_string(),
59 task_id: run_instance.task_id().to_string(),
60 state: run_instance.state(),
61 created_at: run_instance.created_at(),
62 scheduled_at: run_instance.scheduled_at(),
63 attempt_count: run_instance.attempt_count(),
64 concurrency_key,
65 }
66 }
67}
68
69#[tracing::instrument(skip_all)]
74pub async fn handle(state: State<super::RouterState>, raw_query: RawQuery) -> impl IntoResponse {
75 let pagination = match parse_pagination(raw_query.0.as_deref()) {
76 Ok(pagination) => pagination,
77 Err(error) => return pagination_error(error).into_response(),
78 };
79
80 let projection = match super::read_projection(&state) {
81 Ok(guard) => guard,
82 Err(response) => return *response,
83 };
84 let response = build_runs_list_response(&projection, pagination);
85 Json(response).into_response()
86}
87
88pub fn register_routes(
90 router: axum::Router<super::RouterState>,
91) -> axum::Router<super::RouterState> {
92 router.route("/api/v1/runs", axum::routing::get(handle))
93}
94
95fn build_runs_list_response(
96 projection: &actionqueue_storage::recovery::reducer::ReplayReducer,
97 pagination: Pagination,
98) -> RunListResponse {
99 let mut summaries: Vec<RunSummary> = projection
100 .run_instances()
101 .map(|run_instance| {
102 let task_id = run_instance.task_id();
103 let task_spec = projection.get_task(&task_id);
104 let task_constraints = task_spec.map(|spec| spec.constraints());
105 RunSummary::from_run_instance(run_instance, task_constraints)
106 })
107 .collect();
108 summaries.sort_by(compare_run_summaries);
109
110 let start = pagination.offset.min(summaries.len());
111 let end = start.saturating_add(pagination.limit).min(summaries.len());
112 let paged_runs = summaries[start..end].to_vec();
113
114 RunListResponse { runs: paged_runs, limit: pagination.limit, offset: pagination.offset }
115}
116
117fn compare_run_summaries(left: &RunSummary, right: &RunSummary) -> Ordering {
118 let scheduled_at_cmp = left.scheduled_at.cmp(&right.scheduled_at);
119 if scheduled_at_cmp != Ordering::Equal {
120 return scheduled_at_cmp;
121 }
122
123 left.run_id.cmp(&right.run_id)
124}
125
126#[cfg(test)]
127mod tests {
128 use std::str::FromStr;
129
130 use actionqueue_core::ids::{RunId, TaskId};
131 use actionqueue_core::run::run_instance::RunInstance;
132 use actionqueue_core::task::constraints::TaskConstraints;
133 use actionqueue_core::task::metadata::TaskMetadata;
134 use actionqueue_core::task::run_policy::RunPolicy;
135 use actionqueue_core::task::task_spec::{TaskPayload, TaskSpec};
136 use actionqueue_storage::recovery::reducer::ReplayReducer;
137 use actionqueue_storage::wal::event::{WalEvent, WalEventType};
138 use axum::http::StatusCode;
139 use axum::response::IntoResponse;
140
141 use super::*;
142 use crate::http::pagination::{
143 parse_limit, parse_offset, PaginationError, PaginationErrorDetails, PaginationErrorResponse,
144 };
145
146 fn task_spec_with_concurrency_key(task_id: TaskId, concurrency_key: Option<&str>) -> TaskSpec {
147 let constraints = TaskConstraints::new(1, None, concurrency_key.map(|s| s.to_string()))
148 .expect("constraints should be valid");
149 let metadata = TaskMetadata::default();
150
151 TaskSpec::new(
152 task_id,
153 TaskPayload::with_content_type(b"payload".to_vec(), "application/octet-stream"),
154 RunPolicy::Once,
155 constraints,
156 metadata,
157 )
158 .expect("task spec should be valid")
159 }
160
161 fn apply_task(
162 reducer: &mut ReplayReducer,
163 sequence: u64,
164 task_spec: TaskSpec,
165 created_at: u64,
166 ) {
167 let event =
168 WalEvent::new(sequence, WalEventType::TaskCreated { task_spec, timestamp: created_at });
169 reducer.apply(&event).expect("task created event should apply");
170 }
171
172 fn apply_run(reducer: &mut ReplayReducer, sequence: u64, run_instance: RunInstance) {
173 let event = WalEvent::new(sequence, WalEventType::RunCreated { run_instance });
174 reducer.apply(&event).expect("run created event should apply");
175 }
176
177 fn run_instance_scheduled(
178 run_id: RunId,
179 task_id: TaskId,
180 scheduled_at: u64,
181 created_at: u64,
182 ) -> RunInstance {
183 RunInstance::new_scheduled_with_id(run_id, task_id, scheduled_at, created_at)
184 .expect("run instance should be valid")
185 }
186
187 #[test]
188 fn test_empty_runs_list_response() {
189 let reducer = ReplayReducer::new();
190 let pagination = Pagination { limit: 100, offset: 0 };
191 let response = build_runs_list_response(&reducer, pagination);
192
193 assert!(response.runs.is_empty());
194 assert_eq!(response.limit, 100);
195 assert_eq!(response.offset, 0);
196 }
197
198 #[test]
199 fn test_ordering_by_scheduled_at_then_run_id() {
200 let mut reducer = ReplayReducer::new();
201
202 let task_id_1 = TaskId::from_str("11111111-1111-1111-1111-111111111111").unwrap();
203 let task_id_2 = TaskId::from_str("22222222-2222-2222-2222-222222222222").unwrap();
204 let task_id_3 = TaskId::from_str("33333333-3333-3333-3333-333333333333").unwrap();
205
206 let task_spec_1 = task_spec_with_concurrency_key(task_id_1, Some("key-1"));
207 let task_spec_2 = task_spec_with_concurrency_key(task_id_2, Some("key-2"));
208 let task_spec_3 = task_spec_with_concurrency_key(task_id_3, Some("key-3"));
209
210 apply_task(&mut reducer, 1, task_spec_1.clone(), 1000);
211 apply_task(&mut reducer, 2, task_spec_2.clone(), 1001);
212 apply_task(&mut reducer, 3, task_spec_3.clone(), 1002);
213
214 let run_id_1 = RunId::from_str("11111111-1111-1111-1111-111111111110").unwrap();
215 let run_id_2 = RunId::from_str("22222222-2222-2222-2222-222222222220").unwrap();
216 let run_id_3 = RunId::from_str("33333333-3333-3333-3333-333333333330").unwrap();
217
218 let run_1 = run_instance_scheduled(run_id_1, task_id_1, 1000, 1500);
220 let run_2 = run_instance_scheduled(run_id_2, task_id_2, 1000, 1501);
221 let run_3 = run_instance_scheduled(run_id_3, task_id_3, 2000, 1502);
222
223 apply_run(&mut reducer, 4, run_1);
224 apply_run(&mut reducer, 5, run_2);
225 apply_run(&mut reducer, 6, run_3);
226
227 let pagination = Pagination { limit: 100, offset: 0 };
228 let response = build_runs_list_response(&reducer, pagination);
229
230 assert_eq!(response.runs.len(), 3);
231 assert_eq!(response.runs[0].run_id, run_id_1.to_string());
233 assert_eq!(response.runs[1].run_id, run_id_2.to_string());
234 assert_eq!(response.runs[2].run_id, run_id_3.to_string());
236 }
237
238 #[test]
239 fn test_pagination_slicing() {
240 let mut reducer = ReplayReducer::new();
241
242 let task_id = TaskId::from_str("11111111-1111-1111-1111-111111111111").unwrap();
243 let task_spec = task_spec_with_concurrency_key(task_id, Some("key"));
244
245 apply_task(&mut reducer, 1, task_spec, 1000);
246
247 let run_ids: Vec<RunId> = (1..=5)
248 .map(|i| {
249 let uuid_str = format!("00000000-0000-0000-0000-0000000000{}{}", i / 10, i % 10);
250 RunId::from_str(&uuid_str).unwrap()
251 })
252 .collect();
253
254 for (i, run_id) in run_ids.iter().enumerate() {
255 let run = run_instance_scheduled(*run_id, task_id, 1000 + i as u64, 2000 + i as u64);
257 apply_run(&mut reducer, 2 + i as u64, run);
258 }
259
260 let pagination = Pagination { limit: 2, offset: 1 };
262 let response = build_runs_list_response(&reducer, pagination);
263
264 assert_eq!(response.runs.len(), 2);
265 assert_eq!(response.limit, 2);
266 assert_eq!(response.offset, 1);
267 assert_eq!(response.runs[0].run_id, run_ids[1].to_string());
268 assert_eq!(response.runs[1].run_id, run_ids[2].to_string());
269
270 let pagination = Pagination { limit: 3, offset: 3 };
272 let response = build_runs_list_response(&reducer, pagination);
273
274 assert_eq!(response.runs.len(), 2);
275 assert_eq!(response.limit, 3);
276 assert_eq!(response.offset, 3);
277 assert_eq!(response.runs[0].run_id, run_ids[3].to_string());
278 assert_eq!(response.runs[1].run_id, run_ids[4].to_string());
279 }
280
281 #[test]
282 fn test_invalid_pagination_limit_error_schema() {
283 let result = parse_limit("0");
285 assert!(matches!(result, Err(ref e) if e.field == "limit"));
286
287 let result = parse_limit("1001");
289 assert!(matches!(result, Err(ref e) if e.field == "limit"));
290
291 let error = PaginationError::new("limit", "limit must be between 1 and 1000");
293 let error_message = error.message.clone();
294 let error_field = error.field;
295 let response = pagination_error(error).into_response();
296 let status = response.status();
297
298 let response_json = serde_json::to_string(&PaginationErrorResponse {
299 error: "invalid_pagination",
300 message: error_message,
301 details: PaginationErrorDetails { field: error_field },
302 })
303 .expect("should serialize response");
304
305 assert_eq!(status, StatusCode::UNPROCESSABLE_ENTITY);
306 assert_eq!(
307 response_json,
308 r#"{"error":"invalid_pagination","message":"limit must be between 1 and 1000","details":{"field":"limit"}}"#
309 );
310 }
311
312 #[test]
313 fn test_invalid_pagination_offset_error_schema() {
314 let result = parse_offset("-1");
318 assert!(matches!(result, Err(ref e) if e.field == "offset"));
319
320 let error = PaginationError::new("offset", "offset must be a non-negative integer");
322 let error_message = error.message.clone();
323 let error_field = error.field;
324 let response = pagination_error(error).into_response();
325 let status = response.status();
326
327 let response_json = serde_json::to_string(&PaginationErrorResponse {
328 error: "invalid_pagination",
329 message: error_message,
330 details: PaginationErrorDetails { field: error_field },
331 })
332 .expect("should serialize response");
333
334 assert_eq!(status, StatusCode::UNPROCESSABLE_ENTITY);
335 assert_eq!(
336 response_json,
337 r#"{"error":"invalid_pagination","message":"offset must be a non-negative integer","details":{"field":"offset"}}"#
338 );
339 }
340
341 #[test]
342 fn test_concurrency_key_resolution_and_missing_task() {
343 let mut reducer = ReplayReducer::new();
344
345 let task_id_with_key = TaskId::from_str("11111111-1111-1111-1111-111111111111").unwrap();
347 let task_spec_with_key = task_spec_with_concurrency_key(task_id_with_key, Some("my-key"));
348
349 let task_id_no_key = TaskId::from_str("22222222-2222-2222-2222-222222222222").unwrap();
351 let task_spec_no_key = task_spec_with_concurrency_key(task_id_no_key, None);
352
353 apply_task(&mut reducer, 1, task_spec_with_key, 1000);
354 apply_task(&mut reducer, 2, task_spec_no_key, 1001);
355
356 let run_id_with_key = RunId::from_str("11111111-1111-1111-1111-111111111110").unwrap();
358 let run_with_key = run_instance_scheduled(run_id_with_key, task_id_with_key, 500, 1500);
359
360 let run_id_no_key = RunId::from_str("22222222-2222-2222-2222-222222222220").unwrap();
362 let run_no_key = run_instance_scheduled(run_id_no_key, task_id_no_key, 600, 1501);
363
364 let missing_task_id = TaskId::from_str("33333333-3333-3333-3333-333333333333").unwrap();
366 let run_id_missing_task = RunId::from_str("33333333-3333-3333-3333-333333333330").unwrap();
367 let run_missing_task =
368 run_instance_scheduled(run_id_missing_task, missing_task_id, 700, 1502);
369
370 apply_run(&mut reducer, 3, run_with_key);
371 apply_run(&mut reducer, 4, run_no_key);
372 apply_run(&mut reducer, 5, run_missing_task);
373
374 let pagination = Pagination { limit: 100, offset: 0 };
375 let response = build_runs_list_response(&reducer, pagination);
376
377 assert_eq!(response.runs.len(), 3);
378
379 assert_eq!(response.runs[0].run_id, run_id_with_key.to_string());
382 assert_eq!(response.runs[0].concurrency_key, Some("my-key".to_string()));
383
384 assert_eq!(response.runs[1].run_id, run_id_no_key.to_string());
386 assert_eq!(response.runs[1].concurrency_key, None);
387
388 assert_eq!(response.runs[2].run_id, run_id_missing_task.to_string());
390 assert_eq!(response.runs[2].concurrency_key, None);
391 }
392}