Skip to main content

actionqueue_daemon/http/
run_get.rs

1//! Single run retrieval route module.
2//!
3//! This module provides the single run endpoint (`GET /api/v1/runs/:run_id`)
4//! for the daemon. The endpoint returns the run lifecycle and lineage details
5//! derived from authoritative projection state. The response is read-only,
6//! deterministic, and sourced strictly from the recovery projection.
7//!
8//! # Invariant boundaries
9//!
10//! The handler performs no IO, reads no mutation authority or WAL, and mutates
11//! no runtime state. It only reads from the authoritative projection state.
12
13use std::str::FromStr;
14
15use actionqueue_core::ids::RunId;
16use actionqueue_core::mutation::AttemptResultKind;
17use axum::extract::Path;
18use axum::http::StatusCode;
19use axum::response::IntoResponse;
20use axum::Json;
21use serde::Serialize;
22
23/// Run get response payload.
24///
25/// This struct represents the stable schema for the run get endpoint response.
26#[derive(Debug, Clone, Serialize)]
27pub struct RunGetResponse {
28    /// Run identifier as a stable string.
29    pub run_id: String,
30    /// Task identifier as a stable string.
31    pub task_id: String,
32    /// Run state.
33    pub state: actionqueue_core::run::state::RunState,
34    /// Run creation timestamp derived from the WAL event.
35    pub created_at: u64,
36    /// Run scheduled timestamp derived from run instance.
37    pub scheduled_at: u64,
38    /// Number of attempts made for this run.
39    pub attempt_count: u32,
40    /// Current attempt identifier if any.
41    pub current_attempt_id: Option<String>,
42    /// Run state history entries derived from WAL.
43    pub state_history: Vec<RunStateHistoryEntry>,
44    /// Attempt lineage entries derived from WAL.
45    pub attempts: Vec<RunAttemptEntry>,
46    /// Lease metadata if a lease is active.
47    pub lease: Option<RunLeaseEntry>,
48    /// Block reason when the run is not Ready.
49    pub block_reason: Option<&'static str>,
50}
51
52/// Run state history entry for the response payload.
53#[derive(Debug, Clone, Serialize)]
54pub struct RunStateHistoryEntry {
55    /// Previous state, or null for initial Scheduled.
56    pub from: Option<actionqueue_core::run::state::RunState>,
57    /// New state.
58    pub to: actionqueue_core::run::state::RunState,
59    /// Timestamp associated with the transition.
60    pub timestamp: u64,
61}
62
63/// Run attempt entry for the response payload.
64#[derive(Debug, Clone, Serialize)]
65pub struct RunAttemptEntry {
66    /// Attempt identifier as a stable string.
67    pub attempt_id: String,
68    /// Attempt started timestamp.
69    pub started_at: u64,
70    /// Attempt finished timestamp if finished.
71    pub finished_at: Option<u64>,
72    /// Canonical attempt result taxonomy if finished.
73    pub result: Option<AttemptResultKind>,
74    /// Attempt error message if any.
75    pub error: Option<String>,
76    /// Opaque handler output bytes if the attempt produced output.
77    #[serde(skip_serializing_if = "Option::is_none")]
78    pub output: Option<Vec<u8>>,
79}
80
81/// Run lease entry for the response payload.
82#[derive(Debug, Clone, Serialize)]
83pub struct RunLeaseEntry {
84    /// Lease owner.
85    pub owner: String,
86    /// Lease expiry timestamp.
87    pub expiry: u64,
88    /// Lease acquisition timestamp.
89    pub acquired_at: u64,
90    /// Lease last update timestamp.
91    pub updated_at: u64,
92}
93
94impl RunGetResponse {
95    /// Builds a run get response from projection records.
96    fn from_record(
97        run_instance: &actionqueue_core::run::run_instance::RunInstance,
98        history: &[actionqueue_storage::recovery::reducer::RunStateHistoryEntry],
99        attempts: &[actionqueue_storage::recovery::reducer::AttemptHistoryEntry],
100        lease: Option<&actionqueue_storage::recovery::reducer::LeaseMetadata>,
101    ) -> Self {
102        let state_history = history
103            .iter()
104            .map(|entry| RunStateHistoryEntry {
105                from: entry.from(),
106                to: entry.to(),
107                timestamp: entry.timestamp(),
108            })
109            .collect();
110
111        let attempts = attempts
112            .iter()
113            .map(|entry| RunAttemptEntry {
114                attempt_id: entry.attempt_id().to_string(),
115                started_at: entry.started_at(),
116                finished_at: entry.finished_at(),
117                result: entry.result(),
118                error: entry.error().map(str::to_string),
119                output: entry.output().map(|b| b.to_vec()),
120            })
121            .collect();
122
123        let lease = lease.map(|metadata| RunLeaseEntry {
124            owner: metadata.owner().to_string(),
125            expiry: metadata.expiry(),
126            acquired_at: metadata.acquired_at(),
127            updated_at: metadata.updated_at(),
128        });
129
130        let block_reason = match run_instance.state() {
131            actionqueue_core::run::state::RunState::Ready => None,
132            actionqueue_core::run::state::RunState::Scheduled => Some("scheduled"),
133            actionqueue_core::run::state::RunState::Leased => Some("leased"),
134            actionqueue_core::run::state::RunState::Running => Some("running"),
135            actionqueue_core::run::state::RunState::RetryWait => Some("retry_wait"),
136            actionqueue_core::run::state::RunState::Suspended => Some("suspended"),
137            actionqueue_core::run::state::RunState::Completed
138            | actionqueue_core::run::state::RunState::Failed
139            | actionqueue_core::run::state::RunState::Canceled => Some("terminal"),
140        };
141
142        Self {
143            run_id: run_instance.id().to_string(),
144            task_id: run_instance.task_id().to_string(),
145            state: run_instance.state(),
146            created_at: run_instance.created_at(),
147            scheduled_at: run_instance.scheduled_at(),
148            attempt_count: run_instance.attempt_count(),
149            current_attempt_id: run_instance.current_attempt_id().map(|id| id.to_string()),
150            state_history,
151            attempts,
152            lease,
153            block_reason,
154        }
155    }
156}
157
158/// Invalid run ID error response.
159#[derive(Debug, Clone, Serialize)]
160struct InvalidRunIdResponse {
161    error: &'static str,
162    message: &'static str,
163    details: InvalidRunIdDetails,
164}
165
166#[derive(Debug, Clone, Serialize)]
167struct InvalidRunIdDetails {
168    run_id: String,
169}
170
171/// Run not found error response.
172#[derive(Debug, Clone, Serialize)]
173struct RunNotFoundResponse {
174    error: &'static str,
175    message: &'static str,
176    details: RunNotFoundDetails,
177}
178
179#[derive(Debug, Clone, Serialize)]
180struct RunNotFoundDetails {
181    run_id: String,
182}
183
184/// Run get handler.
185///
186/// This handler responds to `GET /api/v1/runs/:run_id` with a deterministic,
187/// side-effect-free payload containing the run details derived from
188/// authoritative projection state.
189#[tracing::instrument(skip(state))]
190pub async fn handle(
191    state: axum::extract::State<crate::http::RouterState>,
192    Path(run_id_str): Path<String>,
193) -> impl IntoResponse {
194    let run_id = match RunId::from_str(&run_id_str) {
195        Ok(id) => id,
196        Err(_) => {
197            return invalid_run_id_response(&run_id_str).into_response();
198        }
199    };
200
201    if run_id.as_uuid().is_nil() {
202        return invalid_run_id_response(&run_id_str).into_response();
203    }
204
205    let projection = match super::read_projection(&state) {
206        Ok(guard) => guard,
207        Err(response) => return (*response).into_response(),
208    };
209    let run_instance = match projection.get_run_instance(&run_id) {
210        Some(instance) => instance,
211        None => return run_not_found_response(&run_id_str).into_response(),
212    };
213
214    let history = projection.get_run_history(&run_id).unwrap_or(&[]);
215    let attempts = projection.get_attempt_history(&run_id).unwrap_or(&[]);
216    let lease = projection.get_lease_metadata(&run_id);
217
218    let response = RunGetResponse::from_record(run_instance, history, attempts, lease);
219    (StatusCode::OK, Json(response)).into_response()
220}
221
222/// Registers the run get route in the router builder.
223pub fn register_routes(
224    router: axum::Router<crate::http::RouterState>,
225) -> axum::Router<crate::http::RouterState> {
226    router.route("/api/v1/runs/:run_id", axum::routing::get(handle))
227}
228
229/// Creates an invalid run ID error response.
230fn invalid_run_id_response(run_id: &str) -> impl IntoResponse {
231    let response = InvalidRunIdResponse {
232        error: "invalid_run_id",
233        message: "invalid run id",
234        details: InvalidRunIdDetails { run_id: run_id.to_string() },
235    };
236    (StatusCode::BAD_REQUEST, Json(response)).into_response()
237}
238
239/// Creates a run not found error response.
240fn run_not_found_response(run_id: &str) -> impl IntoResponse {
241    let response = RunNotFoundResponse {
242        error: "run_not_found",
243        message: "run not found",
244        details: RunNotFoundDetails { run_id: run_id.to_string() },
245    };
246    (StatusCode::NOT_FOUND, Json(response)).into_response()
247}
248
249#[cfg(test)]
250mod tests {
251    use std::str::FromStr;
252    use std::sync::Arc;
253
254    use actionqueue_core::ids::{AttemptId, RunId, TaskId};
255    use actionqueue_core::mutation::AttemptResultKind;
256    use actionqueue_core::run::run_instance::RunInstance;
257    use actionqueue_core::run::state::RunState;
258    use actionqueue_core::task::constraints::TaskConstraints;
259    use actionqueue_core::task::metadata::TaskMetadata;
260    use actionqueue_core::task::run_policy::RunPolicy;
261    use actionqueue_core::task::task_spec::{TaskPayload, TaskSpec};
262    use actionqueue_storage::recovery::bootstrap::RecoveryObservations;
263    use actionqueue_storage::recovery::reducer::ReplayReducer;
264    use actionqueue_storage::wal::event::{WalEvent, WalEventType};
265    use actionqueue_storage::wal::WalAppendTelemetry;
266    use axum::extract::{Path, State};
267    use axum::http::StatusCode;
268    use axum::response::IntoResponse;
269    use http_body_util::BodyExt;
270
271    use super::RunGetResponse;
272    use crate::time::clock::{MockClock, SharedDaemonClock};
273
274    fn task_spec(task_id: TaskId) -> TaskSpec {
275        TaskSpec::new(
276            task_id,
277            TaskPayload::with_content_type(b"payload".to_vec(), "application/octet-stream"),
278            RunPolicy::Once,
279            TaskConstraints::default(),
280            TaskMetadata::default(),
281        )
282        .expect("task spec should be valid")
283    }
284
285    fn run_instance_scheduled(run_id: RunId, task_id: TaskId) -> RunInstance {
286        RunInstance::new_scheduled_with_id(run_id, task_id, 1000, 1000)
287            .expect("run instance should be valid")
288    }
289
290    fn apply_event(reducer: &mut ReplayReducer, sequence: u64, event: WalEventType) {
291        let event = WalEvent::new(sequence, event);
292        reducer.apply(&event).expect("event should apply");
293    }
294
295    fn build_state(reducer: ReplayReducer) -> crate::http::RouterState {
296        let metrics = std::sync::Arc::new(
297            crate::metrics::registry::MetricsRegistry::new(None)
298                .expect("test metrics registry should initialize"),
299        );
300        let clock: SharedDaemonClock = Arc::new(MockClock::new(1_700_000_000));
301        let state = crate::http::RouterStateInner::new(
302            crate::bootstrap::RouterConfig { control_enabled: false, metrics_enabled: false },
303            std::sync::Arc::new(std::sync::RwLock::new(reducer)),
304            crate::http::RouterObservability {
305                metrics,
306                wal_append_telemetry: WalAppendTelemetry::new(),
307                clock,
308                recovery_observations: RecoveryObservations::zero(),
309            },
310            crate::bootstrap::ReadyStatus::ready(),
311        );
312        std::sync::Arc::new(state)
313    }
314
315    async fn response_body_string(response: axum::response::Response) -> String {
316        let bytes = response.into_body().collect().await.expect("body should collect").to_bytes();
317        String::from_utf8(bytes.to_vec()).expect("response body should be utf-8")
318    }
319
320    #[test]
321    fn test_run_get_success_with_history_attempts_lease() {
322        let mut reducer = ReplayReducer::new();
323
324        let task_id = TaskId::from_str("11111111-1111-1111-1111-111111111111").unwrap();
325        let run_id = RunId::from_str("22222222-2222-2222-2222-222222222222").unwrap();
326        let attempt_id = AttemptId::from_str("33333333-3333-3333-3333-333333333333").unwrap();
327        let attempt_id_2 = AttemptId::from_str("44444444-4444-4444-4444-444444444444").unwrap();
328
329        apply_event(
330            &mut reducer,
331            1,
332            WalEventType::TaskCreated { task_spec: task_spec(task_id), timestamp: 900 },
333        );
334        apply_event(
335            &mut reducer,
336            2,
337            WalEventType::RunCreated { run_instance: run_instance_scheduled(run_id, task_id) },
338        );
339        apply_event(
340            &mut reducer,
341            3,
342            WalEventType::RunStateChanged {
343                run_id,
344                previous_state: RunState::Scheduled,
345                new_state: RunState::Ready,
346                timestamp: 1001,
347            },
348        );
349        apply_event(
350            &mut reducer,
351            4,
352            WalEventType::LeaseAcquired {
353                run_id,
354                owner: "worker-1".to_string(),
355                expiry: 2000,
356                timestamp: 1100,
357            },
358        );
359        apply_event(
360            &mut reducer,
361            5,
362            WalEventType::RunStateChanged {
363                run_id,
364                previous_state: RunState::Ready,
365                new_state: RunState::Leased,
366                timestamp: 1101,
367            },
368        );
369        apply_event(
370            &mut reducer,
371            6,
372            WalEventType::RunStateChanged {
373                run_id,
374                previous_state: RunState::Leased,
375                new_state: RunState::Running,
376                timestamp: 1200,
377            },
378        );
379        apply_event(
380            &mut reducer,
381            7,
382            WalEventType::AttemptStarted { run_id, attempt_id, timestamp: 1201 },
383        );
384        apply_event(
385            &mut reducer,
386            8,
387            WalEventType::AttemptFinished {
388                run_id,
389                attempt_id,
390                result: AttemptResultKind::Failure,
391                error: Some("boom".to_string()),
392                output: None,
393                timestamp: 1300,
394            },
395        );
396        apply_event(
397            &mut reducer,
398            9,
399            WalEventType::AttemptStarted { run_id, attempt_id: attempt_id_2, timestamp: 1350 },
400        );
401        apply_event(
402            &mut reducer,
403            10,
404            WalEventType::LeaseHeartbeat {
405                run_id,
406                owner: "worker-1".to_string(),
407                expiry: 2100,
408                timestamp: 1400,
409            },
410        );
411
412        let run_instance = reducer.get_run_instance(&run_id).expect("run should exist");
413        let history = reducer.get_run_history(&run_id).expect("history should exist");
414        let attempts = reducer.get_attempt_history(&run_id).expect("attempts should exist");
415        let lease = reducer.get_lease_metadata(&run_id);
416
417        let response = RunGetResponse::from_record(run_instance, history, attempts, lease);
418
419        assert_eq!(response.run_id, run_id.to_string());
420        assert_eq!(response.task_id, task_id.to_string());
421        assert_eq!(response.state, RunState::Running);
422        assert_eq!(response.state_history.first().unwrap().from, None);
423        assert_eq!(response.state_history.first().unwrap().to, RunState::Scheduled);
424        assert_eq!(response.state_history.first().unwrap().timestamp, 1000);
425        assert_eq!(response.attempts.len(), 2);
426        assert_eq!(response.attempts[0].attempt_id, attempt_id.to_string());
427        assert_eq!(response.attempts[0].started_at, 1201);
428        assert_eq!(response.attempts[0].finished_at, Some(1300));
429        assert_eq!(response.attempts[0].result, Some(AttemptResultKind::Failure));
430        assert_eq!(response.attempts[0].error, Some("boom".to_string()));
431        assert_eq!(response.attempts[1].attempt_id, attempt_id_2.to_string());
432        assert_eq!(response.attempts[1].started_at, 1350);
433        assert_eq!(response.attempts[1].finished_at, None);
434        assert_eq!(response.attempts[1].result, None);
435        assert_eq!(response.attempts[1].error, None);
436        assert_eq!(response.lease.as_ref().unwrap().updated_at, 1400);
437        assert_eq!(response.block_reason, Some("running"));
438    }
439
440    #[test]
441    fn test_run_get_invalid_id_returns_400() {
442        let response = super::invalid_run_id_response("not-a-uuid").into_response();
443        assert_eq!(response.status(), StatusCode::BAD_REQUEST);
444
445        let response_json = serde_json::to_string(&super::InvalidRunIdResponse {
446            error: "invalid_run_id",
447            message: "invalid run id",
448            details: super::InvalidRunIdDetails { run_id: "not-a-uuid".to_string() },
449        })
450        .expect("should serialize response");
451
452        assert_eq!(
453            response_json,
454            r#"{"error":"invalid_run_id","message":"invalid run id","details":{"run_id":"not-a-uuid"}}"#
455        );
456    }
457
458    #[test]
459    fn test_run_get_nil_id_returns_400() {
460        let response =
461            super::invalid_run_id_response("00000000-0000-0000-0000-000000000000").into_response();
462        assert_eq!(response.status(), StatusCode::BAD_REQUEST);
463
464        let response_json = serde_json::to_string(&super::InvalidRunIdResponse {
465            error: "invalid_run_id",
466            message: "invalid run id",
467            details: super::InvalidRunIdDetails {
468                run_id: "00000000-0000-0000-0000-000000000000".to_string(),
469            },
470        })
471        .expect("should serialize response");
472
473        assert_eq!(
474            response_json,
475            r#"{"error":"invalid_run_id","message":"invalid run id","details":{"run_id":"00000000-0000-0000-0000-000000000000"}}"#
476        );
477    }
478
479    #[test]
480    fn test_run_get_not_found_returns_404() {
481        let response =
482            super::run_not_found_response("11111111-1111-1111-1111-111111111111").into_response();
483        assert_eq!(response.status(), StatusCode::NOT_FOUND);
484
485        let response_json = serde_json::to_string(&super::RunNotFoundResponse {
486            error: "run_not_found",
487            message: "run not found",
488            details: super::RunNotFoundDetails {
489                run_id: "11111111-1111-1111-1111-111111111111".to_string(),
490            },
491        })
492        .expect("should serialize response");
493
494        assert_eq!(
495            response_json,
496            r#"{"error":"run_not_found","message":"run not found","details":{"run_id":"11111111-1111-1111-1111-111111111111"}}"#
497        );
498    }
499
500    #[test]
501    fn test_block_reason_ready_is_null() {
502        let mut reducer = ReplayReducer::new();
503
504        let task_id = TaskId::from_str("aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa").unwrap();
505        let run_id = RunId::from_str("bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb").unwrap();
506
507        apply_event(
508            &mut reducer,
509            1,
510            WalEventType::TaskCreated { task_spec: task_spec(task_id), timestamp: 900 },
511        );
512        apply_event(
513            &mut reducer,
514            2,
515            WalEventType::RunCreated { run_instance: run_instance_scheduled(run_id, task_id) },
516        );
517        apply_event(
518            &mut reducer,
519            3,
520            WalEventType::RunStateChanged {
521                run_id,
522                previous_state: RunState::Scheduled,
523                new_state: RunState::Ready,
524                timestamp: 1001,
525            },
526        );
527
528        let run_instance = reducer.get_run_instance(&run_id).expect("run should exist");
529        let history = reducer.get_run_history(&run_id).expect("history should exist");
530        let attempts = reducer.get_attempt_history(&run_id).expect("attempts should exist");
531        let lease = reducer.get_lease_metadata(&run_id);
532
533        let response = RunGetResponse::from_record(run_instance, history, attempts, lease);
534        assert_eq!(response.block_reason, None);
535    }
536
537    #[tokio::test]
538    async fn test_run_get_handle_invalid_id_returns_400() {
539        let state = build_state(ReplayReducer::new());
540        let response =
541            super::handle(State(state), Path("not-a-uuid".to_string())).await.into_response();
542        assert_eq!(response.status(), StatusCode::BAD_REQUEST);
543        assert_eq!(
544            response_body_string(response).await,
545            r#"{"error":"invalid_run_id","message":"invalid run id","details":{"run_id":"not-a-uuid"}}"#
546        );
547    }
548
549    #[tokio::test]
550    async fn test_run_get_handle_nil_id_returns_400() {
551        let state = build_state(ReplayReducer::new());
552        let response =
553            super::handle(State(state), Path("00000000-0000-0000-0000-000000000000".to_string()))
554                .await
555                .into_response();
556        assert_eq!(response.status(), StatusCode::BAD_REQUEST);
557        assert_eq!(
558            response_body_string(response).await,
559            r#"{"error":"invalid_run_id","message":"invalid run id","details":{"run_id":"00000000-0000-0000-0000-000000000000"}}"#
560        );
561    }
562
563    #[tokio::test]
564    async fn test_run_get_handle_not_found_returns_404() {
565        let state = build_state(ReplayReducer::new());
566        let response =
567            super::handle(State(state), Path("11111111-1111-1111-1111-111111111111".to_string()))
568                .await
569                .into_response();
570        assert_eq!(response.status(), StatusCode::NOT_FOUND);
571        assert_eq!(
572            response_body_string(response).await,
573            r#"{"error":"run_not_found","message":"run not found","details":{"run_id":"11111111-1111-1111-1111-111111111111"}}"#
574        );
575    }
576}