1use axum::extract::{Path, Query, State};
2use axum::response::IntoResponse;
3use axum::Json;
4use serde::Deserialize;
5
6use crate::error::{EnvoyError, Result};
7use crate::event::{self, EventSeverity, EventType};
8use crate::http::state::SharedState;
9use crate::http::ws::broadcast_to_project;
10pub(crate) async fn ingest_hook_event(
13 State(state): State<SharedState>,
14 Json(req): Json<event::HookEventRequest>,
15) -> Result<impl IntoResponse> {
16 let severity = if req.exit_code == 2 {
17 EventSeverity::Blocking
18 } else if req.exit_code != 0 {
19 EventSeverity::Warning
20 } else {
21 EventSeverity::Info
22 };
23 let state_fb = state.clone();
24 let event = tokio::task::spawn_blocking(move || {
25 let engine = state_fb.engine.lock();
26 let event = state_fb.event_bus.ingest(
27 engine.graph(),
28 req.project.clone(),
29 EventType::HookResult,
30 severity,
31 format!("hook:{}", req.hook_name),
32 format!("Hook {} exited {}", req.hook_name, req.exit_code),
33 serde_json::json!({
34 "hook_name": req.hook_name,
35 "exit_code": req.exit_code,
36 "output_preview": req.output.chars().take(200).collect::<String>(),
37 }),
38 )?;
39 let _ = state_fb.audit_store.log_event_ingested(
40 engine.graph(),
41 &req.project,
42 &format!("hook:{}", req.hook_name),
43 EventType::HookResult,
44 );
45 Ok::<_, crate::error::EnvoyError>(event)
46 })
47 .await
48 .map_err(|_| EnvoyError::InvalidEntity("blocking task join error".into()))??;
49 broadcast_to_project(
50 &state,
51 &event.project,
52 "hook_event",
53 &serde_json::to_value(&event).unwrap_or_default(),
54 )
55 .await;
56 Ok((axum::http::StatusCode::CREATED, Json(event)))
57}
58
59pub(crate) async fn ingest_gate_event(
60 State(state): State<SharedState>,
61 Json(req): Json<event::GateEventRequest>,
62) -> Result<impl IntoResponse> {
63 let severity = if req.gates_passed < req.gates_total {
64 EventSeverity::Warning
65 } else {
66 EventSeverity::Info
67 };
68 let state_fb = state.clone();
69 let event = tokio::task::spawn_blocking(move || {
70 let engine = state_fb.engine.lock();
71 let event = state_fb.event_bus.ingest(
72 engine.graph(),
73 req.project.clone(),
74 EventType::GateResult,
75 severity,
76 "gate:quality".into(),
77 format!("{}/{} passed", req.gates_passed, req.gates_total),
78 serde_json::json!({
79 "gates_passed": req.gates_passed,
80 "gates_total": req.gates_total,
81 "failures": req.failures,
82 }),
83 )?;
84 let _ = state_fb.audit_store.log_event_ingested(
85 engine.graph(),
86 &req.project,
87 "gate:quality",
88 EventType::GateResult,
89 );
90 Ok::<_, crate::error::EnvoyError>(event)
91 })
92 .await
93 .map_err(|_| EnvoyError::InvalidEntity("blocking task join error".into()))??;
94 broadcast_to_project(
95 &state,
96 &event.project,
97 "gate_event",
98 &serde_json::to_value(&event).unwrap_or_default(),
99 )
100 .await;
101 Ok((axum::http::StatusCode::CREATED, Json(event)))
102}
103
104pub(crate) async fn ingest_ci_event(
105 State(state): State<SharedState>,
106 Json(req): Json<event::CiEventRequest>,
107) -> Result<impl IntoResponse> {
108 let severity = match req.conclusion.as_deref() {
109 Some("success") => EventSeverity::Info,
110 Some("failure") => EventSeverity::Blocking,
111 _ => EventSeverity::Info,
112 };
113 let state_fb = state.clone();
114 let event = tokio::task::spawn_blocking(move || {
115 let engine = state_fb.engine.lock();
116 let event = state_fb.event_bus.ingest(
117 engine.graph(),
118 req.project.clone(),
119 EventType::CiStatus,
120 severity,
121 "ci:github".into(),
122 format!(
123 "CI {}: {}",
124 req.run_id,
125 req.conclusion.as_deref().unwrap_or("in_progress")
126 ),
127 serde_json::json!({
128 "run_id": req.run_id,
129 "status": req.status,
130 "conclusion": req.conclusion,
131 "head_branch": req.head_branch,
132 "display_title": req.display_title,
133 }),
134 )?;
135 let _ = state_fb.audit_store.log_event_ingested(
136 engine.graph(),
137 &req.project,
138 "ci:github",
139 EventType::CiStatus,
140 );
141 Ok::<_, crate::error::EnvoyError>(event)
142 })
143 .await
144 .map_err(|_| EnvoyError::InvalidEntity("blocking task join error".into()))??;
145 broadcast_to_project(
146 &state,
147 &event.project,
148 "ci_event",
149 &serde_json::to_value(&event).unwrap_or_default(),
150 )
151 .await;
152 Ok((axum::http::StatusCode::CREATED, Json(event)))
153}
154
155pub(crate) async fn ingest_doc_event(
156 State(state): State<SharedState>,
157 Json(req): Json<event::DocEventRequest>,
158) -> Result<impl IntoResponse> {
159 let severity = if req.last_updated_seconds > 86400 {
160 EventSeverity::Warning
161 } else {
162 EventSeverity::Info
163 };
164 let state_fb = state.clone();
165 let event = tokio::task::spawn_blocking(move || {
166 let engine = state_fb.engine.lock();
167 let event = state_fb.event_bus.ingest(
168 engine.graph(),
169 req.project.clone(),
170 EventType::DocSync,
171 severity,
172 "doc:wiki".into(),
173 format!("Docs last updated {}s ago", req.last_updated_seconds),
174 serde_json::json!({
175 "doc_files": req.doc_files,
176 "last_updated_seconds": req.last_updated_seconds,
177 }),
178 )?;
179 let _ = state_fb.audit_store.log_event_ingested(
180 engine.graph(),
181 &req.project,
182 "doc:wiki",
183 EventType::DocSync,
184 );
185 Ok::<_, crate::error::EnvoyError>(event)
186 })
187 .await
188 .map_err(|_| EnvoyError::InvalidEntity("blocking task join error".into()))??;
189 broadcast_to_project(
190 &state,
191 &event.project,
192 "doc_event",
193 &serde_json::to_value(&event).unwrap_or_default(),
194 )
195 .await;
196 Ok((axum::http::StatusCode::CREATED, Json(event)))
197}
198
199pub(crate) async fn ingest_verify_event(
200 State(state): State<SharedState>,
201 Json(req): Json<event::VerifyEventRequest>,
202) -> Result<impl IntoResponse> {
203 let severity = if req.failed > 0 {
204 EventSeverity::Warning
205 } else {
206 EventSeverity::Info
207 };
208 let state_fb = state.clone();
209 let event = tokio::task::spawn_blocking(move || {
210 let engine = state_fb.engine.lock();
211 let event = state_fb.event_bus.ingest(
212 engine.graph(),
213 req.project.clone(),
214 EventType::TaskVerify,
215 severity,
216 format!("verify:{}", req.task_type),
217 format!(
218 "Deliverable verify: {}/{} passed for {}",
219 req.passed,
220 req.passed + req.failed,
221 req.task_type
222 ),
223 serde_json::json!({
224 "agent_id": req.agent_id,
225 "task_type": req.task_type,
226 "claimed_files": req.claimed_files,
227 "passed": req.passed,
228 "failed": req.failed,
229 "failures": req.failures,
230 }),
231 )?;
232 let _ = state_fb.audit_store.log_event_ingested(
233 engine.graph(),
234 &req.project,
235 &format!("verify:{}", req.task_type),
236 EventType::TaskVerify,
237 );
238 Ok::<_, crate::error::EnvoyError>(event)
239 })
240 .await
241 .map_err(|_| EnvoyError::InvalidEntity("blocking task join error".into()))??;
242 broadcast_to_project(
243 &state,
244 &event.project,
245 "verify_event",
246 &serde_json::to_value(&event).unwrap_or_default(),
247 )
248 .await;
249 Ok((axum::http::StatusCode::CREATED, Json(event)))
250}
251
252#[derive(Debug, Deserialize)]
253pub struct EventQueryParams {
254 project: String,
255 #[serde(default)]
256 since: Option<String>,
257 #[serde(default)]
258 limit: Option<i64>,
259}
260
261pub(crate) async fn query_events(
262 State(state): State<SharedState>,
263 Query(params): Query<EventQueryParams>,
264) -> Result<impl IntoResponse> {
265 let state_fb = state.clone();
266 let project = params.project.clone();
267 let since = params.since.clone();
268 let limit = params.limit.unwrap_or(50).min(100);
269 let events = tokio::task::spawn_blocking(move || {
270 let engine = state_fb.engine.lock();
271 state_fb
272 .event_bus
273 .query(engine.graph(), &project, since.as_deref(), Some(limit))
274 })
275 .await
276 .map_err(|_| EnvoyError::InvalidEntity("blocking task join error".into()))??;
277 Ok(Json(serde_json::json!({
278 "events": events,
279 "count": events.len(),
280 })))
281}
282
283#[derive(Debug, serde::Deserialize)]
284pub struct AuditQueryParams {
285 #[serde(default)]
286 agent_id: Option<String>,
287 #[serde(default)]
288 operation: Option<String>,
289 #[serde(default)]
290 task_id: Option<String>,
291 #[serde(default)]
292 since: Option<String>,
293 #[serde(default)]
294 limit: Option<i64>,
295}
296
297pub(crate) async fn query_audit(
298 State(state): State<SharedState>,
299 Query(params): Query<AuditQueryParams>,
300) -> Result<impl IntoResponse> {
301 let state_fb = state.clone();
302 let limit = params.limit.unwrap_or(50).min(100);
303 let events = tokio::task::spawn_blocking(move || {
304 let engine = state_fb.engine.lock();
305 state_fb.audit_store.query(
306 engine.graph(),
307 params.agent_id.as_deref(),
308 params.operation.as_deref(),
309 params.task_id.as_deref(),
310 params.since.as_deref(),
311 Some(limit),
312 )
313 })
314 .await
315 .map_err(|_| EnvoyError::InvalidEntity("blocking task join error".into()))??;
316 Ok(Json(serde_json::json!({
317 "events": events,
318 "count": events.len(),
319 })))
320}
321
322pub(crate) async fn query_task_audit(
323 State(state): State<SharedState>,
324 Path(task_id): Path<String>,
325) -> Result<impl IntoResponse> {
326 let state_fb = state.clone();
327 let events = tokio::task::spawn_blocking(move || {
328 let engine = state_fb.engine.lock();
329 state_fb
330 .audit_store
331 .query(engine.graph(), None, None, Some(&task_id), None, Some(50))
332 })
333 .await
334 .map_err(|_| EnvoyError::InvalidEntity("blocking task join error".into()))??;
335 Ok(Json(serde_json::json!({
336 "events": events,
337 "count": events.len(),
338 })))
339}