use axum::{
extract::{Request, State},
http::{HeaderMap, StatusCode},
response::IntoResponse,
Json,
};
use base64::Engine as _;
use serde_json::json;
use std::{collections::HashMap, sync::Arc};
use tracing::warn;
use crate::daemon::state::AppState;
use aegis_orchestrator_core::application::stimulus::StimulusError;
use aegis_orchestrator_core::domain::stimulus::{Stimulus, StimulusSource};
use aegis_orchestrator_core::presentation::webhook_guard::WebhookHmacGuard;
#[derive(Debug, serde::Deserialize)]
pub(crate) struct IngestStimulusBody {
pub(crate) content: String,
pub(crate) idempotency_key: Option<String>,
}
fn axum_headers_to_map(headers: &HeaderMap) -> HashMap<String, String> {
headers
.iter()
.filter_map(|(k, v)| {
v.to_str()
.ok()
.map(|val| (k.as_str().to_string(), val.to_string()))
})
.collect()
}
fn stimulus_error_response(e: StimulusError) -> (StatusCode, axum::Json<serde_json::Value>) {
let status = match e.http_status() {
409 => StatusCode::CONFLICT,
422 => StatusCode::UNPROCESSABLE_ENTITY,
_ => StatusCode::INTERNAL_SERVER_ERROR,
};
let body = json!({
"error": e.error_code(),
"message": e.to_string(),
});
(status, Json(body))
}
fn workflow_execution_logs_location(workflow_execution_id: &str) -> String {
format!("/v1/workflows/executions/{workflow_execution_id}/logs")
}
pub(crate) async fn ingest_stimulus_handler(
State(state): State<Arc<AppState>>,
headers: HeaderMap,
Json(body): Json<IngestStimulusBody>,
) -> impl IntoResponse {
let stimulus_service = match &state.stimulus_service {
Some(svc) => svc.clone(),
None => {
warn!("StimulusService not configured; rejecting POST /v1/stimuli");
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(json!({ "error": "stimulus_service_unavailable", "message": "Stimulus service is not configured" })),
).into_response();
}
};
let stimulus = Stimulus::new(StimulusSource::HttpApi, body.content)
.with_headers(axum_headers_to_map(&headers));
let stimulus = if let Some(key) = body.idempotency_key {
stimulus.with_idempotency_key(key)
} else {
stimulus
};
match stimulus_service.ingest(stimulus).await {
Ok(resp) => {
let location = workflow_execution_logs_location(&resp.workflow_execution_id);
let mut response_headers = HeaderMap::new();
if let Ok(loc_val) = location.parse() {
response_headers.insert(axum::http::header::LOCATION, loc_val);
}
(
StatusCode::ACCEPTED,
response_headers,
Json(json!({
"stimulus_id": resp.stimulus_id.to_string(),
"workflow_execution_id": resp.workflow_execution_id,
})),
)
.into_response()
}
Err(e) => {
let (status, body) = stimulus_error_response(e);
(status, body).into_response()
}
}
}
pub(crate) async fn webhook_handler(
State(state): State<Arc<AppState>>,
req: Request,
) -> impl IntoResponse {
let headers = req.headers().clone();
let guard =
match WebhookHmacGuard::from_request(req, state.webhook_secret_provider.as_ref()).await {
Ok(g) => g,
Err(e) => return e.into_response(),
};
let source = guard.source.clone();
let stimulus_service = match &state.stimulus_service {
Some(svc) => svc.clone(),
None => {
warn!(source = %source, "StimulusService not configured; rejecting webhook");
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(json!({ "error": "stimulus_service_unavailable", "message": "Stimulus service is not configured" })),
).into_response();
}
};
let content = String::from_utf8(guard.body.to_vec())
.unwrap_or_else(|_| base64::engine::general_purpose::STANDARD.encode(&guard.body));
let stimulus = Stimulus::new(
StimulusSource::Webhook {
source_name: source.clone(),
},
content,
)
.with_headers(axum_headers_to_map(&headers));
match stimulus_service.ingest(stimulus).await {
Ok(resp) => {
let location = workflow_execution_logs_location(&resp.workflow_execution_id);
let mut response_headers = HeaderMap::new();
if let Ok(loc_val) = location.parse() {
response_headers.insert(axum::http::header::LOCATION, loc_val);
}
(
StatusCode::ACCEPTED,
response_headers,
Json(json!({
"stimulus_id": resp.stimulus_id.to_string(),
"workflow_execution_id": resp.workflow_execution_id,
})),
)
.into_response()
}
Err(e) => {
let (status, body) = stimulus_error_response(e);
(status, body).into_response()
}
}
}