1use std::str::FromStr;
14
15use actionqueue_core::ids::RunId;
16use actionqueue_core::mutation::AttemptResultKind;
17use axum::extract::Path;
18use axum::http::StatusCode;
19use axum::response::IntoResponse;
20use axum::Json;
21use serde::Serialize;
22
23#[derive(Debug, Clone, Serialize)]
27pub struct RunGetResponse {
28 pub run_id: String,
30 pub task_id: String,
32 pub state: actionqueue_core::run::state::RunState,
34 pub created_at: u64,
36 pub scheduled_at: u64,
38 pub attempt_count: u32,
40 pub current_attempt_id: Option<String>,
42 pub state_history: Vec<RunStateHistoryEntry>,
44 pub attempts: Vec<RunAttemptEntry>,
46 pub lease: Option<RunLeaseEntry>,
48 pub block_reason: Option<&'static str>,
50}
51
52#[derive(Debug, Clone, Serialize)]
54pub struct RunStateHistoryEntry {
55 pub from: Option<actionqueue_core::run::state::RunState>,
57 pub to: actionqueue_core::run::state::RunState,
59 pub timestamp: u64,
61}
62
63#[derive(Debug, Clone, Serialize)]
65pub struct RunAttemptEntry {
66 pub attempt_id: String,
68 pub started_at: u64,
70 pub finished_at: Option<u64>,
72 pub result: Option<AttemptResultKind>,
74 pub error: Option<String>,
76 #[serde(skip_serializing_if = "Option::is_none")]
78 pub output: Option<Vec<u8>>,
79}
80
81#[derive(Debug, Clone, Serialize)]
83pub struct RunLeaseEntry {
84 pub owner: String,
86 pub expiry: u64,
88 pub acquired_at: u64,
90 pub updated_at: u64,
92}
93
94impl RunGetResponse {
95 fn from_record(
97 run_instance: &actionqueue_core::run::run_instance::RunInstance,
98 history: &[actionqueue_storage::recovery::reducer::RunStateHistoryEntry],
99 attempts: &[actionqueue_storage::recovery::reducer::AttemptHistoryEntry],
100 lease: Option<&actionqueue_storage::recovery::reducer::LeaseMetadata>,
101 ) -> Self {
102 let state_history = history
103 .iter()
104 .map(|entry| RunStateHistoryEntry {
105 from: entry.from(),
106 to: entry.to(),
107 timestamp: entry.timestamp(),
108 })
109 .collect();
110
111 let attempts = attempts
112 .iter()
113 .map(|entry| RunAttemptEntry {
114 attempt_id: entry.attempt_id().to_string(),
115 started_at: entry.started_at(),
116 finished_at: entry.finished_at(),
117 result: entry.result(),
118 error: entry.error().map(str::to_string),
119 output: entry.output().map(|b| b.to_vec()),
120 })
121 .collect();
122
123 let lease = lease.map(|metadata| RunLeaseEntry {
124 owner: metadata.owner().to_string(),
125 expiry: metadata.expiry(),
126 acquired_at: metadata.acquired_at(),
127 updated_at: metadata.updated_at(),
128 });
129
130 let block_reason = match run_instance.state() {
131 actionqueue_core::run::state::RunState::Ready => None,
132 actionqueue_core::run::state::RunState::Scheduled => Some("scheduled"),
133 actionqueue_core::run::state::RunState::Leased => Some("leased"),
134 actionqueue_core::run::state::RunState::Running => Some("running"),
135 actionqueue_core::run::state::RunState::RetryWait => Some("retry_wait"),
136 actionqueue_core::run::state::RunState::Suspended => Some("suspended"),
137 actionqueue_core::run::state::RunState::Completed
138 | actionqueue_core::run::state::RunState::Failed
139 | actionqueue_core::run::state::RunState::Canceled => Some("terminal"),
140 };
141
142 Self {
143 run_id: run_instance.id().to_string(),
144 task_id: run_instance.task_id().to_string(),
145 state: run_instance.state(),
146 created_at: run_instance.created_at(),
147 scheduled_at: run_instance.scheduled_at(),
148 attempt_count: run_instance.attempt_count(),
149 current_attempt_id: run_instance.current_attempt_id().map(|id| id.to_string()),
150 state_history,
151 attempts,
152 lease,
153 block_reason,
154 }
155 }
156}
157
158#[derive(Debug, Clone, Serialize)]
160struct InvalidRunIdResponse {
161 error: &'static str,
162 message: &'static str,
163 details: InvalidRunIdDetails,
164}
165
166#[derive(Debug, Clone, Serialize)]
167struct InvalidRunIdDetails {
168 run_id: String,
169}
170
171#[derive(Debug, Clone, Serialize)]
173struct RunNotFoundResponse {
174 error: &'static str,
175 message: &'static str,
176 details: RunNotFoundDetails,
177}
178
179#[derive(Debug, Clone, Serialize)]
180struct RunNotFoundDetails {
181 run_id: String,
182}
183
184#[tracing::instrument(skip(state))]
190pub async fn handle(
191 state: axum::extract::State<crate::http::RouterState>,
192 Path(run_id_str): Path<String>,
193) -> impl IntoResponse {
194 let run_id = match RunId::from_str(&run_id_str) {
195 Ok(id) => id,
196 Err(_) => {
197 return invalid_run_id_response(&run_id_str).into_response();
198 }
199 };
200
201 if run_id.as_uuid().is_nil() {
202 return invalid_run_id_response(&run_id_str).into_response();
203 }
204
205 let projection = match super::read_projection(&state) {
206 Ok(guard) => guard,
207 Err(response) => return (*response).into_response(),
208 };
209 let run_instance = match projection.get_run_instance(&run_id) {
210 Some(instance) => instance,
211 None => return run_not_found_response(&run_id_str).into_response(),
212 };
213
214 let history = projection.get_run_history(&run_id).unwrap_or(&[]);
215 let attempts = projection.get_attempt_history(&run_id).unwrap_or(&[]);
216 let lease = projection.get_lease_metadata(&run_id);
217
218 let response = RunGetResponse::from_record(run_instance, history, attempts, lease);
219 (StatusCode::OK, Json(response)).into_response()
220}
221
222pub fn register_routes(
224 router: axum::Router<crate::http::RouterState>,
225) -> axum::Router<crate::http::RouterState> {
226 router.route("/api/v1/runs/:run_id", axum::routing::get(handle))
227}
228
229fn invalid_run_id_response(run_id: &str) -> impl IntoResponse {
231 let response = InvalidRunIdResponse {
232 error: "invalid_run_id",
233 message: "invalid run id",
234 details: InvalidRunIdDetails { run_id: run_id.to_string() },
235 };
236 (StatusCode::BAD_REQUEST, Json(response)).into_response()
237}
238
239fn run_not_found_response(run_id: &str) -> impl IntoResponse {
241 let response = RunNotFoundResponse {
242 error: "run_not_found",
243 message: "run not found",
244 details: RunNotFoundDetails { run_id: run_id.to_string() },
245 };
246 (StatusCode::NOT_FOUND, Json(response)).into_response()
247}
248
249#[cfg(test)]
250mod tests {
251 use std::str::FromStr;
252 use std::sync::Arc;
253
254 use actionqueue_core::ids::{AttemptId, RunId, TaskId};
255 use actionqueue_core::mutation::AttemptResultKind;
256 use actionqueue_core::run::run_instance::RunInstance;
257 use actionqueue_core::run::state::RunState;
258 use actionqueue_core::task::constraints::TaskConstraints;
259 use actionqueue_core::task::metadata::TaskMetadata;
260 use actionqueue_core::task::run_policy::RunPolicy;
261 use actionqueue_core::task::task_spec::{TaskPayload, TaskSpec};
262 use actionqueue_storage::recovery::bootstrap::RecoveryObservations;
263 use actionqueue_storage::recovery::reducer::ReplayReducer;
264 use actionqueue_storage::wal::event::{WalEvent, WalEventType};
265 use actionqueue_storage::wal::WalAppendTelemetry;
266 use axum::extract::{Path, State};
267 use axum::http::StatusCode;
268 use axum::response::IntoResponse;
269 use http_body_util::BodyExt;
270
271 use super::RunGetResponse;
272 use crate::time::clock::{MockClock, SharedDaemonClock};
273
274 fn task_spec(task_id: TaskId) -> TaskSpec {
275 TaskSpec::new(
276 task_id,
277 TaskPayload::with_content_type(b"payload".to_vec(), "application/octet-stream"),
278 RunPolicy::Once,
279 TaskConstraints::default(),
280 TaskMetadata::default(),
281 )
282 .expect("task spec should be valid")
283 }
284
285 fn run_instance_scheduled(run_id: RunId, task_id: TaskId) -> RunInstance {
286 RunInstance::new_scheduled_with_id(run_id, task_id, 1000, 1000)
287 .expect("run instance should be valid")
288 }
289
290 fn apply_event(reducer: &mut ReplayReducer, sequence: u64, event: WalEventType) {
291 let event = WalEvent::new(sequence, event);
292 reducer.apply(&event).expect("event should apply");
293 }
294
295 fn build_state(reducer: ReplayReducer) -> crate::http::RouterState {
296 let metrics = std::sync::Arc::new(
297 crate::metrics::registry::MetricsRegistry::new(None)
298 .expect("test metrics registry should initialize"),
299 );
300 let clock: SharedDaemonClock = Arc::new(MockClock::new(1_700_000_000));
301 let state = crate::http::RouterStateInner::new(
302 crate::bootstrap::RouterConfig { control_enabled: false, metrics_enabled: false },
303 std::sync::Arc::new(std::sync::RwLock::new(reducer)),
304 crate::http::RouterObservability {
305 metrics,
306 wal_append_telemetry: WalAppendTelemetry::new(),
307 clock,
308 recovery_observations: RecoveryObservations::zero(),
309 },
310 crate::bootstrap::ReadyStatus::ready(),
311 );
312 std::sync::Arc::new(state)
313 }
314
315 async fn response_body_string(response: axum::response::Response) -> String {
316 let bytes = response.into_body().collect().await.expect("body should collect").to_bytes();
317 String::from_utf8(bytes.to_vec()).expect("response body should be utf-8")
318 }
319
320 #[test]
321 fn test_run_get_success_with_history_attempts_lease() {
322 let mut reducer = ReplayReducer::new();
323
324 let task_id = TaskId::from_str("11111111-1111-1111-1111-111111111111").unwrap();
325 let run_id = RunId::from_str("22222222-2222-2222-2222-222222222222").unwrap();
326 let attempt_id = AttemptId::from_str("33333333-3333-3333-3333-333333333333").unwrap();
327 let attempt_id_2 = AttemptId::from_str("44444444-4444-4444-4444-444444444444").unwrap();
328
329 apply_event(
330 &mut reducer,
331 1,
332 WalEventType::TaskCreated { task_spec: task_spec(task_id), timestamp: 900 },
333 );
334 apply_event(
335 &mut reducer,
336 2,
337 WalEventType::RunCreated { run_instance: run_instance_scheduled(run_id, task_id) },
338 );
339 apply_event(
340 &mut reducer,
341 3,
342 WalEventType::RunStateChanged {
343 run_id,
344 previous_state: RunState::Scheduled,
345 new_state: RunState::Ready,
346 timestamp: 1001,
347 },
348 );
349 apply_event(
350 &mut reducer,
351 4,
352 WalEventType::LeaseAcquired {
353 run_id,
354 owner: "worker-1".to_string(),
355 expiry: 2000,
356 timestamp: 1100,
357 },
358 );
359 apply_event(
360 &mut reducer,
361 5,
362 WalEventType::RunStateChanged {
363 run_id,
364 previous_state: RunState::Ready,
365 new_state: RunState::Leased,
366 timestamp: 1101,
367 },
368 );
369 apply_event(
370 &mut reducer,
371 6,
372 WalEventType::RunStateChanged {
373 run_id,
374 previous_state: RunState::Leased,
375 new_state: RunState::Running,
376 timestamp: 1200,
377 },
378 );
379 apply_event(
380 &mut reducer,
381 7,
382 WalEventType::AttemptStarted { run_id, attempt_id, timestamp: 1201 },
383 );
384 apply_event(
385 &mut reducer,
386 8,
387 WalEventType::AttemptFinished {
388 run_id,
389 attempt_id,
390 result: AttemptResultKind::Failure,
391 error: Some("boom".to_string()),
392 output: None,
393 timestamp: 1300,
394 },
395 );
396 apply_event(
397 &mut reducer,
398 9,
399 WalEventType::AttemptStarted { run_id, attempt_id: attempt_id_2, timestamp: 1350 },
400 );
401 apply_event(
402 &mut reducer,
403 10,
404 WalEventType::LeaseHeartbeat {
405 run_id,
406 owner: "worker-1".to_string(),
407 expiry: 2100,
408 timestamp: 1400,
409 },
410 );
411
412 let run_instance = reducer.get_run_instance(&run_id).expect("run should exist");
413 let history = reducer.get_run_history(&run_id).expect("history should exist");
414 let attempts = reducer.get_attempt_history(&run_id).expect("attempts should exist");
415 let lease = reducer.get_lease_metadata(&run_id);
416
417 let response = RunGetResponse::from_record(run_instance, history, attempts, lease);
418
419 assert_eq!(response.run_id, run_id.to_string());
420 assert_eq!(response.task_id, task_id.to_string());
421 assert_eq!(response.state, RunState::Running);
422 assert_eq!(response.state_history.first().unwrap().from, None);
423 assert_eq!(response.state_history.first().unwrap().to, RunState::Scheduled);
424 assert_eq!(response.state_history.first().unwrap().timestamp, 1000);
425 assert_eq!(response.attempts.len(), 2);
426 assert_eq!(response.attempts[0].attempt_id, attempt_id.to_string());
427 assert_eq!(response.attempts[0].started_at, 1201);
428 assert_eq!(response.attempts[0].finished_at, Some(1300));
429 assert_eq!(response.attempts[0].result, Some(AttemptResultKind::Failure));
430 assert_eq!(response.attempts[0].error, Some("boom".to_string()));
431 assert_eq!(response.attempts[1].attempt_id, attempt_id_2.to_string());
432 assert_eq!(response.attempts[1].started_at, 1350);
433 assert_eq!(response.attempts[1].finished_at, None);
434 assert_eq!(response.attempts[1].result, None);
435 assert_eq!(response.attempts[1].error, None);
436 assert_eq!(response.lease.as_ref().unwrap().updated_at, 1400);
437 assert_eq!(response.block_reason, Some("running"));
438 }
439
440 #[test]
441 fn test_run_get_invalid_id_returns_400() {
442 let response = super::invalid_run_id_response("not-a-uuid").into_response();
443 assert_eq!(response.status(), StatusCode::BAD_REQUEST);
444
445 let response_json = serde_json::to_string(&super::InvalidRunIdResponse {
446 error: "invalid_run_id",
447 message: "invalid run id",
448 details: super::InvalidRunIdDetails { run_id: "not-a-uuid".to_string() },
449 })
450 .expect("should serialize response");
451
452 assert_eq!(
453 response_json,
454 r#"{"error":"invalid_run_id","message":"invalid run id","details":{"run_id":"not-a-uuid"}}"#
455 );
456 }
457
458 #[test]
459 fn test_run_get_nil_id_returns_400() {
460 let response =
461 super::invalid_run_id_response("00000000-0000-0000-0000-000000000000").into_response();
462 assert_eq!(response.status(), StatusCode::BAD_REQUEST);
463
464 let response_json = serde_json::to_string(&super::InvalidRunIdResponse {
465 error: "invalid_run_id",
466 message: "invalid run id",
467 details: super::InvalidRunIdDetails {
468 run_id: "00000000-0000-0000-0000-000000000000".to_string(),
469 },
470 })
471 .expect("should serialize response");
472
473 assert_eq!(
474 response_json,
475 r#"{"error":"invalid_run_id","message":"invalid run id","details":{"run_id":"00000000-0000-0000-0000-000000000000"}}"#
476 );
477 }
478
479 #[test]
480 fn test_run_get_not_found_returns_404() {
481 let response =
482 super::run_not_found_response("11111111-1111-1111-1111-111111111111").into_response();
483 assert_eq!(response.status(), StatusCode::NOT_FOUND);
484
485 let response_json = serde_json::to_string(&super::RunNotFoundResponse {
486 error: "run_not_found",
487 message: "run not found",
488 details: super::RunNotFoundDetails {
489 run_id: "11111111-1111-1111-1111-111111111111".to_string(),
490 },
491 })
492 .expect("should serialize response");
493
494 assert_eq!(
495 response_json,
496 r#"{"error":"run_not_found","message":"run not found","details":{"run_id":"11111111-1111-1111-1111-111111111111"}}"#
497 );
498 }
499
500 #[test]
501 fn test_block_reason_ready_is_null() {
502 let mut reducer = ReplayReducer::new();
503
504 let task_id = TaskId::from_str("aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa").unwrap();
505 let run_id = RunId::from_str("bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb").unwrap();
506
507 apply_event(
508 &mut reducer,
509 1,
510 WalEventType::TaskCreated { task_spec: task_spec(task_id), timestamp: 900 },
511 );
512 apply_event(
513 &mut reducer,
514 2,
515 WalEventType::RunCreated { run_instance: run_instance_scheduled(run_id, task_id) },
516 );
517 apply_event(
518 &mut reducer,
519 3,
520 WalEventType::RunStateChanged {
521 run_id,
522 previous_state: RunState::Scheduled,
523 new_state: RunState::Ready,
524 timestamp: 1001,
525 },
526 );
527
528 let run_instance = reducer.get_run_instance(&run_id).expect("run should exist");
529 let history = reducer.get_run_history(&run_id).expect("history should exist");
530 let attempts = reducer.get_attempt_history(&run_id).expect("attempts should exist");
531 let lease = reducer.get_lease_metadata(&run_id);
532
533 let response = RunGetResponse::from_record(run_instance, history, attempts, lease);
534 assert_eq!(response.block_reason, None);
535 }
536
537 #[tokio::test]
538 async fn test_run_get_handle_invalid_id_returns_400() {
539 let state = build_state(ReplayReducer::new());
540 let response =
541 super::handle(State(state), Path("not-a-uuid".to_string())).await.into_response();
542 assert_eq!(response.status(), StatusCode::BAD_REQUEST);
543 assert_eq!(
544 response_body_string(response).await,
545 r#"{"error":"invalid_run_id","message":"invalid run id","details":{"run_id":"not-a-uuid"}}"#
546 );
547 }
548
549 #[tokio::test]
550 async fn test_run_get_handle_nil_id_returns_400() {
551 let state = build_state(ReplayReducer::new());
552 let response =
553 super::handle(State(state), Path("00000000-0000-0000-0000-000000000000".to_string()))
554 .await
555 .into_response();
556 assert_eq!(response.status(), StatusCode::BAD_REQUEST);
557 assert_eq!(
558 response_body_string(response).await,
559 r#"{"error":"invalid_run_id","message":"invalid run id","details":{"run_id":"00000000-0000-0000-0000-000000000000"}}"#
560 );
561 }
562
563 #[tokio::test]
564 async fn test_run_get_handle_not_found_returns_404() {
565 let state = build_state(ReplayReducer::new());
566 let response =
567 super::handle(State(state), Path("11111111-1111-1111-1111-111111111111".to_string()))
568 .await
569 .into_response();
570 assert_eq!(response.status(), StatusCode::NOT_FOUND);
571 assert_eq!(
572 response_body_string(response).await,
573 r#"{"error":"run_not_found","message":"run not found","details":{"run_id":"11111111-1111-1111-1111-111111111111"}}"#
574 );
575 }
576}