Skip to main content

actionqueue_daemon/http/
task_get.rs

1//! Single task retrieval route module.
2//!
3//! This module provides the single task endpoint (`GET /api/v1/tasks/:task_id`)
4//! for the daemon. The endpoint returns a task's full specification (including
5//! payload and content_type) plus authoritative timestamps. The response is
6//! read-only, deterministic, and sourced strictly from the recovery projection.
7//!
8//! # Invariant boundaries
9//!
10//! The handler performs no IO, reads no mutation authority or WAL, and mutates
11//! no runtime state. It only reads from the authoritative projection state.
12
13use std::str::FromStr;
14
15use actionqueue_core::ids::TaskId;
16use axum::extract::Path;
17use axum::http::StatusCode;
18use axum::response::IntoResponse;
19use axum::Json;
20use serde::Serialize;
21
22/// Task get response payload.
23///
24/// This struct represents the stable schema for the task get endpoint response.
25#[derive(Debug, Clone, Serialize)]
26pub struct TaskGetResponse {
27    /// Task identifier as a stable string.
28    pub id: String,
29    /// Task payload bytes (serde `Vec<u8>` JSON array of numbers).
30    pub payload: Vec<u8>,
31    /// Optional payload content-type hint.
32    pub content_type: Option<String>,
33    /// Task run policy.
34    pub run_policy: actionqueue_core::task::run_policy::RunPolicy,
35    /// Task constraints.
36    pub constraints: actionqueue_core::task::constraints::TaskConstraints,
37    /// Task metadata including priority.
38    pub metadata: actionqueue_core::task::metadata::TaskMetadata,
39    /// Parent task identifier, if this task is a child in a workflow hierarchy.
40    #[serde(skip_serializing_if = "Option::is_none")]
41    pub parent_task_id: Option<String>,
42    /// Task creation timestamp derived from the WAL event.
43    pub created_at: u64,
44    /// Task update timestamp (null until update events exist).
45    pub updated_at: Option<u64>,
46}
47
48impl TaskGetResponse {
49    /// Builds a task get response from a projection record.
50    fn from_record(record: &actionqueue_storage::recovery::reducer::TaskRecord) -> Self {
51        let task_spec = record.task_spec();
52        Self {
53            id: task_spec.id().to_string(),
54            payload: task_spec.payload().to_vec(),
55            content_type: task_spec.content_type().map(str::to_string),
56            run_policy: task_spec.run_policy().clone(),
57            constraints: task_spec.constraints().clone(),
58            metadata: task_spec.metadata().clone(),
59            parent_task_id: task_spec.parent_task_id().map(|id| id.to_string()),
60            created_at: record.created_at(),
61            updated_at: record.updated_at(),
62        }
63    }
64}
65
66/// Invalid task ID error response.
67#[derive(Debug, Clone, Serialize)]
68struct InvalidTaskIdResponse {
69    error: &'static str,
70    message: &'static str,
71    details: InvalidTaskIdDetails,
72}
73
74#[derive(Debug, Clone, Serialize)]
75struct InvalidTaskIdDetails {
76    task_id: String,
77}
78
79/// Task not found error response.
80#[derive(Debug, Clone, Serialize)]
81struct TaskNotFoundResponse {
82    error: &'static str,
83    message: &'static str,
84    details: TaskNotFoundDetails,
85}
86
87#[derive(Debug, Clone, Serialize)]
88struct TaskNotFoundDetails {
89    task_id: String,
90}
91
92/// Task get handler.
93///
94/// This handler responds to `GET /api/v1/tasks/:task_id` with a deterministic,
95/// side-effect-free payload containing the task specification derived from
96/// authoritative projection state.
97#[tracing::instrument(skip(state))]
98pub async fn handle(
99    state: axum::extract::State<crate::http::RouterState>,
100    Path(task_id_str): Path<String>,
101) -> impl IntoResponse {
102    // Parse the task ID
103    let task_id = match TaskId::from_str(&task_id_str) {
104        Ok(id) => id,
105        Err(_) => {
106            return invalid_task_id_response(&task_id_str).into_response();
107        }
108    };
109
110    // Check for nil UUID
111    if task_id.is_nil() {
112        return invalid_task_id_response(&task_id_str).into_response();
113    }
114
115    // Lookup the task record in the projection
116    let projection = match super::read_projection(&state) {
117        Ok(guard) => guard,
118        Err(response) => return (*response).into_response(),
119    };
120    let record = projection.get_task_record(&task_id);
121
122    // Handle not found
123    let Some(record) = record else {
124        return task_not_found_response(&task_id_str).into_response();
125    };
126
127    // Build and return the response
128    let response = TaskGetResponse::from_record(record);
129    (StatusCode::OK, Json(response)).into_response()
130}
131
132/// Registers the task get route in the router builder.
133pub fn register_routes(
134    router: axum::Router<crate::http::RouterState>,
135) -> axum::Router<crate::http::RouterState> {
136    router.route("/api/v1/tasks/:task_id", axum::routing::get(handle))
137}
138
139/// Creates an invalid task ID error response.
140fn invalid_task_id_response(task_id: &str) -> impl IntoResponse {
141    let response = InvalidTaskIdResponse {
142        error: "invalid_task_id",
143        message: "invalid task id",
144        details: InvalidTaskIdDetails { task_id: task_id.to_string() },
145    };
146    (StatusCode::BAD_REQUEST, Json(response)).into_response()
147}
148
149/// Creates a task not found error response.
150fn task_not_found_response(task_id: &str) -> impl IntoResponse {
151    let response = TaskNotFoundResponse {
152        error: "task_not_found",
153        message: "task not found",
154        details: TaskNotFoundDetails { task_id: task_id.to_string() },
155    };
156    (StatusCode::NOT_FOUND, Json(response)).into_response()
157}
158
159#[cfg(test)]
160mod tests {
161    use std::str::FromStr;
162    use std::sync::Arc;
163
164    use actionqueue_core::task::constraints::TaskConstraints;
165    use actionqueue_core::task::metadata::TaskMetadata;
166    use actionqueue_core::task::run_policy::RunPolicy;
167    use actionqueue_core::task::task_spec::{TaskPayload, TaskSpec};
168    use actionqueue_storage::recovery::bootstrap::RecoveryObservations;
169    use actionqueue_storage::recovery::reducer::ReplayReducer;
170    use actionqueue_storage::wal::event::{WalEvent, WalEventType};
171    use actionqueue_storage::wal::WalAppendTelemetry;
172    use axum::extract::{Path, State};
173    use axum::http::StatusCode;
174    use axum::response::IntoResponse;
175    use http_body_util::BodyExt;
176
177    use super::TaskGetResponse;
178    use crate::time::clock::{MockClock, SharedDaemonClock};
179
180    fn task_spec_with_payload(task_id: actionqueue_core::ids::TaskId, payload: &[u8]) -> TaskSpec {
181        TaskSpec::new(
182            task_id,
183            TaskPayload::with_content_type(payload.to_vec(), "application/octet-stream"),
184            RunPolicy::Once,
185            TaskConstraints::default(),
186            TaskMetadata::default(),
187        )
188        .expect("task spec should be valid")
189    }
190
191    fn apply_task(
192        reducer: &mut ReplayReducer,
193        sequence: u64,
194        task_spec: TaskSpec,
195        created_at: u64,
196    ) {
197        let event =
198            WalEvent::new(sequence, WalEventType::TaskCreated { task_spec, timestamp: created_at });
199        reducer.apply(&event).expect("task created event should apply");
200    }
201
202    fn build_state(reducer: ReplayReducer) -> crate::http::RouterState {
203        let metrics = std::sync::Arc::new(
204            crate::metrics::registry::MetricsRegistry::new(None)
205                .expect("test metrics registry should initialize"),
206        );
207        let clock: SharedDaemonClock = Arc::new(MockClock::new(1_700_000_000));
208        let state = crate::http::RouterStateInner::new(
209            crate::bootstrap::RouterConfig { control_enabled: false, metrics_enabled: false },
210            std::sync::Arc::new(std::sync::RwLock::new(reducer)),
211            crate::http::RouterObservability {
212                metrics,
213                wal_append_telemetry: WalAppendTelemetry::new(),
214                clock,
215                recovery_observations: RecoveryObservations::zero(),
216            },
217            crate::bootstrap::ReadyStatus::ready(),
218        );
219        std::sync::Arc::new(state)
220    }
221
222    async fn response_body_string(response: axum::response::Response) -> String {
223        let bytes = response.into_body().collect().await.expect("body should collect").to_bytes();
224        String::from_utf8(bytes.to_vec()).expect("response body should be utf-8")
225    }
226
227    #[test]
228    fn test_task_get_response_from_record() {
229        let mut reducer = ReplayReducer::new();
230
231        let task_id =
232            actionqueue_core::ids::TaskId::from_str("00000000-0000-0000-0000-000000000001")
233                .unwrap();
234        let task_spec = task_spec_with_payload(task_id, b"test payload");
235        apply_task(&mut reducer, 1, task_spec, 100);
236
237        let record = reducer.get_task_record(&task_id).expect("task should exist");
238        let response = TaskGetResponse::from_record(record);
239
240        assert_eq!(response.id, "00000000-0000-0000-0000-000000000001");
241        assert_eq!(response.payload, b"test payload");
242        assert_eq!(response.content_type, Some("application/octet-stream".to_string()));
243        assert_eq!(response.created_at, 100);
244        assert!(response.updated_at.is_none());
245    }
246
247    #[test]
248    fn test_invalid_task_id_response_schema() {
249        let response = super::invalid_task_id_response("test-id").into_response();
250        assert_eq!(response.status(), StatusCode::BAD_REQUEST);
251
252        let response_json = serde_json::to_string(&super::InvalidTaskIdResponse {
253            error: "invalid_task_id",
254            message: "invalid task id",
255            details: super::InvalidTaskIdDetails { task_id: "test-id".to_string() },
256        })
257        .expect("should serialize response");
258
259        assert_eq!(
260            response_json,
261            r#"{"error":"invalid_task_id","message":"invalid task id","details":{"task_id":"test-id"}}"#
262        );
263    }
264
265    #[test]
266    fn test_task_not_found_response_schema() {
267        let response = super::task_not_found_response("test-id").into_response();
268        assert_eq!(response.status(), StatusCode::NOT_FOUND);
269
270        let response_json = serde_json::to_string(&super::TaskNotFoundResponse {
271            error: "task_not_found",
272            message: "task not found",
273            details: super::TaskNotFoundDetails { task_id: "test-id".to_string() },
274        })
275        .expect("should serialize response");
276
277        assert_eq!(
278            response_json,
279            r#"{"error":"task_not_found","message":"task not found","details":{"task_id":"test-id"}}"#
280        );
281    }
282
283    #[tokio::test]
284    async fn test_task_get_handle_invalid_id_returns_400() {
285        let state = build_state(ReplayReducer::new());
286        let response =
287            super::handle(State(state), Path("not-a-uuid".to_string())).await.into_response();
288        assert_eq!(response.status(), StatusCode::BAD_REQUEST);
289        assert_eq!(
290            response_body_string(response).await,
291            r#"{"error":"invalid_task_id","message":"invalid task id","details":{"task_id":"not-a-uuid"}}"#
292        );
293    }
294
295    #[tokio::test]
296    async fn test_task_get_handle_nil_id_returns_400() {
297        let state = build_state(ReplayReducer::new());
298        let response =
299            super::handle(State(state), Path("00000000-0000-0000-0000-000000000000".to_string()))
300                .await
301                .into_response();
302        assert_eq!(response.status(), StatusCode::BAD_REQUEST);
303        assert_eq!(
304            response_body_string(response).await,
305            r#"{"error":"invalid_task_id","message":"invalid task id","details":{"task_id":"00000000-0000-0000-0000-000000000000"}}"#
306        );
307    }
308
309    #[tokio::test]
310    async fn test_task_get_handle_not_found_returns_404() {
311        let state = build_state(ReplayReducer::new());
312        let response =
313            super::handle(State(state), Path("00000000-0000-0000-0000-000000000999".to_string()))
314                .await
315                .into_response();
316        assert_eq!(response.status(), StatusCode::NOT_FOUND);
317        assert_eq!(
318            response_body_string(response).await,
319            r#"{"error":"task_not_found","message":"task not found","details":{"task_id":"00000000-0000-0000-0000-000000000999"}}"#
320        );
321    }
322}