Skip to main content

envoy/http/handlers/
audit.rs

1use axum::extract::{Path, Query, State};
2use axum::response::IntoResponse;
3use axum::Json;
4use serde::Deserialize;
5
6use crate::error::{EnvoyError, Result};
7use crate::event::{self, EventSeverity, EventType};
8use crate::http::state::SharedState;
9use crate::http::ws::broadcast_to_project;
10// ── Event handlers ──
11
12pub(crate) async fn ingest_hook_event(
13    State(state): State<SharedState>,
14    Json(req): Json<event::HookEventRequest>,
15) -> Result<impl IntoResponse> {
16    let severity = if req.exit_code == 2 {
17        EventSeverity::Blocking
18    } else if req.exit_code != 0 {
19        EventSeverity::Warning
20    } else {
21        EventSeverity::Info
22    };
23    let state_fb = state.clone();
24    let event = tokio::task::spawn_blocking(move || {
25        let engine = state_fb.engine.lock();
26        let event = state_fb.event_bus.ingest(
27            engine.graph(),
28            req.project.clone(),
29            EventType::HookResult,
30            severity,
31            format!("hook:{}", req.hook_name),
32            format!("Hook {} exited {}", req.hook_name, req.exit_code),
33            serde_json::json!({
34                "hook_name": req.hook_name,
35                "exit_code": req.exit_code,
36                "output_preview": req.output.chars().take(200).collect::<String>(),
37            }),
38        )?;
39        let _ = state_fb.audit_store.log_event_ingested(
40            engine.graph(),
41            &req.project,
42            &format!("hook:{}", req.hook_name),
43            EventType::HookResult,
44        );
45        Ok::<_, crate::error::EnvoyError>(event)
46    })
47    .await
48    .map_err(|_| EnvoyError::InvalidEntity("blocking task join error".into()))??;
49    broadcast_to_project(
50        &state,
51        &event.project,
52        "hook_event",
53        &serde_json::to_value(&event).unwrap_or_default(),
54    )
55    .await;
56    Ok((axum::http::StatusCode::CREATED, Json(event)))
57}
58
59pub(crate) async fn ingest_gate_event(
60    State(state): State<SharedState>,
61    Json(req): Json<event::GateEventRequest>,
62) -> Result<impl IntoResponse> {
63    let severity = if req.gates_passed < req.gates_total {
64        EventSeverity::Warning
65    } else {
66        EventSeverity::Info
67    };
68    let state_fb = state.clone();
69    let event = tokio::task::spawn_blocking(move || {
70        let engine = state_fb.engine.lock();
71        let event = state_fb.event_bus.ingest(
72            engine.graph(),
73            req.project.clone(),
74            EventType::GateResult,
75            severity,
76            "gate:quality".into(),
77            format!("{}/{} passed", req.gates_passed, req.gates_total),
78            serde_json::json!({
79                "gates_passed": req.gates_passed,
80                "gates_total": req.gates_total,
81                "failures": req.failures,
82            }),
83        )?;
84        let _ = state_fb.audit_store.log_event_ingested(
85            engine.graph(),
86            &req.project,
87            "gate:quality",
88            EventType::GateResult,
89        );
90        Ok::<_, crate::error::EnvoyError>(event)
91    })
92    .await
93    .map_err(|_| EnvoyError::InvalidEntity("blocking task join error".into()))??;
94    broadcast_to_project(
95        &state,
96        &event.project,
97        "gate_event",
98        &serde_json::to_value(&event).unwrap_or_default(),
99    )
100    .await;
101    Ok((axum::http::StatusCode::CREATED, Json(event)))
102}
103
104pub(crate) async fn ingest_ci_event(
105    State(state): State<SharedState>,
106    Json(req): Json<event::CiEventRequest>,
107) -> Result<impl IntoResponse> {
108    let severity = match req.conclusion.as_deref() {
109        Some("success") => EventSeverity::Info,
110        Some("failure") => EventSeverity::Blocking,
111        _ => EventSeverity::Info,
112    };
113    let state_fb = state.clone();
114    let event = tokio::task::spawn_blocking(move || {
115        let engine = state_fb.engine.lock();
116        let event = state_fb.event_bus.ingest(
117            engine.graph(),
118            req.project.clone(),
119            EventType::CiStatus,
120            severity,
121            "ci:github".into(),
122            format!(
123                "CI {}: {}",
124                req.run_id,
125                req.conclusion.as_deref().unwrap_or("in_progress")
126            ),
127            serde_json::json!({
128                "run_id": req.run_id,
129                "status": req.status,
130                "conclusion": req.conclusion,
131                "head_branch": req.head_branch,
132                "display_title": req.display_title,
133            }),
134        )?;
135        let _ = state_fb.audit_store.log_event_ingested(
136            engine.graph(),
137            &req.project,
138            "ci:github",
139            EventType::CiStatus,
140        );
141        Ok::<_, crate::error::EnvoyError>(event)
142    })
143    .await
144    .map_err(|_| EnvoyError::InvalidEntity("blocking task join error".into()))??;
145    broadcast_to_project(
146        &state,
147        &event.project,
148        "ci_event",
149        &serde_json::to_value(&event).unwrap_or_default(),
150    )
151    .await;
152    Ok((axum::http::StatusCode::CREATED, Json(event)))
153}
154
155pub(crate) async fn ingest_doc_event(
156    State(state): State<SharedState>,
157    Json(req): Json<event::DocEventRequest>,
158) -> Result<impl IntoResponse> {
159    let severity = if req.last_updated_seconds > 86400 {
160        EventSeverity::Warning
161    } else {
162        EventSeverity::Info
163    };
164    let state_fb = state.clone();
165    let event = tokio::task::spawn_blocking(move || {
166        let engine = state_fb.engine.lock();
167        let event = state_fb.event_bus.ingest(
168            engine.graph(),
169            req.project.clone(),
170            EventType::DocSync,
171            severity,
172            "doc:wiki".into(),
173            format!("Docs last updated {}s ago", req.last_updated_seconds),
174            serde_json::json!({
175                "doc_files": req.doc_files,
176                "last_updated_seconds": req.last_updated_seconds,
177            }),
178        )?;
179        let _ = state_fb.audit_store.log_event_ingested(
180            engine.graph(),
181            &req.project,
182            "doc:wiki",
183            EventType::DocSync,
184        );
185        Ok::<_, crate::error::EnvoyError>(event)
186    })
187    .await
188    .map_err(|_| EnvoyError::InvalidEntity("blocking task join error".into()))??;
189    broadcast_to_project(
190        &state,
191        &event.project,
192        "doc_event",
193        &serde_json::to_value(&event).unwrap_or_default(),
194    )
195    .await;
196    Ok((axum::http::StatusCode::CREATED, Json(event)))
197}
198
199pub(crate) async fn ingest_verify_event(
200    State(state): State<SharedState>,
201    Json(req): Json<event::VerifyEventRequest>,
202) -> Result<impl IntoResponse> {
203    let severity = if req.failed > 0 {
204        EventSeverity::Warning
205    } else {
206        EventSeverity::Info
207    };
208    let state_fb = state.clone();
209    let event = tokio::task::spawn_blocking(move || {
210        let engine = state_fb.engine.lock();
211        let event = state_fb.event_bus.ingest(
212            engine.graph(),
213            req.project.clone(),
214            EventType::TaskVerify,
215            severity,
216            format!("verify:{}", req.task_type),
217            format!(
218                "Deliverable verify: {}/{} passed for {}",
219                req.passed,
220                req.passed + req.failed,
221                req.task_type
222            ),
223            serde_json::json!({
224                "agent_id": req.agent_id,
225                "task_type": req.task_type,
226                "claimed_files": req.claimed_files,
227                "passed": req.passed,
228                "failed": req.failed,
229                "failures": req.failures,
230            }),
231        )?;
232        let _ = state_fb.audit_store.log_event_ingested(
233            engine.graph(),
234            &req.project,
235            &format!("verify:{}", req.task_type),
236            EventType::TaskVerify,
237        );
238        Ok::<_, crate::error::EnvoyError>(event)
239    })
240    .await
241    .map_err(|_| EnvoyError::InvalidEntity("blocking task join error".into()))??;
242    broadcast_to_project(
243        &state,
244        &event.project,
245        "verify_event",
246        &serde_json::to_value(&event).unwrap_or_default(),
247    )
248    .await;
249    Ok((axum::http::StatusCode::CREATED, Json(event)))
250}
251
252#[derive(Debug, Deserialize)]
253pub struct EventQueryParams {
254    project: String,
255    #[serde(default)]
256    since: Option<String>,
257    #[serde(default)]
258    limit: Option<i64>,
259}
260
261pub(crate) async fn query_events(
262    State(state): State<SharedState>,
263    Query(params): Query<EventQueryParams>,
264) -> Result<impl IntoResponse> {
265    let state_fb = state.clone();
266    let project = params.project.clone();
267    let since = params.since.clone();
268    let limit = params.limit.unwrap_or(50).min(100);
269    let events = tokio::task::spawn_blocking(move || {
270        let engine = state_fb.engine.lock();
271        state_fb
272            .event_bus
273            .query(engine.graph(), &project, since.as_deref(), Some(limit))
274    })
275    .await
276    .map_err(|_| EnvoyError::InvalidEntity("blocking task join error".into()))??;
277    Ok(Json(serde_json::json!({
278        "events": events,
279        "count": events.len(),
280    })))
281}
282
283#[derive(Debug, serde::Deserialize)]
284pub struct AuditQueryParams {
285    #[serde(default)]
286    agent_id: Option<String>,
287    #[serde(default)]
288    operation: Option<String>,
289    #[serde(default)]
290    task_id: Option<String>,
291    #[serde(default)]
292    since: Option<String>,
293    #[serde(default)]
294    limit: Option<i64>,
295}
296
297pub(crate) async fn query_audit(
298    State(state): State<SharedState>,
299    Query(params): Query<AuditQueryParams>,
300) -> Result<impl IntoResponse> {
301    let state_fb = state.clone();
302    let limit = params.limit.unwrap_or(50).min(100);
303    let events = tokio::task::spawn_blocking(move || {
304        let engine = state_fb.engine.lock();
305        state_fb.audit_store.query(
306            engine.graph(),
307            params.agent_id.as_deref(),
308            params.operation.as_deref(),
309            params.task_id.as_deref(),
310            params.since.as_deref(),
311            Some(limit),
312        )
313    })
314    .await
315    .map_err(|_| EnvoyError::InvalidEntity("blocking task join error".into()))??;
316    Ok(Json(serde_json::json!({
317        "events": events,
318        "count": events.len(),
319    })))
320}
321
322pub(crate) async fn query_task_audit(
323    State(state): State<SharedState>,
324    Path(task_id): Path<String>,
325) -> Result<impl IntoResponse> {
326    let state_fb = state.clone();
327    let events = tokio::task::spawn_blocking(move || {
328        let engine = state_fb.engine.lock();
329        state_fb
330            .audit_store
331            .query(engine.graph(), None, None, Some(&task_id), None, Some(50))
332    })
333    .await
334    .map_err(|_| EnvoyError::InvalidEntity("blocking task join error".into()))??;
335    Ok(Json(serde_json::json!({
336        "events": events,
337        "count": events.len(),
338    })))
339}