1#![cfg_attr(not(test), allow(dead_code))]
8
9use std::collections::VecDeque;
10use std::io::{Read, Seek, SeekFrom};
11use std::path::{Path, PathBuf};
12use std::sync::{Arc, Mutex};
13
14use anyhow::{Context, Result};
15use regex::Regex;
16use serde::Serialize;
17use tracing::debug;
18
19use crate::prompt::strip_ansi;
20
21#[cfg(test)]
23const DEFAULT_BUFFER_SIZE: usize = 50;
24
25#[derive(Debug, Clone, Serialize, PartialEq)]
27#[serde(tag = "type", rename_all = "snake_case")]
28pub enum PipeEvent {
29 TaskStarted { task_id: String, title: String },
31 FileCreated { path: String },
33 FileModified { path: String },
35 CommandRan {
37 command: String,
38 success: Option<bool>,
39 },
40 TestRan { passed: bool, detail: String },
42 PromptDetected { prompt: String },
44 TaskCompleted { task_id: String },
46 CommitMade { hash: String, message: String },
48 #[allow(dead_code)] OutputLine { line: String },
51}
52
53pub struct EventPatterns {
55 patterns: Vec<(Regex, EventClassifier)>,
56}
57
58type EventClassifier = fn(®ex::Captures) -> PipeEvent;
59
60impl EventPatterns {
61 pub fn default_patterns() -> Self {
66 Self {
67 patterns: vec![
68 (
70 Regex::new(r"(?i)(?:picked|claimed|starting|working on)\s+(?:and moved\s+)?task\s+#?(\d+)(?::\s+(.+))?").unwrap(),
71 |caps| PipeEvent::TaskStarted {
72 task_id: caps.get(1).map(|m| m.as_str().to_string()).unwrap_or_default(),
73 title: caps.get(2).map(|m| m.as_str().to_string()).unwrap_or_default(),
74 },
75 ),
76 (
78 Regex::new(r"(?i)(?:moved task\s+#?(\d+).*(?:done|complete)|task\s+#?(\d+)\s+(?:done|complete))").unwrap(),
79 |caps| PipeEvent::TaskCompleted {
80 task_id: caps.get(1)
81 .or_else(|| caps.get(2))
82 .map(|m| m.as_str().to_string())
83 .unwrap_or_default(),
84 },
85 ),
86 (
88 Regex::new(r"(?:\[[\w/-]+\s+([0-9a-f]{7,40})\]\s+(.+)|commit\s+([0-9a-f]{7,40}))").unwrap(),
89 |caps| PipeEvent::CommitMade {
90 hash: caps.get(1)
91 .or_else(|| caps.get(3))
92 .map(|m| m.as_str().to_string())
93 .unwrap_or_default(),
94 message: caps.get(2).map(|m| m.as_str().to_string()).unwrap_or_default(),
95 },
96 ),
97 (
99 Regex::new(r"test result:\s*(ok|FAILED)").unwrap(),
100 |caps| {
101 let result = caps.get(1).map(|m| m.as_str()).unwrap_or("FAILED");
102 PipeEvent::TestRan {
103 passed: result == "ok",
104 detail: caps.get(0).map(|m| m.as_str().to_string()).unwrap_or_default(),
105 }
106 },
107 ),
108 (
110 Regex::new(r"(?i)(?:created?\s+(?:file\s+)?|wrote\s+|writing\s+to\s+)([\w/.+\-]+\.\w+)").unwrap(),
111 |caps| PipeEvent::FileCreated {
112 path: caps.get(1).map(|m| m.as_str().to_string()).unwrap_or_default(),
113 },
114 ),
115 (
117 Regex::new(r"(?i)(?:edit(?:ed|ing)?\s+|modif(?:ied|ying)\s+)([\w/.+\-]+\.\w+)").unwrap(),
118 |caps| PipeEvent::FileModified {
119 path: caps.get(1).map(|m| m.as_str().to_string()).unwrap_or_default(),
120 },
121 ),
122 (
124 Regex::new(r"(?:^\$\s+(.+)|Running:\s+(.+))").unwrap(),
125 |caps| PipeEvent::CommandRan {
126 command: caps.get(1)
127 .or_else(|| caps.get(2))
128 .map(|m| m.as_str().to_string())
129 .unwrap_or_default(),
130 success: None,
131 },
132 ),
133 (
135 Regex::new(r"(?i)(?:allow\s+tool|continue\?|\[y/n\]|do you want to proceed)").unwrap(),
136 |caps| PipeEvent::PromptDetected {
137 prompt: caps.get(0).map(|m| m.as_str().to_string()).unwrap_or_default(),
138 },
139 ),
140 ],
141 }
142 }
143
144 pub fn classify(&self, line: &str) -> Option<PipeEvent> {
147 for (regex, classify) in &self.patterns {
148 if let Some(caps) = regex.captures(line) {
149 return Some(classify(&caps));
150 }
151 }
152 None
153 }
154}
155
156#[derive(Debug, Clone)]
161pub struct EventBuffer {
162 inner: Arc<Mutex<EventBufferInner>>,
163}
164
165#[derive(Debug)]
166struct EventBufferInner {
167 events: VecDeque<PipeEvent>,
168 max_size: usize,
169}
170
171impl EventBuffer {
172 pub fn new(max_size: usize) -> Self {
174 Self {
175 inner: Arc::new(Mutex::new(EventBufferInner {
176 events: VecDeque::with_capacity(max_size),
177 max_size,
178 })),
179 }
180 }
181
182 #[cfg(test)]
184 pub fn default_size() -> Self {
185 Self::new(DEFAULT_BUFFER_SIZE)
186 }
187
188 pub fn push(&self, event: PipeEvent) {
190 let mut inner = self.inner.lock().unwrap();
191 if inner.events.len() >= inner.max_size {
192 inner.events.pop_front();
193 }
194 inner.events.push_back(event);
195 }
196
197 pub fn snapshot(&self) -> Vec<PipeEvent> {
199 let inner = self.inner.lock().unwrap();
200 inner.events.iter().cloned().collect()
201 }
202
203 pub fn len(&self) -> usize {
205 let inner = self.inner.lock().unwrap();
206 inner.events.len()
207 }
208
209 #[allow(dead_code)]
211 pub fn is_empty(&self) -> bool {
212 self.len() == 0
213 }
214
215 #[allow(dead_code)]
217 pub fn clear(&self) {
218 let mut inner = self.inner.lock().unwrap();
219 inner.events.clear();
220 }
221
222 pub fn format_summary(&self) -> String {
224 let events = self.snapshot();
225 if events.is_empty() {
226 return "(no events yet)".to_string();
227 }
228
229 let mut summary = String::new();
230 for event in &events {
231 let line = match event {
232 PipeEvent::TaskStarted { task_id, title } => {
233 format!("→ task #{task_id} started: {title}")
234 }
235 PipeEvent::TaskCompleted { task_id } => {
236 format!("✓ task #{task_id} completed")
237 }
238 PipeEvent::FileCreated { path } => {
239 format!("+ {path}")
240 }
241 PipeEvent::FileModified { path } => {
242 format!("~ {path}")
243 }
244 PipeEvent::CommandRan { command, success } => {
245 let status = match success {
246 Some(true) => " ✓",
247 Some(false) => " ✗",
248 None => "",
249 };
250 format!("$ {command}{status}")
251 }
252 PipeEvent::TestRan { passed, detail } => {
253 let icon = if *passed { "✓" } else { "✗" };
254 format!("{icon} test: {detail}")
255 }
256 PipeEvent::PromptDetected { prompt } => {
257 format!("? {prompt}")
258 }
259 PipeEvent::CommitMade { hash, message } => {
260 let short_hash = &hash[..7.min(hash.len())];
261 format!("⊕ commit {short_hash}: {message}")
262 }
263 PipeEvent::OutputLine { line } => {
264 if line.len() > 80 {
266 format!(" {}...", &line[..77])
267 } else {
268 format!(" {line}")
269 }
270 }
271 };
272 summary.push_str(&line);
273 summary.push('\n');
274 }
275 summary
276 }
277}
278
279pub struct PipeWatcher {
284 path: PathBuf,
285 patterns: EventPatterns,
286 buffer: EventBuffer,
287 position: u64,
288 line_buffer: String,
289}
290
291impl PipeWatcher {
292 pub fn new(path: &Path, buffer: EventBuffer) -> Self {
294 Self::new_with_position(path, buffer, 0)
295 }
296
297 pub fn new_with_position(path: &Path, buffer: EventBuffer, position: u64) -> Self {
301 Self {
302 path: path.to_path_buf(),
303 patterns: EventPatterns::default_patterns(),
304 buffer,
305 position,
306 line_buffer: String::new(),
307 }
308 }
309
310 pub fn poll(&mut self) -> Result<usize> {
315 let mut file = match std::fs::File::open(&self.path) {
316 Ok(f) => f,
317 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
318 return Ok(0); }
320 Err(e) => {
321 return Err(e)
322 .with_context(|| format!("failed to open pipe log: {}", self.path.display()));
323 }
324 };
325
326 let file_len = file.metadata().map(|m| m.len()).unwrap_or(0);
328 if self.position > file_len {
329 self.position = file_len;
330 }
331
332 file.seek(SeekFrom::Start(self.position))
334 .context("failed to seek in pipe log")?;
335
336 let mut new_bytes = Vec::new();
338 let n = file
339 .read_to_end(&mut new_bytes)
340 .context("failed to read pipe log")?;
341
342 if n == 0 {
343 return Ok(0);
344 }
345
346 self.position += n as u64;
347
348 let new_text = String::from_utf8_lossy(&new_bytes);
350 self.line_buffer.push_str(&new_text);
351
352 let mut event_count = 0;
354 while let Some(newline_pos) = self.line_buffer.find('\n') {
355 let line = self.line_buffer[..newline_pos].to_string();
356 self.line_buffer = self.line_buffer[newline_pos + 1..].to_string();
357
358 let stripped = strip_ansi(&line);
359 let trimmed = stripped.trim();
360 if trimmed.is_empty() {
361 continue;
362 }
363
364 if let Some(event) = self.patterns.classify(trimmed) {
365 debug!(event = ?event, "extracted event");
366 self.buffer.push(event);
367 event_count += 1;
368 }
369 }
370
371 Ok(event_count)
372 }
373
374 pub fn checkpoint_offset(&self) -> u64 {
379 self.position
380 .saturating_sub(self.line_buffer.len().try_into().unwrap_or(0))
381 }
382}
383
384#[cfg(test)]
385mod tests {
386 use super::*;
387 use std::fs;
388 use std::io::Write;
389
390 #[test]
393 fn detect_task_started() {
394 let patterns = EventPatterns::default_patterns();
395 let event = patterns
396 .classify("Picked and moved task #3: kanban reader")
397 .unwrap();
398 match event {
399 PipeEvent::TaskStarted { task_id, title } => {
400 assert_eq!(task_id, "3");
401 assert_eq!(title, "kanban reader");
402 }
403 other => panic!("expected TaskStarted, got: {other:?}"),
404 }
405 }
406
407 #[test]
408 fn detect_task_started_claim() {
409 let patterns = EventPatterns::default_patterns();
410 let event = patterns.classify("Claimed task #5").unwrap();
411 assert!(matches!(event, PipeEvent::TaskStarted { .. }));
412 }
413
414 #[test]
415 fn detect_task_completed() {
416 let patterns = EventPatterns::default_patterns();
417 let event = patterns
418 .classify("Moved task #3: in-progress -> done")
419 .unwrap();
420 match event {
421 PipeEvent::TaskCompleted { task_id } => assert_eq!(task_id, "3"),
422 other => panic!("expected TaskCompleted, got: {other:?}"),
423 }
424 }
425
426 #[test]
427 fn detect_commit() {
428 let patterns = EventPatterns::default_patterns();
429 let event = patterns
430 .classify("[main abc1234] fix the auth bug")
431 .unwrap();
432 match event {
433 PipeEvent::CommitMade { hash, message } => {
434 assert_eq!(hash, "abc1234");
435 assert_eq!(message, "fix the auth bug");
436 }
437 other => panic!("expected CommitMade, got: {other:?}"),
438 }
439 }
440
441 #[test]
442 fn detect_test_passed() {
443 let patterns = EventPatterns::default_patterns();
444 let event = patterns
445 .classify("test result: ok. 42 passed; 0 failed")
446 .unwrap();
447 match event {
448 PipeEvent::TestRan { passed, .. } => assert!(passed),
449 other => panic!("expected TestRan, got: {other:?}"),
450 }
451 }
452
453 #[test]
454 fn detect_test_failed() {
455 let patterns = EventPatterns::default_patterns();
456 let event = patterns
457 .classify("test result: FAILED. 40 passed; 2 failed")
458 .unwrap();
459 match event {
460 PipeEvent::TestRan { passed, .. } => assert!(!passed),
461 other => panic!("expected TestRan, got: {other:?}"),
462 }
463 }
464
465 #[test]
466 fn detect_file_created() {
467 let patterns = EventPatterns::default_patterns();
468 let event = patterns.classify("Created file src/tmux.rs").unwrap();
469 match event {
470 PipeEvent::FileCreated { path } => assert_eq!(path, "src/tmux.rs"),
471 other => panic!("expected FileCreated, got: {other:?}"),
472 }
473 }
474
475 #[test]
476 fn detect_file_modified() {
477 let patterns = EventPatterns::default_patterns();
478 let event = patterns.classify("Edited src/main.rs").unwrap();
479 match event {
480 PipeEvent::FileModified { path } => assert_eq!(path, "src/main.rs"),
481 other => panic!("expected FileModified, got: {other:?}"),
482 }
483 }
484
485 #[test]
486 fn detect_command_ran() {
487 let patterns = EventPatterns::default_patterns();
488 let event = patterns.classify("$ cargo test").unwrap();
489 match event {
490 PipeEvent::CommandRan { command, .. } => assert_eq!(command, "cargo test"),
491 other => panic!("expected CommandRan, got: {other:?}"),
492 }
493 }
494
495 #[test]
496 fn detect_prompt() {
497 let patterns = EventPatterns::default_patterns();
498 let event = patterns
499 .classify("Allow tool Read on /home/user/file.rs?")
500 .unwrap();
501 assert!(matches!(event, PipeEvent::PromptDetected { .. }));
502 }
503
504 #[test]
505 fn no_match_on_normal_output() {
506 let patterns = EventPatterns::default_patterns();
507 assert!(
508 patterns
509 .classify("Writing function to parse YAML...")
510 .is_none()
511 );
512 }
513
514 #[test]
517 fn buffer_push_and_snapshot() {
518 let buf = EventBuffer::new(3);
519 buf.push(PipeEvent::OutputLine {
520 line: "a".to_string(),
521 });
522 buf.push(PipeEvent::OutputLine {
523 line: "b".to_string(),
524 });
525
526 let snap = buf.snapshot();
527 assert_eq!(snap.len(), 2);
528 }
529
530 #[test]
531 fn buffer_evicts_oldest_when_full() {
532 let buf = EventBuffer::new(2);
533 buf.push(PipeEvent::OutputLine {
534 line: "a".to_string(),
535 });
536 buf.push(PipeEvent::OutputLine {
537 line: "b".to_string(),
538 });
539 buf.push(PipeEvent::OutputLine {
540 line: "c".to_string(),
541 });
542
543 let snap = buf.snapshot();
544 assert_eq!(snap.len(), 2);
545 assert_eq!(
546 snap[0],
547 PipeEvent::OutputLine {
548 line: "b".to_string()
549 }
550 );
551 assert_eq!(
552 snap[1],
553 PipeEvent::OutputLine {
554 line: "c".to_string()
555 }
556 );
557 }
558
559 #[test]
560 fn buffer_default_size() {
561 let buf = EventBuffer::default_size();
562 assert_eq!(buf.len(), 0);
563
564 for i in 0..60 {
566 buf.push(PipeEvent::OutputLine {
567 line: format!("line {i}"),
568 });
569 }
570 assert_eq!(buf.len(), 50);
571
572 let snap = buf.snapshot();
573 assert_eq!(
575 snap[0],
576 PipeEvent::OutputLine {
577 line: "line 10".to_string()
578 }
579 );
580 }
581
582 #[test]
583 fn buffer_clear() {
584 let buf = EventBuffer::new(10);
585 buf.push(PipeEvent::OutputLine {
586 line: "x".to_string(),
587 });
588 assert_eq!(buf.len(), 1);
589
590 buf.clear();
591 assert!(buf.is_empty());
592 }
593
594 #[test]
595 fn buffer_format_summary_empty() {
596 let buf = EventBuffer::new(10);
597 assert_eq!(buf.format_summary(), "(no events yet)");
598 }
599
600 #[test]
601 fn buffer_format_summary_has_events() {
602 let buf = EventBuffer::new(10);
603 buf.push(PipeEvent::TaskStarted {
604 task_id: "3".to_string(),
605 title: "foo".to_string(),
606 });
607 buf.push(PipeEvent::FileCreated {
608 path: "src/x.rs".to_string(),
609 });
610 buf.push(PipeEvent::TestRan {
611 passed: true,
612 detail: "ok".to_string(),
613 });
614 buf.push(PipeEvent::CommitMade {
615 hash: "abc1234".to_string(),
616 message: "fix".to_string(),
617 });
618
619 let summary = buf.format_summary();
620 assert!(summary.contains("→ task #3 started: foo"));
621 assert!(summary.contains("+ src/x.rs"));
622 assert!(summary.contains("✓ test: ok"));
623 assert!(summary.contains("⊕ commit abc1234: fix"));
624 }
625
626 #[test]
627 fn buffer_is_thread_safe() {
628 let buf = EventBuffer::new(100);
629 let buf2 = buf.clone();
630
631 let handle = std::thread::spawn(move || {
632 for i in 0..50 {
633 buf2.push(PipeEvent::OutputLine {
634 line: format!("thread {i}"),
635 });
636 }
637 });
638
639 for i in 0..50 {
640 buf.push(PipeEvent::OutputLine {
641 line: format!("main {i}"),
642 });
643 }
644
645 handle.join().unwrap();
646 assert_eq!(buf.len(), 100);
647 }
648
649 #[test]
652 fn watcher_reads_new_content() {
653 let tmp = tempfile::tempdir().unwrap();
654 let log_path = tmp.path().join("pty-output.log");
655
656 {
658 let mut f = fs::File::create(&log_path).unwrap();
659 writeln!(f, "Picked and moved task #3: reader").unwrap();
660 writeln!(f, "some normal output").unwrap();
661 writeln!(f, "test result: ok. 5 passed; 0 failed").unwrap();
662 }
663
664 let buffer = EventBuffer::new(50);
665 let mut watcher = PipeWatcher::new(&log_path, buffer.clone());
666
667 let count = watcher.poll().unwrap();
668 assert!(count >= 2, "expected at least 2 events, got {count}");
669
670 let events = buffer.snapshot();
671 assert!(
672 events
673 .iter()
674 .any(|e| matches!(e, PipeEvent::TaskStarted { .. }))
675 );
676 assert!(
677 events
678 .iter()
679 .any(|e| matches!(e, PipeEvent::TestRan { passed: true, .. }))
680 );
681 }
682
683 #[test]
684 fn watcher_tracks_position() {
685 let tmp = tempfile::tempdir().unwrap();
686 let log_path = tmp.path().join("pty-output.log");
687
688 {
690 let mut f = fs::File::create(&log_path).unwrap();
691 writeln!(f, "test result: ok. 5 passed").unwrap();
692 }
693
694 let buffer = EventBuffer::new(50);
695 let mut watcher = PipeWatcher::new(&log_path, buffer.clone());
696
697 watcher.poll().unwrap();
698 let count1 = buffer.len();
699
700 let count = watcher.poll().unwrap();
702 assert_eq!(count, 0, "no new content should yield 0 events");
703 assert_eq!(buffer.len(), count1);
704
705 {
707 let mut f = fs::OpenOptions::new().append(true).open(&log_path).unwrap();
708 writeln!(f, "[main abc1234] fix bug").unwrap();
709 }
710
711 let count = watcher.poll().unwrap();
712 assert!(count >= 1, "expected at least 1 new event");
713 }
714
715 #[test]
716 fn watcher_handles_missing_file() {
717 let tmp = tempfile::tempdir().unwrap();
718 let log_path = tmp.path().join("nonexistent.log");
719
720 let buffer = EventBuffer::new(50);
721 let mut watcher = PipeWatcher::new(&log_path, buffer);
722
723 let count = watcher.poll().unwrap();
725 assert_eq!(count, 0);
726 }
727
728 #[test]
729 fn watcher_resume_from_position_reads_only_new_content() {
730 let tmp = tempfile::tempdir().unwrap();
731 let log_path = tmp.path().join("resume.log");
732
733 {
734 let mut f = fs::File::create(&log_path).unwrap();
735 writeln!(f, "test result: ok. 1 passed").unwrap();
736 }
737
738 let file_len = fs::metadata(&log_path).unwrap().len();
739 let buffer = EventBuffer::new(50);
740 let mut watcher = PipeWatcher::new_with_position(&log_path, buffer.clone(), file_len);
741
742 {
743 let mut f = fs::OpenOptions::new().append(true).open(&log_path).unwrap();
744 writeln!(f, "[main abc1234] resume").unwrap();
745 }
746
747 let count = watcher.poll().unwrap();
748 assert!(count >= 1);
749 let events = buffer.snapshot();
750 assert!(
751 events
752 .iter()
753 .any(|e| matches!(e, PipeEvent::CommitMade { .. }))
754 );
755 }
756
757 #[test]
758 fn watcher_checkpoint_offset_rewinds_partial_line() {
759 let tmp = tempfile::tempdir().unwrap();
760 let log_path = tmp.path().join("partial.log");
761
762 {
763 let mut f = fs::File::create(&log_path).unwrap();
764 write!(f, "test result: ok").unwrap(); }
766
767 let buffer = EventBuffer::new(50);
768 let mut watcher = PipeWatcher::new(&log_path, buffer.clone());
769 let _ = watcher.poll().unwrap();
770
771 assert_eq!(
772 watcher.checkpoint_offset(),
773 0,
774 "partial line should be re-read on resume"
775 );
776 assert_eq!(buffer.len(), 0);
777 }
778
779 #[test]
780 fn watcher_strips_ansi() {
781 let tmp = tempfile::tempdir().unwrap();
782 let log_path = tmp.path().join("ansi.log");
783
784 {
785 let mut f = fs::File::create(&log_path).unwrap();
786 writeln!(f, "\x1b[32mtest result: ok. 5 passed\x1b[0m").unwrap();
788 }
789
790 let buffer = EventBuffer::new(50);
791 let mut watcher = PipeWatcher::new(&log_path, buffer.clone());
792
793 watcher.poll().unwrap();
794 let events = buffer.snapshot();
795 assert!(
796 events
797 .iter()
798 .any(|e| matches!(e, PipeEvent::TestRan { passed: true, .. }))
799 );
800 }
801
802 #[test]
805 fn pipe_event_serializes_to_json() {
806 let event = PipeEvent::TaskStarted {
807 task_id: "3".to_string(),
808 title: "test".to_string(),
809 };
810 let json = serde_json::to_string(&event).unwrap();
811 assert!(json.contains("\"type\":\"task_started\""));
812 assert!(json.contains("\"task_id\":\"3\""));
813 }
814
815 #[test]
816 fn all_pipe_events_serialize() {
817 let events = vec![
818 PipeEvent::TaskStarted {
819 task_id: "1".to_string(),
820 title: "test".to_string(),
821 },
822 PipeEvent::FileCreated {
823 path: "x.rs".to_string(),
824 },
825 PipeEvent::FileModified {
826 path: "y.rs".to_string(),
827 },
828 PipeEvent::CommandRan {
829 command: "ls".to_string(),
830 success: Some(true),
831 },
832 PipeEvent::TestRan {
833 passed: true,
834 detail: "ok".to_string(),
835 },
836 PipeEvent::PromptDetected {
837 prompt: "y/n".to_string(),
838 },
839 PipeEvent::TaskCompleted {
840 task_id: "1".to_string(),
841 },
842 PipeEvent::CommitMade {
843 hash: "abc".to_string(),
844 message: "fix".to_string(),
845 },
846 PipeEvent::OutputLine {
847 line: "hi".to_string(),
848 },
849 ];
850
851 for event in events {
852 let json = serde_json::to_string(&event);
853 assert!(json.is_ok(), "failed to serialize: {event:?}");
854 }
855 }
856
857 #[test]
860 fn pipe_watcher_extracts_events_from_file() {
861 let tmp = tempfile::tempdir().unwrap();
862 let pipe_log = tmp.path().join("pipe.log");
863 std::fs::write(
864 &pipe_log,
865 "some normal output\nRunning: cargo test\ntest result: ok. 5 passed\n",
866 )
867 .unwrap();
868
869 let buffer = EventBuffer::new(10);
870 let mut watcher = PipeWatcher::new(&pipe_log, buffer);
871 let count = watcher.poll().unwrap();
872 assert!(count > 0, "expected at least one event extracted");
873 }
874
875 #[test]
876 fn pipe_watcher_handles_missing_file() {
877 let tmp = tempfile::tempdir().unwrap();
878 let pipe_log = tmp.path().join("nonexistent.log");
879
880 let buffer = EventBuffer::new(10);
881 let mut watcher = PipeWatcher::new(&pipe_log, buffer);
882 let count = watcher.poll().unwrap();
883 assert_eq!(count, 0);
884 }
885
886 #[test]
887 fn pipe_watcher_incremental_reads() {
888 let tmp = tempfile::tempdir().unwrap();
889 let pipe_log = tmp.path().join("pipe.log");
890 std::fs::write(&pipe_log, "Running: first command\n").unwrap();
891
892 let buffer = EventBuffer::new(10);
893 let mut watcher = PipeWatcher::new(&pipe_log, buffer);
894 let count1 = watcher.poll().unwrap();
895 assert!(count1 > 0, "should extract events from first write");
896
897 use std::io::Write;
899 let mut f = std::fs::OpenOptions::new()
900 .append(true)
901 .open(&pipe_log)
902 .unwrap();
903 writeln!(f, "Running: second command").unwrap();
904
905 let count2 = watcher.poll().unwrap();
906 assert!(count2 > 0, "should extract events from appended content");
907 }
908
909 #[test]
910 fn pipe_watcher_clamps_stale_position() {
911 let tmp = tempfile::tempdir().unwrap();
912 let pipe_log = tmp.path().join("pipe.log");
913 std::fs::write(&pipe_log, "short\n").unwrap();
914
915 let buffer = EventBuffer::new(10);
916 let mut watcher = PipeWatcher::new_with_position(&pipe_log, buffer, 99999);
918 let count = watcher.poll().unwrap();
919 assert_eq!(count, 0, "should clamp and read nothing new");
920 }
921
922 #[test]
923 fn pipe_watcher_partial_line_buffering() {
924 let tmp = tempfile::tempdir().unwrap();
925 let pipe_log = tmp.path().join("pipe.log");
926 std::fs::write(&pipe_log, "Running: partial").unwrap();
928
929 let buffer = EventBuffer::new(10);
930 let mut watcher = PipeWatcher::new(&pipe_log, buffer);
931 let count1 = watcher.poll().unwrap();
932 assert_eq!(
933 count1, 0,
934 "incomplete line should be buffered, not processed"
935 );
936
937 use std::io::Write;
939 let mut f = std::fs::OpenOptions::new()
940 .append(true)
941 .open(&pipe_log)
942 .unwrap();
943 writeln!(f, " command").unwrap();
944
945 let count2 = watcher.poll().unwrap();
946 assert!(
947 count2 > 0,
948 "completed line should now be processed as event"
949 );
950 }
951
952 #[test]
953 fn format_summary_truncates_long_output_line() {
954 let buffer = EventBuffer::new(10);
955 let long_line = "x".repeat(100);
956 buffer.push(PipeEvent::OutputLine {
957 line: long_line.clone(),
958 });
959 let summary = buffer.format_summary();
960 assert!(
961 summary.contains("..."),
962 "long output line should be truncated with ..."
963 );
964 assert!(summary.len() < long_line.len() + 20);
965 }
966
967 #[test]
968 fn format_summary_includes_all_event_types() {
969 let buffer = EventBuffer::new(10);
970 buffer.push(PipeEvent::CommandRan {
971 command: "ls".to_string(),
972 success: Some(true),
973 });
974 buffer.push(PipeEvent::TestRan {
975 passed: true,
976 detail: "all good".to_string(),
977 });
978 buffer.push(PipeEvent::CommitMade {
979 hash: "abc1234def".to_string(),
980 message: "fix bug".to_string(),
981 });
982 buffer.push(PipeEvent::OutputLine {
983 line: "hello".to_string(),
984 });
985
986 let summary = buffer.format_summary();
987 assert!(summary.contains("$ ls"));
988 assert!(summary.contains("✓ test:"));
989 assert!(summary.contains("⊕ commit abc1234:"));
990 assert!(summary.contains("hello"));
991 }
992
993 #[test]
994 fn event_buffer_respects_capacity() {
995 let buffer = EventBuffer::new(3);
996 buffer.push(PipeEvent::OutputLine {
997 line: "a".to_string(),
998 });
999 buffer.push(PipeEvent::OutputLine {
1000 line: "b".to_string(),
1001 });
1002 buffer.push(PipeEvent::OutputLine {
1003 line: "c".to_string(),
1004 });
1005 buffer.push(PipeEvent::OutputLine {
1006 line: "d".to_string(),
1007 });
1008
1009 let summary = buffer.format_summary();
1010 assert!(!summary.contains(" a"), "oldest event should be evicted");
1011 assert!(summary.contains(" d"), "newest event should be present");
1012 }
1013}