actionqueue_daemon/http/control/
mod.rs1mod engine_pause;
22mod engine_resume;
23mod run_cancel;
24mod task_cancel;
25
26#[derive(Debug, Clone, serde::Serialize)]
28pub(crate) struct ErrorResponse {
29 pub error: &'static str,
30 pub message: String,
31}
32
33pub(crate) fn internal_error_response(message: &str) -> axum::response::Response {
35 use axum::http::StatusCode;
36 use axum::response::IntoResponse;
37 use axum::Json;
38
39 (
40 StatusCode::INTERNAL_SERVER_ERROR,
41 Json(ErrorResponse { error: "internal_error", message: message.to_string() }),
42 )
43 .into_response()
44}
45
46pub(crate) fn sequence_overflow_response() -> axum::response::Response {
48 internal_error_response("control sequence overflow")
49}
50
51pub(crate) fn internal_authority_error(
56 error: actionqueue_storage::mutation::authority::MutationAuthorityError<
57 actionqueue_storage::recovery::reducer::ReplayReducerError,
58 >,
59) -> axum::response::Response {
60 let message = match error {
61 actionqueue_storage::mutation::authority::MutationAuthorityError::Validation(_) => {
62 "authority validation failed"
63 }
64 actionqueue_storage::mutation::authority::MutationAuthorityError::Append(_) => {
65 "authority append failed"
66 }
67 actionqueue_storage::mutation::authority::MutationAuthorityError::PartialDurability {
68 ..
69 } => "authority partial durability failed",
70 actionqueue_storage::mutation::authority::MutationAuthorityError::Apply { .. } => {
71 "authority apply failed"
72 }
73 };
74 internal_error_response(message)
75}
76
77pub fn register_routes(
83 router: axum::Router<super::RouterState>,
84 control_enabled: bool,
85) -> axum::Router<super::RouterState> {
86 if !control_enabled {
87 return router;
88 }
89
90 router
91 .route("/api/v1/tasks/:task_id/cancel", axum::routing::post(task_cancel::handle))
92 .route("/api/v1/runs/:run_id/cancel", axum::routing::post(run_cancel::handle))
93 .route("/api/v1/engine/pause", axum::routing::post(engine_pause::handle))
94 .route("/api/v1/engine/resume", axum::routing::post(engine_resume::handle))
95}
96
97#[cfg(test)]
98mod tests {
99 use std::sync::Arc;
100
101 use actionqueue_storage::mutation::authority::StorageMutationAuthority;
102 use actionqueue_storage::recovery::bootstrap::RecoveryObservations;
103 use actionqueue_storage::recovery::reducer::ReplayReducer;
104 use actionqueue_storage::wal::fs_writer::WalFsWriter;
105 use actionqueue_storage::wal::{InstrumentedWalWriter, WalAppendTelemetry};
106 use axum::body::Body;
107 use axum::http::{Method, Request, StatusCode};
108 use http_body_util::BodyExt;
109 use tower::Service;
110
111 use super::*;
112 use crate::time::clock::{MockClock, SharedDaemonClock};
113
114 const CONTROL_PATHS: [&str; 4] = [
115 "/api/v1/tasks/00000000-0000-0000-0000-000000000123/cancel",
116 "/api/v1/runs/00000000-0000-0000-0000-000000000456/cancel",
117 "/api/v1/engine/pause",
118 "/api/v1/engine/resume",
119 ];
120
121 fn test_control_authority() -> super::super::ControlMutationAuthority {
122 let unique = format!(
123 "actionqueue-daemon-control-test-{}-{}.wal",
124 std::process::id(),
125 std::time::SystemTime::now()
126 .duration_since(std::time::UNIX_EPOCH)
127 .expect("clock should be after epoch")
128 .as_nanos()
129 );
130 let wal_path = std::env::temp_dir().join(unique);
131 let wal_writer = WalFsWriter::new(wal_path).expect("test wal writer should initialize");
132 let wal_writer = InstrumentedWalWriter::new(wal_writer, WalAppendTelemetry::new());
133 let authority = StorageMutationAuthority::new(wal_writer, ReplayReducer::new());
134 std::sync::Arc::new(std::sync::Mutex::new(authority))
135 }
136
137 fn test_metrics_registry(enabled: bool) -> Arc<crate::metrics::registry::MetricsRegistry> {
138 let metrics_bind =
139 if enabled { Some(std::net::SocketAddr::from(([127, 0, 0, 1], 9090))) } else { None };
140 Arc::new(
141 crate::metrics::registry::MetricsRegistry::new(metrics_bind)
142 .expect("test metrics registry should initialize"),
143 )
144 }
145
146 fn test_clock() -> SharedDaemonClock {
147 Arc::new(MockClock::new(1_700_000_000))
148 }
149
150 fn test_router(control_enabled: bool) -> axum::Router<()> {
151 let state = if control_enabled {
152 std::sync::Arc::new(super::super::RouterStateInner::with_control_authority(
153 crate::bootstrap::RouterConfig { control_enabled, metrics_enabled: false },
154 Arc::new(std::sync::RwLock::new(ReplayReducer::new())),
155 crate::http::RouterObservability {
156 metrics: test_metrics_registry(false),
157 wal_append_telemetry: WalAppendTelemetry::new(),
158 clock: test_clock(),
159 recovery_observations: RecoveryObservations::zero(),
160 },
161 test_control_authority(),
162 crate::bootstrap::ReadyStatus::ready(),
163 ))
164 } else {
165 std::sync::Arc::new(super::super::RouterStateInner::new(
166 crate::bootstrap::RouterConfig { control_enabled, metrics_enabled: false },
167 Arc::new(std::sync::RwLock::new(ReplayReducer::new())),
168 crate::http::RouterObservability {
169 metrics: test_metrics_registry(false),
170 wal_append_telemetry: WalAppendTelemetry::new(),
171 clock: test_clock(),
172 recovery_observations: RecoveryObservations::zero(),
173 },
174 crate::bootstrap::ReadyStatus::ready(),
175 ))
176 };
177
178 register_routes(axum::Router::new(), control_enabled).with_state(state).with_state(())
179 }
180
181 async fn send_post(router: &mut axum::Router<()>, path: &str) -> axum::response::Response {
182 let request = Request::builder()
183 .method(Method::POST)
184 .uri(path)
185 .body(Body::empty())
186 .expect("request should build");
187 let mut service = router.as_service::<Body>();
188 service.call(request).await.expect("request should succeed")
189 }
190
191 async fn response_body_string(response: axum::response::Response) -> String {
192 let bytes = response.into_body().collect().await.expect("body should collect").to_bytes();
193 String::from_utf8(bytes.to_vec()).expect("response body should be utf-8")
194 }
195
196 #[tokio::test]
197 async fn disabled_gating_does_not_register_control_routes() {
198 let mut router = test_router(false);
199
200 for path in CONTROL_PATHS {
201 let response = send_post(&mut router, path).await;
202 assert_eq!(response.status(), StatusCode::NOT_FOUND);
203 }
204 }
205
206 #[tokio::test]
207 async fn enabled_gating_routes_engine_resume_to_real_handler() {
208 let mut router = test_router(true);
209 let response = send_post(&mut router, CONTROL_PATHS[3]).await;
210 assert_eq!(response.status(), StatusCode::OK);
211 assert_eq!(response_body_string(response).await, r#"{"status":"already_resumed"}"#);
212 }
213
214 #[tokio::test]
215 async fn enabled_gating_routes_engine_pause_to_real_handler() {
216 let mut router = test_router(true);
217 let response = send_post(&mut router, CONTROL_PATHS[2]).await;
218 assert_eq!(response.status(), StatusCode::OK);
219 assert_eq!(response_body_string(response).await, r#"{"status":"paused"}"#);
220 }
221
222 #[tokio::test]
223 async fn enabled_gating_routes_task_cancel_to_real_handler() {
224 let mut router = test_router(true);
225 let response = send_post(&mut router, CONTROL_PATHS[0]).await;
226 assert_eq!(response.status(), StatusCode::NOT_FOUND);
227 assert_eq!(
228 response_body_string(response).await,
229 r#"{"error":"task_not_found","message":"task not found","details":{"task_id":"00000000-0000-0000-0000-000000000123"}}"#
230 );
231 }
232
233 #[tokio::test]
234 async fn enabled_gating_routes_run_cancel_to_real_handler() {
235 let mut router = test_router(true);
236 let response = send_post(&mut router, CONTROL_PATHS[1]).await;
237 assert_eq!(response.status(), StatusCode::NOT_FOUND);
238 assert_eq!(
239 response_body_string(response).await,
240 r#"{"error":"run_not_found","message":"run not found","details":{"run_id":"00000000-0000-0000-0000-000000000456"}}"#
241 );
242 }
243}