Skip to main content

batty_cli/
events.rs

1//! Event extraction from piped tmux output.
2//!
3//! Reads the pipe-pane log file, strips ANSI escapes, pattern-matches against
4//! known agent output patterns, and produces structured events. A rolling event
5//! buffer provides a compact summary of the executor's recent activity for the
6//! supervisor's context window.
7#![cfg_attr(not(test), allow(dead_code))]
8
9use std::collections::VecDeque;
10use std::io::{Read, Seek, SeekFrom};
11use std::path::{Path, PathBuf};
12use std::sync::{Arc, Mutex};
13
14use anyhow::{Context, Result};
15use regex::Regex;
16use serde::Serialize;
17use tracing::debug;
18
19use crate::prompt::strip_ansi;
20
21/// Default rolling buffer size.
22#[cfg(test)]
23const DEFAULT_BUFFER_SIZE: usize = 50;
24
25/// Structured events extracted from executor output.
26#[derive(Debug, Clone, Serialize, PartialEq)]
27#[serde(tag = "type", rename_all = "snake_case")]
28pub enum PipeEvent {
29    /// Executor picked a new task.
30    TaskStarted { task_id: String, title: String },
31    /// Executor created a file.
32    FileCreated { path: String },
33    /// Executor modified a file.
34    FileModified { path: String },
35    /// Executor ran a command.
36    CommandRan {
37        command: String,
38        success: Option<bool>,
39    },
40    /// Test execution detected.
41    TestRan { passed: bool, detail: String },
42    /// Executor is asking a question (prompt detected).
43    PromptDetected { prompt: String },
44    /// Executor marked a task done.
45    TaskCompleted { task_id: String },
46    /// Executor made a git commit.
47    CommitMade { hash: String, message: String },
48    /// Raw output line (for lines that don't match any pattern).
49    OutputLine { line: String },
50}
51
52/// Compiled regex patterns for extracting events from executor output.
53pub struct EventPatterns {
54    patterns: Vec<(Regex, EventClassifier)>,
55}
56
57type EventClassifier = fn(&regex::Captures) -> PipeEvent;
58
59impl EventPatterns {
60    /// Build default event extraction patterns.
61    ///
62    /// These patterns target common agent output after ANSI stripping.
63    /// They work across Claude Code, Codex, and Aider output.
64    pub fn default_patterns() -> Self {
65        Self {
66            patterns: vec![
67                // Task started: "Picked and moved task #N" or kanban-md output
68                (
69                    Regex::new(r"(?i)(?:picked|claimed|starting|working on)\s+(?:and moved\s+)?task\s+#?(\d+)(?::\s+(.+))?").unwrap(),
70                    |caps| PipeEvent::TaskStarted {
71                        task_id: caps.get(1).map(|m| m.as_str().to_string()).unwrap_or_default(),
72                        title: caps.get(2).map(|m| m.as_str().to_string()).unwrap_or_default(),
73                    },
74                ),
75                // Task completed: "Moved task #N" to done, or "task #N done"
76                (
77                    Regex::new(r"(?i)(?:moved task\s+#?(\d+).*(?:done|complete)|task\s+#?(\d+)\s+(?:done|complete))").unwrap(),
78                    |caps| PipeEvent::TaskCompleted {
79                        task_id: caps.get(1)
80                            .or_else(|| caps.get(2))
81                            .map(|m| m.as_str().to_string())
82                            .unwrap_or_default(),
83                    },
84                ),
85                // Git commit: "[main abc1234] message" or "commit abc1234"
86                (
87                    Regex::new(r"(?:\[[\w/-]+\s+([0-9a-f]{7,40})\]\s+(.+)|commit\s+([0-9a-f]{7,40}))").unwrap(),
88                    |caps| PipeEvent::CommitMade {
89                        hash: caps.get(1)
90                            .or_else(|| caps.get(3))
91                            .map(|m| m.as_str().to_string())
92                            .unwrap_or_default(),
93                        message: caps.get(2).map(|m| m.as_str().to_string()).unwrap_or_default(),
94                    },
95                ),
96                // Test result: "test result: ok. N passed" or "test result: FAILED"
97                (
98                    Regex::new(r"test result:\s*(ok|FAILED)").unwrap(),
99                    |caps| {
100                        let result = caps.get(1).map(|m| m.as_str()).unwrap_or("FAILED");
101                        PipeEvent::TestRan {
102                            passed: result == "ok",
103                            detail: caps.get(0).map(|m| m.as_str().to_string()).unwrap_or_default(),
104                        }
105                    },
106                ),
107                // File created: "Created file X" or "Write tool" or "File created"
108                (
109                    Regex::new(r"(?i)(?:created?\s+(?:file\s+)?|wrote\s+|writing\s+to\s+)([\w/.+\-]+\.\w+)").unwrap(),
110                    |caps| PipeEvent::FileCreated {
111                        path: caps.get(1).map(|m| m.as_str().to_string()).unwrap_or_default(),
112                    },
113                ),
114                // File modified: "Edited X" or "Modified X" or "Edit tool"
115                (
116                    Regex::new(r"(?i)(?:edit(?:ed|ing)?\s+|modif(?:ied|ying)\s+)([\w/.+\-]+\.\w+)").unwrap(),
117                    |caps| PipeEvent::FileModified {
118                        path: caps.get(1).map(|m| m.as_str().to_string()).unwrap_or_default(),
119                    },
120                ),
121                // Command ran: "$ command" or "Running: command" or exit code pattern
122                (
123                    Regex::new(r"(?:^\$\s+(.+)|Running:\s+(.+))").unwrap(),
124                    |caps| PipeEvent::CommandRan {
125                        command: caps.get(1)
126                            .or_else(|| caps.get(2))
127                            .map(|m| m.as_str().to_string())
128                            .unwrap_or_default(),
129                        success: None,
130                    },
131                ),
132                // Prompt patterns: "Allow tool", "[y/n]", "Continue?"
133                (
134                    Regex::new(r"(?i)(?:allow\s+tool|continue\?|\[y/n\]|do you want to proceed)").unwrap(),
135                    |caps| PipeEvent::PromptDetected {
136                        prompt: caps.get(0).map(|m| m.as_str().to_string()).unwrap_or_default(),
137                    },
138                ),
139            ],
140        }
141    }
142
143    /// Try to classify a line of ANSI-stripped output as a structured event.
144    /// Returns None if no pattern matches.
145    pub fn classify(&self, line: &str) -> Option<PipeEvent> {
146        for (regex, classify) in &self.patterns {
147            if let Some(caps) = regex.captures(line) {
148                return Some(classify(&caps));
149            }
150        }
151        None
152    }
153}
154
155/// Rolling buffer of recent events.
156///
157/// Thread-safe via Arc<Mutex<_>> for sharing between the watcher thread
158/// and the supervisor's context composition.
159#[derive(Debug, Clone)]
160pub struct EventBuffer {
161    inner: Arc<Mutex<EventBufferInner>>,
162}
163
164#[derive(Debug)]
165struct EventBufferInner {
166    events: VecDeque<PipeEvent>,
167    max_size: usize,
168}
169
170impl EventBuffer {
171    /// Create a new event buffer with the given capacity.
172    pub fn new(max_size: usize) -> Self {
173        Self {
174            inner: Arc::new(Mutex::new(EventBufferInner {
175                events: VecDeque::with_capacity(max_size),
176                max_size,
177            })),
178        }
179    }
180
181    /// Create a buffer with the default size (50 events).
182    #[cfg(test)]
183    pub fn default_size() -> Self {
184        Self::new(DEFAULT_BUFFER_SIZE)
185    }
186
187    /// Push an event into the buffer, evicting the oldest if full.
188    pub fn push(&self, event: PipeEvent) {
189        let mut inner = self.inner.lock().unwrap();
190        if inner.events.len() >= inner.max_size {
191            inner.events.pop_front();
192        }
193        inner.events.push_back(event);
194    }
195
196    /// Get a snapshot of all events in the buffer.
197    pub fn snapshot(&self) -> Vec<PipeEvent> {
198        let inner = self.inner.lock().unwrap();
199        inner.events.iter().cloned().collect()
200    }
201
202    /// Get the number of events in the buffer.
203    pub fn len(&self) -> usize {
204        let inner = self.inner.lock().unwrap();
205        inner.events.len()
206    }
207
208    /// Check if the buffer is empty.
209    pub fn is_empty(&self) -> bool {
210        self.len() == 0
211    }
212
213    /// Clear all events.
214    pub fn clear(&self) {
215        let mut inner = self.inner.lock().unwrap();
216        inner.events.clear();
217    }
218
219    /// Format the buffer as a compact summary for the supervisor's context.
220    pub fn format_summary(&self) -> String {
221        let events = self.snapshot();
222        if events.is_empty() {
223            return "(no events yet)".to_string();
224        }
225
226        let mut summary = String::new();
227        for event in &events {
228            let line = match event {
229                PipeEvent::TaskStarted { task_id, title } => {
230                    format!("→ task #{task_id} started: {title}")
231                }
232                PipeEvent::TaskCompleted { task_id } => {
233                    format!("✓ task #{task_id} completed")
234                }
235                PipeEvent::FileCreated { path } => {
236                    format!("+ {path}")
237                }
238                PipeEvent::FileModified { path } => {
239                    format!("~ {path}")
240                }
241                PipeEvent::CommandRan { command, success } => {
242                    let status = match success {
243                        Some(true) => " ✓",
244                        Some(false) => " ✗",
245                        None => "",
246                    };
247                    format!("$ {command}{status}")
248                }
249                PipeEvent::TestRan { passed, detail } => {
250                    let icon = if *passed { "✓" } else { "✗" };
251                    format!("{icon} test: {detail}")
252                }
253                PipeEvent::PromptDetected { prompt } => {
254                    format!("? {prompt}")
255                }
256                PipeEvent::CommitMade { hash, message } => {
257                    let short_hash = &hash[..7.min(hash.len())];
258                    format!("⊕ commit {short_hash}: {message}")
259                }
260                PipeEvent::OutputLine { line } => {
261                    // Truncate long output lines
262                    if line.len() > 80 {
263                        format!("  {}...", &line[..77])
264                    } else {
265                        format!("  {line}")
266                    }
267                }
268            };
269            summary.push_str(&line);
270            summary.push('\n');
271        }
272        summary
273    }
274}
275
276/// Watches a pipe-pane log file and extracts events from new content.
277///
278/// Uses polling (seek to last position, read new bytes) which is simple
279/// and portable. The polling interval is configurable.
280pub struct PipeWatcher {
281    path: PathBuf,
282    patterns: EventPatterns,
283    buffer: EventBuffer,
284    position: u64,
285    line_buffer: String,
286}
287
288impl PipeWatcher {
289    /// Create a new pipe watcher for the given log file.
290    pub fn new(path: &Path, buffer: EventBuffer) -> Self {
291        Self::new_with_position(path, buffer, 0)
292    }
293
294    /// Create a new pipe watcher starting from a specific byte offset.
295    ///
296    /// Offsets beyond EOF are clamped during polling.
297    pub fn new_with_position(path: &Path, buffer: EventBuffer, position: u64) -> Self {
298        Self {
299            path: path.to_path_buf(),
300            patterns: EventPatterns::default_patterns(),
301            buffer,
302            position,
303            line_buffer: String::new(),
304        }
305    }
306
307    /// Poll for new content in the log file and extract events.
308    ///
309    /// Returns the number of new events extracted. Call this periodically
310    /// from the supervisor loop.
311    pub fn poll(&mut self) -> Result<usize> {
312        let mut file = match std::fs::File::open(&self.path) {
313            Ok(f) => f,
314            Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
315                return Ok(0); // file doesn't exist yet
316            }
317            Err(e) => {
318                return Err(e)
319                    .with_context(|| format!("failed to open pipe log: {}", self.path.display()));
320            }
321        };
322
323        // Clamp stale checkpoints (for example after truncation/rotation)
324        let file_len = file.metadata().map(|m| m.len()).unwrap_or(0);
325        if self.position > file_len {
326            self.position = file_len;
327        }
328
329        // Seek to where we left off
330        file.seek(SeekFrom::Start(self.position))
331            .context("failed to seek in pipe log")?;
332
333        // Read new content
334        let mut new_bytes = Vec::new();
335        let n = file
336            .read_to_end(&mut new_bytes)
337            .context("failed to read pipe log")?;
338
339        if n == 0 {
340            return Ok(0);
341        }
342
343        self.position += n as u64;
344
345        // Convert to string (lossy for binary data in PTY output)
346        let new_text = String::from_utf8_lossy(&new_bytes);
347        self.line_buffer.push_str(&new_text);
348
349        // Process complete lines
350        let mut event_count = 0;
351        while let Some(newline_pos) = self.line_buffer.find('\n') {
352            let line = self.line_buffer[..newline_pos].to_string();
353            self.line_buffer = self.line_buffer[newline_pos + 1..].to_string();
354
355            let stripped = strip_ansi(&line);
356            let trimmed = stripped.trim();
357            if trimmed.is_empty() {
358                continue;
359            }
360
361            if let Some(event) = self.patterns.classify(trimmed) {
362                debug!(event = ?event, "extracted event");
363                self.buffer.push(event);
364                event_count += 1;
365            }
366        }
367
368        Ok(event_count)
369    }
370
371    /// Resume-safe checkpoint offset.
372    ///
373    /// This rewinds by the currently buffered partial line bytes so a resumed
374    /// watcher can re-read any incomplete line safely.
375    pub fn checkpoint_offset(&self) -> u64 {
376        self.position
377            .saturating_sub(self.line_buffer.len().try_into().unwrap_or(0))
378    }
379}
380
381#[cfg(test)]
382mod tests {
383    use super::*;
384    use std::fs;
385    use std::io::Write;
386
387    // ── EventPatterns ──
388
389    #[test]
390    fn detect_task_started() {
391        let patterns = EventPatterns::default_patterns();
392        let event = patterns
393            .classify("Picked and moved task #3: kanban reader")
394            .unwrap();
395        match event {
396            PipeEvent::TaskStarted { task_id, title } => {
397                assert_eq!(task_id, "3");
398                assert_eq!(title, "kanban reader");
399            }
400            other => panic!("expected TaskStarted, got: {other:?}"),
401        }
402    }
403
404    #[test]
405    fn detect_task_started_claim() {
406        let patterns = EventPatterns::default_patterns();
407        let event = patterns.classify("Claimed task #5").unwrap();
408        assert!(matches!(event, PipeEvent::TaskStarted { .. }));
409    }
410
411    #[test]
412    fn detect_task_completed() {
413        let patterns = EventPatterns::default_patterns();
414        let event = patterns
415            .classify("Moved task #3: in-progress -> done")
416            .unwrap();
417        match event {
418            PipeEvent::TaskCompleted { task_id } => assert_eq!(task_id, "3"),
419            other => panic!("expected TaskCompleted, got: {other:?}"),
420        }
421    }
422
423    #[test]
424    fn detect_commit() {
425        let patterns = EventPatterns::default_patterns();
426        let event = patterns
427            .classify("[main abc1234] fix the auth bug")
428            .unwrap();
429        match event {
430            PipeEvent::CommitMade { hash, message } => {
431                assert_eq!(hash, "abc1234");
432                assert_eq!(message, "fix the auth bug");
433            }
434            other => panic!("expected CommitMade, got: {other:?}"),
435        }
436    }
437
438    #[test]
439    fn detect_test_passed() {
440        let patterns = EventPatterns::default_patterns();
441        let event = patterns
442            .classify("test result: ok. 42 passed; 0 failed")
443            .unwrap();
444        match event {
445            PipeEvent::TestRan { passed, .. } => assert!(passed),
446            other => panic!("expected TestRan, got: {other:?}"),
447        }
448    }
449
450    #[test]
451    fn detect_test_failed() {
452        let patterns = EventPatterns::default_patterns();
453        let event = patterns
454            .classify("test result: FAILED. 40 passed; 2 failed")
455            .unwrap();
456        match event {
457            PipeEvent::TestRan { passed, .. } => assert!(!passed),
458            other => panic!("expected TestRan, got: {other:?}"),
459        }
460    }
461
462    #[test]
463    fn detect_file_created() {
464        let patterns = EventPatterns::default_patterns();
465        let event = patterns.classify("Created file src/tmux.rs").unwrap();
466        match event {
467            PipeEvent::FileCreated { path } => assert_eq!(path, "src/tmux.rs"),
468            other => panic!("expected FileCreated, got: {other:?}"),
469        }
470    }
471
472    #[test]
473    fn detect_file_modified() {
474        let patterns = EventPatterns::default_patterns();
475        let event = patterns.classify("Edited src/main.rs").unwrap();
476        match event {
477            PipeEvent::FileModified { path } => assert_eq!(path, "src/main.rs"),
478            other => panic!("expected FileModified, got: {other:?}"),
479        }
480    }
481
482    #[test]
483    fn detect_command_ran() {
484        let patterns = EventPatterns::default_patterns();
485        let event = patterns.classify("$ cargo test").unwrap();
486        match event {
487            PipeEvent::CommandRan { command, .. } => assert_eq!(command, "cargo test"),
488            other => panic!("expected CommandRan, got: {other:?}"),
489        }
490    }
491
492    #[test]
493    fn detect_prompt() {
494        let patterns = EventPatterns::default_patterns();
495        let event = patterns
496            .classify("Allow tool Read on /home/user/file.rs?")
497            .unwrap();
498        assert!(matches!(event, PipeEvent::PromptDetected { .. }));
499    }
500
501    #[test]
502    fn no_match_on_normal_output() {
503        let patterns = EventPatterns::default_patterns();
504        assert!(
505            patterns
506                .classify("Writing function to parse YAML...")
507                .is_none()
508        );
509    }
510
511    // ── EventBuffer ──
512
513    #[test]
514    fn buffer_push_and_snapshot() {
515        let buf = EventBuffer::new(3);
516        buf.push(PipeEvent::OutputLine {
517            line: "a".to_string(),
518        });
519        buf.push(PipeEvent::OutputLine {
520            line: "b".to_string(),
521        });
522
523        let snap = buf.snapshot();
524        assert_eq!(snap.len(), 2);
525    }
526
527    #[test]
528    fn buffer_evicts_oldest_when_full() {
529        let buf = EventBuffer::new(2);
530        buf.push(PipeEvent::OutputLine {
531            line: "a".to_string(),
532        });
533        buf.push(PipeEvent::OutputLine {
534            line: "b".to_string(),
535        });
536        buf.push(PipeEvent::OutputLine {
537            line: "c".to_string(),
538        });
539
540        let snap = buf.snapshot();
541        assert_eq!(snap.len(), 2);
542        assert_eq!(
543            snap[0],
544            PipeEvent::OutputLine {
545                line: "b".to_string()
546            }
547        );
548        assert_eq!(
549            snap[1],
550            PipeEvent::OutputLine {
551                line: "c".to_string()
552            }
553        );
554    }
555
556    #[test]
557    fn buffer_default_size() {
558        let buf = EventBuffer::default_size();
559        assert_eq!(buf.len(), 0);
560
561        // Push 60 events — should keep only the last 50
562        for i in 0..60 {
563            buf.push(PipeEvent::OutputLine {
564                line: format!("line {i}"),
565            });
566        }
567        assert_eq!(buf.len(), 50);
568
569        let snap = buf.snapshot();
570        // First event should be line 10 (0-9 evicted)
571        assert_eq!(
572            snap[0],
573            PipeEvent::OutputLine {
574                line: "line 10".to_string()
575            }
576        );
577    }
578
579    #[test]
580    fn buffer_clear() {
581        let buf = EventBuffer::new(10);
582        buf.push(PipeEvent::OutputLine {
583            line: "x".to_string(),
584        });
585        assert_eq!(buf.len(), 1);
586
587        buf.clear();
588        assert!(buf.is_empty());
589    }
590
591    #[test]
592    fn buffer_format_summary_empty() {
593        let buf = EventBuffer::new(10);
594        assert_eq!(buf.format_summary(), "(no events yet)");
595    }
596
597    #[test]
598    fn buffer_format_summary_has_events() {
599        let buf = EventBuffer::new(10);
600        buf.push(PipeEvent::TaskStarted {
601            task_id: "3".to_string(),
602            title: "foo".to_string(),
603        });
604        buf.push(PipeEvent::FileCreated {
605            path: "src/x.rs".to_string(),
606        });
607        buf.push(PipeEvent::TestRan {
608            passed: true,
609            detail: "ok".to_string(),
610        });
611        buf.push(PipeEvent::CommitMade {
612            hash: "abc1234".to_string(),
613            message: "fix".to_string(),
614        });
615
616        let summary = buf.format_summary();
617        assert!(summary.contains("→ task #3 started: foo"));
618        assert!(summary.contains("+ src/x.rs"));
619        assert!(summary.contains("✓ test: ok"));
620        assert!(summary.contains("⊕ commit abc1234: fix"));
621    }
622
623    #[test]
624    fn buffer_is_thread_safe() {
625        let buf = EventBuffer::new(100);
626        let buf2 = buf.clone();
627
628        let handle = std::thread::spawn(move || {
629            for i in 0..50 {
630                buf2.push(PipeEvent::OutputLine {
631                    line: format!("thread {i}"),
632                });
633            }
634        });
635
636        for i in 0..50 {
637            buf.push(PipeEvent::OutputLine {
638                line: format!("main {i}"),
639            });
640        }
641
642        handle.join().unwrap();
643        assert_eq!(buf.len(), 100);
644    }
645
646    // ── PipeWatcher ──
647
648    #[test]
649    fn watcher_reads_new_content() {
650        let tmp = tempfile::tempdir().unwrap();
651        let log_path = tmp.path().join("pty-output.log");
652
653        // Create the log file with some content
654        {
655            let mut f = fs::File::create(&log_path).unwrap();
656            writeln!(f, "Picked and moved task #3: reader").unwrap();
657            writeln!(f, "some normal output").unwrap();
658            writeln!(f, "test result: ok. 5 passed; 0 failed").unwrap();
659        }
660
661        let buffer = EventBuffer::new(50);
662        let mut watcher = PipeWatcher::new(&log_path, buffer.clone());
663
664        let count = watcher.poll().unwrap();
665        assert!(count >= 2, "expected at least 2 events, got {count}");
666
667        let events = buffer.snapshot();
668        assert!(
669            events
670                .iter()
671                .any(|e| matches!(e, PipeEvent::TaskStarted { .. }))
672        );
673        assert!(
674            events
675                .iter()
676                .any(|e| matches!(e, PipeEvent::TestRan { passed: true, .. }))
677        );
678    }
679
680    #[test]
681    fn watcher_tracks_position() {
682        let tmp = tempfile::tempdir().unwrap();
683        let log_path = tmp.path().join("pty-output.log");
684
685        // Write initial content
686        {
687            let mut f = fs::File::create(&log_path).unwrap();
688            writeln!(f, "test result: ok. 5 passed").unwrap();
689        }
690
691        let buffer = EventBuffer::new(50);
692        let mut watcher = PipeWatcher::new(&log_path, buffer.clone());
693
694        watcher.poll().unwrap();
695        let count1 = buffer.len();
696
697        // Poll again with no new content
698        let count = watcher.poll().unwrap();
699        assert_eq!(count, 0, "no new content should yield 0 events");
700        assert_eq!(buffer.len(), count1);
701
702        // Append new content
703        {
704            let mut f = fs::OpenOptions::new().append(true).open(&log_path).unwrap();
705            writeln!(f, "[main abc1234] fix bug").unwrap();
706        }
707
708        let count = watcher.poll().unwrap();
709        assert!(count >= 1, "expected at least 1 new event");
710    }
711
712    #[test]
713    fn watcher_handles_missing_file() {
714        let tmp = tempfile::tempdir().unwrap();
715        let log_path = tmp.path().join("nonexistent.log");
716
717        let buffer = EventBuffer::new(50);
718        let mut watcher = PipeWatcher::new(&log_path, buffer);
719
720        // Should not error — just returns 0 events
721        let count = watcher.poll().unwrap();
722        assert_eq!(count, 0);
723    }
724
725    #[test]
726    fn watcher_resume_from_position_reads_only_new_content() {
727        let tmp = tempfile::tempdir().unwrap();
728        let log_path = tmp.path().join("resume.log");
729
730        {
731            let mut f = fs::File::create(&log_path).unwrap();
732            writeln!(f, "test result: ok. 1 passed").unwrap();
733        }
734
735        let file_len = fs::metadata(&log_path).unwrap().len();
736        let buffer = EventBuffer::new(50);
737        let mut watcher = PipeWatcher::new_with_position(&log_path, buffer.clone(), file_len);
738
739        {
740            let mut f = fs::OpenOptions::new().append(true).open(&log_path).unwrap();
741            writeln!(f, "[main abc1234] resume").unwrap();
742        }
743
744        let count = watcher.poll().unwrap();
745        assert!(count >= 1);
746        let events = buffer.snapshot();
747        assert!(
748            events
749                .iter()
750                .any(|e| matches!(e, PipeEvent::CommitMade { .. }))
751        );
752    }
753
754    #[test]
755    fn watcher_checkpoint_offset_rewinds_partial_line() {
756        let tmp = tempfile::tempdir().unwrap();
757        let log_path = tmp.path().join("partial.log");
758
759        {
760            let mut f = fs::File::create(&log_path).unwrap();
761            write!(f, "test result: ok").unwrap(); // no trailing newline
762        }
763
764        let buffer = EventBuffer::new(50);
765        let mut watcher = PipeWatcher::new(&log_path, buffer.clone());
766        let _ = watcher.poll().unwrap();
767
768        assert_eq!(
769            watcher.checkpoint_offset(),
770            0,
771            "partial line should be re-read on resume"
772        );
773        assert_eq!(buffer.len(), 0);
774    }
775
776    #[test]
777    fn watcher_strips_ansi() {
778        let tmp = tempfile::tempdir().unwrap();
779        let log_path = tmp.path().join("ansi.log");
780
781        {
782            let mut f = fs::File::create(&log_path).unwrap();
783            // Write ANSI-escaped "test result: ok"
784            writeln!(f, "\x1b[32mtest result: ok. 5 passed\x1b[0m").unwrap();
785        }
786
787        let buffer = EventBuffer::new(50);
788        let mut watcher = PipeWatcher::new(&log_path, buffer.clone());
789
790        watcher.poll().unwrap();
791        let events = buffer.snapshot();
792        assert!(
793            events
794                .iter()
795                .any(|e| matches!(e, PipeEvent::TestRan { passed: true, .. }))
796        );
797    }
798
799    // ── PipeEvent serialization ──
800
801    #[test]
802    fn pipe_event_serializes_to_json() {
803        let event = PipeEvent::TaskStarted {
804            task_id: "3".to_string(),
805            title: "test".to_string(),
806        };
807        let json = serde_json::to_string(&event).unwrap();
808        assert!(json.contains("\"type\":\"task_started\""));
809        assert!(json.contains("\"task_id\":\"3\""));
810    }
811
812    #[test]
813    fn all_pipe_events_serialize() {
814        let events = vec![
815            PipeEvent::TaskStarted {
816                task_id: "1".to_string(),
817                title: "test".to_string(),
818            },
819            PipeEvent::FileCreated {
820                path: "x.rs".to_string(),
821            },
822            PipeEvent::FileModified {
823                path: "y.rs".to_string(),
824            },
825            PipeEvent::CommandRan {
826                command: "ls".to_string(),
827                success: Some(true),
828            },
829            PipeEvent::TestRan {
830                passed: true,
831                detail: "ok".to_string(),
832            },
833            PipeEvent::PromptDetected {
834                prompt: "y/n".to_string(),
835            },
836            PipeEvent::TaskCompleted {
837                task_id: "1".to_string(),
838            },
839            PipeEvent::CommitMade {
840                hash: "abc".to_string(),
841                message: "fix".to_string(),
842            },
843            PipeEvent::OutputLine {
844                line: "hi".to_string(),
845            },
846        ];
847
848        for event in events {
849            let json = serde_json::to_string(&event);
850            assert!(json.is_ok(), "failed to serialize: {event:?}");
851        }
852    }
853
854    // ── Coverage: additional events tests ──
855
856    #[test]
857    fn pipe_watcher_extracts_events_from_file() {
858        let tmp = tempfile::tempdir().unwrap();
859        let pipe_log = tmp.path().join("pipe.log");
860        std::fs::write(
861            &pipe_log,
862            "some normal output\nRunning: cargo test\ntest result: ok. 5 passed\n",
863        )
864        .unwrap();
865
866        let buffer = EventBuffer::new(10);
867        let mut watcher = PipeWatcher::new(&pipe_log, buffer);
868        let count = watcher.poll().unwrap();
869        assert!(count > 0, "expected at least one event extracted");
870    }
871
872    #[test]
873    fn pipe_watcher_handles_missing_file() {
874        let tmp = tempfile::tempdir().unwrap();
875        let pipe_log = tmp.path().join("nonexistent.log");
876
877        let buffer = EventBuffer::new(10);
878        let mut watcher = PipeWatcher::new(&pipe_log, buffer);
879        let count = watcher.poll().unwrap();
880        assert_eq!(count, 0);
881    }
882
883    #[test]
884    fn pipe_watcher_incremental_reads() {
885        let tmp = tempfile::tempdir().unwrap();
886        let pipe_log = tmp.path().join("pipe.log");
887        std::fs::write(&pipe_log, "Running: first command\n").unwrap();
888
889        let buffer = EventBuffer::new(10);
890        let mut watcher = PipeWatcher::new(&pipe_log, buffer);
891        let count1 = watcher.poll().unwrap();
892        assert!(count1 > 0, "should extract events from first write");
893
894        // Append more content
895        use std::io::Write;
896        let mut f = std::fs::OpenOptions::new()
897            .append(true)
898            .open(&pipe_log)
899            .unwrap();
900        writeln!(f, "Running: second command").unwrap();
901
902        let count2 = watcher.poll().unwrap();
903        assert!(count2 > 0, "should extract events from appended content");
904    }
905
906    #[test]
907    fn pipe_watcher_clamps_stale_position() {
908        let tmp = tempfile::tempdir().unwrap();
909        let pipe_log = tmp.path().join("pipe.log");
910        std::fs::write(&pipe_log, "short\n").unwrap();
911
912        let buffer = EventBuffer::new(10);
913        // Start with position beyond file
914        let mut watcher = PipeWatcher::new_with_position(&pipe_log, buffer, 99999);
915        let count = watcher.poll().unwrap();
916        assert_eq!(count, 0, "should clamp and read nothing new");
917    }
918
919    #[test]
920    fn pipe_watcher_partial_line_buffering() {
921        let tmp = tempfile::tempdir().unwrap();
922        let pipe_log = tmp.path().join("pipe.log");
923        // Write content WITHOUT trailing newline
924        std::fs::write(&pipe_log, "Running: partial").unwrap();
925
926        let buffer = EventBuffer::new(10);
927        let mut watcher = PipeWatcher::new(&pipe_log, buffer);
928        let count1 = watcher.poll().unwrap();
929        assert_eq!(
930            count1, 0,
931            "incomplete line should be buffered, not processed"
932        );
933
934        // Now complete the line
935        use std::io::Write;
936        let mut f = std::fs::OpenOptions::new()
937            .append(true)
938            .open(&pipe_log)
939            .unwrap();
940        writeln!(f, " command").unwrap();
941
942        let count2 = watcher.poll().unwrap();
943        assert!(
944            count2 > 0,
945            "completed line should now be processed as event"
946        );
947    }
948
949    #[test]
950    fn format_summary_truncates_long_output_line() {
951        let buffer = EventBuffer::new(10);
952        let long_line = "x".repeat(100);
953        buffer.push(PipeEvent::OutputLine {
954            line: long_line.clone(),
955        });
956        let summary = buffer.format_summary();
957        assert!(
958            summary.contains("..."),
959            "long output line should be truncated with ..."
960        );
961        assert!(summary.len() < long_line.len() + 20);
962    }
963
964    #[test]
965    fn format_summary_includes_all_event_types() {
966        let buffer = EventBuffer::new(10);
967        buffer.push(PipeEvent::CommandRan {
968            command: "ls".to_string(),
969            success: Some(true),
970        });
971        buffer.push(PipeEvent::TestRan {
972            passed: true,
973            detail: "all good".to_string(),
974        });
975        buffer.push(PipeEvent::CommitMade {
976            hash: "abc1234def".to_string(),
977            message: "fix bug".to_string(),
978        });
979        buffer.push(PipeEvent::OutputLine {
980            line: "hello".to_string(),
981        });
982
983        let summary = buffer.format_summary();
984        assert!(summary.contains("$ ls"));
985        assert!(summary.contains("✓ test:"));
986        assert!(summary.contains("⊕ commit abc1234:"));
987        assert!(summary.contains("hello"));
988    }
989
990    #[test]
991    fn event_buffer_respects_capacity() {
992        let buffer = EventBuffer::new(3);
993        buffer.push(PipeEvent::OutputLine {
994            line: "a".to_string(),
995        });
996        buffer.push(PipeEvent::OutputLine {
997            line: "b".to_string(),
998        });
999        buffer.push(PipeEvent::OutputLine {
1000            line: "c".to_string(),
1001        });
1002        buffer.push(PipeEvent::OutputLine {
1003            line: "d".to_string(),
1004        });
1005
1006        let summary = buffer.format_summary();
1007        assert!(!summary.contains("  a"), "oldest event should be evicted");
1008        assert!(summary.contains("  d"), "newest event should be present");
1009    }
1010}