1#![cfg_attr(not(test), allow(dead_code))]
8
9use std::collections::VecDeque;
10use std::io::{Read, Seek, SeekFrom};
11use std::path::{Path, PathBuf};
12use std::sync::{Arc, Mutex};
13
14use anyhow::{Context, Result};
15use regex::Regex;
16use serde::Serialize;
17use tracing::debug;
18
19use crate::prompt::strip_ansi;
20
21#[cfg(test)]
23const DEFAULT_BUFFER_SIZE: usize = 50;
24
25#[derive(Debug, Clone, Serialize, PartialEq)]
27#[serde(tag = "type", rename_all = "snake_case")]
28pub enum PipeEvent {
29 TaskStarted { task_id: String, title: String },
31 FileCreated { path: String },
33 FileModified { path: String },
35 CommandRan {
37 command: String,
38 success: Option<bool>,
39 },
40 TestRan { passed: bool, detail: String },
42 PromptDetected { prompt: String },
44 TaskCompleted { task_id: String },
46 CommitMade { hash: String, message: String },
48 OutputLine { line: String },
50}
51
52pub struct EventPatterns {
54 patterns: Vec<(Regex, EventClassifier)>,
55}
56
57type EventClassifier = fn(®ex::Captures) -> PipeEvent;
58
59impl EventPatterns {
60 pub fn default_patterns() -> Self {
65 Self {
66 patterns: vec![
67 (
69 Regex::new(r"(?i)(?:picked|claimed|starting|working on)\s+(?:and moved\s+)?task\s+#?(\d+)(?::\s+(.+))?").unwrap(),
70 |caps| PipeEvent::TaskStarted {
71 task_id: caps.get(1).map(|m| m.as_str().to_string()).unwrap_or_default(),
72 title: caps.get(2).map(|m| m.as_str().to_string()).unwrap_or_default(),
73 },
74 ),
75 (
77 Regex::new(r"(?i)(?:moved task\s+#?(\d+).*(?:done|complete)|task\s+#?(\d+)\s+(?:done|complete))").unwrap(),
78 |caps| PipeEvent::TaskCompleted {
79 task_id: caps.get(1)
80 .or_else(|| caps.get(2))
81 .map(|m| m.as_str().to_string())
82 .unwrap_or_default(),
83 },
84 ),
85 (
87 Regex::new(r"(?:\[[\w/-]+\s+([0-9a-f]{7,40})\]\s+(.+)|commit\s+([0-9a-f]{7,40}))").unwrap(),
88 |caps| PipeEvent::CommitMade {
89 hash: caps.get(1)
90 .or_else(|| caps.get(3))
91 .map(|m| m.as_str().to_string())
92 .unwrap_or_default(),
93 message: caps.get(2).map(|m| m.as_str().to_string()).unwrap_or_default(),
94 },
95 ),
96 (
98 Regex::new(r"test result:\s*(ok|FAILED)").unwrap(),
99 |caps| {
100 let result = caps.get(1).map(|m| m.as_str()).unwrap_or("FAILED");
101 PipeEvent::TestRan {
102 passed: result == "ok",
103 detail: caps.get(0).map(|m| m.as_str().to_string()).unwrap_or_default(),
104 }
105 },
106 ),
107 (
109 Regex::new(r"(?i)(?:created?\s+(?:file\s+)?|wrote\s+|writing\s+to\s+)([\w/.+\-]+\.\w+)").unwrap(),
110 |caps| PipeEvent::FileCreated {
111 path: caps.get(1).map(|m| m.as_str().to_string()).unwrap_or_default(),
112 },
113 ),
114 (
116 Regex::new(r"(?i)(?:edit(?:ed|ing)?\s+|modif(?:ied|ying)\s+)([\w/.+\-]+\.\w+)").unwrap(),
117 |caps| PipeEvent::FileModified {
118 path: caps.get(1).map(|m| m.as_str().to_string()).unwrap_or_default(),
119 },
120 ),
121 (
123 Regex::new(r"(?:^\$\s+(.+)|Running:\s+(.+))").unwrap(),
124 |caps| PipeEvent::CommandRan {
125 command: caps.get(1)
126 .or_else(|| caps.get(2))
127 .map(|m| m.as_str().to_string())
128 .unwrap_or_default(),
129 success: None,
130 },
131 ),
132 (
134 Regex::new(r"(?i)(?:allow\s+tool|continue\?|\[y/n\]|do you want to proceed)").unwrap(),
135 |caps| PipeEvent::PromptDetected {
136 prompt: caps.get(0).map(|m| m.as_str().to_string()).unwrap_or_default(),
137 },
138 ),
139 ],
140 }
141 }
142
143 pub fn classify(&self, line: &str) -> Option<PipeEvent> {
146 for (regex, classify) in &self.patterns {
147 if let Some(caps) = regex.captures(line) {
148 return Some(classify(&caps));
149 }
150 }
151 None
152 }
153}
154
155#[derive(Debug, Clone)]
160pub struct EventBuffer {
161 inner: Arc<Mutex<EventBufferInner>>,
162}
163
164#[derive(Debug)]
165struct EventBufferInner {
166 events: VecDeque<PipeEvent>,
167 max_size: usize,
168}
169
170impl EventBuffer {
171 pub fn new(max_size: usize) -> Self {
173 Self {
174 inner: Arc::new(Mutex::new(EventBufferInner {
175 events: VecDeque::with_capacity(max_size),
176 max_size,
177 })),
178 }
179 }
180
181 #[cfg(test)]
183 pub fn default_size() -> Self {
184 Self::new(DEFAULT_BUFFER_SIZE)
185 }
186
187 pub fn push(&self, event: PipeEvent) {
189 let mut inner = self.inner.lock().unwrap();
190 if inner.events.len() >= inner.max_size {
191 inner.events.pop_front();
192 }
193 inner.events.push_back(event);
194 }
195
196 pub fn snapshot(&self) -> Vec<PipeEvent> {
198 let inner = self.inner.lock().unwrap();
199 inner.events.iter().cloned().collect()
200 }
201
202 pub fn len(&self) -> usize {
204 let inner = self.inner.lock().unwrap();
205 inner.events.len()
206 }
207
208 pub fn is_empty(&self) -> bool {
210 self.len() == 0
211 }
212
213 pub fn clear(&self) {
215 let mut inner = self.inner.lock().unwrap();
216 inner.events.clear();
217 }
218
219 pub fn format_summary(&self) -> String {
221 let events = self.snapshot();
222 if events.is_empty() {
223 return "(no events yet)".to_string();
224 }
225
226 let mut summary = String::new();
227 for event in &events {
228 let line = match event {
229 PipeEvent::TaskStarted { task_id, title } => {
230 format!("→ task #{task_id} started: {title}")
231 }
232 PipeEvent::TaskCompleted { task_id } => {
233 format!("✓ task #{task_id} completed")
234 }
235 PipeEvent::FileCreated { path } => {
236 format!("+ {path}")
237 }
238 PipeEvent::FileModified { path } => {
239 format!("~ {path}")
240 }
241 PipeEvent::CommandRan { command, success } => {
242 let status = match success {
243 Some(true) => " ✓",
244 Some(false) => " ✗",
245 None => "",
246 };
247 format!("$ {command}{status}")
248 }
249 PipeEvent::TestRan { passed, detail } => {
250 let icon = if *passed { "✓" } else { "✗" };
251 format!("{icon} test: {detail}")
252 }
253 PipeEvent::PromptDetected { prompt } => {
254 format!("? {prompt}")
255 }
256 PipeEvent::CommitMade { hash, message } => {
257 let short_hash = &hash[..7.min(hash.len())];
258 format!("⊕ commit {short_hash}: {message}")
259 }
260 PipeEvent::OutputLine { line } => {
261 if line.len() > 80 {
263 format!(" {}...", &line[..77])
264 } else {
265 format!(" {line}")
266 }
267 }
268 };
269 summary.push_str(&line);
270 summary.push('\n');
271 }
272 summary
273 }
274}
275
276pub struct PipeWatcher {
281 path: PathBuf,
282 patterns: EventPatterns,
283 buffer: EventBuffer,
284 position: u64,
285 line_buffer: String,
286}
287
288impl PipeWatcher {
289 pub fn new(path: &Path, buffer: EventBuffer) -> Self {
291 Self::new_with_position(path, buffer, 0)
292 }
293
294 pub fn new_with_position(path: &Path, buffer: EventBuffer, position: u64) -> Self {
298 Self {
299 path: path.to_path_buf(),
300 patterns: EventPatterns::default_patterns(),
301 buffer,
302 position,
303 line_buffer: String::new(),
304 }
305 }
306
307 pub fn poll(&mut self) -> Result<usize> {
312 let mut file = match std::fs::File::open(&self.path) {
313 Ok(f) => f,
314 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
315 return Ok(0); }
317 Err(e) => {
318 return Err(e)
319 .with_context(|| format!("failed to open pipe log: {}", self.path.display()));
320 }
321 };
322
323 let file_len = file.metadata().map(|m| m.len()).unwrap_or(0);
325 if self.position > file_len {
326 self.position = file_len;
327 }
328
329 file.seek(SeekFrom::Start(self.position))
331 .context("failed to seek in pipe log")?;
332
333 let mut new_bytes = Vec::new();
335 let n = file
336 .read_to_end(&mut new_bytes)
337 .context("failed to read pipe log")?;
338
339 if n == 0 {
340 return Ok(0);
341 }
342
343 self.position += n as u64;
344
345 let new_text = String::from_utf8_lossy(&new_bytes);
347 self.line_buffer.push_str(&new_text);
348
349 let mut event_count = 0;
351 while let Some(newline_pos) = self.line_buffer.find('\n') {
352 let line = self.line_buffer[..newline_pos].to_string();
353 self.line_buffer = self.line_buffer[newline_pos + 1..].to_string();
354
355 let stripped = strip_ansi(&line);
356 let trimmed = stripped.trim();
357 if trimmed.is_empty() {
358 continue;
359 }
360
361 if let Some(event) = self.patterns.classify(trimmed) {
362 debug!(event = ?event, "extracted event");
363 self.buffer.push(event);
364 event_count += 1;
365 }
366 }
367
368 Ok(event_count)
369 }
370
371 pub fn checkpoint_offset(&self) -> u64 {
376 self.position
377 .saturating_sub(self.line_buffer.len().try_into().unwrap_or(0))
378 }
379}
380
381#[cfg(test)]
382mod tests {
383 use super::*;
384 use std::fs;
385 use std::io::Write;
386
387 #[test]
390 fn detect_task_started() {
391 let patterns = EventPatterns::default_patterns();
392 let event = patterns
393 .classify("Picked and moved task #3: kanban reader")
394 .unwrap();
395 match event {
396 PipeEvent::TaskStarted { task_id, title } => {
397 assert_eq!(task_id, "3");
398 assert_eq!(title, "kanban reader");
399 }
400 other => panic!("expected TaskStarted, got: {other:?}"),
401 }
402 }
403
404 #[test]
405 fn detect_task_started_claim() {
406 let patterns = EventPatterns::default_patterns();
407 let event = patterns.classify("Claimed task #5").unwrap();
408 assert!(matches!(event, PipeEvent::TaskStarted { .. }));
409 }
410
411 #[test]
412 fn detect_task_completed() {
413 let patterns = EventPatterns::default_patterns();
414 let event = patterns
415 .classify("Moved task #3: in-progress -> done")
416 .unwrap();
417 match event {
418 PipeEvent::TaskCompleted { task_id } => assert_eq!(task_id, "3"),
419 other => panic!("expected TaskCompleted, got: {other:?}"),
420 }
421 }
422
423 #[test]
424 fn detect_commit() {
425 let patterns = EventPatterns::default_patterns();
426 let event = patterns
427 .classify("[main abc1234] fix the auth bug")
428 .unwrap();
429 match event {
430 PipeEvent::CommitMade { hash, message } => {
431 assert_eq!(hash, "abc1234");
432 assert_eq!(message, "fix the auth bug");
433 }
434 other => panic!("expected CommitMade, got: {other:?}"),
435 }
436 }
437
438 #[test]
439 fn detect_test_passed() {
440 let patterns = EventPatterns::default_patterns();
441 let event = patterns
442 .classify("test result: ok. 42 passed; 0 failed")
443 .unwrap();
444 match event {
445 PipeEvent::TestRan { passed, .. } => assert!(passed),
446 other => panic!("expected TestRan, got: {other:?}"),
447 }
448 }
449
450 #[test]
451 fn detect_test_failed() {
452 let patterns = EventPatterns::default_patterns();
453 let event = patterns
454 .classify("test result: FAILED. 40 passed; 2 failed")
455 .unwrap();
456 match event {
457 PipeEvent::TestRan { passed, .. } => assert!(!passed),
458 other => panic!("expected TestRan, got: {other:?}"),
459 }
460 }
461
462 #[test]
463 fn detect_file_created() {
464 let patterns = EventPatterns::default_patterns();
465 let event = patterns.classify("Created file src/tmux.rs").unwrap();
466 match event {
467 PipeEvent::FileCreated { path } => assert_eq!(path, "src/tmux.rs"),
468 other => panic!("expected FileCreated, got: {other:?}"),
469 }
470 }
471
472 #[test]
473 fn detect_file_modified() {
474 let patterns = EventPatterns::default_patterns();
475 let event = patterns.classify("Edited src/main.rs").unwrap();
476 match event {
477 PipeEvent::FileModified { path } => assert_eq!(path, "src/main.rs"),
478 other => panic!("expected FileModified, got: {other:?}"),
479 }
480 }
481
482 #[test]
483 fn detect_command_ran() {
484 let patterns = EventPatterns::default_patterns();
485 let event = patterns.classify("$ cargo test").unwrap();
486 match event {
487 PipeEvent::CommandRan { command, .. } => assert_eq!(command, "cargo test"),
488 other => panic!("expected CommandRan, got: {other:?}"),
489 }
490 }
491
492 #[test]
493 fn detect_prompt() {
494 let patterns = EventPatterns::default_patterns();
495 let event = patterns
496 .classify("Allow tool Read on /home/user/file.rs?")
497 .unwrap();
498 assert!(matches!(event, PipeEvent::PromptDetected { .. }));
499 }
500
501 #[test]
502 fn no_match_on_normal_output() {
503 let patterns = EventPatterns::default_patterns();
504 assert!(
505 patterns
506 .classify("Writing function to parse YAML...")
507 .is_none()
508 );
509 }
510
511 #[test]
514 fn buffer_push_and_snapshot() {
515 let buf = EventBuffer::new(3);
516 buf.push(PipeEvent::OutputLine {
517 line: "a".to_string(),
518 });
519 buf.push(PipeEvent::OutputLine {
520 line: "b".to_string(),
521 });
522
523 let snap = buf.snapshot();
524 assert_eq!(snap.len(), 2);
525 }
526
527 #[test]
528 fn buffer_evicts_oldest_when_full() {
529 let buf = EventBuffer::new(2);
530 buf.push(PipeEvent::OutputLine {
531 line: "a".to_string(),
532 });
533 buf.push(PipeEvent::OutputLine {
534 line: "b".to_string(),
535 });
536 buf.push(PipeEvent::OutputLine {
537 line: "c".to_string(),
538 });
539
540 let snap = buf.snapshot();
541 assert_eq!(snap.len(), 2);
542 assert_eq!(
543 snap[0],
544 PipeEvent::OutputLine {
545 line: "b".to_string()
546 }
547 );
548 assert_eq!(
549 snap[1],
550 PipeEvent::OutputLine {
551 line: "c".to_string()
552 }
553 );
554 }
555
556 #[test]
557 fn buffer_default_size() {
558 let buf = EventBuffer::default_size();
559 assert_eq!(buf.len(), 0);
560
561 for i in 0..60 {
563 buf.push(PipeEvent::OutputLine {
564 line: format!("line {i}"),
565 });
566 }
567 assert_eq!(buf.len(), 50);
568
569 let snap = buf.snapshot();
570 assert_eq!(
572 snap[0],
573 PipeEvent::OutputLine {
574 line: "line 10".to_string()
575 }
576 );
577 }
578
579 #[test]
580 fn buffer_clear() {
581 let buf = EventBuffer::new(10);
582 buf.push(PipeEvent::OutputLine {
583 line: "x".to_string(),
584 });
585 assert_eq!(buf.len(), 1);
586
587 buf.clear();
588 assert!(buf.is_empty());
589 }
590
591 #[test]
592 fn buffer_format_summary_empty() {
593 let buf = EventBuffer::new(10);
594 assert_eq!(buf.format_summary(), "(no events yet)");
595 }
596
597 #[test]
598 fn buffer_format_summary_has_events() {
599 let buf = EventBuffer::new(10);
600 buf.push(PipeEvent::TaskStarted {
601 task_id: "3".to_string(),
602 title: "foo".to_string(),
603 });
604 buf.push(PipeEvent::FileCreated {
605 path: "src/x.rs".to_string(),
606 });
607 buf.push(PipeEvent::TestRan {
608 passed: true,
609 detail: "ok".to_string(),
610 });
611 buf.push(PipeEvent::CommitMade {
612 hash: "abc1234".to_string(),
613 message: "fix".to_string(),
614 });
615
616 let summary = buf.format_summary();
617 assert!(summary.contains("→ task #3 started: foo"));
618 assert!(summary.contains("+ src/x.rs"));
619 assert!(summary.contains("✓ test: ok"));
620 assert!(summary.contains("⊕ commit abc1234: fix"));
621 }
622
623 #[test]
624 fn buffer_is_thread_safe() {
625 let buf = EventBuffer::new(100);
626 let buf2 = buf.clone();
627
628 let handle = std::thread::spawn(move || {
629 for i in 0..50 {
630 buf2.push(PipeEvent::OutputLine {
631 line: format!("thread {i}"),
632 });
633 }
634 });
635
636 for i in 0..50 {
637 buf.push(PipeEvent::OutputLine {
638 line: format!("main {i}"),
639 });
640 }
641
642 handle.join().unwrap();
643 assert_eq!(buf.len(), 100);
644 }
645
646 #[test]
649 fn watcher_reads_new_content() {
650 let tmp = tempfile::tempdir().unwrap();
651 let log_path = tmp.path().join("pty-output.log");
652
653 {
655 let mut f = fs::File::create(&log_path).unwrap();
656 writeln!(f, "Picked and moved task #3: reader").unwrap();
657 writeln!(f, "some normal output").unwrap();
658 writeln!(f, "test result: ok. 5 passed; 0 failed").unwrap();
659 }
660
661 let buffer = EventBuffer::new(50);
662 let mut watcher = PipeWatcher::new(&log_path, buffer.clone());
663
664 let count = watcher.poll().unwrap();
665 assert!(count >= 2, "expected at least 2 events, got {count}");
666
667 let events = buffer.snapshot();
668 assert!(
669 events
670 .iter()
671 .any(|e| matches!(e, PipeEvent::TaskStarted { .. }))
672 );
673 assert!(
674 events
675 .iter()
676 .any(|e| matches!(e, PipeEvent::TestRan { passed: true, .. }))
677 );
678 }
679
680 #[test]
681 fn watcher_tracks_position() {
682 let tmp = tempfile::tempdir().unwrap();
683 let log_path = tmp.path().join("pty-output.log");
684
685 {
687 let mut f = fs::File::create(&log_path).unwrap();
688 writeln!(f, "test result: ok. 5 passed").unwrap();
689 }
690
691 let buffer = EventBuffer::new(50);
692 let mut watcher = PipeWatcher::new(&log_path, buffer.clone());
693
694 watcher.poll().unwrap();
695 let count1 = buffer.len();
696
697 let count = watcher.poll().unwrap();
699 assert_eq!(count, 0, "no new content should yield 0 events");
700 assert_eq!(buffer.len(), count1);
701
702 {
704 let mut f = fs::OpenOptions::new().append(true).open(&log_path).unwrap();
705 writeln!(f, "[main abc1234] fix bug").unwrap();
706 }
707
708 let count = watcher.poll().unwrap();
709 assert!(count >= 1, "expected at least 1 new event");
710 }
711
712 #[test]
713 fn watcher_handles_missing_file() {
714 let tmp = tempfile::tempdir().unwrap();
715 let log_path = tmp.path().join("nonexistent.log");
716
717 let buffer = EventBuffer::new(50);
718 let mut watcher = PipeWatcher::new(&log_path, buffer);
719
720 let count = watcher.poll().unwrap();
722 assert_eq!(count, 0);
723 }
724
725 #[test]
726 fn watcher_resume_from_position_reads_only_new_content() {
727 let tmp = tempfile::tempdir().unwrap();
728 let log_path = tmp.path().join("resume.log");
729
730 {
731 let mut f = fs::File::create(&log_path).unwrap();
732 writeln!(f, "test result: ok. 1 passed").unwrap();
733 }
734
735 let file_len = fs::metadata(&log_path).unwrap().len();
736 let buffer = EventBuffer::new(50);
737 let mut watcher = PipeWatcher::new_with_position(&log_path, buffer.clone(), file_len);
738
739 {
740 let mut f = fs::OpenOptions::new().append(true).open(&log_path).unwrap();
741 writeln!(f, "[main abc1234] resume").unwrap();
742 }
743
744 let count = watcher.poll().unwrap();
745 assert!(count >= 1);
746 let events = buffer.snapshot();
747 assert!(
748 events
749 .iter()
750 .any(|e| matches!(e, PipeEvent::CommitMade { .. }))
751 );
752 }
753
754 #[test]
755 fn watcher_checkpoint_offset_rewinds_partial_line() {
756 let tmp = tempfile::tempdir().unwrap();
757 let log_path = tmp.path().join("partial.log");
758
759 {
760 let mut f = fs::File::create(&log_path).unwrap();
761 write!(f, "test result: ok").unwrap(); }
763
764 let buffer = EventBuffer::new(50);
765 let mut watcher = PipeWatcher::new(&log_path, buffer.clone());
766 let _ = watcher.poll().unwrap();
767
768 assert_eq!(
769 watcher.checkpoint_offset(),
770 0,
771 "partial line should be re-read on resume"
772 );
773 assert_eq!(buffer.len(), 0);
774 }
775
776 #[test]
777 fn watcher_strips_ansi() {
778 let tmp = tempfile::tempdir().unwrap();
779 let log_path = tmp.path().join("ansi.log");
780
781 {
782 let mut f = fs::File::create(&log_path).unwrap();
783 writeln!(f, "\x1b[32mtest result: ok. 5 passed\x1b[0m").unwrap();
785 }
786
787 let buffer = EventBuffer::new(50);
788 let mut watcher = PipeWatcher::new(&log_path, buffer.clone());
789
790 watcher.poll().unwrap();
791 let events = buffer.snapshot();
792 assert!(
793 events
794 .iter()
795 .any(|e| matches!(e, PipeEvent::TestRan { passed: true, .. }))
796 );
797 }
798
799 #[test]
802 fn pipe_event_serializes_to_json() {
803 let event = PipeEvent::TaskStarted {
804 task_id: "3".to_string(),
805 title: "test".to_string(),
806 };
807 let json = serde_json::to_string(&event).unwrap();
808 assert!(json.contains("\"type\":\"task_started\""));
809 assert!(json.contains("\"task_id\":\"3\""));
810 }
811
812 #[test]
813 fn all_pipe_events_serialize() {
814 let events = vec![
815 PipeEvent::TaskStarted {
816 task_id: "1".to_string(),
817 title: "test".to_string(),
818 },
819 PipeEvent::FileCreated {
820 path: "x.rs".to_string(),
821 },
822 PipeEvent::FileModified {
823 path: "y.rs".to_string(),
824 },
825 PipeEvent::CommandRan {
826 command: "ls".to_string(),
827 success: Some(true),
828 },
829 PipeEvent::TestRan {
830 passed: true,
831 detail: "ok".to_string(),
832 },
833 PipeEvent::PromptDetected {
834 prompt: "y/n".to_string(),
835 },
836 PipeEvent::TaskCompleted {
837 task_id: "1".to_string(),
838 },
839 PipeEvent::CommitMade {
840 hash: "abc".to_string(),
841 message: "fix".to_string(),
842 },
843 PipeEvent::OutputLine {
844 line: "hi".to_string(),
845 },
846 ];
847
848 for event in events {
849 let json = serde_json::to_string(&event);
850 assert!(json.is_ok(), "failed to serialize: {event:?}");
851 }
852 }
853
854 #[test]
857 fn pipe_watcher_extracts_events_from_file() {
858 let tmp = tempfile::tempdir().unwrap();
859 let pipe_log = tmp.path().join("pipe.log");
860 std::fs::write(
861 &pipe_log,
862 "some normal output\nRunning: cargo test\ntest result: ok. 5 passed\n",
863 )
864 .unwrap();
865
866 let buffer = EventBuffer::new(10);
867 let mut watcher = PipeWatcher::new(&pipe_log, buffer);
868 let count = watcher.poll().unwrap();
869 assert!(count > 0, "expected at least one event extracted");
870 }
871
872 #[test]
873 fn pipe_watcher_handles_missing_file() {
874 let tmp = tempfile::tempdir().unwrap();
875 let pipe_log = tmp.path().join("nonexistent.log");
876
877 let buffer = EventBuffer::new(10);
878 let mut watcher = PipeWatcher::new(&pipe_log, buffer);
879 let count = watcher.poll().unwrap();
880 assert_eq!(count, 0);
881 }
882
883 #[test]
884 fn pipe_watcher_incremental_reads() {
885 let tmp = tempfile::tempdir().unwrap();
886 let pipe_log = tmp.path().join("pipe.log");
887 std::fs::write(&pipe_log, "Running: first command\n").unwrap();
888
889 let buffer = EventBuffer::new(10);
890 let mut watcher = PipeWatcher::new(&pipe_log, buffer);
891 let count1 = watcher.poll().unwrap();
892 assert!(count1 > 0, "should extract events from first write");
893
894 use std::io::Write;
896 let mut f = std::fs::OpenOptions::new()
897 .append(true)
898 .open(&pipe_log)
899 .unwrap();
900 writeln!(f, "Running: second command").unwrap();
901
902 let count2 = watcher.poll().unwrap();
903 assert!(count2 > 0, "should extract events from appended content");
904 }
905
906 #[test]
907 fn pipe_watcher_clamps_stale_position() {
908 let tmp = tempfile::tempdir().unwrap();
909 let pipe_log = tmp.path().join("pipe.log");
910 std::fs::write(&pipe_log, "short\n").unwrap();
911
912 let buffer = EventBuffer::new(10);
913 let mut watcher = PipeWatcher::new_with_position(&pipe_log, buffer, 99999);
915 let count = watcher.poll().unwrap();
916 assert_eq!(count, 0, "should clamp and read nothing new");
917 }
918
919 #[test]
920 fn pipe_watcher_partial_line_buffering() {
921 let tmp = tempfile::tempdir().unwrap();
922 let pipe_log = tmp.path().join("pipe.log");
923 std::fs::write(&pipe_log, "Running: partial").unwrap();
925
926 let buffer = EventBuffer::new(10);
927 let mut watcher = PipeWatcher::new(&pipe_log, buffer);
928 let count1 = watcher.poll().unwrap();
929 assert_eq!(
930 count1, 0,
931 "incomplete line should be buffered, not processed"
932 );
933
934 use std::io::Write;
936 let mut f = std::fs::OpenOptions::new()
937 .append(true)
938 .open(&pipe_log)
939 .unwrap();
940 writeln!(f, " command").unwrap();
941
942 let count2 = watcher.poll().unwrap();
943 assert!(
944 count2 > 0,
945 "completed line should now be processed as event"
946 );
947 }
948
949 #[test]
950 fn format_summary_truncates_long_output_line() {
951 let buffer = EventBuffer::new(10);
952 let long_line = "x".repeat(100);
953 buffer.push(PipeEvent::OutputLine {
954 line: long_line.clone(),
955 });
956 let summary = buffer.format_summary();
957 assert!(
958 summary.contains("..."),
959 "long output line should be truncated with ..."
960 );
961 assert!(summary.len() < long_line.len() + 20);
962 }
963
964 #[test]
965 fn format_summary_includes_all_event_types() {
966 let buffer = EventBuffer::new(10);
967 buffer.push(PipeEvent::CommandRan {
968 command: "ls".to_string(),
969 success: Some(true),
970 });
971 buffer.push(PipeEvent::TestRan {
972 passed: true,
973 detail: "all good".to_string(),
974 });
975 buffer.push(PipeEvent::CommitMade {
976 hash: "abc1234def".to_string(),
977 message: "fix bug".to_string(),
978 });
979 buffer.push(PipeEvent::OutputLine {
980 line: "hello".to_string(),
981 });
982
983 let summary = buffer.format_summary();
984 assert!(summary.contains("$ ls"));
985 assert!(summary.contains("✓ test:"));
986 assert!(summary.contains("⊕ commit abc1234:"));
987 assert!(summary.contains("hello"));
988 }
989
990 #[test]
991 fn event_buffer_respects_capacity() {
992 let buffer = EventBuffer::new(3);
993 buffer.push(PipeEvent::OutputLine {
994 line: "a".to_string(),
995 });
996 buffer.push(PipeEvent::OutputLine {
997 line: "b".to_string(),
998 });
999 buffer.push(PipeEvent::OutputLine {
1000 line: "c".to_string(),
1001 });
1002 buffer.push(PipeEvent::OutputLine {
1003 line: "d".to_string(),
1004 });
1005
1006 let summary = buffer.format_summary();
1007 assert!(!summary.contains(" a"), "oldest event should be evicted");
1008 assert!(summary.contains(" d"), "newest event should be present");
1009 }
1010}