1use crate::async_database::flatten_err;
2use crate::db::open_conn;
3use crate::state::InnerState;
4use anyhow::Result;
5use rusqlite::{Connection, params};
6use serde_json::Value;
7use std::path::Path;
8use tracing::{debug, error, info, warn};
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum ObservedStepScope {
13 Task,
15 Item,
17}
18
19pub fn observed_step_scope_from_payload(payload: &Value) -> Option<ObservedStepScope> {
21 match payload["step_scope"].as_str() {
22 Some("task") => Some(ObservedStepScope::Task),
23 Some("item") => Some(ObservedStepScope::Item),
24 _ => None,
25 }
26}
27
28pub fn observed_step_scope_label(scope: Option<ObservedStepScope>) -> &'static str {
30 match scope {
31 Some(ObservedStepScope::Task) => "task",
32 Some(ObservedStepScope::Item) => "item",
33 None => "unspecified",
34 }
35}
36
37pub trait EventSink: Send + Sync {
40 fn emit(&self, task_id: &str, task_item_id: Option<&str>, event_type: &str, payload: Value);
42}
43
44pub struct NoopSink;
46
47impl EventSink for NoopSink {
48 fn emit(
49 &self,
50 _task_id: &str,
51 _task_item_id: Option<&str>,
52 _event_type: &str,
53 _payload: Value,
54 ) {
55 }
56}
57
58pub struct TracingEventSink;
60
61impl TracingEventSink {
62 pub fn new() -> Self {
64 Self
65 }
66}
67
68impl Default for TracingEventSink {
69 fn default() -> Self {
70 Self::new()
71 }
72}
73
74impl EventSink for TracingEventSink {
75 fn emit(&self, task_id: &str, task_item_id: Option<&str>, event_type: &str, payload: Value) {
76 let payload_text = payload.to_string();
77 match event_type {
78 "task_failed" => error!(
79 task_id,
80 task_item_id,
81 event_type,
82 payload = %payload_text,
83 "workflow event"
84 ),
85 "step_timeout" | "auto_rollback_failed" => warn!(
86 task_id,
87 task_item_id,
88 event_type,
89 payload = %payload_text,
90 "workflow event"
91 ),
92 "step_started" | "step_finished" | "task_completed" | "task_paused" => info!(
93 task_id,
94 task_item_id,
95 event_type,
96 payload = %payload_text,
97 "workflow event"
98 ),
99 _ => debug!(
100 task_id,
101 task_item_id,
102 event_type,
103 payload = %payload_text,
104 "workflow event"
105 ),
106 }
107 }
108}
109
110pub async fn insert_event(
112 state: &InnerState,
113 task_id: &str,
114 task_item_id: Option<&str>,
115 event_type: &str,
116 payload: Value,
117) -> Result<()> {
118 state
119 .db_writer
120 .insert_event(
121 task_id,
122 task_item_id,
123 event_type,
124 &serde_json::to_string(&payload)?,
125 )
126 .await
127}
128
129#[derive(Debug)]
131pub struct StepEvent {
132 pub event_type: String,
134 pub step: Option<String>,
136 pub step_scope: Option<ObservedStepScope>,
138 pub task_item_id: Option<String>,
140 pub agent_id: Option<String>,
142 pub success: Option<bool>,
144 pub duration_ms: Option<u64>,
146 pub confidence: Option<f64>,
148 pub reason: Option<String>,
150 pub elapsed_secs: Option<u64>,
152 pub stdout_bytes: Option<u64>,
154 pub stderr_bytes: Option<u64>,
156 pub stdout_delta_bytes: Option<u64>,
158 pub stderr_delta_bytes: Option<u64>,
160 pub stagnant_heartbeats: Option<u32>,
162 pub pid: Option<u32>,
164 pub pid_alive: Option<bool>,
166 pub output_state: Option<String>,
168 pub created_at: String,
170}
171
172pub fn query_latest_step_log_paths(
175 db_path: &Path,
176 task_id: &str,
177) -> Result<Option<(String, String, String)>> {
178 let conn = open_conn(db_path)?;
179 query_latest_step_log_paths_with_conn(&conn, task_id)
180}
181
182fn query_latest_step_log_paths_with_conn(
183 conn: &Connection,
184 task_id: &str,
185) -> Result<Option<(String, String, String)>> {
186 let result: Option<(String,)> = conn
187 .query_row(
188 "SELECT payload_json FROM events
189 WHERE task_id = ?1 AND event_type IN ('step_spawned', 'step_started')
190 ORDER BY id DESC LIMIT 1",
191 params![task_id],
192 |row| Ok((row.get::<_, String>(0)?,)),
193 )
194 .ok();
195
196 match result {
197 Some((payload_json,)) => {
198 let v: Value = serde_json::from_str(&payload_json).unwrap_or_default();
199 let phase = v["phase"]
200 .as_str()
201 .or_else(|| v["step"].as_str())
202 .unwrap_or("")
203 .to_string();
204 let stdout = v["stdout_path"].as_str().unwrap_or("").to_string();
205 let stderr = v["stderr_path"].as_str().unwrap_or("").to_string();
206 if phase.is_empty() || stdout.is_empty() {
207 Ok(None)
208 } else {
209 Ok(Some((phase, stdout, stderr)))
210 }
211 }
212 None => Ok(None),
213 }
214}
215
216pub fn query_step_events(db_path: &Path, task_id: &str) -> Result<Vec<StepEvent>> {
218 let conn = open_conn(db_path)?;
219 query_step_events_with_conn(&conn, task_id)
220}
221
222fn query_step_events_with_conn(conn: &Connection, task_id: &str) -> Result<Vec<StepEvent>> {
223 let mut stmt = conn.prepare(
224 "SELECT event_type, payload_json, created_at, task_item_id, step, step_scope FROM events
225 WHERE task_id = ?1
226 AND event_type IN ('step_started', 'step_finished', 'step_skipped', 'step_heartbeat', 'step_spawned', 'step_timeout', 'cycle_started', 'sandbox_denied', 'sandbox_resource_exceeded', 'sandbox_network_blocked', 'daemon_pid_kill_blocked')
227 ORDER BY id ASC",
228 )?;
229 let rows = stmt.query_map(params![task_id], |row| {
230 let event_type: String = row.get(0)?;
231 let payload_json: String = row.get(1)?;
232 let created_at: String = row.get(2)?;
233 let task_item_id: Option<String> = row.get(3)?;
234 let col_step: Option<String> = row.get(4)?;
235 let col_step_scope: Option<String> = row.get(5)?;
236 Ok((
237 event_type,
238 payload_json,
239 created_at,
240 task_item_id,
241 col_step,
242 col_step_scope,
243 ))
244 })?;
245
246 let mut events = Vec::new();
247 for row in rows {
248 let (event_type, payload_json, created_at, task_item_id, col_step, col_step_scope) = row?;
249 let v: Value = serde_json::from_str(&payload_json).unwrap_or_default();
250
251 let step = col_step.or_else(|| {
253 v["step"]
254 .as_str()
255 .or_else(|| v["phase"].as_str())
256 .map(String::from)
257 });
258 let step_scope = if let Some(ref scope_str) = col_step_scope {
259 match scope_str.as_str() {
260 "task" => Some(ObservedStepScope::Task),
261 "item" => Some(ObservedStepScope::Item),
262 _ => None,
263 }
264 } else {
265 observed_step_scope_from_payload(&v)
266 };
267
268 events.push(StepEvent {
269 event_type,
270 step,
271 step_scope,
272 task_item_id,
273 agent_id: v["agent_id"].as_str().map(String::from),
274 success: v["success"].as_bool(),
275 duration_ms: v["duration_ms"].as_u64(),
276 confidence: v["confidence"].as_f64(),
277 reason: v["reason"].as_str().map(String::from),
278 elapsed_secs: v["elapsed_secs"].as_u64(),
279 stdout_bytes: v["stdout_bytes"].as_u64(),
280 stderr_bytes: v["stderr_bytes"].as_u64(),
281 stdout_delta_bytes: v["stdout_delta_bytes"].as_u64(),
282 stderr_delta_bytes: v["stderr_delta_bytes"].as_u64(),
283 stagnant_heartbeats: v["stagnant_heartbeats"].as_u64().map(|v| v as u32),
284 pid: v["pid"].as_u64().map(|p| p as u32),
285 pid_alive: v["pid_alive"].as_bool(),
286 output_state: v["output_state"].as_str().map(String::from),
287 created_at,
288 });
289 }
290 Ok(events)
291}
292
293pub async fn query_latest_step_log_paths_async(
295 state: &InnerState,
296 task_id: &str,
297) -> Result<Option<(String, String, String)>> {
298 let task_id = task_id.to_owned();
299 state
300 .async_database
301 .reader()
302 .call(move |conn| {
303 query_latest_step_log_paths_with_conn(conn, &task_id)
304 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
305 })
306 .await
307 .map_err(flatten_err)
308}
309
310pub async fn query_step_events_async(state: &InnerState, task_id: &str) -> Result<Vec<StepEvent>> {
312 let task_id = task_id.to_owned();
313 state
314 .async_database
315 .reader()
316 .call(move |conn| {
317 query_step_events_with_conn(conn, &task_id)
318 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
319 })
320 .await
321 .map_err(flatten_err)
322}
323
324#[cfg(test)]
325mod tests {
326 use super::*;
327
328 #[test]
329 fn noop_sink_does_not_panic() {
330 let sink = NoopSink;
331 sink.emit(
332 "task1",
333 Some("item1"),
334 "step_started",
335 serde_json::json!({}),
336 );
337 sink.emit(
338 "task1",
339 None,
340 "task_completed",
341 serde_json::json!({"status": "ok"}),
342 );
343 }
344
345 #[tokio::test]
346 async fn insert_event_and_query_roundtrip() {
347 let mut fixture = crate::test_utils::TestState::new();
348 let state = fixture.build();
349
350 insert_event(
352 &state,
353 "task1",
354 Some("item1"),
355 "step_started",
356 serde_json::json!({"step": "qa", "agent_id": "qa_agent"}),
357 )
358 .await
359 .expect("insert step_started event");
360
361 insert_event(
362 &state,
363 "task1",
364 Some("item1"),
365 "step_finished",
366 serde_json::json!({"step": "qa", "success": true, "duration_ms": 1500}),
367 )
368 .await
369 .expect("insert step_finished event");
370
371 insert_event(
372 &state,
373 "task1",
374 None,
375 "cycle_started",
376 serde_json::json!({"cycle": 1}),
377 )
378 .await
379 .expect("insert cycle_started event");
380
381 let events = query_step_events(&state.db_path, "task1").expect("query roundtrip events");
383 assert_eq!(events.len(), 3);
384
385 assert_eq!(events[0].event_type, "step_started");
386 assert_eq!(events[0].step.as_deref(), Some("qa"));
387 assert_eq!(events[0].task_item_id.as_deref(), Some("item1"));
388 assert_eq!(events[0].agent_id.as_deref(), Some("qa_agent"));
389
390 assert_eq!(events[1].event_type, "step_finished");
391 assert_eq!(events[1].success, Some(true));
392 assert_eq!(events[1].duration_ms, Some(1500));
393
394 assert_eq!(events[2].event_type, "cycle_started");
395 }
396
397 #[test]
398 fn query_step_events_empty_for_unknown_task() {
399 let mut fixture = crate::test_utils::TestState::new();
400 let state = fixture.build();
401
402 let events =
403 query_step_events(&state.db_path, "nonexistent_task").expect("query empty events");
404 assert!(events.is_empty());
405 }
406
407 #[test]
408 fn observed_step_scope_parses_known_values() {
409 assert_eq!(
410 observed_step_scope_from_payload(&serde_json::json!({"step_scope": "task"})),
411 Some(ObservedStepScope::Task)
412 );
413 assert_eq!(
414 observed_step_scope_from_payload(&serde_json::json!({"step_scope": "item"})),
415 Some(ObservedStepScope::Item)
416 );
417 assert_eq!(
418 observed_step_scope_from_payload(&serde_json::json!({})),
419 None
420 );
421 }
422
423 #[test]
424 fn observed_step_scope_label_returns_unspecified_for_none() {
425 assert_eq!(observed_step_scope_label(None), "unspecified");
426 assert_eq!(
427 observed_step_scope_label(Some(ObservedStepScope::Task)),
428 "task"
429 );
430 assert_eq!(
431 observed_step_scope_label(Some(ObservedStepScope::Item)),
432 "item"
433 );
434 }
435
436 #[test]
437 fn query_latest_step_log_paths_returns_none_when_empty() {
438 let mut fixture = crate::test_utils::TestState::new();
439 let state = fixture.build();
440
441 let result =
442 query_latest_step_log_paths(&state.db_path, "task1").expect("query latest log paths");
443 assert!(result.is_none());
444 }
445
446 #[tokio::test]
447 async fn query_latest_step_log_paths_returns_paths() {
448 let mut fixture = crate::test_utils::TestState::new();
449 let state = fixture.build();
450
451 insert_event(
452 &state,
453 "task1",
454 Some("item1"),
455 "step_spawned",
456 serde_json::json!({
457 "phase": "qa",
458 "stdout_path": "/tmp/stdout.log",
459 "stderr_path": "/tmp/stderr.log"
460 }),
461 )
462 .await
463 .expect("insert step_spawned event");
464
465 let result = query_latest_step_log_paths(&state.db_path, "task1")
466 .expect("query latest spawned log paths");
467 assert!(result.is_some());
468 let (phase, stdout, stderr) = result.expect("spawned log paths should exist");
469 assert_eq!(phase, "qa");
470 assert_eq!(stdout, "/tmp/stdout.log");
471 assert_eq!(stderr, "/tmp/stderr.log");
472 }
473
474 #[tokio::test]
475 async fn query_latest_step_log_paths_empty_phase_returns_none() {
476 let mut fixture = crate::test_utils::TestState::new();
477 let state = fixture.build();
478
479 insert_event(
480 &state,
481 "task1",
482 Some("item1"),
483 "step_started",
484 serde_json::json!({"stdout_path": "/tmp/out.log"}),
485 )
486 .await
487 .expect("insert step_started log event");
488
489 let result = query_latest_step_log_paths(&state.db_path, "task1")
490 .expect("query empty phase log paths");
491 assert!(result.is_none());
492 }
493
494 #[tokio::test]
495 async fn query_step_events_parses_step_scope_and_task_item_id() {
496 let mut fixture = crate::test_utils::TestState::new();
497 let state = fixture.build();
498
499 insert_event(
500 &state,
501 "task1",
502 Some("item1"),
503 "step_started",
504 serde_json::json!({"step": "qa", "step_scope": "item"}),
505 )
506 .await
507 .expect("insert scoped step_started event");
508
509 let events = query_step_events(&state.db_path, "task1").expect("query scoped events");
510 assert_eq!(events.len(), 1);
511 assert_eq!(events[0].step_scope, Some(ObservedStepScope::Item));
512 assert_eq!(events[0].task_item_id.as_deref(), Some("item1"));
513 }
514
515 #[test]
516 fn tracing_event_sink_does_not_panic_on_all_event_types() {
517 let sink = TracingEventSink::new();
518 sink.emit(
520 "t1",
521 None,
522 "task_failed",
523 serde_json::json!({"error": "boom"}),
524 );
525 sink.emit("t1", None, "step_timeout", serde_json::json!({"secs": 60}));
527 sink.emit("t1", None, "auto_rollback_failed", serde_json::json!({}));
528 sink.emit("t1", Some("i1"), "step_started", serde_json::json!({}));
530 sink.emit("t1", None, "step_finished", serde_json::json!({}));
531 sink.emit("t1", None, "task_completed", serde_json::json!({}));
532 sink.emit("t1", None, "task_paused", serde_json::json!({}));
533 sink.emit("t1", None, "step_heartbeat", serde_json::json!({}));
535 sink.emit("t1", None, "custom_event", serde_json::json!({}));
536 }
537
538 #[test]
539 fn tracing_event_sink_default_impl() {
540 let sink = TracingEventSink;
541 sink.emit("t1", None, "task_completed", serde_json::json!({}));
542 }
543
544 #[test]
545 fn observed_step_scope_from_payload_unknown_value() {
546 assert_eq!(
547 observed_step_scope_from_payload(&serde_json::json!({"step_scope": "unknown"})),
548 None
549 );
550 }
551
552 #[tokio::test]
553 async fn query_latest_step_log_paths_prefers_step_key_over_phase() {
554 let mut fixture = crate::test_utils::TestState::new();
555 let state = fixture.build();
556
557 insert_event(
558 &state,
559 "task1",
560 Some("item1"),
561 "step_started",
562 serde_json::json!({
563 "step": "implement",
564 "stdout_path": "/tmp/out.log",
565 "stderr_path": "/tmp/err.log"
566 }),
567 )
568 .await
569 .expect("insert step_started event");
570
571 let result = query_latest_step_log_paths(&state.db_path, "task1")
572 .expect("query log paths with step key");
573 assert!(result.is_some());
574 let (phase, stdout, _) = result.expect("log paths should exist");
575 assert_eq!(phase, "implement");
576 assert_eq!(stdout, "/tmp/out.log");
577 }
578
579 #[tokio::test]
580 async fn query_step_events_uses_promoted_column_scope() {
581 let mut fixture = crate::test_utils::TestState::new();
582 let state = fixture.build();
583
584 insert_event(
586 &state,
587 "task1",
588 Some("item1"),
589 "step_started",
590 serde_json::json!({"step": "qa", "step_scope": "task"}),
591 )
592 .await
593 .expect("insert step_started event");
594
595 let events = query_step_events(&state.db_path, "task1").expect("query events");
596 assert_eq!(events.len(), 1);
597 assert_eq!(events[0].step_scope, Some(ObservedStepScope::Task));
599 }
600
601 #[tokio::test]
602 async fn step_event_parses_all_optional_fields() {
603 let mut fixture = crate::test_utils::TestState::new();
604 let state = fixture.build();
605
606 insert_event(
607 &state,
608 "task1",
609 None,
610 "step_heartbeat",
611 serde_json::json!({
612 "step": "implement",
613 "step_scope": "task",
614 "elapsed_secs": 120,
615 "stdout_bytes": 4096,
616 "stderr_bytes": 256,
617 "stdout_delta_bytes": 0,
618 "stderr_delta_bytes": 4,
619 "stagnant_heartbeats": 3,
620 "pid": 12345,
621 "pid_alive": true,
622 "output_state": "low_output"
623 }),
624 )
625 .await
626 .expect("insert step_heartbeat event");
627
628 let events = query_step_events(&state.db_path, "task1").expect("query heartbeat events");
629 assert_eq!(events.len(), 1);
630 assert_eq!(events[0].step_scope, Some(ObservedStepScope::Task));
631 assert_eq!(events[0].elapsed_secs, Some(120));
632 assert_eq!(events[0].stdout_bytes, Some(4096));
633 assert_eq!(events[0].stderr_bytes, Some(256));
634 assert_eq!(events[0].stdout_delta_bytes, Some(0));
635 assert_eq!(events[0].stderr_delta_bytes, Some(4));
636 assert_eq!(events[0].stagnant_heartbeats, Some(3));
637 assert_eq!(events[0].pid, Some(12345));
638 assert_eq!(events[0].pid_alive, Some(true));
639 assert_eq!(events[0].output_state.as_deref(), Some("low_output"));
640 }
641}