Skip to main content

agent_orchestrator/
events.rs

1use crate::async_database::flatten_err;
2use crate::db::open_conn;
3use crate::state::InnerState;
4use anyhow::Result;
5use rusqlite::{Connection, params};
6use serde_json::Value;
7use std::path::Path;
8use tracing::{debug, error, info, warn};
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11/// Step scope inferred from persisted event payloads.
12pub enum ObservedStepScope {
13    /// Event belongs to a task-scoped step.
14    Task,
15    /// Event belongs to an item-scoped step.
16    Item,
17}
18
19/// Extracts the observed step scope from an event payload.
20pub fn observed_step_scope_from_payload(payload: &Value) -> Option<ObservedStepScope> {
21    match payload["step_scope"].as_str() {
22        Some("task") => Some(ObservedStepScope::Task),
23        Some("item") => Some(ObservedStepScope::Item),
24        _ => None,
25    }
26}
27
28/// Returns the stable label used for a step scope in logs and APIs.
29pub fn observed_step_scope_label(scope: Option<ObservedStepScope>) -> &'static str {
30    match scope {
31        Some(ObservedStepScope::Task) => "task",
32        Some(ObservedStepScope::Item) => "item",
33        None => "unspecified",
34    }
35}
36
37/// Trait for emitting real-time events to listeners (UI, logging, etc.)
38/// Separate from `insert_event` which persists to DB.
39pub trait EventSink: Send + Sync {
40    /// Emits an in-memory event to real-time listeners.
41    fn emit(&self, task_id: &str, task_item_id: Option<&str>, event_type: &str, payload: Value);
42}
43
44/// No-op implementation for CLI mode - events are persisted to DB but not pushed to any UI.
45pub struct NoopSink;
46
47impl EventSink for NoopSink {
48    fn emit(
49        &self,
50        _task_id: &str,
51        _task_item_id: Option<&str>,
52        _event_type: &str,
53        _payload: Value,
54    ) {
55    }
56}
57
58/// Event sink that forwards workflow events into structured tracing logs.
59pub struct TracingEventSink;
60
61impl TracingEventSink {
62    /// Creates a tracing-backed event sink.
63    pub fn new() -> Self {
64        Self
65    }
66}
67
68impl Default for TracingEventSink {
69    fn default() -> Self {
70        Self::new()
71    }
72}
73
74impl EventSink for TracingEventSink {
75    fn emit(&self, task_id: &str, task_item_id: Option<&str>, event_type: &str, payload: Value) {
76        let payload_text = payload.to_string();
77        match event_type {
78            "task_failed" => error!(
79                task_id,
80                task_item_id,
81                event_type,
82                payload = %payload_text,
83                "workflow event"
84            ),
85            "step_timeout" | "auto_rollback_failed" => warn!(
86                task_id,
87                task_item_id,
88                event_type,
89                payload = %payload_text,
90                "workflow event"
91            ),
92            "step_started" | "step_finished" | "task_completed" | "task_paused" => info!(
93                task_id,
94                task_item_id,
95                event_type,
96                payload = %payload_text,
97                "workflow event"
98            ),
99            _ => debug!(
100                task_id,
101                task_item_id,
102                event_type,
103                payload = %payload_text,
104                "workflow event"
105            ),
106        }
107    }
108}
109
110/// Persists one workflow event using the shared async database writer.
111pub async fn insert_event(
112    state: &InnerState,
113    task_id: &str,
114    task_item_id: Option<&str>,
115    event_type: &str,
116    payload: Value,
117) -> Result<()> {
118    state
119        .db_writer
120        .insert_event(
121            task_id,
122            task_item_id,
123            event_type,
124            &serde_json::to_string(&payload)?,
125        )
126        .await
127}
128
129/// Parsed step event from the events table for display in watch/follow.
130#[derive(Debug)]
131pub struct StepEvent {
132    /// Event type label.
133    pub event_type: String,
134    /// Step identifier or phase name associated with the event.
135    pub step: Option<String>,
136    /// Scope inferred from promoted columns or payload JSON.
137    pub step_scope: Option<ObservedStepScope>,
138    /// Task-item identifier for item-scoped events.
139    pub task_item_id: Option<String>,
140    /// Agent identifier when an agent executed the step.
141    pub agent_id: Option<String>,
142    /// Success flag captured from the payload.
143    pub success: Option<bool>,
144    /// Step duration in milliseconds.
145    pub duration_ms: Option<u64>,
146    /// Confidence score reported by the agent.
147    pub confidence: Option<f64>,
148    /// Human-readable reason or message from the payload.
149    pub reason: Option<String>,
150    /// Elapsed seconds reported by heartbeat events.
151    pub elapsed_secs: Option<u64>,
152    /// Total stdout bytes written so far.
153    pub stdout_bytes: Option<u64>,
154    /// Total stderr bytes written so far.
155    pub stderr_bytes: Option<u64>,
156    /// Stdout growth since the previous heartbeat.
157    pub stdout_delta_bytes: Option<u64>,
158    /// Stderr growth since the previous heartbeat.
159    pub stderr_delta_bytes: Option<u64>,
160    /// Number of consecutive stagnant heartbeat samples.
161    pub stagnant_heartbeats: Option<u32>,
162    /// Child process identifier when tracked.
163    pub pid: Option<u32>,
164    /// Whether the child process was still alive at sample time.
165    pub pid_alive: Option<bool>,
166    /// Output-state classification attached to the event.
167    pub output_state: Option<String>,
168    /// Timestamp when the event row was created.
169    pub created_at: String,
170}
171
172/// Query the latest step's log file paths for real-time tailing.
173/// Returns (phase, stdout_path, stderr_path) from the most recent step_spawned event.
174pub fn query_latest_step_log_paths(
175    db_path: &Path,
176    task_id: &str,
177) -> Result<Option<(String, String, String)>> {
178    let conn = open_conn(db_path)?;
179    query_latest_step_log_paths_with_conn(&conn, task_id)
180}
181
182fn query_latest_step_log_paths_with_conn(
183    conn: &Connection,
184    task_id: &str,
185) -> Result<Option<(String, String, String)>> {
186    let result: Option<(String,)> = conn
187        .query_row(
188            "SELECT payload_json FROM events
189             WHERE task_id = ?1 AND event_type IN ('step_spawned', 'step_started')
190             ORDER BY id DESC LIMIT 1",
191            params![task_id],
192            |row| Ok((row.get::<_, String>(0)?,)),
193        )
194        .ok();
195
196    match result {
197        Some((payload_json,)) => {
198            let v: Value = serde_json::from_str(&payload_json).unwrap_or_default();
199            let phase = v["phase"]
200                .as_str()
201                .or_else(|| v["step"].as_str())
202                .unwrap_or("")
203                .to_string();
204            let stdout = v["stdout_path"].as_str().unwrap_or("").to_string();
205            let stderr = v["stderr_path"].as_str().unwrap_or("").to_string();
206            if phase.is_empty() || stdout.is_empty() {
207                Ok(None)
208            } else {
209                Ok(Some((phase, stdout, stderr)))
210            }
211        }
212        None => Ok(None),
213    }
214}
215
216/// Query all step-related events for a task, parsed into StepEvent structs.
217pub fn query_step_events(db_path: &Path, task_id: &str) -> Result<Vec<StepEvent>> {
218    let conn = open_conn(db_path)?;
219    query_step_events_with_conn(&conn, task_id)
220}
221
222fn query_step_events_with_conn(conn: &Connection, task_id: &str) -> Result<Vec<StepEvent>> {
223    let mut stmt = conn.prepare(
224        "SELECT event_type, payload_json, created_at, task_item_id, step, step_scope FROM events
225         WHERE task_id = ?1
226           AND event_type IN ('step_started', 'step_finished', 'step_skipped', 'step_heartbeat', 'step_spawned', 'step_timeout', 'cycle_started', 'sandbox_denied', 'sandbox_resource_exceeded', 'sandbox_network_blocked', 'daemon_pid_kill_blocked')
227         ORDER BY id ASC",
228    )?;
229    let rows = stmt.query_map(params![task_id], |row| {
230        let event_type: String = row.get(0)?;
231        let payload_json: String = row.get(1)?;
232        let created_at: String = row.get(2)?;
233        let task_item_id: Option<String> = row.get(3)?;
234        let col_step: Option<String> = row.get(4)?;
235        let col_step_scope: Option<String> = row.get(5)?;
236        Ok((
237            event_type,
238            payload_json,
239            created_at,
240            task_item_id,
241            col_step,
242            col_step_scope,
243        ))
244    })?;
245
246    let mut events = Vec::new();
247    for row in rows {
248        let (event_type, payload_json, created_at, task_item_id, col_step, col_step_scope) = row?;
249        let v: Value = serde_json::from_str(&payload_json).unwrap_or_default();
250
251        // Use promoted column values first, fall back to JSON parsing
252        let step = col_step.or_else(|| {
253            v["step"]
254                .as_str()
255                .or_else(|| v["phase"].as_str())
256                .map(String::from)
257        });
258        let step_scope = if let Some(ref scope_str) = col_step_scope {
259            match scope_str.as_str() {
260                "task" => Some(ObservedStepScope::Task),
261                "item" => Some(ObservedStepScope::Item),
262                _ => None,
263            }
264        } else {
265            observed_step_scope_from_payload(&v)
266        };
267
268        events.push(StepEvent {
269            event_type,
270            step,
271            step_scope,
272            task_item_id,
273            agent_id: v["agent_id"].as_str().map(String::from),
274            success: v["success"].as_bool(),
275            duration_ms: v["duration_ms"].as_u64(),
276            confidence: v["confidence"].as_f64(),
277            reason: v["reason"].as_str().map(String::from),
278            elapsed_secs: v["elapsed_secs"].as_u64(),
279            stdout_bytes: v["stdout_bytes"].as_u64(),
280            stderr_bytes: v["stderr_bytes"].as_u64(),
281            stdout_delta_bytes: v["stdout_delta_bytes"].as_u64(),
282            stderr_delta_bytes: v["stderr_delta_bytes"].as_u64(),
283            stagnant_heartbeats: v["stagnant_heartbeats"].as_u64().map(|v| v as u32),
284            pid: v["pid"].as_u64().map(|p| p as u32),
285            pid_alive: v["pid_alive"].as_bool(),
286            output_state: v["output_state"].as_str().map(String::from),
287            created_at,
288        });
289    }
290    Ok(events)
291}
292
293/// Async variant of [`query_latest_step_log_paths`] backed by the shared async reader.
294pub async fn query_latest_step_log_paths_async(
295    state: &InnerState,
296    task_id: &str,
297) -> Result<Option<(String, String, String)>> {
298    let task_id = task_id.to_owned();
299    state
300        .async_database
301        .reader()
302        .call(move |conn| {
303            query_latest_step_log_paths_with_conn(conn, &task_id)
304                .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
305        })
306        .await
307        .map_err(flatten_err)
308}
309
310/// Async variant of [`query_step_events`] backed by the shared async reader.
311pub async fn query_step_events_async(state: &InnerState, task_id: &str) -> Result<Vec<StepEvent>> {
312    let task_id = task_id.to_owned();
313    state
314        .async_database
315        .reader()
316        .call(move |conn| {
317            query_step_events_with_conn(conn, &task_id)
318                .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
319        })
320        .await
321        .map_err(flatten_err)
322}
323
324#[cfg(test)]
325mod tests {
326    use super::*;
327
328    #[test]
329    fn noop_sink_does_not_panic() {
330        let sink = NoopSink;
331        sink.emit(
332            "task1",
333            Some("item1"),
334            "step_started",
335            serde_json::json!({}),
336        );
337        sink.emit(
338            "task1",
339            None,
340            "task_completed",
341            serde_json::json!({"status": "ok"}),
342        );
343    }
344
345    #[tokio::test]
346    async fn insert_event_and_query_roundtrip() {
347        let mut fixture = crate::test_utils::TestState::new();
348        let state = fixture.build();
349
350        // Insert events
351        insert_event(
352            &state,
353            "task1",
354            Some("item1"),
355            "step_started",
356            serde_json::json!({"step": "qa", "agent_id": "qa_agent"}),
357        )
358        .await
359        .expect("insert step_started event");
360
361        insert_event(
362            &state,
363            "task1",
364            Some("item1"),
365            "step_finished",
366            serde_json::json!({"step": "qa", "success": true, "duration_ms": 1500}),
367        )
368        .await
369        .expect("insert step_finished event");
370
371        insert_event(
372            &state,
373            "task1",
374            None,
375            "cycle_started",
376            serde_json::json!({"cycle": 1}),
377        )
378        .await
379        .expect("insert cycle_started event");
380
381        // Query events back
382        let events = query_step_events(&state.db_path, "task1").expect("query roundtrip events");
383        assert_eq!(events.len(), 3);
384
385        assert_eq!(events[0].event_type, "step_started");
386        assert_eq!(events[0].step.as_deref(), Some("qa"));
387        assert_eq!(events[0].task_item_id.as_deref(), Some("item1"));
388        assert_eq!(events[0].agent_id.as_deref(), Some("qa_agent"));
389
390        assert_eq!(events[1].event_type, "step_finished");
391        assert_eq!(events[1].success, Some(true));
392        assert_eq!(events[1].duration_ms, Some(1500));
393
394        assert_eq!(events[2].event_type, "cycle_started");
395    }
396
397    #[test]
398    fn query_step_events_empty_for_unknown_task() {
399        let mut fixture = crate::test_utils::TestState::new();
400        let state = fixture.build();
401
402        let events =
403            query_step_events(&state.db_path, "nonexistent_task").expect("query empty events");
404        assert!(events.is_empty());
405    }
406
407    #[test]
408    fn observed_step_scope_parses_known_values() {
409        assert_eq!(
410            observed_step_scope_from_payload(&serde_json::json!({"step_scope": "task"})),
411            Some(ObservedStepScope::Task)
412        );
413        assert_eq!(
414            observed_step_scope_from_payload(&serde_json::json!({"step_scope": "item"})),
415            Some(ObservedStepScope::Item)
416        );
417        assert_eq!(
418            observed_step_scope_from_payload(&serde_json::json!({})),
419            None
420        );
421    }
422
423    #[test]
424    fn observed_step_scope_label_returns_unspecified_for_none() {
425        assert_eq!(observed_step_scope_label(None), "unspecified");
426        assert_eq!(
427            observed_step_scope_label(Some(ObservedStepScope::Task)),
428            "task"
429        );
430        assert_eq!(
431            observed_step_scope_label(Some(ObservedStepScope::Item)),
432            "item"
433        );
434    }
435
436    #[test]
437    fn query_latest_step_log_paths_returns_none_when_empty() {
438        let mut fixture = crate::test_utils::TestState::new();
439        let state = fixture.build();
440
441        let result =
442            query_latest_step_log_paths(&state.db_path, "task1").expect("query latest log paths");
443        assert!(result.is_none());
444    }
445
446    #[tokio::test]
447    async fn query_latest_step_log_paths_returns_paths() {
448        let mut fixture = crate::test_utils::TestState::new();
449        let state = fixture.build();
450
451        insert_event(
452            &state,
453            "task1",
454            Some("item1"),
455            "step_spawned",
456            serde_json::json!({
457                "phase": "qa",
458                "stdout_path": "/tmp/stdout.log",
459                "stderr_path": "/tmp/stderr.log"
460            }),
461        )
462        .await
463        .expect("insert step_spawned event");
464
465        let result = query_latest_step_log_paths(&state.db_path, "task1")
466            .expect("query latest spawned log paths");
467        assert!(result.is_some());
468        let (phase, stdout, stderr) = result.expect("spawned log paths should exist");
469        assert_eq!(phase, "qa");
470        assert_eq!(stdout, "/tmp/stdout.log");
471        assert_eq!(stderr, "/tmp/stderr.log");
472    }
473
474    #[tokio::test]
475    async fn query_latest_step_log_paths_empty_phase_returns_none() {
476        let mut fixture = crate::test_utils::TestState::new();
477        let state = fixture.build();
478
479        insert_event(
480            &state,
481            "task1",
482            Some("item1"),
483            "step_started",
484            serde_json::json!({"stdout_path": "/tmp/out.log"}),
485        )
486        .await
487        .expect("insert step_started log event");
488
489        let result = query_latest_step_log_paths(&state.db_path, "task1")
490            .expect("query empty phase log paths");
491        assert!(result.is_none());
492    }
493
494    #[tokio::test]
495    async fn query_step_events_parses_step_scope_and_task_item_id() {
496        let mut fixture = crate::test_utils::TestState::new();
497        let state = fixture.build();
498
499        insert_event(
500            &state,
501            "task1",
502            Some("item1"),
503            "step_started",
504            serde_json::json!({"step": "qa", "step_scope": "item"}),
505        )
506        .await
507        .expect("insert scoped step_started event");
508
509        let events = query_step_events(&state.db_path, "task1").expect("query scoped events");
510        assert_eq!(events.len(), 1);
511        assert_eq!(events[0].step_scope, Some(ObservedStepScope::Item));
512        assert_eq!(events[0].task_item_id.as_deref(), Some("item1"));
513    }
514
515    #[test]
516    fn tracing_event_sink_does_not_panic_on_all_event_types() {
517        let sink = TracingEventSink::new();
518        // Error level
519        sink.emit(
520            "t1",
521            None,
522            "task_failed",
523            serde_json::json!({"error": "boom"}),
524        );
525        // Warning level
526        sink.emit("t1", None, "step_timeout", serde_json::json!({"secs": 60}));
527        sink.emit("t1", None, "auto_rollback_failed", serde_json::json!({}));
528        // Info level
529        sink.emit("t1", Some("i1"), "step_started", serde_json::json!({}));
530        sink.emit("t1", None, "step_finished", serde_json::json!({}));
531        sink.emit("t1", None, "task_completed", serde_json::json!({}));
532        sink.emit("t1", None, "task_paused", serde_json::json!({}));
533        // Debug level (fallthrough)
534        sink.emit("t1", None, "step_heartbeat", serde_json::json!({}));
535        sink.emit("t1", None, "custom_event", serde_json::json!({}));
536    }
537
538    #[test]
539    fn tracing_event_sink_default_impl() {
540        let sink = TracingEventSink;
541        sink.emit("t1", None, "task_completed", serde_json::json!({}));
542    }
543
544    #[test]
545    fn observed_step_scope_from_payload_unknown_value() {
546        assert_eq!(
547            observed_step_scope_from_payload(&serde_json::json!({"step_scope": "unknown"})),
548            None
549        );
550    }
551
552    #[tokio::test]
553    async fn query_latest_step_log_paths_prefers_step_key_over_phase() {
554        let mut fixture = crate::test_utils::TestState::new();
555        let state = fixture.build();
556
557        insert_event(
558            &state,
559            "task1",
560            Some("item1"),
561            "step_started",
562            serde_json::json!({
563                "step": "implement",
564                "stdout_path": "/tmp/out.log",
565                "stderr_path": "/tmp/err.log"
566            }),
567        )
568        .await
569        .expect("insert step_started event");
570
571        let result = query_latest_step_log_paths(&state.db_path, "task1")
572            .expect("query log paths with step key");
573        assert!(result.is_some());
574        let (phase, stdout, _) = result.expect("log paths should exist");
575        assert_eq!(phase, "implement");
576        assert_eq!(stdout, "/tmp/out.log");
577    }
578
579    #[tokio::test]
580    async fn query_step_events_uses_promoted_column_scope() {
581        let mut fixture = crate::test_utils::TestState::new();
582        let state = fixture.build();
583
584        // Insert an event where the promoted step_scope column differs from JSON payload
585        insert_event(
586            &state,
587            "task1",
588            Some("item1"),
589            "step_started",
590            serde_json::json!({"step": "qa", "step_scope": "task"}),
591        )
592        .await
593        .expect("insert step_started event");
594
595        let events = query_step_events(&state.db_path, "task1").expect("query events");
596        assert_eq!(events.len(), 1);
597        // The promoted column should take precedence
598        assert_eq!(events[0].step_scope, Some(ObservedStepScope::Task));
599    }
600
601    #[tokio::test]
602    async fn step_event_parses_all_optional_fields() {
603        let mut fixture = crate::test_utils::TestState::new();
604        let state = fixture.build();
605
606        insert_event(
607            &state,
608            "task1",
609            None,
610            "step_heartbeat",
611            serde_json::json!({
612                "step": "implement",
613                "step_scope": "task",
614                "elapsed_secs": 120,
615                "stdout_bytes": 4096,
616                "stderr_bytes": 256,
617                "stdout_delta_bytes": 0,
618                "stderr_delta_bytes": 4,
619                "stagnant_heartbeats": 3,
620                "pid": 12345,
621                "pid_alive": true,
622                "output_state": "low_output"
623            }),
624        )
625        .await
626        .expect("insert step_heartbeat event");
627
628        let events = query_step_events(&state.db_path, "task1").expect("query heartbeat events");
629        assert_eq!(events.len(), 1);
630        assert_eq!(events[0].step_scope, Some(ObservedStepScope::Task));
631        assert_eq!(events[0].elapsed_secs, Some(120));
632        assert_eq!(events[0].stdout_bytes, Some(4096));
633        assert_eq!(events[0].stderr_bytes, Some(256));
634        assert_eq!(events[0].stdout_delta_bytes, Some(0));
635        assert_eq!(events[0].stderr_delta_bytes, Some(4));
636        assert_eq!(events[0].stagnant_heartbeats, Some(3));
637        assert_eq!(events[0].pid, Some(12345));
638        assert_eq!(events[0].pid_alive, Some(true));
639        assert_eq!(events[0].output_state.as_deref(), Some("low_output"));
640    }
641}