use std::sync::Arc;
use axum::extract::{Extension, Path, State};
use axum::response::sse::{Event, Sse};
use axum::response::IntoResponse;
use futures::StreamExt;
use uuid::Uuid;
use aegis_orchestrator_core::domain::agent::AgentId;
use aegis_orchestrator_core::domain::execution::ExecutionId;
use aegis_orchestrator_core::domain::iam::UserIdentity;
use aegis_orchestrator_core::presentation::keycloak_auth::ScopeGuard;
use crate::daemon::handlers::tenant_id_from_identity;
use crate::daemon::state::AppState;
pub(crate) use crate::daemon::handlers::DEFAULT_MAX_EXECUTION_LIST_LIMIT;
#[derive(serde::Deserialize)]
pub(crate) struct ListExecutionsQuery {
pub(crate) agent_id: Option<Uuid>,
pub(crate) workflow_name: Option<String>,
pub(crate) limit: Option<usize>,
}
pub(crate) async fn get_execution_handler(
State(state): State<Arc<AppState>>,
scope_guard: ScopeGuard,
identity: Option<Extension<UserIdentity>>,
Path(execution_id): Path<Uuid>,
) -> Result<
impl axum::response::IntoResponse,
(axum::http::StatusCode, axum::Json<serde_json::Value>),
> {
scope_guard.require("execution:read")?;
let tenant_id = tenant_id_from_identity(identity.as_ref().map(|identity| &identity.0));
match state
.execution_service
.get_execution_for_tenant(&tenant_id, ExecutionId(execution_id))
.await
{
Ok(exec) => Ok(axum::Json(serde_json::json!({
"id": exec.id.0,
"agent_id": exec.agent_id.0,
"status": format!("{:?}", exec.status),
}))),
Err(e) => Ok(axum::Json(serde_json::json!({"error": e.to_string()}))),
}
}
pub(crate) async fn cancel_execution_handler(
State(state): State<Arc<AppState>>,
scope_guard: ScopeGuard,
identity: Option<Extension<UserIdentity>>,
Path(execution_id): Path<Uuid>,
) -> Result<
impl axum::response::IntoResponse,
(axum::http::StatusCode, axum::Json<serde_json::Value>),
> {
scope_guard.require("execution:cancel")?;
let tenant_id = tenant_id_from_identity(identity.as_ref().map(|identity| &identity.0));
match state
.execution_service
.cancel_execution_for_tenant(&tenant_id, ExecutionId(execution_id))
.await
{
Ok(_) => Ok(axum::Json(serde_json::json!({"success": true}))),
Err(e) => Ok(axum::Json(serde_json::json!({"error": e.to_string()}))),
}
}
pub(crate) async fn stream_events_handler(
State(state): State<Arc<AppState>>,
scope_guard: ScopeGuard,
identity: Option<Extension<UserIdentity>>,
Path(execution_id): Path<Uuid>,
axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
) -> axum::response::Response {
if let Err(e) = scope_guard.require("execution:stream") {
return e.into_response();
}
let follow = params.get("follow").map(|v| v != "false").unwrap_or(true);
let verbose = params.get("verbose").map(|v| v == "true").unwrap_or(false);
let exec_id = aegis_orchestrator_core::domain::execution::ExecutionId(execution_id);
let _tenant_id = tenant_id_from_identity(identity.as_ref().map(|identity| &identity.0));
let activity_service = state.correlated_activity_stream_service.clone();
let stream = async_stream::stream! {
if follow {
let mut activity_stream = activity_service.stream_execution_activity(exec_id, verbose).await?;
while let Some(activity) = activity_stream.next().await {
let payload = serde_json::to_string(&activity?)?;
yield Ok::<_, anyhow::Error>(Event::default().data(payload));
}
} else {
for activity in activity_service.execution_history(exec_id, verbose).await? {
let payload = serde_json::to_string(&activity)?;
yield Ok::<_, anyhow::Error>(Event::default().data(payload));
}
}
};
Sse::new(stream)
.keep_alive(axum::response::sse::KeepAlive::default())
.into_response()
}
pub(crate) async fn delete_execution_handler(
State(state): State<Arc<AppState>>,
scope_guard: ScopeGuard,
identity: Option<Extension<UserIdentity>>,
Path(execution_id): Path<Uuid>,
) -> Result<
impl axum::response::IntoResponse,
(axum::http::StatusCode, axum::Json<serde_json::Value>),
> {
scope_guard.require("execution:remove")?;
let tenant_id = tenant_id_from_identity(identity.as_ref().map(|identity| &identity.0));
match state
.execution_service
.delete_execution_for_tenant(&tenant_id, ExecutionId(execution_id))
.await
{
Ok(_) => Ok(axum::Json(serde_json::json!({"success": true}))),
Err(e) => Ok(axum::Json(serde_json::json!({"error": e.to_string()}))),
}
}
pub(crate) async fn list_executions_handler(
State(state): State<Arc<AppState>>,
scope_guard: ScopeGuard,
identity: Option<Extension<UserIdentity>>,
axum::extract::Query(query): axum::extract::Query<ListExecutionsQuery>,
) -> Result<
impl axum::response::IntoResponse,
(axum::http::StatusCode, axum::Json<serde_json::Value>),
> {
scope_guard.require("execution:list")?;
let agent_id = query.agent_id.map(AgentId);
let max_limit = state
.config
.spec
.max_execution_list_limit
.unwrap_or(DEFAULT_MAX_EXECUTION_LIST_LIMIT);
let limit = query.limit.unwrap_or(20).min(max_limit);
let tenant_id = tenant_id_from_identity(identity.as_ref().map(|identity| &identity.0));
let workflow_id = if let Some(ref wf_name) = query.workflow_name {
match state
.workflow_repo
.find_by_name_visible(&tenant_id, wf_name)
.await
{
Ok(Some(wf)) => Some(wf.id),
Ok(None) => {
return Ok(axum::Json(
serde_json::json!({"error": format!("Workflow '{}' not found", wf_name)}),
));
}
Err(e) => {
return Ok(axum::Json(serde_json::json!({"error": e.to_string()})));
}
}
} else {
None
};
match state
.execution_service
.list_executions_for_tenant(&tenant_id, agent_id, workflow_id, limit)
.await
{
Ok(executions) => {
let json_executions: Vec<serde_json::Value> = executions
.into_iter()
.map(|exec| {
serde_json::json!({
"id": exec.id.0,
"agent_id": exec.agent_id.0,
"status": format!("{:?}", exec.status),
"started_at": exec.started_at,
"ended_at": exec.ended_at
})
})
.collect();
Ok(axum::Json(serde_json::json!(json_executions)))
}
Err(e) => Ok(axum::Json(serde_json::json!({"error": e.to_string()}))),
}
}