Skip to main content

batty_cli/
events.rs

1//! Event extraction from piped tmux output.
2//!
3//! Reads the pipe-pane log file, strips ANSI escapes, pattern-matches against
4//! known agent output patterns, and produces structured events. A rolling event
5//! buffer provides a compact summary of the executor's recent activity for the
6//! supervisor's context window.
7#![cfg_attr(not(test), allow(dead_code))]
8
9use std::collections::VecDeque;
10use std::io::{Read, Seek, SeekFrom};
11use std::path::{Path, PathBuf};
12use std::sync::{Arc, Mutex};
13
14use anyhow::{Context, Result};
15use regex::Regex;
16use serde::Serialize;
17use tracing::debug;
18
19use crate::prompt::strip_ansi;
20
21/// Default rolling buffer size.
22#[cfg(test)]
23const DEFAULT_BUFFER_SIZE: usize = 50;
24
25/// Structured events extracted from executor output.
26#[derive(Debug, Clone, Serialize, PartialEq)]
27#[serde(tag = "type", rename_all = "snake_case")]
28pub enum PipeEvent {
29    /// Executor picked a new task.
30    TaskStarted { task_id: String, title: String },
31    /// Executor created a file.
32    FileCreated { path: String },
33    /// Executor modified a file.
34    FileModified { path: String },
35    /// Executor ran a command.
36    CommandRan {
37        command: String,
38        success: Option<bool>,
39    },
40    /// Test execution detected.
41    TestRan { passed: bool, detail: String },
42    /// Executor is asking a question (prompt detected).
43    PromptDetected { prompt: String },
44    /// Executor marked a task done.
45    TaskCompleted { task_id: String },
46    /// Executor made a git commit.
47    CommitMade { hash: String, message: String },
48    /// Raw output line (for lines that don't match any pattern).
49    #[allow(dead_code)] // Retained for optional verbose event buffering and dedicated tests.
50    OutputLine { line: String },
51}
52
53/// Compiled regex patterns for extracting events from executor output.
54pub struct EventPatterns {
55    patterns: Vec<(Regex, EventClassifier)>,
56}
57
58type EventClassifier = fn(&regex::Captures) -> PipeEvent;
59
60impl EventPatterns {
61    /// Build default event extraction patterns.
62    ///
63    /// These patterns target common agent output after ANSI stripping.
64    /// They work across Claude Code, Codex, and Aider output.
65    pub fn default_patterns() -> Self {
66        Self {
67            patterns: vec![
68                // Task started: "Picked and moved task #N" or kanban-md output
69                (
70                    Regex::new(r"(?i)(?:picked|claimed|starting|working on)\s+(?:and moved\s+)?task\s+#?(\d+)(?::\s+(.+))?").unwrap(),
71                    |caps| PipeEvent::TaskStarted {
72                        task_id: caps.get(1).map(|m| m.as_str().to_string()).unwrap_or_default(),
73                        title: caps.get(2).map(|m| m.as_str().to_string()).unwrap_or_default(),
74                    },
75                ),
76                // Task completed: "Moved task #N" to done, or "task #N done"
77                (
78                    Regex::new(r"(?i)(?:moved task\s+#?(\d+).*(?:done|complete)|task\s+#?(\d+)\s+(?:done|complete))").unwrap(),
79                    |caps| PipeEvent::TaskCompleted {
80                        task_id: caps.get(1)
81                            .or_else(|| caps.get(2))
82                            .map(|m| m.as_str().to_string())
83                            .unwrap_or_default(),
84                    },
85                ),
86                // Git commit: "[main abc1234] message" or "commit abc1234"
87                (
88                    Regex::new(r"(?:\[[\w/-]+\s+([0-9a-f]{7,40})\]\s+(.+)|commit\s+([0-9a-f]{7,40}))").unwrap(),
89                    |caps| PipeEvent::CommitMade {
90                        hash: caps.get(1)
91                            .or_else(|| caps.get(3))
92                            .map(|m| m.as_str().to_string())
93                            .unwrap_or_default(),
94                        message: caps.get(2).map(|m| m.as_str().to_string()).unwrap_or_default(),
95                    },
96                ),
97                // Test result: "test result: ok. N passed" or "test result: FAILED"
98                (
99                    Regex::new(r"test result:\s*(ok|FAILED)").unwrap(),
100                    |caps| {
101                        let result = caps.get(1).map(|m| m.as_str()).unwrap_or("FAILED");
102                        PipeEvent::TestRan {
103                            passed: result == "ok",
104                            detail: caps.get(0).map(|m| m.as_str().to_string()).unwrap_or_default(),
105                        }
106                    },
107                ),
108                // File created: "Created file X" or "Write tool" or "File created"
109                (
110                    Regex::new(r"(?i)(?:created?\s+(?:file\s+)?|wrote\s+|writing\s+to\s+)([\w/.+\-]+\.\w+)").unwrap(),
111                    |caps| PipeEvent::FileCreated {
112                        path: caps.get(1).map(|m| m.as_str().to_string()).unwrap_or_default(),
113                    },
114                ),
115                // File modified: "Edited X" or "Modified X" or "Edit tool"
116                (
117                    Regex::new(r"(?i)(?:edit(?:ed|ing)?\s+|modif(?:ied|ying)\s+)([\w/.+\-]+\.\w+)").unwrap(),
118                    |caps| PipeEvent::FileModified {
119                        path: caps.get(1).map(|m| m.as_str().to_string()).unwrap_or_default(),
120                    },
121                ),
122                // Command ran: "$ command" or "Running: command" or exit code pattern
123                (
124                    Regex::new(r"(?:^\$\s+(.+)|Running:\s+(.+))").unwrap(),
125                    |caps| PipeEvent::CommandRan {
126                        command: caps.get(1)
127                            .or_else(|| caps.get(2))
128                            .map(|m| m.as_str().to_string())
129                            .unwrap_or_default(),
130                        success: None,
131                    },
132                ),
133                // Prompt patterns: "Allow tool", "[y/n]", "Continue?"
134                (
135                    Regex::new(r"(?i)(?:allow\s+tool|continue\?|\[y/n\]|do you want to proceed)").unwrap(),
136                    |caps| PipeEvent::PromptDetected {
137                        prompt: caps.get(0).map(|m| m.as_str().to_string()).unwrap_or_default(),
138                    },
139                ),
140            ],
141        }
142    }
143
144    /// Try to classify a line of ANSI-stripped output as a structured event.
145    /// Returns None if no pattern matches.
146    pub fn classify(&self, line: &str) -> Option<PipeEvent> {
147        for (regex, classify) in &self.patterns {
148            if let Some(caps) = regex.captures(line) {
149                return Some(classify(&caps));
150            }
151        }
152        None
153    }
154}
155
156/// Rolling buffer of recent events.
157///
158/// Thread-safe via Arc<Mutex<_>> for sharing between the watcher thread
159/// and the supervisor's context composition.
160#[derive(Debug, Clone)]
161pub struct EventBuffer {
162    inner: Arc<Mutex<EventBufferInner>>,
163}
164
165#[derive(Debug)]
166struct EventBufferInner {
167    events: VecDeque<PipeEvent>,
168    max_size: usize,
169}
170
171impl EventBuffer {
172    /// Create a new event buffer with the given capacity.
173    pub fn new(max_size: usize) -> Self {
174        Self {
175            inner: Arc::new(Mutex::new(EventBufferInner {
176                events: VecDeque::with_capacity(max_size),
177                max_size,
178            })),
179        }
180    }
181
182    /// Create a buffer with the default size (50 events).
183    #[cfg(test)]
184    pub fn default_size() -> Self {
185        Self::new(DEFAULT_BUFFER_SIZE)
186    }
187
188    /// Push an event into the buffer, evicting the oldest if full.
189    pub fn push(&self, event: PipeEvent) {
190        let mut inner = self.inner.lock().unwrap();
191        if inner.events.len() >= inner.max_size {
192            inner.events.pop_front();
193        }
194        inner.events.push_back(event);
195    }
196
197    /// Get a snapshot of all events in the buffer.
198    pub fn snapshot(&self) -> Vec<PipeEvent> {
199        let inner = self.inner.lock().unwrap();
200        inner.events.iter().cloned().collect()
201    }
202
203    /// Get the number of events in the buffer.
204    pub fn len(&self) -> usize {
205        let inner = self.inner.lock().unwrap();
206        inner.events.len()
207    }
208
209    /// Check if the buffer is empty.
210    #[allow(dead_code)]
211    pub fn is_empty(&self) -> bool {
212        self.len() == 0
213    }
214
215    /// Clear all events.
216    #[allow(dead_code)]
217    pub fn clear(&self) {
218        let mut inner = self.inner.lock().unwrap();
219        inner.events.clear();
220    }
221
222    /// Format the buffer as a compact summary for the supervisor's context.
223    pub fn format_summary(&self) -> String {
224        let events = self.snapshot();
225        if events.is_empty() {
226            return "(no events yet)".to_string();
227        }
228
229        let mut summary = String::new();
230        for event in &events {
231            let line = match event {
232                PipeEvent::TaskStarted { task_id, title } => {
233                    format!("→ task #{task_id} started: {title}")
234                }
235                PipeEvent::TaskCompleted { task_id } => {
236                    format!("✓ task #{task_id} completed")
237                }
238                PipeEvent::FileCreated { path } => {
239                    format!("+ {path}")
240                }
241                PipeEvent::FileModified { path } => {
242                    format!("~ {path}")
243                }
244                PipeEvent::CommandRan { command, success } => {
245                    let status = match success {
246                        Some(true) => " ✓",
247                        Some(false) => " ✗",
248                        None => "",
249                    };
250                    format!("$ {command}{status}")
251                }
252                PipeEvent::TestRan { passed, detail } => {
253                    let icon = if *passed { "✓" } else { "✗" };
254                    format!("{icon} test: {detail}")
255                }
256                PipeEvent::PromptDetected { prompt } => {
257                    format!("? {prompt}")
258                }
259                PipeEvent::CommitMade { hash, message } => {
260                    let short_hash = &hash[..7.min(hash.len())];
261                    format!("⊕ commit {short_hash}: {message}")
262                }
263                PipeEvent::OutputLine { line } => {
264                    // Truncate long output lines
265                    if line.len() > 80 {
266                        format!("  {}...", &line[..77])
267                    } else {
268                        format!("  {line}")
269                    }
270                }
271            };
272            summary.push_str(&line);
273            summary.push('\n');
274        }
275        summary
276    }
277}
278
279/// Watches a pipe-pane log file and extracts events from new content.
280///
281/// Uses polling (seek to last position, read new bytes) which is simple
282/// and portable. The polling interval is configurable.
283pub struct PipeWatcher {
284    path: PathBuf,
285    patterns: EventPatterns,
286    buffer: EventBuffer,
287    position: u64,
288    line_buffer: String,
289}
290
291impl PipeWatcher {
292    /// Create a new pipe watcher for the given log file.
293    pub fn new(path: &Path, buffer: EventBuffer) -> Self {
294        Self::new_with_position(path, buffer, 0)
295    }
296
297    /// Create a new pipe watcher starting from a specific byte offset.
298    ///
299    /// Offsets beyond EOF are clamped during polling.
300    pub fn new_with_position(path: &Path, buffer: EventBuffer, position: u64) -> Self {
301        Self {
302            path: path.to_path_buf(),
303            patterns: EventPatterns::default_patterns(),
304            buffer,
305            position,
306            line_buffer: String::new(),
307        }
308    }
309
310    /// Poll for new content in the log file and extract events.
311    ///
312    /// Returns the number of new events extracted. Call this periodically
313    /// from the supervisor loop.
314    pub fn poll(&mut self) -> Result<usize> {
315        let mut file = match std::fs::File::open(&self.path) {
316            Ok(f) => f,
317            Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
318                return Ok(0); // file doesn't exist yet
319            }
320            Err(e) => {
321                return Err(e)
322                    .with_context(|| format!("failed to open pipe log: {}", self.path.display()));
323            }
324        };
325
326        // Clamp stale checkpoints (for example after truncation/rotation)
327        let file_len = file.metadata().map(|m| m.len()).unwrap_or(0);
328        if self.position > file_len {
329            self.position = file_len;
330        }
331
332        // Seek to where we left off
333        file.seek(SeekFrom::Start(self.position))
334            .context("failed to seek in pipe log")?;
335
336        // Read new content
337        let mut new_bytes = Vec::new();
338        let n = file
339            .read_to_end(&mut new_bytes)
340            .context("failed to read pipe log")?;
341
342        if n == 0 {
343            return Ok(0);
344        }
345
346        self.position += n as u64;
347
348        // Convert to string (lossy for binary data in PTY output)
349        let new_text = String::from_utf8_lossy(&new_bytes);
350        self.line_buffer.push_str(&new_text);
351
352        // Process complete lines
353        let mut event_count = 0;
354        while let Some(newline_pos) = self.line_buffer.find('\n') {
355            let line = self.line_buffer[..newline_pos].to_string();
356            self.line_buffer = self.line_buffer[newline_pos + 1..].to_string();
357
358            let stripped = strip_ansi(&line);
359            let trimmed = stripped.trim();
360            if trimmed.is_empty() {
361                continue;
362            }
363
364            if let Some(event) = self.patterns.classify(trimmed) {
365                debug!(event = ?event, "extracted event");
366                self.buffer.push(event);
367                event_count += 1;
368            }
369        }
370
371        Ok(event_count)
372    }
373
374    /// Resume-safe checkpoint offset.
375    ///
376    /// This rewinds by the currently buffered partial line bytes so a resumed
377    /// watcher can re-read any incomplete line safely.
378    pub fn checkpoint_offset(&self) -> u64 {
379        self.position
380            .saturating_sub(self.line_buffer.len().try_into().unwrap_or(0))
381    }
382}
383
384#[cfg(test)]
385mod tests {
386    use super::*;
387    use std::fs;
388    use std::io::Write;
389
390    // ── EventPatterns ──
391
392    #[test]
393    fn detect_task_started() {
394        let patterns = EventPatterns::default_patterns();
395        let event = patterns
396            .classify("Picked and moved task #3: kanban reader")
397            .unwrap();
398        match event {
399            PipeEvent::TaskStarted { task_id, title } => {
400                assert_eq!(task_id, "3");
401                assert_eq!(title, "kanban reader");
402            }
403            other => panic!("expected TaskStarted, got: {other:?}"),
404        }
405    }
406
407    #[test]
408    fn detect_task_started_claim() {
409        let patterns = EventPatterns::default_patterns();
410        let event = patterns.classify("Claimed task #5").unwrap();
411        assert!(matches!(event, PipeEvent::TaskStarted { .. }));
412    }
413
414    #[test]
415    fn detect_task_completed() {
416        let patterns = EventPatterns::default_patterns();
417        let event = patterns
418            .classify("Moved task #3: in-progress -> done")
419            .unwrap();
420        match event {
421            PipeEvent::TaskCompleted { task_id } => assert_eq!(task_id, "3"),
422            other => panic!("expected TaskCompleted, got: {other:?}"),
423        }
424    }
425
426    #[test]
427    fn detect_commit() {
428        let patterns = EventPatterns::default_patterns();
429        let event = patterns
430            .classify("[main abc1234] fix the auth bug")
431            .unwrap();
432        match event {
433            PipeEvent::CommitMade { hash, message } => {
434                assert_eq!(hash, "abc1234");
435                assert_eq!(message, "fix the auth bug");
436            }
437            other => panic!("expected CommitMade, got: {other:?}"),
438        }
439    }
440
441    #[test]
442    fn detect_test_passed() {
443        let patterns = EventPatterns::default_patterns();
444        let event = patterns
445            .classify("test result: ok. 42 passed; 0 failed")
446            .unwrap();
447        match event {
448            PipeEvent::TestRan { passed, .. } => assert!(passed),
449            other => panic!("expected TestRan, got: {other:?}"),
450        }
451    }
452
453    #[test]
454    fn detect_test_failed() {
455        let patterns = EventPatterns::default_patterns();
456        let event = patterns
457            .classify("test result: FAILED. 40 passed; 2 failed")
458            .unwrap();
459        match event {
460            PipeEvent::TestRan { passed, .. } => assert!(!passed),
461            other => panic!("expected TestRan, got: {other:?}"),
462        }
463    }
464
465    #[test]
466    fn detect_file_created() {
467        let patterns = EventPatterns::default_patterns();
468        let event = patterns.classify("Created file src/tmux.rs").unwrap();
469        match event {
470            PipeEvent::FileCreated { path } => assert_eq!(path, "src/tmux.rs"),
471            other => panic!("expected FileCreated, got: {other:?}"),
472        }
473    }
474
475    #[test]
476    fn detect_file_modified() {
477        let patterns = EventPatterns::default_patterns();
478        let event = patterns.classify("Edited src/main.rs").unwrap();
479        match event {
480            PipeEvent::FileModified { path } => assert_eq!(path, "src/main.rs"),
481            other => panic!("expected FileModified, got: {other:?}"),
482        }
483    }
484
485    #[test]
486    fn detect_command_ran() {
487        let patterns = EventPatterns::default_patterns();
488        let event = patterns.classify("$ cargo test").unwrap();
489        match event {
490            PipeEvent::CommandRan { command, .. } => assert_eq!(command, "cargo test"),
491            other => panic!("expected CommandRan, got: {other:?}"),
492        }
493    }
494
495    #[test]
496    fn detect_prompt() {
497        let patterns = EventPatterns::default_patterns();
498        let event = patterns
499            .classify("Allow tool Read on /home/user/file.rs?")
500            .unwrap();
501        assert!(matches!(event, PipeEvent::PromptDetected { .. }));
502    }
503
504    #[test]
505    fn no_match_on_normal_output() {
506        let patterns = EventPatterns::default_patterns();
507        assert!(
508            patterns
509                .classify("Writing function to parse YAML...")
510                .is_none()
511        );
512    }
513
514    // ── EventBuffer ──
515
516    #[test]
517    fn buffer_push_and_snapshot() {
518        let buf = EventBuffer::new(3);
519        buf.push(PipeEvent::OutputLine {
520            line: "a".to_string(),
521        });
522        buf.push(PipeEvent::OutputLine {
523            line: "b".to_string(),
524        });
525
526        let snap = buf.snapshot();
527        assert_eq!(snap.len(), 2);
528    }
529
530    #[test]
531    fn buffer_evicts_oldest_when_full() {
532        let buf = EventBuffer::new(2);
533        buf.push(PipeEvent::OutputLine {
534            line: "a".to_string(),
535        });
536        buf.push(PipeEvent::OutputLine {
537            line: "b".to_string(),
538        });
539        buf.push(PipeEvent::OutputLine {
540            line: "c".to_string(),
541        });
542
543        let snap = buf.snapshot();
544        assert_eq!(snap.len(), 2);
545        assert_eq!(
546            snap[0],
547            PipeEvent::OutputLine {
548                line: "b".to_string()
549            }
550        );
551        assert_eq!(
552            snap[1],
553            PipeEvent::OutputLine {
554                line: "c".to_string()
555            }
556        );
557    }
558
559    #[test]
560    fn buffer_default_size() {
561        let buf = EventBuffer::default_size();
562        assert_eq!(buf.len(), 0);
563
564        // Push 60 events — should keep only the last 50
565        for i in 0..60 {
566            buf.push(PipeEvent::OutputLine {
567                line: format!("line {i}"),
568            });
569        }
570        assert_eq!(buf.len(), 50);
571
572        let snap = buf.snapshot();
573        // First event should be line 10 (0-9 evicted)
574        assert_eq!(
575            snap[0],
576            PipeEvent::OutputLine {
577                line: "line 10".to_string()
578            }
579        );
580    }
581
582    #[test]
583    fn buffer_clear() {
584        let buf = EventBuffer::new(10);
585        buf.push(PipeEvent::OutputLine {
586            line: "x".to_string(),
587        });
588        assert_eq!(buf.len(), 1);
589
590        buf.clear();
591        assert!(buf.is_empty());
592    }
593
594    #[test]
595    fn buffer_format_summary_empty() {
596        let buf = EventBuffer::new(10);
597        assert_eq!(buf.format_summary(), "(no events yet)");
598    }
599
600    #[test]
601    fn buffer_format_summary_has_events() {
602        let buf = EventBuffer::new(10);
603        buf.push(PipeEvent::TaskStarted {
604            task_id: "3".to_string(),
605            title: "foo".to_string(),
606        });
607        buf.push(PipeEvent::FileCreated {
608            path: "src/x.rs".to_string(),
609        });
610        buf.push(PipeEvent::TestRan {
611            passed: true,
612            detail: "ok".to_string(),
613        });
614        buf.push(PipeEvent::CommitMade {
615            hash: "abc1234".to_string(),
616            message: "fix".to_string(),
617        });
618
619        let summary = buf.format_summary();
620        assert!(summary.contains("→ task #3 started: foo"));
621        assert!(summary.contains("+ src/x.rs"));
622        assert!(summary.contains("✓ test: ok"));
623        assert!(summary.contains("⊕ commit abc1234: fix"));
624    }
625
626    #[test]
627    fn buffer_is_thread_safe() {
628        let buf = EventBuffer::new(100);
629        let buf2 = buf.clone();
630
631        let handle = std::thread::spawn(move || {
632            for i in 0..50 {
633                buf2.push(PipeEvent::OutputLine {
634                    line: format!("thread {i}"),
635                });
636            }
637        });
638
639        for i in 0..50 {
640            buf.push(PipeEvent::OutputLine {
641                line: format!("main {i}"),
642            });
643        }
644
645        handle.join().unwrap();
646        assert_eq!(buf.len(), 100);
647    }
648
649    // ── PipeWatcher ──
650
651    #[test]
652    fn watcher_reads_new_content() {
653        let tmp = tempfile::tempdir().unwrap();
654        let log_path = tmp.path().join("pty-output.log");
655
656        // Create the log file with some content
657        {
658            let mut f = fs::File::create(&log_path).unwrap();
659            writeln!(f, "Picked and moved task #3: reader").unwrap();
660            writeln!(f, "some normal output").unwrap();
661            writeln!(f, "test result: ok. 5 passed; 0 failed").unwrap();
662        }
663
664        let buffer = EventBuffer::new(50);
665        let mut watcher = PipeWatcher::new(&log_path, buffer.clone());
666
667        let count = watcher.poll().unwrap();
668        assert!(count >= 2, "expected at least 2 events, got {count}");
669
670        let events = buffer.snapshot();
671        assert!(
672            events
673                .iter()
674                .any(|e| matches!(e, PipeEvent::TaskStarted { .. }))
675        );
676        assert!(
677            events
678                .iter()
679                .any(|e| matches!(e, PipeEvent::TestRan { passed: true, .. }))
680        );
681    }
682
683    #[test]
684    fn watcher_tracks_position() {
685        let tmp = tempfile::tempdir().unwrap();
686        let log_path = tmp.path().join("pty-output.log");
687
688        // Write initial content
689        {
690            let mut f = fs::File::create(&log_path).unwrap();
691            writeln!(f, "test result: ok. 5 passed").unwrap();
692        }
693
694        let buffer = EventBuffer::new(50);
695        let mut watcher = PipeWatcher::new(&log_path, buffer.clone());
696
697        watcher.poll().unwrap();
698        let count1 = buffer.len();
699
700        // Poll again with no new content
701        let count = watcher.poll().unwrap();
702        assert_eq!(count, 0, "no new content should yield 0 events");
703        assert_eq!(buffer.len(), count1);
704
705        // Append new content
706        {
707            let mut f = fs::OpenOptions::new().append(true).open(&log_path).unwrap();
708            writeln!(f, "[main abc1234] fix bug").unwrap();
709        }
710
711        let count = watcher.poll().unwrap();
712        assert!(count >= 1, "expected at least 1 new event");
713    }
714
715    #[test]
716    fn watcher_handles_missing_file() {
717        let tmp = tempfile::tempdir().unwrap();
718        let log_path = tmp.path().join("nonexistent.log");
719
720        let buffer = EventBuffer::new(50);
721        let mut watcher = PipeWatcher::new(&log_path, buffer);
722
723        // Should not error — just returns 0 events
724        let count = watcher.poll().unwrap();
725        assert_eq!(count, 0);
726    }
727
728    #[test]
729    fn watcher_resume_from_position_reads_only_new_content() {
730        let tmp = tempfile::tempdir().unwrap();
731        let log_path = tmp.path().join("resume.log");
732
733        {
734            let mut f = fs::File::create(&log_path).unwrap();
735            writeln!(f, "test result: ok. 1 passed").unwrap();
736        }
737
738        let file_len = fs::metadata(&log_path).unwrap().len();
739        let buffer = EventBuffer::new(50);
740        let mut watcher = PipeWatcher::new_with_position(&log_path, buffer.clone(), file_len);
741
742        {
743            let mut f = fs::OpenOptions::new().append(true).open(&log_path).unwrap();
744            writeln!(f, "[main abc1234] resume").unwrap();
745        }
746
747        let count = watcher.poll().unwrap();
748        assert!(count >= 1);
749        let events = buffer.snapshot();
750        assert!(
751            events
752                .iter()
753                .any(|e| matches!(e, PipeEvent::CommitMade { .. }))
754        );
755    }
756
757    #[test]
758    fn watcher_checkpoint_offset_rewinds_partial_line() {
759        let tmp = tempfile::tempdir().unwrap();
760        let log_path = tmp.path().join("partial.log");
761
762        {
763            let mut f = fs::File::create(&log_path).unwrap();
764            write!(f, "test result: ok").unwrap(); // no trailing newline
765        }
766
767        let buffer = EventBuffer::new(50);
768        let mut watcher = PipeWatcher::new(&log_path, buffer.clone());
769        let _ = watcher.poll().unwrap();
770
771        assert_eq!(
772            watcher.checkpoint_offset(),
773            0,
774            "partial line should be re-read on resume"
775        );
776        assert_eq!(buffer.len(), 0);
777    }
778
779    #[test]
780    fn watcher_strips_ansi() {
781        let tmp = tempfile::tempdir().unwrap();
782        let log_path = tmp.path().join("ansi.log");
783
784        {
785            let mut f = fs::File::create(&log_path).unwrap();
786            // Write ANSI-escaped "test result: ok"
787            writeln!(f, "\x1b[32mtest result: ok. 5 passed\x1b[0m").unwrap();
788        }
789
790        let buffer = EventBuffer::new(50);
791        let mut watcher = PipeWatcher::new(&log_path, buffer.clone());
792
793        watcher.poll().unwrap();
794        let events = buffer.snapshot();
795        assert!(
796            events
797                .iter()
798                .any(|e| matches!(e, PipeEvent::TestRan { passed: true, .. }))
799        );
800    }
801
802    // ── PipeEvent serialization ──
803
804    #[test]
805    fn pipe_event_serializes_to_json() {
806        let event = PipeEvent::TaskStarted {
807            task_id: "3".to_string(),
808            title: "test".to_string(),
809        };
810        let json = serde_json::to_string(&event).unwrap();
811        assert!(json.contains("\"type\":\"task_started\""));
812        assert!(json.contains("\"task_id\":\"3\""));
813    }
814
815    #[test]
816    fn all_pipe_events_serialize() {
817        let events = vec![
818            PipeEvent::TaskStarted {
819                task_id: "1".to_string(),
820                title: "test".to_string(),
821            },
822            PipeEvent::FileCreated {
823                path: "x.rs".to_string(),
824            },
825            PipeEvent::FileModified {
826                path: "y.rs".to_string(),
827            },
828            PipeEvent::CommandRan {
829                command: "ls".to_string(),
830                success: Some(true),
831            },
832            PipeEvent::TestRan {
833                passed: true,
834                detail: "ok".to_string(),
835            },
836            PipeEvent::PromptDetected {
837                prompt: "y/n".to_string(),
838            },
839            PipeEvent::TaskCompleted {
840                task_id: "1".to_string(),
841            },
842            PipeEvent::CommitMade {
843                hash: "abc".to_string(),
844                message: "fix".to_string(),
845            },
846            PipeEvent::OutputLine {
847                line: "hi".to_string(),
848            },
849        ];
850
851        for event in events {
852            let json = serde_json::to_string(&event);
853            assert!(json.is_ok(), "failed to serialize: {event:?}");
854        }
855    }
856
857    // ── Coverage: additional events tests ──
858
859    #[test]
860    fn pipe_watcher_extracts_events_from_file() {
861        let tmp = tempfile::tempdir().unwrap();
862        let pipe_log = tmp.path().join("pipe.log");
863        std::fs::write(
864            &pipe_log,
865            "some normal output\nRunning: cargo test\ntest result: ok. 5 passed\n",
866        )
867        .unwrap();
868
869        let buffer = EventBuffer::new(10);
870        let mut watcher = PipeWatcher::new(&pipe_log, buffer);
871        let count = watcher.poll().unwrap();
872        assert!(count > 0, "expected at least one event extracted");
873    }
874
875    #[test]
876    fn pipe_watcher_handles_missing_file() {
877        let tmp = tempfile::tempdir().unwrap();
878        let pipe_log = tmp.path().join("nonexistent.log");
879
880        let buffer = EventBuffer::new(10);
881        let mut watcher = PipeWatcher::new(&pipe_log, buffer);
882        let count = watcher.poll().unwrap();
883        assert_eq!(count, 0);
884    }
885
886    #[test]
887    fn pipe_watcher_incremental_reads() {
888        let tmp = tempfile::tempdir().unwrap();
889        let pipe_log = tmp.path().join("pipe.log");
890        std::fs::write(&pipe_log, "Running: first command\n").unwrap();
891
892        let buffer = EventBuffer::new(10);
893        let mut watcher = PipeWatcher::new(&pipe_log, buffer);
894        let count1 = watcher.poll().unwrap();
895        assert!(count1 > 0, "should extract events from first write");
896
897        // Append more content
898        use std::io::Write;
899        let mut f = std::fs::OpenOptions::new()
900            .append(true)
901            .open(&pipe_log)
902            .unwrap();
903        writeln!(f, "Running: second command").unwrap();
904
905        let count2 = watcher.poll().unwrap();
906        assert!(count2 > 0, "should extract events from appended content");
907    }
908
909    #[test]
910    fn pipe_watcher_clamps_stale_position() {
911        let tmp = tempfile::tempdir().unwrap();
912        let pipe_log = tmp.path().join("pipe.log");
913        std::fs::write(&pipe_log, "short\n").unwrap();
914
915        let buffer = EventBuffer::new(10);
916        // Start with position beyond file
917        let mut watcher = PipeWatcher::new_with_position(&pipe_log, buffer, 99999);
918        let count = watcher.poll().unwrap();
919        assert_eq!(count, 0, "should clamp and read nothing new");
920    }
921
922    #[test]
923    fn pipe_watcher_partial_line_buffering() {
924        let tmp = tempfile::tempdir().unwrap();
925        let pipe_log = tmp.path().join("pipe.log");
926        // Write content WITHOUT trailing newline
927        std::fs::write(&pipe_log, "Running: partial").unwrap();
928
929        let buffer = EventBuffer::new(10);
930        let mut watcher = PipeWatcher::new(&pipe_log, buffer);
931        let count1 = watcher.poll().unwrap();
932        assert_eq!(
933            count1, 0,
934            "incomplete line should be buffered, not processed"
935        );
936
937        // Now complete the line
938        use std::io::Write;
939        let mut f = std::fs::OpenOptions::new()
940            .append(true)
941            .open(&pipe_log)
942            .unwrap();
943        writeln!(f, " command").unwrap();
944
945        let count2 = watcher.poll().unwrap();
946        assert!(
947            count2 > 0,
948            "completed line should now be processed as event"
949        );
950    }
951
952    #[test]
953    fn format_summary_truncates_long_output_line() {
954        let buffer = EventBuffer::new(10);
955        let long_line = "x".repeat(100);
956        buffer.push(PipeEvent::OutputLine {
957            line: long_line.clone(),
958        });
959        let summary = buffer.format_summary();
960        assert!(
961            summary.contains("..."),
962            "long output line should be truncated with ..."
963        );
964        assert!(summary.len() < long_line.len() + 20);
965    }
966
967    #[test]
968    fn format_summary_includes_all_event_types() {
969        let buffer = EventBuffer::new(10);
970        buffer.push(PipeEvent::CommandRan {
971            command: "ls".to_string(),
972            success: Some(true),
973        });
974        buffer.push(PipeEvent::TestRan {
975            passed: true,
976            detail: "all good".to_string(),
977        });
978        buffer.push(PipeEvent::CommitMade {
979            hash: "abc1234def".to_string(),
980            message: "fix bug".to_string(),
981        });
982        buffer.push(PipeEvent::OutputLine {
983            line: "hello".to_string(),
984        });
985
986        let summary = buffer.format_summary();
987        assert!(summary.contains("$ ls"));
988        assert!(summary.contains("✓ test:"));
989        assert!(summary.contains("⊕ commit abc1234:"));
990        assert!(summary.contains("hello"));
991    }
992
993    #[test]
994    fn event_buffer_respects_capacity() {
995        let buffer = EventBuffer::new(3);
996        buffer.push(PipeEvent::OutputLine {
997            line: "a".to_string(),
998        });
999        buffer.push(PipeEvent::OutputLine {
1000            line: "b".to_string(),
1001        });
1002        buffer.push(PipeEvent::OutputLine {
1003            line: "c".to_string(),
1004        });
1005        buffer.push(PipeEvent::OutputLine {
1006            line: "d".to_string(),
1007        });
1008
1009        let summary = buffer.format_summary();
1010        assert!(!summary.contains("  a"), "oldest event should be evicted");
1011        assert!(summary.contains("  d"), "newest event should be present");
1012    }
1013}