Skip to main content

alien_commands/server/
axum_handlers.rs

1use std::sync::Arc;
2
3use axum::{
4    extract::{Path, State},
5    http::StatusCode,
6    response::{IntoResponse, Response},
7    routing::{get, post, put},
8    Json, Router,
9};
10use tracing::error;
11
12use alien_error::AlienError;
13
14use crate::{
15    error::{Error, ErrorData},
16    server::CommandServer,
17    types::*,
18};
19
20/// Trait to extract CommandServer from any state type
21pub trait HasCommandServer {
22    fn command_server(&self) -> &Arc<CommandServer>;
23}
24
25impl HasCommandServer for Arc<CommandServer> {
26    fn command_server(&self) -> &Arc<CommandServer> {
27        self
28    }
29}
30
31/// Create an Axum router with all command endpoints using a generic state type
32pub fn create_axum_router<S>() -> Router<S>
33where
34    S: HasCommandServer + Clone + Send + Sync + 'static,
35{
36    Router::new()
37        .route("/commands", post(create_command::<S>))
38        .route(
39            "/commands/{command_id}/upload-complete",
40            post(upload_complete::<S>),
41        )
42        .route("/commands/{command_id}/response", put(submit_response::<S>))
43        .route("/commands/{command_id}", get(get_command_status::<S>))
44        .route(
45            "/commands/{command_id}/payload",
46            get(get_command_payload::<S>).put(store_command_payload::<S>),
47        )
48        .route("/commands/leases", post(acquire_leases::<S>))
49        .route(
50            "/commands/leases/{lease_id}/release",
51            post(release_lease::<S>),
52        )
53}
54
55/// Create a new command
56#[cfg_attr(feature = "openapi", utoipa::path(
57    post,
58    path = "/commands",
59    request_body = CreateCommandRequest,
60    responses(
61        (status = 200, description = "Command created successfully", body = CreateCommandResponse),
62        (status = 400, description = "Invalid command", body = ErrorResponse),
63        (status = 500, description = "Internal server error", body = ErrorResponse),
64    ),
65    operation_id = "create_command",
66    tag = "commands"
67))]
68async fn create_command<S>(
69    State(state): State<S>,
70    Json(request): Json<CreateCommandRequest>,
71) -> Result<Json<CreateCommandResponse>, ErrorResponse>
72where
73    S: HasCommandServer,
74{
75    let response = state.command_server().create_command(request).await?;
76    Ok(Json(response))
77}
78
79/// Mark upload as complete
80#[cfg_attr(feature = "openapi", utoipa::path(
81    post,
82    path = "/commands/{command_id}/upload-complete",
83    params(
84        ("command_id" = String, Path, description = "Command identifier")
85    ),
86    request_body = UploadCompleteRequest,
87    responses(
88        (status = 200, description = "Upload marked complete", body = UploadCompleteResponse),
89        (status = 400, description = "Invalid command or state", body = ErrorResponse),
90        (status = 404, description = "Command not found", body = ErrorResponse),
91        (status = 500, description = "Internal server error", body = ErrorResponse),
92    ),
93    operation_id = "upload_complete",
94    tag = "commands"
95))]
96async fn upload_complete<S>(
97    State(state): State<S>,
98    Path(command_id): Path<String>,
99    Json(upload_request): Json<UploadCompleteRequest>,
100) -> Result<Json<UploadCompleteResponse>, ErrorResponse>
101where
102    S: HasCommandServer,
103{
104    let response = state
105        .command_server()
106        .upload_complete(&command_id, upload_request)
107        .await?;
108    Ok(Json(response))
109}
110
111/// Get command status
112#[cfg_attr(feature = "openapi", utoipa::path(
113    get,
114    path = "/commands/{command_id}",
115    params(
116        ("command_id" = String, Path, description = "Command identifier")
117    ),
118    responses(
119        (status = 200, description = "Command status", body = CommandStatusResponse),
120        (status = 404, description = "Command not found", body = ErrorResponse),
121        (status = 500, description = "Internal server error", body = ErrorResponse),
122    ),
123    operation_id = "get_command_status",
124    tag = "commands"
125))]
126async fn get_command_status<S>(
127    State(state): State<S>,
128    Path(command_id): Path<String>,
129) -> Result<Json<CommandStatusResponse>, ErrorResponse>
130where
131    S: HasCommandServer,
132{
133    let response = state
134        .command_server()
135        .get_command_status(&command_id)
136        .await?;
137    Ok(Json(response))
138}
139
140/// Payload response containing params and response data from KV
141#[derive(Debug, serde::Serialize, serde::Deserialize)]
142#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
143#[serde(rename_all = "camelCase")]
144pub struct CommandPayloadResponse {
145    pub command_id: String,
146    #[serde(skip_serializing_if = "Option::is_none")]
147    pub params: Option<BodySpec>,
148    #[serde(skip_serializing_if = "Option::is_none")]
149    pub response: Option<CommandResponse>,
150}
151
152/// Get command payload (params and response) from KV
153///
154/// Returns the raw params and response data stored in the manager's KV store.
155/// Returns 404 if neither params nor response exist for this command.
156#[cfg_attr(feature = "openapi", utoipa::path(
157    get,
158    path = "/commands/{command_id}/payload",
159    params(
160        ("command_id" = String, Path, description = "Command identifier")
161    ),
162    responses(
163        (status = 200, description = "Command payload data", body = CommandPayloadResponse),
164        (status = 404, description = "Command payload not found", body = ErrorResponse),
165        (status = 500, description = "Internal server error", body = ErrorResponse),
166    ),
167    operation_id = "get_command_payload",
168    tag = "commands"
169))]
170async fn get_command_payload<S>(
171    State(state): State<S>,
172    Path(command_id): Path<String>,
173) -> Result<Json<CommandPayloadResponse>, ErrorResponse>
174where
175    S: HasCommandServer,
176{
177    let params = state.command_server().get_params(&command_id).await?;
178    let response = state.command_server().get_response(&command_id).await?;
179
180    // If neither params nor response exist, the command payload doesn't exist in this AM
181    if params.is_none() && response.is_none() {
182        return Err(AlienError::new(ErrorData::CommandNotFound {
183            command_id: command_id.clone(),
184        })
185        .into());
186    }
187
188    Ok(Json(CommandPayloadResponse {
189        command_id,
190        params,
191        response,
192    }))
193}
194
195/// Request to store payload data directly in KV by command_id.
196///
197/// This bypasses the normal command lifecycle (create → dispatch → respond)
198/// and writes params/response directly into KV. Used by the demo service
199/// to populate payload data for commands created outside the command flow.
200#[derive(Debug, serde::Serialize, serde::Deserialize)]
201#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
202#[serde(rename_all = "camelCase")]
203pub struct StorePayloadRequest {
204    #[serde(skip_serializing_if = "Option::is_none")]
205    pub params: Option<BodySpec>,
206    #[serde(skip_serializing_if = "Option::is_none")]
207    pub response: Option<CommandResponse>,
208}
209
210/// Store command payload data (params and/or response) directly into KV.
211///
212/// Bypasses the command registry — useful for populating demo data or
213/// migrating payload data. Does not validate command existence or state.
214#[cfg_attr(feature = "openapi", utoipa::path(
215    put,
216    path = "/commands/{command_id}/payload",
217    params(
218        ("command_id" = String, Path, description = "Command identifier")
219    ),
220    request_body = StorePayloadRequest,
221    responses(
222        (status = 200, description = "Payload stored successfully"),
223        (status = 400, description = "Invalid request", body = ErrorResponse),
224        (status = 500, description = "Internal server error", body = ErrorResponse),
225    ),
226    operation_id = "store_command_payload",
227    tag = "commands"
228))]
229async fn store_command_payload<S>(
230    State(state): State<S>,
231    Path(command_id): Path<String>,
232    Json(request): Json<StorePayloadRequest>,
233) -> Result<StatusCode, ErrorResponse>
234where
235    S: HasCommandServer,
236{
237    if let Some(params) = &request.params {
238        state
239            .command_server()
240            .store_params(&command_id, params)
241            .await?;
242    }
243
244    if let Some(response) = &request.response {
245        state
246            .command_server()
247            .store_response(&command_id, response)
248            .await?;
249    }
250
251    Ok(StatusCode::OK)
252}
253
254/// Submit response from deployment
255#[cfg_attr(feature = "openapi", utoipa::path(
256    put,
257    path = "/commands/{command_id}/response",
258    params(
259        ("command_id" = String, Path, description = "Command identifier")
260    ),
261    request_body = SubmitResponseRequest,
262    responses(
263        (status = 200, description = "Response submitted successfully"),
264        (status = 400, description = "Invalid command or state", body = ErrorResponse),
265        (status = 404, description = "Command not found", body = ErrorResponse),
266        (status = 500, description = "Internal server error", body = ErrorResponse),
267    ),
268    operation_id = "submit_response",
269    tag = "commands"
270))]
271async fn submit_response<S>(
272    State(state): State<S>,
273    Path(command_id): Path<String>,
274    Json(request): Json<SubmitResponseRequest>,
275) -> Result<StatusCode, ErrorResponse>
276where
277    S: HasCommandServer,
278{
279    state
280        .command_server()
281        .submit_command_response(&command_id, request.response)
282        .await?;
283    Ok(StatusCode::OK)
284}
285
286/// Acquire leases for polling deployments
287#[cfg_attr(feature = "openapi", utoipa::path(
288    post,
289    path = "/commands/leases",
290    request_body = LeaseRequest,
291    responses(
292        (status = 200, description = "Leases acquired", body = LeaseResponse),
293        (status = 500, description = "Internal server error", body = ErrorResponse),
294    ),
295    operation_id = "acquire_leases",
296    tag = "leases"
297))]
298async fn acquire_leases<S>(
299    State(state): State<S>,
300    Json(lease_request): Json<LeaseRequest>,
301) -> Result<Json<LeaseResponse>, ErrorResponse>
302where
303    S: HasCommandServer,
304{
305    let response = state
306        .command_server()
307        .acquire_lease(&lease_request.deployment_id, &lease_request)
308        .await?;
309    Ok(Json(response))
310}
311
312/// Release a lease
313#[cfg_attr(feature = "openapi", utoipa::path(
314    post,
315    path = "/commands/leases/{lease_id}/release",
316    params(
317        ("lease_id" = String, Path, description = "Lease identifier")
318    ),
319    responses(
320        (status = 200, description = "Lease released successfully"),
321        (status = 404, description = "Lease not found", body = ErrorResponse),
322        (status = 500, description = "Internal server error", body = ErrorResponse),
323    ),
324    operation_id = "release_lease",
325    tag = "leases"
326))]
327async fn release_lease<S>(
328    State(state): State<S>,
329    Path(lease_id): Path<String>,
330) -> Result<StatusCode, ErrorResponse>
331where
332    S: HasCommandServer,
333{
334    state
335        .command_server()
336        .release_lease_by_id(&lease_id)
337        .await?;
338    Ok(StatusCode::OK)
339}
340
341// Error handling
342
343/// Error response wrapper for API endpoints
344#[derive(Debug, serde::Serialize)]
345#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
346struct ErrorResponse {
347    pub code: String,
348    pub message: String,
349    #[serde(skip_serializing_if = "Option::is_none")]
350    pub details: Option<String>,
351}
352
353impl From<Error> for ErrorResponse {
354    fn from(error: Error) -> Self {
355        ErrorResponse {
356            code: error.code.clone(),
357            message: error.message.clone(),
358            details: None,
359        }
360    }
361}
362
363impl IntoResponse for ErrorResponse {
364    fn into_response(self) -> Response {
365        let status = match self.code.as_str() {
366            "INVALID_COMMAND" | "INVALID_STATE_TRANSITION" | "INVALID_ENVELOPE" => {
367                StatusCode::BAD_REQUEST
368            }
369            "COMMAND_NOT_FOUND" | "LEASE_NOT_FOUND" => StatusCode::NOT_FOUND,
370            "COMMAND_EXPIRED" => StatusCode::GONE,
371            "CONFLICT" => StatusCode::CONFLICT,
372            "OPERATION_NOT_SUPPORTED" => StatusCode::NOT_IMPLEMENTED,
373            "STORAGE_OPERATION_FAILED"
374            | "KV_OPERATION_FAILED"
375            | "TRANSPORT_DISPATCH_FAILED"
376            | "AGENT_ERROR"
377            | "COMMANDS_ERROR"
378            | "SERIALIZATION_FAILED"
379            | "HTTP_OPERATION_FAILED" => StatusCode::INTERNAL_SERVER_ERROR,
380            _ => StatusCode::INTERNAL_SERVER_ERROR,
381        };
382
383        let body = match serde_json::to_string(&self) {
384            Ok(json) => json,
385            Err(e) => {
386                error!("Failed to serialize error response: {}", e);
387                r#"{"code":"COMMANDS_ERROR","message":"Serialization error"}"#.to_string()
388            }
389        };
390
391        (status, body).into_response()
392    }
393}
394
395#[cfg(feature = "openapi")]
396mod openapi {
397    use super::*;
398    use utoipa::OpenApi;
399
400    #[derive(OpenApi)]
401    #[openapi(
402        paths(
403            create_command,
404            upload_complete,
405            get_command_status,
406            get_command_payload,
407            store_command_payload,
408            submit_response,
409            acquire_leases,
410            release_lease,
411        ),
412        components(
413            schemas(
414                CreateCommandRequest,
415                CreateCommandResponse,
416                UploadCompleteRequest,
417                UploadCompleteResponse,
418                CommandStatusResponse,
419                CommandPayloadResponse,
420                StorePayloadRequest,
421                SubmitResponseRequest,
422                CommandResponse,
423                LeaseRequest,
424                LeaseResponse,
425                ReleaseRequest,
426                ErrorResponse,
427                // Re-export common types
428                BodySpec,
429                CommandState,
430                StorageUpload,
431                ResponseHandling,
432                Envelope,
433                LeaseInfo,
434            )
435        ),
436        tags(
437            (name = "commands", description = "Command management"),
438            (name = "leases", description = "Lease management for polling deployments")
439        ),
440        info(
441            title = "Commands API",
442            description = "Alien Commands API",
443            version = "1.0.0"
444        ),
445    )]
446    pub struct ApiDoc;
447}
448
449#[cfg(feature = "openapi")]
450pub use openapi::ApiDoc;