Skip to main content

scud/commands/swarm/
events.rs

1//! Swarm event logging and aggregation
2//!
3//! Captures structured events from agent execution for retrospective analysis.
4//! Events are stored in SQLite for queryable access.
5
6use std::path::{Path, PathBuf};
7
8use anyhow::Result;
9use chrono::{DateTime, Utc};
10use serde::{Deserialize, Serialize};
11
12use crate::db::Database;
13
14/// Event kinds that can be logged
15#[derive(Debug, Clone, Serialize, Deserialize)]
16#[serde(tag = "kind", rename_all = "snake_case")]
17pub enum EventKind {
18    // Lifecycle events (from orchestrator)
19    Spawned,
20    Started,
21    Completed {
22        success: bool,
23        duration_ms: u64,
24    },
25    Failed {
26        reason: String,
27    },
28
29    // Tool events (from hooks)
30    ToolCall {
31        tool: String,
32        #[serde(skip_serializing_if = "Option::is_none")]
33        input_summary: Option<String>,
34    },
35    ToolResult {
36        tool: String,
37        success: bool,
38        #[serde(skip_serializing_if = "Option::is_none")]
39        duration_ms: Option<u64>,
40    },
41
42    // File events (from hooks)
43    FileRead {
44        path: String,
45    },
46    FileWrite {
47        path: String,
48        #[serde(skip_serializing_if = "Option::is_none")]
49        lines_changed: Option<u32>,
50    },
51
52    // Dependency events (from orchestrator)
53    DependencyMet {
54        dependency_id: String,
55    },
56    Unblocked {
57        by_task_id: String,
58    },
59
60    // Output capture
61    Output {
62        line: String,
63    },
64
65    // Wave lifecycle events (from orchestrator)
66    WaveStarted {
67        wave_number: usize,
68        task_count: usize,
69    },
70    WaveCompleted {
71        wave_number: usize,
72        duration_ms: u64,
73    },
74
75    // Validation events (from orchestrator)
76    ValidationPassed,
77    ValidationFailed {
78        failures: Vec<String>,
79    },
80
81    // Repair events (from orchestrator)
82    RepairStarted {
83        attempt: usize,
84        task_ids: Vec<String>,
85    },
86    RepairCompleted {
87        attempt: usize,
88        success: bool,
89    },
90
91    // Custom events
92    Custom {
93        name: String,
94        #[serde(skip_serializing_if = "Option::is_none")]
95        data: Option<serde_json::Value>,
96    },
97}
98
99/// A single event in the timeline
100#[derive(Debug, Clone, Serialize, Deserialize)]
101pub struct AgentEvent {
102    /// When this event occurred
103    pub timestamp: DateTime<Utc>,
104    /// The swarm session this belongs to
105    pub session_id: String,
106    /// Which task/agent generated this event
107    pub task_id: String,
108    /// The event details
109    #[serde(flatten)]
110    pub event: EventKind,
111}
112
113impl AgentEvent {
114    pub fn new(session_id: &str, task_id: &str, event: EventKind) -> Self {
115        Self {
116            timestamp: Utc::now(),
117            session_id: session_id.to_string(),
118            task_id: task_id.to_string(),
119            event,
120        }
121    }
122
123    /// Create a spawned event
124    pub fn spawned(session_id: &str, task_id: &str) -> Self {
125        Self::new(session_id, task_id, EventKind::Spawned)
126    }
127
128    /// Create a completed event
129    pub fn completed(session_id: &str, task_id: &str, success: bool, duration_ms: u64) -> Self {
130        Self::new(
131            session_id,
132            task_id,
133            EventKind::Completed {
134                success,
135                duration_ms,
136            },
137        )
138    }
139
140    /// Create a tool call event
141    pub fn tool_call(session_id: &str, task_id: &str, tool: &str, input_summary: Option<&str>) -> Self {
142        Self::new(
143            session_id,
144            task_id,
145            EventKind::ToolCall {
146                tool: tool.to_string(),
147                input_summary: input_summary.map(String::from),
148            },
149        )
150    }
151
152    /// Create an unblocked event (task was unblocked by another task completing)
153    pub fn unblocked(session_id: &str, task_id: &str, by_task_id: &str) -> Self {
154        Self::new(
155            session_id,
156            task_id,
157            EventKind::Unblocked {
158                by_task_id: by_task_id.to_string(),
159            },
160        )
161    }
162}
163
164/// Writer for appending events to SQLite database
165pub struct EventWriter {
166    session_id: String,
167    db: Database,
168}
169
170impl EventWriter {
171    pub fn new(project_root: &Path, session_id: &str) -> Result<Self> {
172        // Ensure .scud directory exists
173        let scud_dir = project_root.join(".scud");
174        std::fs::create_dir_all(&scud_dir)?;
175
176        let db = Database::new(project_root);
177        db.initialize()?;
178
179        Ok(Self {
180            session_id: session_id.to_string(),
181            db,
182        })
183    }
184
185    /// Get the session ID
186    pub fn session_id(&self) -> &str {
187        &self.session_id
188    }
189
190    /// Get the path to the database file (for display purposes)
191    pub fn session_file(&self) -> PathBuf {
192        self.db.path().to_path_buf()
193    }
194
195    /// Write an event to the database
196    pub fn write(&self, event: &AgentEvent) -> Result<()> {
197        let guard = self.db.connection()?;
198        let conn = guard.as_ref().unwrap();
199        crate::db::events::insert_event(conn, event)?;
200        Ok(())
201    }
202
203    /// Write an event (SQLite stores all events in one table, no separate task log needed)
204    pub fn write_with_task_log(&self, event: &AgentEvent) -> Result<()> {
205        self.write(event)
206    }
207
208    /// Log a spawn event
209    pub fn log_spawned(&self, task_id: &str) -> Result<()> {
210        self.write(&AgentEvent::spawned(&self.session_id, task_id))
211    }
212
213    /// Log a completion event
214    pub fn log_completed(&self, task_id: &str, success: bool, duration_ms: u64) -> Result<()> {
215        self.write(&AgentEvent::completed(
216            &self.session_id,
217            task_id,
218            success,
219            duration_ms,
220        ))
221    }
222
223    /// Log an unblocked event
224    pub fn log_unblocked(&self, task_id: &str, by_task_id: &str) -> Result<()> {
225        self.write(&AgentEvent::unblocked(&self.session_id, task_id, by_task_id))
226    }
227
228    /// Log a wave started event
229    pub fn log_wave_started(&self, wave_number: usize, task_count: usize) -> Result<()> {
230        self.write(&AgentEvent::new(
231            &self.session_id,
232            &format!("wave:{}", wave_number),
233            EventKind::WaveStarted {
234                wave_number,
235                task_count,
236            },
237        ))
238    }
239
240    /// Log a wave completed event
241    pub fn log_wave_completed(&self, wave_number: usize, duration_ms: u64) -> Result<()> {
242        self.write(&AgentEvent::new(
243            &self.session_id,
244            &format!("wave:{}", wave_number),
245            EventKind::WaveCompleted {
246                wave_number,
247                duration_ms,
248            },
249        ))
250    }
251
252    /// Log a validation passed event
253    pub fn log_validation_passed(&self) -> Result<()> {
254        self.write(&AgentEvent::new(
255            &self.session_id,
256            "validation",
257            EventKind::ValidationPassed,
258        ))
259    }
260
261    /// Log a validation failed event
262    pub fn log_validation_failed(&self, failures: &[String]) -> Result<()> {
263        self.write(&AgentEvent::new(
264            &self.session_id,
265            "validation",
266            EventKind::ValidationFailed {
267                failures: failures.to_vec(),
268            },
269        ))
270    }
271
272    /// Log a repair started event
273    pub fn log_repair_started(&self, attempt: usize, task_ids: &[String]) -> Result<()> {
274        self.write(&AgentEvent::new(
275            &self.session_id,
276            "repair",
277            EventKind::RepairStarted {
278                attempt,
279                task_ids: task_ids.to_vec(),
280            },
281        ))
282    }
283
284    /// Log a repair completed event
285    pub fn log_repair_completed(&self, attempt: usize, success: bool) -> Result<()> {
286        self.write(&AgentEvent::new(
287            &self.session_id,
288            "repair",
289            EventKind::RepairCompleted { attempt, success },
290        ))
291    }
292}
293
294/// Reader for loading events from SQLite database
295pub struct EventReader {
296    db: Database,
297}
298
299impl EventReader {
300    pub fn new(project_root: &Path) -> Self {
301        Self {
302            db: Database::new(project_root),
303        }
304    }
305
306    /// Load all events for a session
307    pub fn load_session(&self, session_id: &str) -> Result<Vec<AgentEvent>> {
308        self.load_all_for_session(session_id)
309    }
310
311    /// Load all events for a session (already sorted by timestamp in SQLite)
312    pub fn load_all_for_session(&self, session_id: &str) -> Result<Vec<AgentEvent>> {
313        let guard = self.db.connection()?;
314        let conn = guard.as_ref().unwrap();
315        crate::db::events::get_events_for_session(conn, session_id)
316    }
317
318    /// List available sessions
319    pub fn list_sessions(&self) -> Result<Vec<String>> {
320        let guard = self.db.connection()?;
321        let conn = guard.as_ref().unwrap();
322        crate::db::events::list_sessions(conn)
323    }
324}
325
326/// Aggregated timeline for retrospective analysis
327#[derive(Debug, Clone, Serialize, Deserialize)]
328pub struct RetrospectiveTimeline {
329    pub session_id: String,
330    pub started_at: Option<DateTime<Utc>>,
331    pub completed_at: Option<DateTime<Utc>>,
332    pub tasks: Vec<TaskTimeline>,
333    pub total_events: usize,
334}
335
336/// Timeline for a single task
337#[derive(Debug, Clone, Serialize, Deserialize)]
338pub struct TaskTimeline {
339    pub task_id: String,
340    pub spawned_at: Option<DateTime<Utc>>,
341    pub completed_at: Option<DateTime<Utc>>,
342    pub success: Option<bool>,
343    pub duration_ms: Option<u64>,
344    pub tools_used: Vec<String>,
345    pub files_read: Vec<String>,
346    pub files_written: Vec<String>,
347    pub unblocked_by: Vec<String>,
348    pub events: Vec<AgentEvent>,
349}
350
351impl RetrospectiveTimeline {
352    /// Build a timeline from events
353    pub fn from_events(session_id: &str, events: Vec<AgentEvent>) -> Self {
354        use std::collections::HashMap;
355
356        let mut task_map: HashMap<String, TaskTimeline> = HashMap::new();
357
358        for event in &events {
359            let task = task_map
360                .entry(event.task_id.clone())
361                .or_insert_with(|| TaskTimeline {
362                    task_id: event.task_id.clone(),
363                    spawned_at: None,
364                    completed_at: None,
365                    success: None,
366                    duration_ms: None,
367                    tools_used: Vec::new(),
368                    files_read: Vec::new(),
369                    files_written: Vec::new(),
370                    unblocked_by: Vec::new(),
371                    events: Vec::new(),
372                });
373
374            task.events.push(event.clone());
375
376            match &event.event {
377                EventKind::Spawned => {
378                    task.spawned_at = Some(event.timestamp);
379                }
380                EventKind::Completed { success, duration_ms } => {
381                    task.completed_at = Some(event.timestamp);
382                    task.success = Some(*success);
383                    task.duration_ms = Some(*duration_ms);
384                }
385                EventKind::ToolCall { tool, .. } => {
386                    if !task.tools_used.contains(tool) {
387                        task.tools_used.push(tool.clone());
388                    }
389                }
390                EventKind::FileRead { path } => {
391                    if !task.files_read.contains(path) {
392                        task.files_read.push(path.clone());
393                    }
394                }
395                EventKind::FileWrite { path, .. } => {
396                    if !task.files_written.contains(path) {
397                        task.files_written.push(path.clone());
398                    }
399                }
400                EventKind::Unblocked { by_task_id } => {
401                    if !task.unblocked_by.contains(by_task_id) {
402                        task.unblocked_by.push(by_task_id.clone());
403                    }
404                }
405                _ => {}
406            }
407        }
408
409        let tasks: Vec<TaskTimeline> = task_map.into_values().collect();
410
411        let started_at = events.first().map(|e| e.timestamp);
412        let completed_at = events.last().map(|e| e.timestamp);
413
414        Self {
415            session_id: session_id.to_string(),
416            started_at,
417            completed_at,
418            tasks,
419            total_events: events.len(),
420        }
421    }
422
423    /// Generate a text summary
424    pub fn to_summary(&self) -> String {
425        use std::fmt::Write;
426        let mut s = String::new();
427
428        writeln!(s, "Session: {}", self.session_id).unwrap();
429        if let (Some(start), Some(end)) = (self.started_at, self.completed_at) {
430            let duration = end.signed_duration_since(start);
431            writeln!(s, "Duration: {}s", duration.num_seconds()).unwrap();
432        }
433        writeln!(s, "Total events: {}", self.total_events).unwrap();
434        writeln!(s, "Tasks: {}", self.tasks.len()).unwrap();
435        writeln!(s).unwrap();
436
437        for task in &self.tasks {
438            writeln!(s, "  [{}]", task.task_id).unwrap();
439            if let Some(success) = task.success {
440                writeln!(s, "    Status: {}", if success { "✓" } else { "✗" }).unwrap();
441            }
442            if let Some(duration) = task.duration_ms {
443                writeln!(s, "    Duration: {}ms", duration).unwrap();
444            }
445            if !task.tools_used.is_empty() {
446                writeln!(s, "    Tools: {}", task.tools_used.join(", ")).unwrap();
447            }
448            if !task.files_written.is_empty() {
449                writeln!(s, "    Files written: {}", task.files_written.len()).unwrap();
450            }
451            if !task.unblocked_by.is_empty() {
452                writeln!(s, "    Unblocked by: {}", task.unblocked_by.join(", ")).unwrap();
453            }
454        }
455
456        s
457    }
458}
459
460/// Print a retrospective for a session
461pub fn print_retro(project_root: &Path, session_id: Option<&str>) -> Result<()> {
462    use colored::Colorize;
463
464    let reader = EventReader::new(project_root);
465
466    // If no session specified, list available sessions
467    let session_id = match session_id {
468        Some(id) => id.to_string(),
469        None => {
470            let sessions = reader.list_sessions()?;
471            if sessions.is_empty() {
472                println!("{}", "No swarm sessions found.".yellow());
473                println!("Run a swarm first: scud swarm --tag <tag>");
474                return Ok(());
475            }
476
477            println!("{}", "Available sessions:".blue().bold());
478            for session in &sessions {
479                println!("  • {}", session);
480            }
481
482            // Use the most recent session
483            if let Some(latest) = sessions.last() {
484                println!();
485                println!("Showing latest session: {}", latest.cyan());
486                latest.clone()
487            } else {
488                return Ok(());
489            }
490        }
491    };
492
493    // Load events
494    let events = reader.load_all_for_session(&session_id)?;
495
496    if events.is_empty() {
497        println!("{}", "No events found for this session.".yellow());
498        return Ok(());
499    }
500
501    // Build timeline
502    let timeline = RetrospectiveTimeline::from_events(&session_id, events);
503
504    // Print header
505    println!();
506    println!("{}", "Swarm Retrospective".blue().bold());
507    println!("{}", "═".repeat(60).blue());
508    println!();
509
510    println!(
511        "  {} {}",
512        "Session:".dimmed(),
513        timeline.session_id.cyan()
514    );
515
516    if let (Some(start), Some(end)) = (timeline.started_at, timeline.completed_at) {
517        let duration = end.signed_duration_since(start);
518        println!(
519            "  {} {}s",
520            "Duration:".dimmed(),
521            duration.num_seconds().to_string().cyan()
522        );
523        println!(
524            "  {} {}",
525            "Started:".dimmed(),
526            start.format("%Y-%m-%d %H:%M:%S").to_string().dimmed()
527        );
528    }
529
530    println!(
531        "  {} {}",
532        "Events:".dimmed(),
533        timeline.total_events.to_string().cyan()
534    );
535    println!(
536        "  {} {}",
537        "Tasks:".dimmed(),
538        timeline.tasks.len().to_string().cyan()
539    );
540    println!();
541
542    // Print task details
543    println!("{}", "Task Timeline".yellow().bold());
544    println!("{}", "─".repeat(60).yellow());
545
546    for task in &timeline.tasks {
547        let status_icon = match task.success {
548            Some(true) => "✓".green(),
549            Some(false) => "✗".red(),
550            None => "?".yellow(),
551        };
552
553        println!();
554        println!("  {} [{}]", status_icon, task.task_id.cyan());
555
556        if let Some(duration) = task.duration_ms {
557            println!("    Duration: {}ms", duration.to_string().dimmed());
558        }
559
560        if !task.tools_used.is_empty() {
561            println!(
562                "    Tools: {}",
563                task.tools_used.join(", ").dimmed()
564            );
565        }
566
567        if !task.files_written.is_empty() {
568            println!(
569                "    Files written: {}",
570                task.files_written.len().to_string().dimmed()
571            );
572            for file in task.files_written.iter().take(5) {
573                println!("      • {}", file.dimmed());
574            }
575            if task.files_written.len() > 5 {
576                println!(
577                    "      ... and {} more",
578                    (task.files_written.len() - 5).to_string().dimmed()
579                );
580            }
581        }
582
583        if !task.unblocked_by.is_empty() {
584            println!(
585                "    Unblocked by: {}",
586                task.unblocked_by.join(", ").dimmed()
587            );
588        }
589    }
590
591    println!();
592    Ok(())
593}
594
595/// Export retrospective as JSON
596pub fn export_retro_json(project_root: &Path, session_id: &str) -> Result<String> {
597    let reader = EventReader::new(project_root);
598    let events = reader.load_all_for_session(session_id)?;
599    let timeline = RetrospectiveTimeline::from_events(session_id, events);
600    Ok(serde_json::to_string_pretty(&timeline)?)
601}
602
603#[cfg(test)]
604mod tests {
605    use super::*;
606    use tempfile::TempDir;
607
608    #[test]
609    fn test_event_serialization() {
610        let event = AgentEvent::spawned("session-1", "task:1");
611        let json = serde_json::to_string(&event).unwrap();
612        assert!(json.contains("spawned"));
613        assert!(json.contains("task:1"));
614
615        let parsed: AgentEvent = serde_json::from_str(&json).unwrap();
616        assert_eq!(parsed.task_id, "task:1");
617    }
618
619    #[test]
620    fn test_event_writer_reader() {
621        let temp_dir = TempDir::new().unwrap();
622        let project_root = temp_dir.path();
623
624        let writer = EventWriter::new(project_root, "test-session").unwrap();
625
626        // Write events
627        writer.log_spawned("task:1").unwrap();
628        writer.log_spawned("task:2").unwrap();
629        writer.log_completed("task:1", true, 1000).unwrap();
630
631        // Read events
632        let reader = EventReader::new(project_root);
633        let events = reader.load_session("test-session").unwrap();
634
635        assert_eq!(events.len(), 3);
636    }
637
638    #[test]
639    fn test_retrospective_timeline() {
640        let events = vec![
641            AgentEvent::spawned("s1", "task:1"),
642            AgentEvent::spawned("s1", "task:2"),
643            AgentEvent::tool_call("s1", "task:1", "Read", Some("src/main.rs")),
644            AgentEvent::completed("s1", "task:1", true, 5000),
645            AgentEvent::unblocked("s1", "task:3", "task:1"),
646            AgentEvent::completed("s1", "task:2", true, 3000),
647        ];
648
649        let timeline = RetrospectiveTimeline::from_events("s1", events);
650
651        assert_eq!(timeline.tasks.len(), 3); // task:1, task:2, task:3
652        assert_eq!(timeline.total_events, 6);
653
654        let task1 = timeline.tasks.iter().find(|t| t.task_id == "task:1").unwrap();
655        assert_eq!(task1.success, Some(true));
656        assert_eq!(task1.duration_ms, Some(5000));
657        assert!(task1.tools_used.contains(&"Read".to_string()));
658    }
659
660    #[test]
661    fn test_deduplication_preserves_different_tool_calls() {
662        use chrono::TimeZone;
663
664        // Create two tool call events with the same timestamp and task_id
665        // but different tool names - these should NOT be deduplicated
666        let fixed_time = Utc.with_ymd_and_hms(2025, 1, 15, 12, 0, 0).unwrap();
667
668        let event1 = AgentEvent {
669            timestamp: fixed_time,
670            session_id: "s1".to_string(),
671            task_id: "task:1".to_string(),
672            event: EventKind::ToolCall {
673                tool: "Read".to_string(),
674                input_summary: Some("file1.rs".to_string()),
675            },
676        };
677
678        let event2 = AgentEvent {
679            timestamp: fixed_time,
680            session_id: "s1".to_string(),
681            task_id: "task:1".to_string(),
682            event: EventKind::ToolCall {
683                tool: "Write".to_string(),
684                input_summary: Some("file2.rs".to_string()),
685            },
686        };
687
688        let mut events = vec![event1, event2];
689
690        // Sort and dedup using the same logic as load_all_for_session
691        events.sort_by_key(|e| e.timestamp);
692        events.dedup_by(|a, b| {
693            a.timestamp == b.timestamp
694                && a.task_id == b.task_id
695                && serde_json::to_string(&a.event).ok() == serde_json::to_string(&b.event).ok()
696        });
697
698        // Both events should remain (different tool names)
699        assert_eq!(events.len(), 2);
700    }
701
702    #[test]
703    fn test_deduplication_removes_true_duplicates() {
704        use chrono::TimeZone;
705
706        // Create two identical events - these SHOULD be deduplicated
707        let fixed_time = Utc.with_ymd_and_hms(2025, 1, 15, 12, 0, 0).unwrap();
708
709        let event1 = AgentEvent {
710            timestamp: fixed_time,
711            session_id: "s1".to_string(),
712            task_id: "task:1".to_string(),
713            event: EventKind::Spawned,
714        };
715
716        let event2 = AgentEvent {
717            timestamp: fixed_time,
718            session_id: "s1".to_string(),
719            task_id: "task:1".to_string(),
720            event: EventKind::Spawned,
721        };
722
723        let mut events = vec![event1, event2];
724
725        events.sort_by_key(|e| e.timestamp);
726        events.dedup_by(|a, b| {
727            a.timestamp == b.timestamp
728                && a.task_id == b.task_id
729                && serde_json::to_string(&a.event).ok() == serde_json::to_string(&b.event).ok()
730        });
731
732        // Only one event should remain (true duplicate)
733        assert_eq!(events.len(), 1);
734    }
735}