1use std::str::FromStr;
14
15use actionqueue_core::ids::TaskId;
16use axum::extract::Path;
17use axum::http::StatusCode;
18use axum::response::IntoResponse;
19use axum::Json;
20use serde::Serialize;
21
22#[derive(Debug, Clone, Serialize)]
26pub struct TaskGetResponse {
27 pub id: String,
29 pub payload: Vec<u8>,
31 pub content_type: Option<String>,
33 pub run_policy: actionqueue_core::task::run_policy::RunPolicy,
35 pub constraints: actionqueue_core::task::constraints::TaskConstraints,
37 pub metadata: actionqueue_core::task::metadata::TaskMetadata,
39 #[serde(skip_serializing_if = "Option::is_none")]
41 pub parent_task_id: Option<String>,
42 pub created_at: u64,
44 pub updated_at: Option<u64>,
46}
47
48impl TaskGetResponse {
49 fn from_record(record: &actionqueue_storage::recovery::reducer::TaskRecord) -> Self {
51 let task_spec = record.task_spec();
52 Self {
53 id: task_spec.id().to_string(),
54 payload: task_spec.payload().to_vec(),
55 content_type: task_spec.content_type().map(str::to_string),
56 run_policy: task_spec.run_policy().clone(),
57 constraints: task_spec.constraints().clone(),
58 metadata: task_spec.metadata().clone(),
59 parent_task_id: task_spec.parent_task_id().map(|id| id.to_string()),
60 created_at: record.created_at(),
61 updated_at: record.updated_at(),
62 }
63 }
64}
65
66#[derive(Debug, Clone, Serialize)]
68struct InvalidTaskIdResponse {
69 error: &'static str,
70 message: &'static str,
71 details: InvalidTaskIdDetails,
72}
73
74#[derive(Debug, Clone, Serialize)]
75struct InvalidTaskIdDetails {
76 task_id: String,
77}
78
79#[derive(Debug, Clone, Serialize)]
81struct TaskNotFoundResponse {
82 error: &'static str,
83 message: &'static str,
84 details: TaskNotFoundDetails,
85}
86
87#[derive(Debug, Clone, Serialize)]
88struct TaskNotFoundDetails {
89 task_id: String,
90}
91
92#[tracing::instrument(skip(state))]
98pub async fn handle(
99 state: axum::extract::State<crate::http::RouterState>,
100 Path(task_id_str): Path<String>,
101) -> impl IntoResponse {
102 let task_id = match TaskId::from_str(&task_id_str) {
104 Ok(id) => id,
105 Err(_) => {
106 return invalid_task_id_response(&task_id_str).into_response();
107 }
108 };
109
110 if task_id.is_nil() {
112 return invalid_task_id_response(&task_id_str).into_response();
113 }
114
115 let projection = match super::read_projection(&state) {
117 Ok(guard) => guard,
118 Err(response) => return (*response).into_response(),
119 };
120 let record = projection.get_task_record(&task_id);
121
122 let Some(record) = record else {
124 return task_not_found_response(&task_id_str).into_response();
125 };
126
127 let response = TaskGetResponse::from_record(record);
129 (StatusCode::OK, Json(response)).into_response()
130}
131
132pub fn register_routes(
134 router: axum::Router<crate::http::RouterState>,
135) -> axum::Router<crate::http::RouterState> {
136 router.route("/api/v1/tasks/:task_id", axum::routing::get(handle))
137}
138
139fn invalid_task_id_response(task_id: &str) -> impl IntoResponse {
141 let response = InvalidTaskIdResponse {
142 error: "invalid_task_id",
143 message: "invalid task id",
144 details: InvalidTaskIdDetails { task_id: task_id.to_string() },
145 };
146 (StatusCode::BAD_REQUEST, Json(response)).into_response()
147}
148
149fn task_not_found_response(task_id: &str) -> impl IntoResponse {
151 let response = TaskNotFoundResponse {
152 error: "task_not_found",
153 message: "task not found",
154 details: TaskNotFoundDetails { task_id: task_id.to_string() },
155 };
156 (StatusCode::NOT_FOUND, Json(response)).into_response()
157}
158
159#[cfg(test)]
160mod tests {
161 use std::str::FromStr;
162 use std::sync::Arc;
163
164 use actionqueue_core::task::constraints::TaskConstraints;
165 use actionqueue_core::task::metadata::TaskMetadata;
166 use actionqueue_core::task::run_policy::RunPolicy;
167 use actionqueue_core::task::task_spec::{TaskPayload, TaskSpec};
168 use actionqueue_storage::recovery::bootstrap::RecoveryObservations;
169 use actionqueue_storage::recovery::reducer::ReplayReducer;
170 use actionqueue_storage::wal::event::{WalEvent, WalEventType};
171 use actionqueue_storage::wal::WalAppendTelemetry;
172 use axum::extract::{Path, State};
173 use axum::http::StatusCode;
174 use axum::response::IntoResponse;
175 use http_body_util::BodyExt;
176
177 use super::TaskGetResponse;
178 use crate::time::clock::{MockClock, SharedDaemonClock};
179
180 fn task_spec_with_payload(task_id: actionqueue_core::ids::TaskId, payload: &[u8]) -> TaskSpec {
181 TaskSpec::new(
182 task_id,
183 TaskPayload::with_content_type(payload.to_vec(), "application/octet-stream"),
184 RunPolicy::Once,
185 TaskConstraints::default(),
186 TaskMetadata::default(),
187 )
188 .expect("task spec should be valid")
189 }
190
191 fn apply_task(
192 reducer: &mut ReplayReducer,
193 sequence: u64,
194 task_spec: TaskSpec,
195 created_at: u64,
196 ) {
197 let event =
198 WalEvent::new(sequence, WalEventType::TaskCreated { task_spec, timestamp: created_at });
199 reducer.apply(&event).expect("task created event should apply");
200 }
201
202 fn build_state(reducer: ReplayReducer) -> crate::http::RouterState {
203 let metrics = std::sync::Arc::new(
204 crate::metrics::registry::MetricsRegistry::new(None)
205 .expect("test metrics registry should initialize"),
206 );
207 let clock: SharedDaemonClock = Arc::new(MockClock::new(1_700_000_000));
208 let state = crate::http::RouterStateInner::new(
209 crate::bootstrap::RouterConfig { control_enabled: false, metrics_enabled: false },
210 std::sync::Arc::new(std::sync::RwLock::new(reducer)),
211 crate::http::RouterObservability {
212 metrics,
213 wal_append_telemetry: WalAppendTelemetry::new(),
214 clock,
215 recovery_observations: RecoveryObservations::zero(),
216 },
217 crate::bootstrap::ReadyStatus::ready(),
218 );
219 std::sync::Arc::new(state)
220 }
221
222 async fn response_body_string(response: axum::response::Response) -> String {
223 let bytes = response.into_body().collect().await.expect("body should collect").to_bytes();
224 String::from_utf8(bytes.to_vec()).expect("response body should be utf-8")
225 }
226
227 #[test]
228 fn test_task_get_response_from_record() {
229 let mut reducer = ReplayReducer::new();
230
231 let task_id =
232 actionqueue_core::ids::TaskId::from_str("00000000-0000-0000-0000-000000000001")
233 .unwrap();
234 let task_spec = task_spec_with_payload(task_id, b"test payload");
235 apply_task(&mut reducer, 1, task_spec, 100);
236
237 let record = reducer.get_task_record(&task_id).expect("task should exist");
238 let response = TaskGetResponse::from_record(record);
239
240 assert_eq!(response.id, "00000000-0000-0000-0000-000000000001");
241 assert_eq!(response.payload, b"test payload");
242 assert_eq!(response.content_type, Some("application/octet-stream".to_string()));
243 assert_eq!(response.created_at, 100);
244 assert!(response.updated_at.is_none());
245 }
246
247 #[test]
248 fn test_invalid_task_id_response_schema() {
249 let response = super::invalid_task_id_response("test-id").into_response();
250 assert_eq!(response.status(), StatusCode::BAD_REQUEST);
251
252 let response_json = serde_json::to_string(&super::InvalidTaskIdResponse {
253 error: "invalid_task_id",
254 message: "invalid task id",
255 details: super::InvalidTaskIdDetails { task_id: "test-id".to_string() },
256 })
257 .expect("should serialize response");
258
259 assert_eq!(
260 response_json,
261 r#"{"error":"invalid_task_id","message":"invalid task id","details":{"task_id":"test-id"}}"#
262 );
263 }
264
265 #[test]
266 fn test_task_not_found_response_schema() {
267 let response = super::task_not_found_response("test-id").into_response();
268 assert_eq!(response.status(), StatusCode::NOT_FOUND);
269
270 let response_json = serde_json::to_string(&super::TaskNotFoundResponse {
271 error: "task_not_found",
272 message: "task not found",
273 details: super::TaskNotFoundDetails { task_id: "test-id".to_string() },
274 })
275 .expect("should serialize response");
276
277 assert_eq!(
278 response_json,
279 r#"{"error":"task_not_found","message":"task not found","details":{"task_id":"test-id"}}"#
280 );
281 }
282
283 #[tokio::test]
284 async fn test_task_get_handle_invalid_id_returns_400() {
285 let state = build_state(ReplayReducer::new());
286 let response =
287 super::handle(State(state), Path("not-a-uuid".to_string())).await.into_response();
288 assert_eq!(response.status(), StatusCode::BAD_REQUEST);
289 assert_eq!(
290 response_body_string(response).await,
291 r#"{"error":"invalid_task_id","message":"invalid task id","details":{"task_id":"not-a-uuid"}}"#
292 );
293 }
294
295 #[tokio::test]
296 async fn test_task_get_handle_nil_id_returns_400() {
297 let state = build_state(ReplayReducer::new());
298 let response =
299 super::handle(State(state), Path("00000000-0000-0000-0000-000000000000".to_string()))
300 .await
301 .into_response();
302 assert_eq!(response.status(), StatusCode::BAD_REQUEST);
303 assert_eq!(
304 response_body_string(response).await,
305 r#"{"error":"invalid_task_id","message":"invalid task id","details":{"task_id":"00000000-0000-0000-0000-000000000000"}}"#
306 );
307 }
308
309 #[tokio::test]
310 async fn test_task_get_handle_not_found_returns_404() {
311 let state = build_state(ReplayReducer::new());
312 let response =
313 super::handle(State(state), Path("00000000-0000-0000-0000-000000000999".to_string()))
314 .await
315 .into_response();
316 assert_eq!(response.status(), StatusCode::NOT_FOUND);
317 assert_eq!(
318 response_body_string(response).await,
319 r#"{"error":"task_not_found","message":"task not found","details":{"task_id":"00000000-0000-0000-0000-000000000999"}}"#
320 );
321 }
322}