Skip to main content

scud/commands/swarm/
events.rs

1//! Swarm event logging and aggregation
2//!
3//! Captures structured events from agent execution for retrospective analysis.
4//! Events are written to JSONL files and can be aggregated into a timeline.
5
6use std::fs::{self, File, OpenOptions};
7use std::io::{BufRead, BufReader, Write};
8use std::path::{Path, PathBuf};
9
10use anyhow::Result;
11use chrono::{DateTime, Utc};
12use serde::{Deserialize, Serialize};
13
14/// Event kinds that can be logged
15#[derive(Debug, Clone, Serialize, Deserialize)]
16#[serde(tag = "kind", rename_all = "snake_case")]
17pub enum EventKind {
18    // Lifecycle events (from orchestrator)
19    Spawned,
20    Started,
21    Completed {
22        success: bool,
23        duration_ms: u64,
24    },
25    Failed {
26        reason: String,
27    },
28
29    // Tool events (from hooks)
30    ToolCall {
31        tool: String,
32        #[serde(skip_serializing_if = "Option::is_none")]
33        input_summary: Option<String>,
34    },
35    ToolResult {
36        tool: String,
37        success: bool,
38        #[serde(skip_serializing_if = "Option::is_none")]
39        duration_ms: Option<u64>,
40    },
41
42    // File events (from hooks)
43    FileRead {
44        path: String,
45    },
46    FileWrite {
47        path: String,
48        #[serde(skip_serializing_if = "Option::is_none")]
49        lines_changed: Option<u32>,
50    },
51
52    // Dependency events (from orchestrator)
53    DependencyMet {
54        dependency_id: String,
55    },
56    Unblocked {
57        by_task_id: String,
58    },
59
60    // Output capture
61    Output {
62        line: String,
63    },
64
65    // Custom events
66    Custom {
67        name: String,
68        #[serde(skip_serializing_if = "Option::is_none")]
69        data: Option<serde_json::Value>,
70    },
71}
72
73/// A single event in the timeline
74#[derive(Debug, Clone, Serialize, Deserialize)]
75pub struct AgentEvent {
76    /// When this event occurred
77    pub timestamp: DateTime<Utc>,
78    /// The swarm session this belongs to
79    pub session_id: String,
80    /// Which task/agent generated this event
81    pub task_id: String,
82    /// The event details
83    #[serde(flatten)]
84    pub event: EventKind,
85}
86
87impl AgentEvent {
88    pub fn new(session_id: &str, task_id: &str, event: EventKind) -> Self {
89        Self {
90            timestamp: Utc::now(),
91            session_id: session_id.to_string(),
92            task_id: task_id.to_string(),
93            event,
94        }
95    }
96
97    /// Create a spawned event
98    pub fn spawned(session_id: &str, task_id: &str) -> Self {
99        Self::new(session_id, task_id, EventKind::Spawned)
100    }
101
102    /// Create a completed event
103    pub fn completed(session_id: &str, task_id: &str, success: bool, duration_ms: u64) -> Self {
104        Self::new(
105            session_id,
106            task_id,
107            EventKind::Completed {
108                success,
109                duration_ms,
110            },
111        )
112    }
113
114    /// Create a tool call event
115    pub fn tool_call(session_id: &str, task_id: &str, tool: &str, input_summary: Option<&str>) -> Self {
116        Self::new(
117            session_id,
118            task_id,
119            EventKind::ToolCall {
120                tool: tool.to_string(),
121                input_summary: input_summary.map(String::from),
122            },
123        )
124    }
125
126    /// Create an unblocked event (task was unblocked by another task completing)
127    pub fn unblocked(session_id: &str, task_id: &str, by_task_id: &str) -> Self {
128        Self::new(
129            session_id,
130            task_id,
131            EventKind::Unblocked {
132                by_task_id: by_task_id.to_string(),
133            },
134        )
135    }
136}
137
138/// Writer for appending events to a JSONL file
139pub struct EventWriter {
140    session_id: String,
141    events_dir: PathBuf,
142}
143
144impl EventWriter {
145    pub fn new(project_root: &Path, session_id: &str) -> Result<Self> {
146        let events_dir = project_root.join(".scud").join("swarm").join("events");
147        fs::create_dir_all(&events_dir)?;
148
149        Ok(Self {
150            session_id: session_id.to_string(),
151            events_dir,
152        })
153    }
154
155    /// Get the path to the session event file
156    pub fn session_file(&self) -> PathBuf {
157        self.events_dir.join(format!("{}.jsonl", self.session_id))
158    }
159
160    /// Get the path to a task-specific event file
161    pub fn task_file(&self, task_id: &str) -> PathBuf {
162        // Sanitize task_id for filename (replace : with -)
163        let safe_id = task_id.replace(':', "-");
164        self.events_dir.join(format!("{}-{}.jsonl", self.session_id, safe_id))
165    }
166
167    /// Write an event to the session log
168    pub fn write(&self, event: &AgentEvent) -> Result<()> {
169        let mut file = OpenOptions::new()
170            .create(true)
171            .append(true)
172            .open(self.session_file())?;
173
174        let line = serde_json::to_string(event)?;
175        writeln!(file, "{}", line)?;
176
177        Ok(())
178    }
179
180    /// Write an event to both session and task-specific logs
181    pub fn write_with_task_log(&self, event: &AgentEvent) -> Result<()> {
182        // Write to session log
183        self.write(event)?;
184
185        // Write to task-specific log
186        let mut task_file = OpenOptions::new()
187            .create(true)
188            .append(true)
189            .open(self.task_file(&event.task_id))?;
190
191        let line = serde_json::to_string(event)?;
192        writeln!(task_file, "{}", line)?;
193
194        Ok(())
195    }
196
197    /// Log a spawn event
198    pub fn log_spawned(&self, task_id: &str) -> Result<()> {
199        self.write_with_task_log(&AgentEvent::spawned(&self.session_id, task_id))
200    }
201
202    /// Log a completion event
203    pub fn log_completed(&self, task_id: &str, success: bool, duration_ms: u64) -> Result<()> {
204        self.write_with_task_log(&AgentEvent::completed(
205            &self.session_id,
206            task_id,
207            success,
208            duration_ms,
209        ))
210    }
211
212    /// Log an unblocked event
213    pub fn log_unblocked(&self, task_id: &str, by_task_id: &str) -> Result<()> {
214        self.write_with_task_log(&AgentEvent::unblocked(&self.session_id, task_id, by_task_id))
215    }
216}
217
218/// Reader for loading events from JSONL files
219pub struct EventReader {
220    events_dir: PathBuf,
221}
222
223impl EventReader {
224    pub fn new(project_root: &Path) -> Self {
225        Self {
226            events_dir: project_root.join(".scud").join("swarm").join("events"),
227        }
228    }
229
230    /// Load all events for a session
231    pub fn load_session(&self, session_id: &str) -> Result<Vec<AgentEvent>> {
232        let file_path = self.events_dir.join(format!("{}.jsonl", session_id));
233        self.load_file(&file_path)
234    }
235
236    /// Load events from a JSONL file
237    pub fn load_file(&self, path: &Path) -> Result<Vec<AgentEvent>> {
238        if !path.exists() {
239            return Ok(Vec::new());
240        }
241
242        let file = File::open(path)?;
243        let reader = BufReader::new(file);
244        let mut events = Vec::new();
245
246        for line in reader.lines() {
247            let line = line?;
248            if line.trim().is_empty() {
249                continue;
250            }
251            match serde_json::from_str(&line) {
252                Ok(event) => events.push(event),
253                Err(e) => {
254                    eprintln!("Warning: Failed to parse event: {}", e);
255                }
256            }
257        }
258
259        Ok(events)
260    }
261
262    /// Load all events for a session (including task-specific files)
263    pub fn load_all_for_session(&self, session_id: &str) -> Result<Vec<AgentEvent>> {
264        let mut events = Vec::new();
265
266        // Load from session file
267        events.extend(self.load_session(session_id)?);
268
269        // Load from task-specific files
270        if self.events_dir.exists() {
271            let prefix = format!("{}-", session_id);
272            for entry in fs::read_dir(&self.events_dir)? {
273                let entry = entry?;
274                let path = entry.path();
275                if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
276                    if name.starts_with(&prefix) && name.ends_with(".jsonl") {
277                        events.extend(self.load_file(&path)?);
278                    }
279                }
280            }
281        }
282
283        // Sort by timestamp
284        events.sort_by_key(|e| e.timestamp);
285
286        // Deduplicate (same timestamp + task_id + event content)
287        // We compare the full serialized event to ensure different tool calls
288        // or other events with different content are not incorrectly merged
289        events.dedup_by(|a, b| {
290            a.timestamp == b.timestamp
291                && a.task_id == b.task_id
292                && serde_json::to_string(&a.event).ok() == serde_json::to_string(&b.event).ok()
293        });
294
295        Ok(events)
296    }
297
298    /// List available sessions
299    pub fn list_sessions(&self) -> Result<Vec<String>> {
300        let mut sessions = Vec::new();
301
302        if !self.events_dir.exists() {
303            return Ok(sessions);
304        }
305
306        for entry in fs::read_dir(&self.events_dir)? {
307            let entry = entry?;
308            let path = entry.path();
309            if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
310                // Only include main session files (not task-specific ones)
311                if name.ends_with(".jsonl") && !name.contains('-') {
312                    if let Some(session_id) = name.strip_suffix(".jsonl") {
313                        sessions.push(session_id.to_string());
314                    }
315                }
316            }
317        }
318
319        sessions.sort();
320        Ok(sessions)
321    }
322}
323
324/// Aggregated timeline for retrospective analysis
325#[derive(Debug, Clone, Serialize, Deserialize)]
326pub struct RetrospectiveTimeline {
327    pub session_id: String,
328    pub started_at: Option<DateTime<Utc>>,
329    pub completed_at: Option<DateTime<Utc>>,
330    pub tasks: Vec<TaskTimeline>,
331    pub total_events: usize,
332}
333
334/// Timeline for a single task
335#[derive(Debug, Clone, Serialize, Deserialize)]
336pub struct TaskTimeline {
337    pub task_id: String,
338    pub spawned_at: Option<DateTime<Utc>>,
339    pub completed_at: Option<DateTime<Utc>>,
340    pub success: Option<bool>,
341    pub duration_ms: Option<u64>,
342    pub tools_used: Vec<String>,
343    pub files_read: Vec<String>,
344    pub files_written: Vec<String>,
345    pub unblocked_by: Vec<String>,
346    pub events: Vec<AgentEvent>,
347}
348
349impl RetrospectiveTimeline {
350    /// Build a timeline from events
351    pub fn from_events(session_id: &str, events: Vec<AgentEvent>) -> Self {
352        use std::collections::HashMap;
353
354        let mut task_map: HashMap<String, TaskTimeline> = HashMap::new();
355
356        for event in &events {
357            let task = task_map
358                .entry(event.task_id.clone())
359                .or_insert_with(|| TaskTimeline {
360                    task_id: event.task_id.clone(),
361                    spawned_at: None,
362                    completed_at: None,
363                    success: None,
364                    duration_ms: None,
365                    tools_used: Vec::new(),
366                    files_read: Vec::new(),
367                    files_written: Vec::new(),
368                    unblocked_by: Vec::new(),
369                    events: Vec::new(),
370                });
371
372            task.events.push(event.clone());
373
374            match &event.event {
375                EventKind::Spawned => {
376                    task.spawned_at = Some(event.timestamp);
377                }
378                EventKind::Completed { success, duration_ms } => {
379                    task.completed_at = Some(event.timestamp);
380                    task.success = Some(*success);
381                    task.duration_ms = Some(*duration_ms);
382                }
383                EventKind::ToolCall { tool, .. } => {
384                    if !task.tools_used.contains(tool) {
385                        task.tools_used.push(tool.clone());
386                    }
387                }
388                EventKind::FileRead { path } => {
389                    if !task.files_read.contains(path) {
390                        task.files_read.push(path.clone());
391                    }
392                }
393                EventKind::FileWrite { path, .. } => {
394                    if !task.files_written.contains(path) {
395                        task.files_written.push(path.clone());
396                    }
397                }
398                EventKind::Unblocked { by_task_id } => {
399                    if !task.unblocked_by.contains(by_task_id) {
400                        task.unblocked_by.push(by_task_id.clone());
401                    }
402                }
403                _ => {}
404            }
405        }
406
407        let tasks: Vec<TaskTimeline> = task_map.into_values().collect();
408
409        let started_at = events.first().map(|e| e.timestamp);
410        let completed_at = events.last().map(|e| e.timestamp);
411
412        Self {
413            session_id: session_id.to_string(),
414            started_at,
415            completed_at,
416            tasks,
417            total_events: events.len(),
418        }
419    }
420
421    /// Generate a text summary
422    pub fn to_summary(&self) -> String {
423        use std::fmt::Write;
424        let mut s = String::new();
425
426        writeln!(s, "Session: {}", self.session_id).unwrap();
427        if let (Some(start), Some(end)) = (self.started_at, self.completed_at) {
428            let duration = end.signed_duration_since(start);
429            writeln!(s, "Duration: {}s", duration.num_seconds()).unwrap();
430        }
431        writeln!(s, "Total events: {}", self.total_events).unwrap();
432        writeln!(s, "Tasks: {}", self.tasks.len()).unwrap();
433        writeln!(s).unwrap();
434
435        for task in &self.tasks {
436            writeln!(s, "  [{}]", task.task_id).unwrap();
437            if let Some(success) = task.success {
438                writeln!(s, "    Status: {}", if success { "✓" } else { "✗" }).unwrap();
439            }
440            if let Some(duration) = task.duration_ms {
441                writeln!(s, "    Duration: {}ms", duration).unwrap();
442            }
443            if !task.tools_used.is_empty() {
444                writeln!(s, "    Tools: {}", task.tools_used.join(", ")).unwrap();
445            }
446            if !task.files_written.is_empty() {
447                writeln!(s, "    Files written: {}", task.files_written.len()).unwrap();
448            }
449            if !task.unblocked_by.is_empty() {
450                writeln!(s, "    Unblocked by: {}", task.unblocked_by.join(", ")).unwrap();
451            }
452        }
453
454        s
455    }
456}
457
458/// Print a retrospective for a session
459pub fn print_retro(project_root: &Path, session_id: Option<&str>) -> Result<()> {
460    use colored::Colorize;
461
462    let reader = EventReader::new(project_root);
463
464    // If no session specified, list available sessions
465    let session_id = match session_id {
466        Some(id) => id.to_string(),
467        None => {
468            let sessions = reader.list_sessions()?;
469            if sessions.is_empty() {
470                println!("{}", "No swarm sessions found.".yellow());
471                println!("Run a swarm first: scud swarm --tag <tag>");
472                return Ok(());
473            }
474
475            println!("{}", "Available sessions:".blue().bold());
476            for session in &sessions {
477                println!("  • {}", session);
478            }
479
480            // Use the most recent session
481            if let Some(latest) = sessions.last() {
482                println!();
483                println!("Showing latest session: {}", latest.cyan());
484                latest.clone()
485            } else {
486                return Ok(());
487            }
488        }
489    };
490
491    // Load events
492    let events = reader.load_all_for_session(&session_id)?;
493
494    if events.is_empty() {
495        println!("{}", "No events found for this session.".yellow());
496        return Ok(());
497    }
498
499    // Build timeline
500    let timeline = RetrospectiveTimeline::from_events(&session_id, events);
501
502    // Print header
503    println!();
504    println!("{}", "Swarm Retrospective".blue().bold());
505    println!("{}", "═".repeat(60).blue());
506    println!();
507
508    println!(
509        "  {} {}",
510        "Session:".dimmed(),
511        timeline.session_id.cyan()
512    );
513
514    if let (Some(start), Some(end)) = (timeline.started_at, timeline.completed_at) {
515        let duration = end.signed_duration_since(start);
516        println!(
517            "  {} {}s",
518            "Duration:".dimmed(),
519            duration.num_seconds().to_string().cyan()
520        );
521        println!(
522            "  {} {}",
523            "Started:".dimmed(),
524            start.format("%Y-%m-%d %H:%M:%S").to_string().dimmed()
525        );
526    }
527
528    println!(
529        "  {} {}",
530        "Events:".dimmed(),
531        timeline.total_events.to_string().cyan()
532    );
533    println!(
534        "  {} {}",
535        "Tasks:".dimmed(),
536        timeline.tasks.len().to_string().cyan()
537    );
538    println!();
539
540    // Print task details
541    println!("{}", "Task Timeline".yellow().bold());
542    println!("{}", "─".repeat(60).yellow());
543
544    for task in &timeline.tasks {
545        let status_icon = match task.success {
546            Some(true) => "✓".green(),
547            Some(false) => "✗".red(),
548            None => "?".yellow(),
549        };
550
551        println!();
552        println!("  {} [{}]", status_icon, task.task_id.cyan());
553
554        if let Some(duration) = task.duration_ms {
555            println!("    Duration: {}ms", duration.to_string().dimmed());
556        }
557
558        if !task.tools_used.is_empty() {
559            println!(
560                "    Tools: {}",
561                task.tools_used.join(", ").dimmed()
562            );
563        }
564
565        if !task.files_written.is_empty() {
566            println!(
567                "    Files written: {}",
568                task.files_written.len().to_string().dimmed()
569            );
570            for file in task.files_written.iter().take(5) {
571                println!("      • {}", file.dimmed());
572            }
573            if task.files_written.len() > 5 {
574                println!(
575                    "      ... and {} more",
576                    (task.files_written.len() - 5).to_string().dimmed()
577                );
578            }
579        }
580
581        if !task.unblocked_by.is_empty() {
582            println!(
583                "    Unblocked by: {}",
584                task.unblocked_by.join(", ").dimmed()
585            );
586        }
587    }
588
589    println!();
590    Ok(())
591}
592
593/// Export retrospective as JSON
594pub fn export_retro_json(project_root: &Path, session_id: &str) -> Result<String> {
595    let reader = EventReader::new(project_root);
596    let events = reader.load_all_for_session(session_id)?;
597    let timeline = RetrospectiveTimeline::from_events(session_id, events);
598    Ok(serde_json::to_string_pretty(&timeline)?)
599}
600
601#[cfg(test)]
602mod tests {
603    use super::*;
604    use tempfile::TempDir;
605
606    #[test]
607    fn test_event_serialization() {
608        let event = AgentEvent::spawned("session-1", "task:1");
609        let json = serde_json::to_string(&event).unwrap();
610        assert!(json.contains("spawned"));
611        assert!(json.contains("task:1"));
612
613        let parsed: AgentEvent = serde_json::from_str(&json).unwrap();
614        assert_eq!(parsed.task_id, "task:1");
615    }
616
617    #[test]
618    fn test_event_writer_reader() {
619        let temp_dir = TempDir::new().unwrap();
620        let project_root = temp_dir.path();
621
622        let writer = EventWriter::new(project_root, "test-session").unwrap();
623
624        // Write events
625        writer.log_spawned("task:1").unwrap();
626        writer.log_spawned("task:2").unwrap();
627        writer.log_completed("task:1", true, 1000).unwrap();
628
629        // Read events
630        let reader = EventReader::new(project_root);
631        let events = reader.load_session("test-session").unwrap();
632
633        assert_eq!(events.len(), 3);
634    }
635
636    #[test]
637    fn test_retrospective_timeline() {
638        let events = vec![
639            AgentEvent::spawned("s1", "task:1"),
640            AgentEvent::spawned("s1", "task:2"),
641            AgentEvent::tool_call("s1", "task:1", "Read", Some("src/main.rs")),
642            AgentEvent::completed("s1", "task:1", true, 5000),
643            AgentEvent::unblocked("s1", "task:3", "task:1"),
644            AgentEvent::completed("s1", "task:2", true, 3000),
645        ];
646
647        let timeline = RetrospectiveTimeline::from_events("s1", events);
648
649        assert_eq!(timeline.tasks.len(), 3); // task:1, task:2, task:3
650        assert_eq!(timeline.total_events, 6);
651
652        let task1 = timeline.tasks.iter().find(|t| t.task_id == "task:1").unwrap();
653        assert_eq!(task1.success, Some(true));
654        assert_eq!(task1.duration_ms, Some(5000));
655        assert!(task1.tools_used.contains(&"Read".to_string()));
656    }
657
658    #[test]
659    fn test_deduplication_preserves_different_tool_calls() {
660        use chrono::TimeZone;
661
662        // Create two tool call events with the same timestamp and task_id
663        // but different tool names - these should NOT be deduplicated
664        let fixed_time = Utc.with_ymd_and_hms(2025, 1, 15, 12, 0, 0).unwrap();
665
666        let event1 = AgentEvent {
667            timestamp: fixed_time,
668            session_id: "s1".to_string(),
669            task_id: "task:1".to_string(),
670            event: EventKind::ToolCall {
671                tool: "Read".to_string(),
672                input_summary: Some("file1.rs".to_string()),
673            },
674        };
675
676        let event2 = AgentEvent {
677            timestamp: fixed_time,
678            session_id: "s1".to_string(),
679            task_id: "task:1".to_string(),
680            event: EventKind::ToolCall {
681                tool: "Write".to_string(),
682                input_summary: Some("file2.rs".to_string()),
683            },
684        };
685
686        let mut events = vec![event1, event2];
687
688        // Sort and dedup using the same logic as load_all_for_session
689        events.sort_by_key(|e| e.timestamp);
690        events.dedup_by(|a, b| {
691            a.timestamp == b.timestamp
692                && a.task_id == b.task_id
693                && serde_json::to_string(&a.event).ok() == serde_json::to_string(&b.event).ok()
694        });
695
696        // Both events should remain (different tool names)
697        assert_eq!(events.len(), 2);
698    }
699
700    #[test]
701    fn test_deduplication_removes_true_duplicates() {
702        use chrono::TimeZone;
703
704        // Create two identical events - these SHOULD be deduplicated
705        let fixed_time = Utc.with_ymd_and_hms(2025, 1, 15, 12, 0, 0).unwrap();
706
707        let event1 = AgentEvent {
708            timestamp: fixed_time,
709            session_id: "s1".to_string(),
710            task_id: "task:1".to_string(),
711            event: EventKind::Spawned,
712        };
713
714        let event2 = AgentEvent {
715            timestamp: fixed_time,
716            session_id: "s1".to_string(),
717            task_id: "task:1".to_string(),
718            event: EventKind::Spawned,
719        };
720
721        let mut events = vec![event1, event2];
722
723        events.sort_by_key(|e| e.timestamp);
724        events.dedup_by(|a, b| {
725            a.timestamp == b.timestamp
726                && a.task_id == b.task_id
727                && serde_json::to_string(&a.event).ok() == serde_json::to_string(&b.event).ok()
728        });
729
730        // Only one event should remain (true duplicate)
731        assert_eq!(events.len(), 1);
732    }
733}