Skip to main content

actionqueue_daemon/http/control/
mod.rs

1//! Feature-gated control route registration.
2//!
3//! This module centralizes control-surface route registration behind the
4//! daemon `enable_control` feature flag.
5//!
6//! # Control behavior contract
7//!
8//! - When control is disabled, no control routes are registered.
9//! - When control is enabled, task cancel is implemented in P6-011:
10//!   - `POST /api/v1/tasks/:task_id/cancel`
11//! - When control is enabled, run cancel is implemented in P6-012:
12//!   - `POST /api/v1/runs/:run_id/cancel`
13//! - Engine pause is implemented in P6-013:
14//!   - `POST /api/v1/engine/pause`
15//! - Engine resume is implemented in P6-014:
16//!   - `POST /api/v1/engine/resume`
17//!
18//! This routing boundary is intentionally centralized so control feature gating
19//! remains deterministic and testable.
20
21mod engine_pause;
22mod engine_resume;
23mod run_cancel;
24mod task_cancel;
25
26/// Shared typed error response for control handlers.
27#[derive(Debug, Clone, serde::Serialize)]
28pub(crate) struct ErrorResponse {
29    pub error: &'static str,
30    pub message: String,
31}
32
33/// Returns a 500 Internal Server Error response with a typed error body.
34pub(crate) fn internal_error_response(message: &str) -> axum::response::Response {
35    use axum::http::StatusCode;
36    use axum::response::IntoResponse;
37    use axum::Json;
38
39    (
40        StatusCode::INTERNAL_SERVER_ERROR,
41        Json(ErrorResponse { error: "internal_error", message: message.to_string() }),
42    )
43        .into_response()
44}
45
46/// Returns a 500 Internal Server Error for sequence overflow.
47pub(crate) fn sequence_overflow_response() -> axum::response::Response {
48    internal_error_response("control sequence overflow")
49}
50
51/// Returns a 500 Internal Server Error for a mutation authority failure.
52///
53/// The error variant is classified into a human-readable category for the
54/// response message, avoiding leaking internal error details to callers.
55pub(crate) fn internal_authority_error(
56    error: actionqueue_storage::mutation::authority::MutationAuthorityError<
57        actionqueue_storage::recovery::reducer::ReplayReducerError,
58    >,
59) -> axum::response::Response {
60    let message = match error {
61        actionqueue_storage::mutation::authority::MutationAuthorityError::Validation(_) => {
62            "authority validation failed"
63        }
64        actionqueue_storage::mutation::authority::MutationAuthorityError::Append(_) => {
65            "authority append failed"
66        }
67        actionqueue_storage::mutation::authority::MutationAuthorityError::PartialDurability {
68            ..
69        } => "authority partial durability failed",
70        actionqueue_storage::mutation::authority::MutationAuthorityError::Apply { .. } => {
71            "authority apply failed"
72        }
73    };
74    internal_error_response(message)
75}
76
77/// Registers control routes according to the `control_enabled` feature flag.
78///
79/// When disabled, the router is returned unchanged and control paths remain
80/// unreachable (HTTP 404 by route absence). When enabled, all four control
81/// paths are registered as `POST` routes with fully implemented handlers.
82pub fn register_routes(
83    router: axum::Router<super::RouterState>,
84    control_enabled: bool,
85) -> axum::Router<super::RouterState> {
86    if !control_enabled {
87        return router;
88    }
89
90    router
91        .route("/api/v1/tasks/:task_id/cancel", axum::routing::post(task_cancel::handle))
92        .route("/api/v1/runs/:run_id/cancel", axum::routing::post(run_cancel::handle))
93        .route("/api/v1/engine/pause", axum::routing::post(engine_pause::handle))
94        .route("/api/v1/engine/resume", axum::routing::post(engine_resume::handle))
95}
96
97#[cfg(test)]
98mod tests {
99    use std::sync::Arc;
100
101    use actionqueue_storage::mutation::authority::StorageMutationAuthority;
102    use actionqueue_storage::recovery::bootstrap::RecoveryObservations;
103    use actionqueue_storage::recovery::reducer::ReplayReducer;
104    use actionqueue_storage::wal::fs_writer::WalFsWriter;
105    use actionqueue_storage::wal::{InstrumentedWalWriter, WalAppendTelemetry};
106    use axum::body::Body;
107    use axum::http::{Method, Request, StatusCode};
108    use http_body_util::BodyExt;
109    use tower::Service;
110
111    use super::*;
112    use crate::time::clock::{MockClock, SharedDaemonClock};
113
114    const CONTROL_PATHS: [&str; 4] = [
115        "/api/v1/tasks/00000000-0000-0000-0000-000000000123/cancel",
116        "/api/v1/runs/00000000-0000-0000-0000-000000000456/cancel",
117        "/api/v1/engine/pause",
118        "/api/v1/engine/resume",
119    ];
120
121    fn test_control_authority() -> super::super::ControlMutationAuthority {
122        let unique = format!(
123            "actionqueue-daemon-control-test-{}-{}.wal",
124            std::process::id(),
125            std::time::SystemTime::now()
126                .duration_since(std::time::UNIX_EPOCH)
127                .expect("clock should be after epoch")
128                .as_nanos()
129        );
130        let wal_path = std::env::temp_dir().join(unique);
131        let wal_writer = WalFsWriter::new(wal_path).expect("test wal writer should initialize");
132        let wal_writer = InstrumentedWalWriter::new(wal_writer, WalAppendTelemetry::new());
133        let authority = StorageMutationAuthority::new(wal_writer, ReplayReducer::new());
134        std::sync::Arc::new(std::sync::Mutex::new(authority))
135    }
136
137    fn test_metrics_registry(enabled: bool) -> Arc<crate::metrics::registry::MetricsRegistry> {
138        let metrics_bind =
139            if enabled { Some(std::net::SocketAddr::from(([127, 0, 0, 1], 9090))) } else { None };
140        Arc::new(
141            crate::metrics::registry::MetricsRegistry::new(metrics_bind)
142                .expect("test metrics registry should initialize"),
143        )
144    }
145
146    fn test_clock() -> SharedDaemonClock {
147        Arc::new(MockClock::new(1_700_000_000))
148    }
149
150    fn test_router(control_enabled: bool) -> axum::Router<()> {
151        let state = if control_enabled {
152            std::sync::Arc::new(super::super::RouterStateInner::with_control_authority(
153                crate::bootstrap::RouterConfig { control_enabled, metrics_enabled: false },
154                Arc::new(std::sync::RwLock::new(ReplayReducer::new())),
155                crate::http::RouterObservability {
156                    metrics: test_metrics_registry(false),
157                    wal_append_telemetry: WalAppendTelemetry::new(),
158                    clock: test_clock(),
159                    recovery_observations: RecoveryObservations::zero(),
160                },
161                test_control_authority(),
162                crate::bootstrap::ReadyStatus::ready(),
163            ))
164        } else {
165            std::sync::Arc::new(super::super::RouterStateInner::new(
166                crate::bootstrap::RouterConfig { control_enabled, metrics_enabled: false },
167                Arc::new(std::sync::RwLock::new(ReplayReducer::new())),
168                crate::http::RouterObservability {
169                    metrics: test_metrics_registry(false),
170                    wal_append_telemetry: WalAppendTelemetry::new(),
171                    clock: test_clock(),
172                    recovery_observations: RecoveryObservations::zero(),
173                },
174                crate::bootstrap::ReadyStatus::ready(),
175            ))
176        };
177
178        register_routes(axum::Router::new(), control_enabled).with_state(state).with_state(())
179    }
180
181    async fn send_post(router: &mut axum::Router<()>, path: &str) -> axum::response::Response {
182        let request = Request::builder()
183            .method(Method::POST)
184            .uri(path)
185            .body(Body::empty())
186            .expect("request should build");
187        let mut service = router.as_service::<Body>();
188        service.call(request).await.expect("request should succeed")
189    }
190
191    async fn response_body_string(response: axum::response::Response) -> String {
192        let bytes = response.into_body().collect().await.expect("body should collect").to_bytes();
193        String::from_utf8(bytes.to_vec()).expect("response body should be utf-8")
194    }
195
196    #[tokio::test]
197    async fn disabled_gating_does_not_register_control_routes() {
198        let mut router = test_router(false);
199
200        for path in CONTROL_PATHS {
201            let response = send_post(&mut router, path).await;
202            assert_eq!(response.status(), StatusCode::NOT_FOUND);
203        }
204    }
205
206    #[tokio::test]
207    async fn enabled_gating_routes_engine_resume_to_real_handler() {
208        let mut router = test_router(true);
209        let response = send_post(&mut router, CONTROL_PATHS[3]).await;
210        assert_eq!(response.status(), StatusCode::OK);
211        assert_eq!(response_body_string(response).await, r#"{"status":"already_resumed"}"#);
212    }
213
214    #[tokio::test]
215    async fn enabled_gating_routes_engine_pause_to_real_handler() {
216        let mut router = test_router(true);
217        let response = send_post(&mut router, CONTROL_PATHS[2]).await;
218        assert_eq!(response.status(), StatusCode::OK);
219        assert_eq!(response_body_string(response).await, r#"{"status":"paused"}"#);
220    }
221
222    #[tokio::test]
223    async fn enabled_gating_routes_task_cancel_to_real_handler() {
224        let mut router = test_router(true);
225        let response = send_post(&mut router, CONTROL_PATHS[0]).await;
226        assert_eq!(response.status(), StatusCode::NOT_FOUND);
227        assert_eq!(
228            response_body_string(response).await,
229            r#"{"error":"task_not_found","message":"task not found","details":{"task_id":"00000000-0000-0000-0000-000000000123"}}"#
230        );
231    }
232
233    #[tokio::test]
234    async fn enabled_gating_routes_run_cancel_to_real_handler() {
235        let mut router = test_router(true);
236        let response = send_post(&mut router, CONTROL_PATHS[1]).await;
237        assert_eq!(response.status(), StatusCode::NOT_FOUND);
238        assert_eq!(
239            response_body_string(response).await,
240            r#"{"error":"run_not_found","message":"run not found","details":{"run_id":"00000000-0000-0000-0000-000000000456"}}"#
241        );
242    }
243}