1use crate::config::Config;
2use crate::highlighter::{Highlighter, WatcherStats};
3use crate::matcher::Matcher;
4use crate::notifier::Notifier;
5use crate::utils::{get_file_size, validate_files};
6use anyhow::Result;
7use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
8use std::collections::HashMap;
9use std::fs::File;
10use std::io::{BufRead, BufReader, Seek, SeekFrom};
11use std::path::{Path, PathBuf};
12use std::time::Duration;
13use tokio::sync::mpsc;
14use tokio::time::sleep;
15use tracing::{error, info};
16
17#[derive(Debug)]
18pub struct LogWatcher {
19 config: Config,
20 matcher: Matcher,
21 highlighter: Highlighter,
22 notifier: Notifier,
23 stats: WatcherStats,
24}
25
26impl LogWatcher {
27 pub fn new(config: Config) -> Self {
28 let matcher = Matcher::new(config.clone());
29 let highlighter = Highlighter::new(config.clone());
30 let notifier = Notifier::new(config.clone());
31
32 Self {
33 config,
34 matcher,
35 highlighter,
36 notifier,
37 stats: WatcherStats::default(),
38 }
39 }
40
41 pub async fn run(&mut self) -> Result<()> {
42 let valid_files = validate_files(&self.config.files)?;
44 self.stats.files_watched = valid_files.len();
45
46 self.highlighter.print_startup_info()?;
48
49 if self.config.dry_run {
50 self.run_dry_mode(&valid_files).await?;
51 } else {
52 self.run_tail_mode(&valid_files).await?;
53 }
54
55 self.highlighter.print_shutdown_summary(&self.stats)?;
57
58 Ok(())
59 }
60
61 async fn run_dry_mode(&mut self, files: &[PathBuf]) -> Result<()> {
62 info!("Running in dry-run mode");
63
64 let mut pattern_counts: HashMap<String, usize> = HashMap::new();
65
66 for file_path in files {
67 match self.process_existing_file(file_path).await {
68 Ok(matches) => {
69 for (pattern, count) in matches {
70 *pattern_counts.entry(pattern).or_insert(0) += count;
71 }
72 }
73 Err(e) => {
74 self.highlighter
75 .print_file_error(&file_path.display().to_string(), &e.to_string())?;
76 }
77 }
78 }
79
80 let summary: Vec<(String, usize)> = pattern_counts.into_iter().collect();
82 self.highlighter.print_dry_run_summary(&summary)?;
83
84 Ok(())
85 }
86
87 async fn run_tail_mode(&mut self, files: &[PathBuf]) -> Result<()> {
88 info!("Running in tail mode");
89
90 let (tx, mut rx) = mpsc::channel::<FileEvent>(100);
92
93 let mut watchers = Vec::new();
95 for file_path in files {
96 let tx_clone = tx.clone();
97 let file_path_clone = file_path.clone();
98
99 match self.start_file_watcher(file_path_clone, tx_clone).await {
100 Ok(watcher) => watchers.push(watcher),
101 Err(e) => {
102 self.highlighter
103 .print_file_error(&file_path.display().to_string(), &e.to_string())?;
104 }
105 }
106 }
107
108 while let Some(event) = rx.recv().await {
110 match event {
111 FileEvent::NewLine { file_path, line } => {
112 self.process_line(&file_path, &line).await?;
113 }
114 FileEvent::FileRotated { file_path } => {
115 self.handle_file_rotation(&file_path).await?;
116 }
117 FileEvent::FileError { file_path, error } => {
118 self.highlighter
119 .print_file_error(&file_path.display().to_string(), &error.to_string())?;
120 }
121 }
122 }
123
124 Ok(())
125 }
126
127 async fn start_file_watcher(
128 &self,
129 file_path: PathBuf,
130 tx: mpsc::Sender<FileEvent>,
131 ) -> Result<RecommendedWatcher> {
132 let file_path_clone = file_path.clone();
133 let tx_clone = tx.clone();
134
135 let mut watcher = notify::recommended_watcher(move |res: Result<Event, notify::Error>| {
136 match res {
137 Ok(event) => {
138 if matches!(event.kind, EventKind::Modify(_)) {
139 }
141 }
142 Err(e) => {
143 let _ = tx_clone.try_send(FileEvent::FileError {
144 file_path: file_path_clone.clone(),
145 error: e,
146 });
147 }
148 }
149 })?;
150
151 watcher.watch(&file_path, RecursiveMode::NonRecursive)?;
152
153 let file_path_clone = file_path.clone();
155 let tx_clone = tx.clone();
156 let poll_interval = self.config.poll_interval;
157 let buffer_size = self.config.buffer_size;
158
159 tokio::spawn(async move {
160 let mut last_size = get_file_size(&file_path_clone).unwrap_or(0);
161
162 loop {
163 sleep(Duration::from_millis(poll_interval)).await;
164
165 match Self::poll_file_changes(&file_path_clone, last_size, buffer_size).await {
166 Ok((new_size, new_lines)) => {
167 last_size = new_size;
168
169 for line in new_lines {
170 if let Err(e) = tx_clone
171 .send(FileEvent::NewLine {
172 file_path: file_path_clone.clone(),
173 line,
174 })
175 .await
176 {
177 error!("Failed to send line event: {}", e);
178 break;
179 }
180 }
181 }
182 Err(e) => {
183 let _ = tx_clone
184 .send(FileEvent::FileError {
185 file_path: file_path_clone.clone(),
186 error: notify::Error::generic(&e.to_string()),
187 })
188 .await;
189 break;
190 }
191 }
192 }
193 });
194
195 Ok(watcher)
196 }
197
198 async fn poll_file_changes(
199 file_path: &PathBuf,
200 last_size: u64,
201 buffer_size: usize,
202 ) -> Result<(u64, Vec<String>)> {
203 let current_size = get_file_size(file_path)?;
204
205 if current_size < last_size {
206 return Err(anyhow::anyhow!("File rotation detected"));
208 }
209
210 if current_size > last_size {
211 let file = File::open(file_path)?;
213 let mut reader = BufReader::with_capacity(buffer_size, file);
214
215 reader.seek(SeekFrom::Start(last_size))?;
217
218 let mut lines = Vec::new();
219 let mut line = String::new();
220
221 while reader.read_line(&mut line)? > 0 {
222 if !line.trim().is_empty() {
223 lines.push(line.trim().to_string());
224 }
225 line.clear();
226 }
227
228 Ok((current_size, lines))
229 } else {
230 Ok((current_size, Vec::new()))
231 }
232 }
233
234 async fn process_existing_file(
235 &mut self,
236 file_path: &PathBuf,
237 ) -> Result<HashMap<String, usize>> {
238 let mut pattern_counts: HashMap<String, usize> = HashMap::new();
239
240 let file = File::open(file_path)?;
241 let reader = BufReader::new(file);
242
243 for line_result in reader.lines() {
244 let line = line_result?;
245 self.stats.lines_processed += 1;
246 let match_result = self.matcher.match_line(&line);
247
248 if match_result.matched {
249 self.stats.matches_found += 1;
250 if let Some(pattern) = &match_result.pattern {
251 *pattern_counts.entry(pattern.clone()).or_insert(0) += 1;
252 }
253
254 self.highlighter.print_line(
255 &line,
256 Some(&file_path.file_name().unwrap().to_string_lossy()),
257 &match_result,
258 true, )?;
260 }
261 }
262
263 Ok(pattern_counts)
264 }
265
266 async fn process_line(&mut self, file_path: &Path, line: &str) -> Result<()> {
267 self.stats.lines_processed += 1;
268
269 let match_result = self.matcher.match_line(line);
270
271 if match_result.matched {
272 self.stats.matches_found += 1;
273
274 if match_result.should_notify {
276 if let Some(pattern) = &match_result.pattern {
277 self.notifier
278 .send_notification(
279 pattern,
280 line,
281 Some(&file_path.file_name().unwrap().to_string_lossy()),
282 )
283 .await?;
284 self.stats.notifications_sent += 1;
285 }
286 }
287 }
288
289 self.highlighter.print_line(
291 line,
292 Some(&file_path.file_name().unwrap().to_string_lossy()),
293 &match_result,
294 false, )?;
296
297 Ok(())
298 }
299
300 async fn handle_file_rotation(&mut self, file_path: &Path) -> Result<()> {
301 self.highlighter
302 .print_file_rotation(&file_path.display().to_string())?;
303
304 sleep(Duration::from_millis(1000)).await;
306
307 if file_path.exists() {
309 self.highlighter
310 .print_file_reopened(&file_path.display().to_string())?;
311 } else {
312 self.highlighter.print_file_error(
313 &file_path.display().to_string(),
314 "File not found after rotation",
315 )?;
316 }
317
318 Ok(())
319 }
320}
321
322#[derive(Debug)]
323enum FileEvent {
324 NewLine {
325 file_path: PathBuf,
326 line: String,
327 },
328 #[allow(dead_code)]
329 FileRotated {
330 file_path: PathBuf,
331 },
332 FileError {
333 file_path: PathBuf,
334 error: notify::Error,
335 },
336}
337
338#[cfg(test)]
339mod tests {
340 use super::*;
341 use crate::cli::Args;
342 use std::io::Write;
343 use tempfile::NamedTempFile;
344
345 fn create_test_config() -> Config {
346 let args = Args {
347 files: vec![PathBuf::from("test.log")],
348 patterns: "ERROR".to_string(),
349 regex: false,
350 case_insensitive: false,
351 color_map: None,
352 notify: false,
353 notify_patterns: None,
354 notify_throttle: 5,
355 dry_run: true,
356 quiet: false,
357 no_color: true,
358 prefix_file: None,
359 poll_interval: 100,
360 buffer_size: 8192,
361 };
362 Config::from_args(&args).unwrap()
363 }
364
365 #[tokio::test]
366 async fn test_dry_run_mode() {
367 let mut temp_file = NamedTempFile::new().unwrap();
368 writeln!(temp_file, "This is an ERROR message").unwrap();
369 writeln!(temp_file, "This is a normal message").unwrap();
370 writeln!(temp_file, "Another ERROR message").unwrap();
371 temp_file.flush().unwrap();
372
373 let mut config = create_test_config();
374 config.files = vec![temp_file.path().to_path_buf()];
375
376 let mut watcher = LogWatcher::new(config);
377 let result = watcher.run().await;
378
379 assert!(result.is_ok());
380 assert_eq!(watcher.stats.matches_found, 2);
381 }
382
383 #[test]
384 fn test_poll_file_changes() {
385 let mut temp_file = NamedTempFile::new().unwrap();
386 writeln!(temp_file, "line 1").unwrap();
387 temp_file.flush().unwrap();
388
389 let initial_size = get_file_size(temp_file.path()).unwrap();
390
391 writeln!(temp_file, "line 2").unwrap();
392 temp_file.flush().unwrap();
393
394 let rt = tokio::runtime::Runtime::new().unwrap();
395 let result = rt.block_on(LogWatcher::poll_file_changes(
396 &temp_file.path().to_path_buf(),
397 initial_size,
398 1024,
399 ));
400
401 assert!(result.is_ok());
402 let (new_size, lines) = result.unwrap();
403 assert!(new_size > initial_size);
404 assert_eq!(lines.len(), 1);
405 assert_eq!(lines[0], "line 2");
406 }
407
408 #[tokio::test]
409 async fn test_process_existing_file() {
410 let mut temp_file = NamedTempFile::new().unwrap();
411 writeln!(temp_file, "ERROR: Something went wrong").unwrap();
412 writeln!(temp_file, "INFO: Normal operation").unwrap();
413 temp_file.flush().unwrap();
414
415 let config = create_test_config();
416 let mut watcher = LogWatcher::new(config);
417
418 let result = watcher
420 .process_existing_file(&temp_file.path().to_path_buf())
421 .await;
422 assert!(result.is_ok());
423 }
424
425 #[tokio::test]
426 async fn test_process_line() {
427 let mut temp_file = NamedTempFile::new().unwrap();
428 writeln!(temp_file, "ERROR: Test error").unwrap();
429 temp_file.flush().unwrap();
430
431 let config = create_test_config();
432 let mut watcher = LogWatcher::new(config);
433
434 let result = watcher
436 .process_line(temp_file.path(), "ERROR: Test error")
437 .await;
438 assert!(result.is_ok());
439 }
440
441 #[tokio::test]
442 async fn test_handle_file_rotation() {
443 let mut temp_file = NamedTempFile::new().unwrap();
444 writeln!(temp_file, "ERROR: Test error").unwrap();
445 temp_file.flush().unwrap();
446
447 let config = create_test_config();
448 let mut watcher = LogWatcher::new(config);
449
450 let result = watcher.handle_file_rotation(temp_file.path()).await;
452 assert!(result.is_ok());
453 }
454
455 #[tokio::test]
456 async fn test_run_with_startup_info() {
457 let mut temp_file = NamedTempFile::new().unwrap();
458 writeln!(temp_file, "ERROR: Test error").unwrap();
459 temp_file.flush().unwrap();
460
461 let mut config = create_test_config();
462 config.files = vec![temp_file.path().to_path_buf()];
463 config.dry_run = true;
464
465 let mut watcher = LogWatcher::new(config);
466 let result = watcher.run().await;
467 assert!(result.is_ok());
468 }
469
470 #[tokio::test]
471 async fn test_run_tail_mode_execution() {
472 let mut temp_file = NamedTempFile::new().unwrap();
473 writeln!(temp_file, "ERROR: Test error").unwrap();
474 temp_file.flush().unwrap();
475
476 let mut config = create_test_config();
477 config.files = vec![temp_file.path().to_path_buf()];
478 config.dry_run = false; let mut watcher = LogWatcher::new(config);
481
482 let result =
484 tokio::time::timeout(std::time::Duration::from_millis(100), watcher.run()).await;
485
486 assert!(result.is_err());
488 }
489
490 #[test]
491 fn test_run_tail_mode() {
492 let mut temp_file = NamedTempFile::new().unwrap();
493 writeln!(temp_file, "ERROR: Test error").unwrap();
494 temp_file.flush().unwrap();
495
496 let config = create_test_config();
497 let mut watcher = LogWatcher::new(config);
498
499 let rt = tokio::runtime::Runtime::new().unwrap();
501 let files = vec![temp_file.path().to_path_buf()];
502
503 let result = rt.block_on(async {
505 tokio::time::timeout(
506 std::time::Duration::from_millis(100),
507 watcher.run_tail_mode(&files),
508 )
509 .await
510 });
511
512 assert!(result.is_err());
514 }
515
516 #[tokio::test]
517 async fn test_dry_run_with_file_error() {
518 let mut config = create_test_config();
520 config.files = vec![PathBuf::from("/non/existent/file.log")];
521 config.dry_run = true;
522
523 let mut watcher = LogWatcher::new(config);
524 let result = watcher.run().await;
525
526 assert!(result.is_err());
528 assert!(result
529 .unwrap_err()
530 .to_string()
531 .contains("No valid files to watch"));
532 }
533
534 #[tokio::test]
535 async fn test_dry_run_summary_with_multiple_patterns() {
536 let mut temp_file = NamedTempFile::new().unwrap();
537 writeln!(temp_file, "ERROR: Something went wrong").unwrap();
538 writeln!(temp_file, "WARN: This is a warning").unwrap();
539 writeln!(temp_file, "INFO: Normal operation").unwrap();
540 writeln!(temp_file, "ERROR: Another error").unwrap();
541 temp_file.flush().unwrap();
542
543 let mut config = create_test_config();
544 config.files = vec![temp_file.path().to_path_buf()];
545 config.patterns = vec!["ERROR".to_string(), "WARN".to_string()];
546 config.dry_run = true;
547
548 let mut watcher = LogWatcher::new(config);
549 let result = watcher.run().await;
550 assert!(result.is_ok());
551 assert_eq!(watcher.stats.matches_found, 3); }
553
554 #[tokio::test]
555 async fn test_poll_file_changes_with_rotation() {
556 let mut temp_file = NamedTempFile::new().unwrap();
557 writeln!(temp_file, "line 1").unwrap();
558 temp_file.flush().unwrap();
559
560 let initial_size = get_file_size(temp_file.path()).unwrap();
561
562 temp_file.as_file_mut().set_len(0).unwrap();
564 temp_file.flush().unwrap();
565
566 let result =
567 LogWatcher::poll_file_changes(&temp_file.path().to_path_buf(), initial_size, 1024)
568 .await;
569
570 assert!(result.is_err());
572 assert!(result
573 .unwrap_err()
574 .to_string()
575 .contains("File rotation detected"));
576 }
577
578 #[tokio::test]
579 async fn test_poll_file_changes_no_new_content() {
580 let mut temp_file = NamedTempFile::new().unwrap();
581 writeln!(temp_file, "line 1").unwrap();
582 temp_file.flush().unwrap();
583
584 let initial_size = get_file_size(temp_file.path()).unwrap();
585
586 let result =
587 LogWatcher::poll_file_changes(&temp_file.path().to_path_buf(), initial_size, 1024)
588 .await;
589
590 assert!(result.is_ok());
591 let (new_size, lines) = result.unwrap();
592 assert_eq!(new_size, initial_size);
593 assert_eq!(lines.len(), 0);
594 }
595
596 #[tokio::test]
597 async fn test_poll_file_changes_with_seeking() {
598 let mut temp_file = NamedTempFile::new().unwrap();
599 writeln!(temp_file, "line 1").unwrap();
600 writeln!(temp_file, "line 2").unwrap();
601 temp_file.flush().unwrap();
602
603 let initial_size = get_file_size(temp_file.path()).unwrap();
604
605 writeln!(temp_file, "line 3").unwrap();
607 writeln!(temp_file, "line 4").unwrap();
608 temp_file.flush().unwrap();
609
610 let result =
611 LogWatcher::poll_file_changes(&temp_file.path().to_path_buf(), initial_size, 1024)
612 .await;
613
614 assert!(result.is_ok());
615 let (new_size, lines) = result.unwrap();
616 assert!(new_size > initial_size);
617 assert_eq!(lines.len(), 2);
618 assert_eq!(lines[0], "line 3");
619 assert_eq!(lines[1], "line 4");
620 }
621
622 #[tokio::test]
623 async fn test_process_line_with_notification() {
624 let mut temp_file = NamedTempFile::new().unwrap();
625 writeln!(temp_file, "ERROR: Test error").unwrap();
626 temp_file.flush().unwrap();
627
628 let mut config = create_test_config();
629 config.notify_enabled = true;
630 config.notify_patterns = vec!["ERROR".to_string()];
631
632 let mut watcher = LogWatcher::new(config);
633
634 let result = watcher
636 .process_line(temp_file.path(), "ERROR: Critical error occurred")
637 .await;
638
639 if let Err(e) = &result {
641 eprintln!("Notification test failed with error: {}", e);
642 let error_msg = e.to_string();
643 if error_msg.contains("can only be set once") || error_msg.contains("org.freedesktop.DBus.Error.ServiceUnknown") || error_msg.contains("not provided by any .service files")
647 {
649 assert_eq!(watcher.stats.notifications_sent, 0);
652 return;
653 }
654 }
655
656 assert!(result.is_ok());
657 assert_eq!(watcher.stats.notifications_sent, 1);
658 }
659
660 #[tokio::test]
661 async fn test_process_line_without_notification() {
662 let mut temp_file = NamedTempFile::new().unwrap();
663 writeln!(temp_file, "INFO: Normal operation").unwrap();
664 temp_file.flush().unwrap();
665
666 let mut config = create_test_config();
667 config.notify_enabled = true;
668 config.notify_patterns = vec!["ERROR".to_string()];
669
670 let mut watcher = LogWatcher::new(config);
671
672 let result = watcher
674 .process_line(temp_file.path(), "INFO: Normal operation")
675 .await;
676 assert!(result.is_ok());
677 assert_eq!(watcher.stats.notifications_sent, 0);
678 }
679
680 #[tokio::test]
681 async fn test_handle_file_rotation_file_not_found() {
682 let config = create_test_config();
683 let mut watcher = LogWatcher::new(config);
684
685 let result = watcher
687 .handle_file_rotation(&PathBuf::from("/non/existent/file.log"))
688 .await;
689 assert!(result.is_ok());
690 }
691
692 #[tokio::test]
693 async fn test_start_file_watcher() {
694 let mut temp_file = NamedTempFile::new().unwrap();
695 writeln!(temp_file, "ERROR: Test error").unwrap();
696 temp_file.flush().unwrap();
697
698 let config = create_test_config();
699 let watcher = LogWatcher::new(config);
700
701 let (tx, _rx) = mpsc::channel::<FileEvent>(100);
702
703 let result = watcher
705 .start_file_watcher(temp_file.path().to_path_buf(), tx)
706 .await;
707
708 assert!(result.is_ok());
709 }
710
711 #[tokio::test]
712 async fn test_file_event_processing() {
713 let mut temp_file = NamedTempFile::new().unwrap();
714 writeln!(temp_file, "ERROR: Test error").unwrap();
715 temp_file.flush().unwrap();
716
717 let mut config = create_test_config();
718 config.dry_run = false;
719
720 let mut watcher = LogWatcher::new(config);
721
722 let result = watcher
724 .process_line(temp_file.path(), "ERROR: New error occurred")
725 .await;
726 assert!(result.is_ok());
727 assert_eq!(watcher.stats.lines_processed, 1);
728 assert_eq!(watcher.stats.matches_found, 1);
729 }
730
731 #[tokio::test]
732 async fn test_process_existing_file_with_empty_file() {
733 let temp_file = NamedTempFile::new().unwrap();
734 let config = create_test_config();
737 let mut watcher = LogWatcher::new(config);
738
739 let result = watcher
741 .process_existing_file(&temp_file.path().to_path_buf())
742 .await;
743 assert!(result.is_ok());
744 assert_eq!(watcher.stats.lines_processed, 0);
745 assert_eq!(watcher.stats.matches_found, 0);
746 }
747
748 #[tokio::test]
749 async fn test_process_existing_file_with_non_matching_content() {
750 let mut temp_file = NamedTempFile::new().unwrap();
751 writeln!(temp_file, "This is a normal message").unwrap();
752 writeln!(temp_file, "Another normal message").unwrap();
753 temp_file.flush().unwrap();
754
755 let config = create_test_config();
756 let mut watcher = LogWatcher::new(config);
757
758 let result = watcher
760 .process_existing_file(&temp_file.path().to_path_buf())
761 .await;
762 assert!(result.is_ok());
763 assert_eq!(watcher.stats.lines_processed, 2);
764 assert_eq!(watcher.stats.matches_found, 0);
765 }
766
767 #[tokio::test]
768 async fn test_run_tail_mode_with_watcher_error() {
769 let mut temp_file = NamedTempFile::new().unwrap();
770 writeln!(temp_file, "ERROR: Test error").unwrap();
771 temp_file.flush().unwrap();
772
773 let mut config = create_test_config();
774 config.files = vec![temp_file.path().to_path_buf()];
775 config.poll_interval = 10; let mut watcher = LogWatcher::new(config);
778 let files = vec![temp_file.path().to_path_buf()];
779
780 let result = tokio::time::timeout(
782 std::time::Duration::from_millis(50),
783 watcher.run_tail_mode(&files),
784 )
785 .await;
786
787 assert!(result.is_err());
789 }
790
791 #[tokio::test]
792 async fn test_file_event_processing_new_line() {
793 let mut temp_file = NamedTempFile::new().unwrap();
794 writeln!(temp_file, "ERROR: Test error").unwrap();
795 temp_file.flush().unwrap();
796
797 let config = create_test_config();
798 let mut watcher = LogWatcher::new(config);
799
800 let result = watcher
802 .process_line(temp_file.path(), "ERROR: New error occurred")
803 .await;
804 assert!(result.is_ok());
805 assert_eq!(watcher.stats.matches_found, 1);
806 }
807
808 #[tokio::test]
809 async fn test_file_event_processing_file_rotation() {
810 let mut temp_file = NamedTempFile::new().unwrap();
811 writeln!(temp_file, "ERROR: Test error").unwrap();
812 temp_file.flush().unwrap();
813
814 let config = create_test_config();
815 let mut watcher = LogWatcher::new(config);
816
817 let result = watcher.handle_file_rotation(temp_file.path()).await;
819 assert!(result.is_ok());
820 }
821
822 #[tokio::test]
823 async fn test_file_event_processing_file_error() {
824 let mut temp_file = NamedTempFile::new().unwrap();
825 writeln!(temp_file, "ERROR: Test error").unwrap();
826 temp_file.flush().unwrap();
827
828 let config = create_test_config();
829 let mut watcher = LogWatcher::new(config);
830
831 let error_msg = "Permission denied";
833 let result = watcher
834 .highlighter
835 .print_file_error(&temp_file.path().display().to_string(), error_msg);
836 assert!(result.is_ok());
837 }
838
839 #[tokio::test]
840 async fn test_start_file_watcher_error_handling() {
841 let mut temp_file = NamedTempFile::new().unwrap();
842 writeln!(temp_file, "ERROR: Test error").unwrap();
843 temp_file.flush().unwrap();
844
845 let config = create_test_config();
846 let watcher = LogWatcher::new(config);
847
848 let (tx, _rx) = tokio::sync::mpsc::channel(100);
850 let result = watcher
851 .start_file_watcher(temp_file.path().to_path_buf(), tx)
852 .await;
853 assert!(result.is_ok());
854 }
855
856 #[tokio::test]
857 async fn test_poll_file_changes_error_handling() {
858 let mut temp_file = NamedTempFile::new().unwrap();
859 writeln!(temp_file, "line 1").unwrap();
860 temp_file.flush().unwrap();
861
862 let initial_size = get_file_size(temp_file.path()).unwrap();
863
864 let result =
866 LogWatcher::poll_file_changes(&temp_file.path().to_path_buf(), initial_size, 1024)
867 .await;
868
869 assert!(result.is_ok());
870 let (new_size, lines) = result.unwrap();
871 assert_eq!(new_size, initial_size);
872 assert_eq!(lines.len(), 0);
873 }
874
875 #[tokio::test]
876 async fn test_poll_file_changes_with_file_error() {
877 let result =
879 LogWatcher::poll_file_changes(&PathBuf::from("/non/existent/file.log"), 0, 1024).await;
880
881 assert!(result.is_err());
882 }
883
884 #[tokio::test]
885 async fn test_file_event_channel_error() {
886 let mut temp_file = NamedTempFile::new().unwrap();
887 writeln!(temp_file, "ERROR: Test error").unwrap();
888 temp_file.flush().unwrap();
889
890 let config = create_test_config();
891 let _watcher = LogWatcher::new(config);
892
893 let (tx, rx) = tokio::sync::mpsc::channel(1);
895 drop(rx); let result = tx
899 .send(FileEvent::NewLine {
900 file_path: temp_file.path().to_path_buf(),
901 line: "ERROR: Test".to_string(),
902 })
903 .await;
904
905 assert!(result.is_err());
906 }
907
908 #[tokio::test]
909 async fn test_run_dry_mode_with_file_error() {
910 let mut config = create_test_config();
911 config.files = vec![PathBuf::from("/non/existent/file.log")];
912 config.dry_run = true;
913
914 let mut watcher = LogWatcher::new(config);
915 let files = vec![PathBuf::from("/non/existent/file.log")];
916
917 let result = watcher.run_dry_mode(&files).await;
919 assert!(result.is_ok());
920 }
921
922 #[tokio::test]
923 async fn test_run_tail_mode_with_file_error() {
924 let mut config = create_test_config();
925 config.files = vec![PathBuf::from("/non/existent/file.log")];
926
927 let mut watcher = LogWatcher::new(config);
928 let files = vec![PathBuf::from("/non/existent/file.log")];
929
930 let result = tokio::time::timeout(
932 std::time::Duration::from_millis(100),
933 watcher.run_tail_mode(&files),
934 )
935 .await;
936
937 assert!(result.is_err());
939 }
940
941 #[tokio::test]
942 async fn test_channel_send_error_handling() {
943 let mut temp_file = NamedTempFile::new().unwrap();
944 writeln!(temp_file, "ERROR: Test error").unwrap();
945 temp_file.flush().unwrap();
946
947 let config = create_test_config();
948 let _watcher = LogWatcher::new(config);
949
950 let (tx, rx) = tokio::sync::mpsc::channel(1);
952
953 let file_path = temp_file.path().to_path_buf();
955 let tx_clone = tx.clone();
956
957 tokio::spawn(async move {
958 let _ = tx_clone
960 .send(FileEvent::NewLine {
961 file_path: file_path.clone(),
962 line: "ERROR: Test".to_string(),
963 })
964 .await;
965 });
966
967 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
969
970 drop(rx);
972
973 let result = tx
975 .send(FileEvent::NewLine {
976 file_path: temp_file.path().to_path_buf(),
977 line: "ERROR: Test".to_string(),
978 })
979 .await;
980
981 assert!(result.is_err());
982 }
983
984 #[tokio::test]
985 async fn test_file_error_event_sending() {
986 let mut temp_file = NamedTempFile::new().unwrap();
987 writeln!(temp_file, "ERROR: Test error").unwrap();
988 temp_file.flush().unwrap();
989
990 let config = create_test_config();
991 let _watcher = LogWatcher::new(config);
992
993 let (tx, rx) = tokio::sync::mpsc::channel::<FileEvent>(1);
995 drop(rx);
996
997 let result = tx
999 .send(FileEvent::FileError {
1000 file_path: temp_file.path().to_path_buf(),
1001 error: notify::Error::generic("Test error"),
1002 })
1003 .await;
1004
1005 assert!(result.is_err());
1006 }
1007
1008 #[tokio::test]
1009 async fn test_poll_file_changes_error_sending() {
1010 let mut temp_file = NamedTempFile::new().unwrap();
1011 writeln!(temp_file, "ERROR: Test error").unwrap();
1012 temp_file.flush().unwrap();
1013
1014 let config = create_test_config();
1015 let _watcher = LogWatcher::new(config);
1016
1017 let (_tx, rx) = tokio::sync::mpsc::channel::<FileEvent>(1);
1019 drop(rx);
1020
1021 let result =
1023 LogWatcher::poll_file_changes(&PathBuf::from("/non/existent/file.log"), 0, 1024).await;
1024
1025 assert!(result.is_err());
1026 }
1027
1028 #[tokio::test]
1029 async fn test_startup_info_coverage() {
1030 let mut temp_file = NamedTempFile::new().unwrap();
1031 writeln!(temp_file, "ERROR: Test error").unwrap();
1032 temp_file.flush().unwrap();
1033
1034 let mut config = create_test_config();
1035 config.files = vec![temp_file.path().to_path_buf()];
1036
1037 let mut watcher = LogWatcher::new(config);
1038
1039 let result = watcher.highlighter.print_startup_info();
1041 assert!(result.is_ok());
1042 }
1043
1044 #[tokio::test]
1045 async fn test_dry_run_summary_coverage() {
1046 let mut temp_file = NamedTempFile::new().unwrap();
1047 writeln!(temp_file, "ERROR: Test error").unwrap();
1048 temp_file.flush().unwrap();
1049
1050 let mut config = create_test_config();
1051 config.files = vec![temp_file.path().to_path_buf()];
1052 config.dry_run = true;
1053
1054 let mut watcher = LogWatcher::new(config);
1055 let files = vec![temp_file.path().to_path_buf()];
1056
1057 let result = watcher.run_dry_mode(&files).await;
1059 assert!(result.is_ok());
1060 }
1061
1062 #[tokio::test]
1063 async fn test_pattern_counts_entry_coverage() {
1064 let mut temp_file = NamedTempFile::new().unwrap();
1065 writeln!(temp_file, "ERROR: Test error").unwrap();
1066 writeln!(temp_file, "ERROR: Another error").unwrap();
1067 temp_file.flush().unwrap();
1068
1069 let mut config = create_test_config();
1070 config.files = vec![temp_file.path().to_path_buf()];
1071 config.dry_run = true;
1072
1073 let mut watcher = LogWatcher::new(config);
1074 let files = vec![temp_file.path().to_path_buf()];
1075
1076 let result = watcher.run_dry_mode(&files).await;
1078 assert!(result.is_ok());
1079 }
1080
1081 #[tokio::test]
1082 async fn test_files_watched_assignment() {
1083 let mut temp_file = NamedTempFile::new().unwrap();
1084 writeln!(temp_file, "ERROR: Test error").unwrap();
1085 temp_file.flush().unwrap();
1086
1087 let mut config = create_test_config();
1088 config.files = vec![temp_file.path().to_path_buf()];
1089 let files = config.files.clone();
1090
1091 let mut watcher = LogWatcher::new(config);
1092
1093 let valid_files = validate_files(&files).unwrap();
1095 watcher.stats.files_watched = valid_files.len();
1096
1097 assert_eq!(watcher.stats.files_watched, 1);
1098 }
1099
1100 #[tokio::test]
1101 async fn test_run_method_coverage() {
1102 let mut temp_file = NamedTempFile::new().unwrap();
1103 writeln!(temp_file, "ERROR: Test error").unwrap();
1104 temp_file.flush().unwrap();
1105
1106 let mut config = create_test_config();
1107 config.files = vec![temp_file.path().to_path_buf()];
1108 config.dry_run = true;
1109
1110 let mut watcher = LogWatcher::new(config);
1111
1112 let result = watcher.run().await;
1114 assert!(result.is_ok());
1115 }
1116
1117 #[tokio::test]
1118 async fn test_run_tail_mode_coverage() {
1119 let mut temp_file = NamedTempFile::new().unwrap();
1120 writeln!(temp_file, "ERROR: Test error").unwrap();
1121 temp_file.flush().unwrap();
1122
1123 let mut config = create_test_config();
1124 config.files = vec![temp_file.path().to_path_buf()];
1125 config.dry_run = false;
1126
1127 let mut watcher = LogWatcher::new(config);
1128 let files = vec![temp_file.path().to_path_buf()];
1129
1130 let result = tokio::time::timeout(
1132 std::time::Duration::from_millis(100),
1133 watcher.run_tail_mode(&files),
1134 )
1135 .await;
1136
1137 assert!(result.is_err());
1139 }
1140
1141 #[tokio::test]
1142 async fn test_file_event_processing_comprehensive() {
1143 let mut temp_file = NamedTempFile::new().unwrap();
1144 writeln!(temp_file, "ERROR: Test error").unwrap();
1145 temp_file.flush().unwrap();
1146
1147 let mut config = create_test_config();
1148 config.files = vec![temp_file.path().to_path_buf()];
1149
1150 let mut watcher = LogWatcher::new(config);
1151
1152 let events = vec![
1154 FileEvent::NewLine {
1155 file_path: temp_file.path().to_path_buf(),
1156 line: "ERROR: Test error".to_string(),
1157 },
1158 FileEvent::FileRotated {
1159 file_path: temp_file.path().to_path_buf(),
1160 },
1161 FileEvent::FileError {
1162 file_path: temp_file.path().to_path_buf(),
1163 error: notify::Error::generic("Test error"),
1164 },
1165 ];
1166
1167 for event in events {
1168 let result = match event {
1169 FileEvent::NewLine { file_path, line } => {
1170 watcher.process_line(&file_path, &line).await
1171 }
1172 FileEvent::FileRotated { file_path } => {
1173 watcher.handle_file_rotation(&file_path).await
1174 }
1175 FileEvent::FileError { file_path, error } => watcher
1176 .highlighter
1177 .print_file_error(&file_path.display().to_string(), &error.to_string()),
1178 };
1179 assert!(result.is_ok());
1180 }
1181 }
1182
1183 #[tokio::test]
1184 async fn test_start_file_watcher_error_path() {
1185 let mut temp_file = NamedTempFile::new().unwrap();
1186 writeln!(temp_file, "ERROR: Test error").unwrap();
1187 temp_file.flush().unwrap();
1188
1189 let config = create_test_config();
1190 let watcher = LogWatcher::new(config);
1191
1192 let (tx, _rx) = tokio::sync::mpsc::channel::<FileEvent>(1);
1194 let file_path = temp_file.path().to_path_buf();
1195
1196 let result = watcher.start_file_watcher(file_path, tx).await;
1198 assert!(result.is_ok());
1199 }
1200
1201 #[tokio::test]
1202 async fn test_poll_file_changes_seek_coverage() {
1203 let mut temp_file = NamedTempFile::new().unwrap();
1204 writeln!(temp_file, "ERROR: Test error").unwrap();
1205 writeln!(temp_file, "INFO: Normal operation").unwrap();
1206 temp_file.flush().unwrap();
1207
1208 let config = create_test_config();
1209 let _watcher = LogWatcher::new(config);
1210
1211 let result = LogWatcher::poll_file_changes(
1213 &temp_file.path().to_path_buf(),
1214 0, 1024,
1216 )
1217 .await;
1218
1219 assert!(result.is_ok());
1220 let (new_size, lines) = result.unwrap();
1221 assert!(new_size > 0);
1222 assert!(!lines.is_empty());
1223 }
1224
1225 #[tokio::test]
1226 async fn test_process_line_notification_coverage() {
1227 let mut temp_file = NamedTempFile::new().unwrap();
1228 writeln!(temp_file, "ERROR: Test error").unwrap();
1229 temp_file.flush().unwrap();
1230
1231 let mut config = create_test_config();
1232 config.notify_enabled = true;
1233 config.notify_patterns = vec!["ERROR".to_string()];
1234
1235 let mut watcher = LogWatcher::new(config);
1236
1237 let result = watcher
1239 .process_line(temp_file.path(), "ERROR: Critical error occurred")
1240 .await;
1241
1242 if let Err(e) = &result {
1244 eprintln!("Notification test failed with error: {}", e);
1245 let error_msg = e.to_string();
1246 if error_msg.contains("can only be set once") || error_msg.contains("org.freedesktop.DBus.Error.ServiceUnknown") || error_msg.contains("not provided by any .service files")
1250 {
1252 assert_eq!(watcher.stats.notifications_sent, 0);
1255 return;
1256 }
1257 }
1258
1259 assert!(result.is_ok());
1260 assert_eq!(watcher.stats.notifications_sent, 1);
1261 }
1262
1263 #[tokio::test]
1264 async fn test_channel_send_error_comprehensive() {
1265 let mut temp_file = NamedTempFile::new().unwrap();
1266 writeln!(temp_file, "ERROR: Test error").unwrap();
1267 temp_file.flush().unwrap();
1268
1269 let config = create_test_config();
1270 let _watcher = LogWatcher::new(config);
1271
1272 let (tx, rx) = tokio::sync::mpsc::channel::<FileEvent>(1);
1274 drop(rx);
1275
1276 let events = vec![
1278 FileEvent::NewLine {
1279 file_path: temp_file.path().to_path_buf(),
1280 line: "ERROR: Test".to_string(),
1281 },
1282 FileEvent::FileError {
1283 file_path: temp_file.path().to_path_buf(),
1284 error: notify::Error::generic("Test error"),
1285 },
1286 ];
1287
1288 for event in events {
1289 let result = tx.send(event).await;
1290 assert!(result.is_err());
1291 }
1292 }
1293
1294 #[tokio::test]
1295 async fn test_try_send_error_coverage() {
1296 let mut temp_file = NamedTempFile::new().unwrap();
1297 writeln!(temp_file, "ERROR: Test error").unwrap();
1298 temp_file.flush().unwrap();
1299
1300 let config = create_test_config();
1301 let _watcher = LogWatcher::new(config);
1302
1303 let (tx, rx) = tokio::sync::mpsc::channel::<FileEvent>(1);
1305 drop(rx);
1306
1307 let events = vec![FileEvent::FileError {
1309 file_path: temp_file.path().to_path_buf(),
1310 error: notify::Error::generic("Test error"),
1311 }];
1312
1313 for event in events {
1314 let result = tx.try_send(event);
1315 assert!(result.is_err());
1316 }
1317 }
1318
1319 #[tokio::test]
1320 async fn test_file_name_unwrap_coverage() {
1321 let mut temp_file = NamedTempFile::new().unwrap();
1322 writeln!(temp_file, "ERROR: Test error").unwrap();
1323 temp_file.flush().unwrap();
1324
1325 let mut config = create_test_config();
1326 config.notify_enabled = true;
1327 config.notify_patterns = vec!["ERROR".to_string()];
1328
1329 let mut watcher = LogWatcher::new(config);
1330
1331 let result = watcher
1333 .process_line(temp_file.path(), "ERROR: Critical error occurred")
1334 .await;
1335
1336 if let Err(e) = &result {
1338 eprintln!("Notification test failed with error: {}", e);
1339 let error_msg = e.to_string();
1340 if error_msg.contains("can only be set once") || error_msg.contains("org.freedesktop.DBus.Error.ServiceUnknown") || error_msg.contains("not provided by any .service files")
1344 {
1346 assert_eq!(watcher.stats.notifications_sent, 0);
1349 return;
1350 }
1351 }
1352
1353 assert!(result.is_ok());
1354 }
1355
1356 #[tokio::test]
1357 async fn test_startup_info_coverage_line_47() {
1358 let mut temp_file = NamedTempFile::new().unwrap();
1359 writeln!(temp_file, "ERROR: Test error").unwrap();
1360 temp_file.flush().unwrap();
1361
1362 let mut config = create_test_config();
1363 config.files = vec![temp_file.path().to_path_buf()];
1364 config.dry_run = true;
1365
1366 let mut watcher = LogWatcher::new(config);
1367
1368 let result = watcher.run().await;
1370 assert!(result.is_ok());
1371 }
1372
1373 #[tokio::test]
1374 async fn test_dry_run_summary_coverage_line_82() {
1375 let mut temp_file = NamedTempFile::new().unwrap();
1376 writeln!(temp_file, "ERROR: Test error").unwrap();
1377 writeln!(temp_file, "WARN: Test warning").unwrap();
1378 temp_file.flush().unwrap();
1379
1380 let mut config = create_test_config();
1381 config.files = vec![temp_file.path().to_path_buf()];
1382 config.patterns = vec!["ERROR".to_string(), "WARN".to_string()];
1383 config.dry_run = true;
1384
1385 let mut watcher = LogWatcher::new(config);
1386
1387 let result = watcher.run().await;
1389 assert!(result.is_ok());
1390 }
1391
1392 #[tokio::test]
1393 async fn test_file_event_match_coverage_lines_111_119() {
1394 let mut temp_file = NamedTempFile::new().unwrap();
1395 writeln!(temp_file, "ERROR: Test error").unwrap();
1396 temp_file.flush().unwrap();
1397
1398 let mut config = create_test_config();
1399 config.files = vec![temp_file.path().to_path_buf()];
1400
1401 let mut watcher = LogWatcher::new(config);
1402
1403 let events = vec![
1405 FileEvent::NewLine {
1406 file_path: temp_file.path().to_path_buf(),
1407 line: "ERROR: Test error".to_string(),
1408 },
1409 FileEvent::FileRotated {
1410 file_path: temp_file.path().to_path_buf(),
1411 },
1412 FileEvent::FileError {
1413 file_path: temp_file.path().to_path_buf(),
1414 error: notify::Error::generic("Test error"),
1415 },
1416 ];
1417
1418 for event in events {
1419 let result = match event {
1420 FileEvent::NewLine { file_path, line } => {
1421 watcher.process_line(&file_path, &line).await
1422 }
1423 FileEvent::FileRotated { file_path } => {
1424 watcher.handle_file_rotation(&file_path).await
1425 }
1426 FileEvent::FileError { file_path, error } => watcher
1427 .highlighter
1428 .print_file_error(&file_path.display().to_string(), &error.to_string()),
1429 };
1430 assert!(result.is_ok());
1431 }
1432 }
1433
1434 #[tokio::test]
1435 async fn test_error_handling_coverage_lines_142_189() {
1436 let mut temp_file = NamedTempFile::new().unwrap();
1437 writeln!(temp_file, "ERROR: Test error").unwrap();
1438 temp_file.flush().unwrap();
1439
1440 let config = create_test_config();
1441 let _watcher = LogWatcher::new(config);
1442
1443 let (tx, rx) = tokio::sync::mpsc::channel::<FileEvent>(1);
1445 drop(rx); let result = tx.try_send(FileEvent::FileError {
1449 file_path: temp_file.path().to_path_buf(),
1450 error: notify::Error::generic("Test error"),
1451 });
1452 assert!(result.is_err());
1453
1454 let (tx2, rx2) = tokio::sync::mpsc::channel::<FileEvent>(1);
1456 drop(rx2); let result = tx2
1459 .send(FileEvent::FileError {
1460 file_path: temp_file.path().to_path_buf(),
1461 error: notify::Error::generic("Test error"),
1462 })
1463 .await;
1464 assert!(result.is_err());
1465 }
1466
1467 #[tokio::test]
1468 async fn test_seek_operation_coverage_line_216() {
1469 let mut temp_file = NamedTempFile::new().unwrap();
1470 writeln!(temp_file, "ERROR: Test error").unwrap();
1471 writeln!(temp_file, "INFO: Normal operation").unwrap();
1472 temp_file.flush().unwrap();
1473
1474 let config = create_test_config();
1475 let _watcher = LogWatcher::new(config);
1476
1477 let result = LogWatcher::poll_file_changes(
1479 &temp_file.path().to_path_buf(),
1480 0, 1024,
1482 )
1483 .await;
1484
1485 assert!(result.is_ok());
1486 let (new_size, lines) = result.unwrap();
1487 assert!(new_size > 0);
1488 assert!(!lines.is_empty());
1489 }
1490
1491 #[tokio::test]
1492 async fn test_notification_success_coverage_line_283() {
1493 let mut temp_file = NamedTempFile::new().unwrap();
1494 writeln!(temp_file, "ERROR: Test error").unwrap();
1495 temp_file.flush().unwrap();
1496
1497 let mut config = create_test_config();
1498 config.notify_enabled = true;
1499 config.notify_patterns = vec!["ERROR".to_string()];
1500
1501 let mut watcher = LogWatcher::new(config);
1502
1503 let result = watcher
1505 .process_line(temp_file.path(), "ERROR: Critical error occurred")
1506 .await;
1507
1508 if let Err(e) = &result {
1510 eprintln!("Notification test failed with error: {}", e);
1511 let error_msg = e.to_string();
1512 if error_msg.contains("can only be set once") || error_msg.contains("org.freedesktop.DBus.Error.ServiceUnknown") || error_msg.contains("not provided by any .service files")
1516 {
1518 assert_eq!(watcher.stats.notifications_sent, 0);
1521 return;
1522 }
1523 }
1524
1525 assert!(result.is_ok());
1526 if watcher.stats.notifications_sent > 0 {
1528 assert_eq!(watcher.stats.notifications_sent, 1);
1529 }
1530 }
1531
1532 #[tokio::test]
1533 async fn test_channel_send_error_coverage_line_177() {
1534 let mut temp_file = NamedTempFile::new().unwrap();
1535 writeln!(temp_file, "ERROR: Test error").unwrap();
1536 temp_file.flush().unwrap();
1537
1538 let config = create_test_config();
1539 let _watcher = LogWatcher::new(config);
1540
1541 let (tx, rx) = tokio::sync::mpsc::channel::<FileEvent>(1);
1543 drop(rx);
1544
1545 let result = tx
1547 .send(FileEvent::NewLine {
1548 file_path: temp_file.path().to_path_buf(),
1549 line: "ERROR: Test".to_string(),
1550 })
1551 .await;
1552
1553 assert!(result.is_err());
1554 }
1555
1556 #[tokio::test]
1557 async fn test_poll_file_changes_with_seek_coverage_line_216() {
1558 let mut temp_file = NamedTempFile::new().unwrap();
1559 writeln!(temp_file, "ERROR: Test error").unwrap();
1560 writeln!(temp_file, "INFO: Normal operation").unwrap();
1561 writeln!(temp_file, "WARN: Warning message").unwrap();
1562 temp_file.flush().unwrap();
1563
1564 let config = create_test_config();
1565 let _watcher = LogWatcher::new(config);
1566
1567 let result = LogWatcher::poll_file_changes(
1569 &temp_file.path().to_path_buf(),
1570 10, 1024,
1572 )
1573 .await;
1574
1575 assert!(result.is_ok());
1576 let (new_size, _lines) = result.unwrap();
1577 assert!(new_size > 0);
1578 }
1579
1580 #[tokio::test]
1581 async fn test_comprehensive_file_event_processing() {
1582 let mut temp_file = NamedTempFile::new().unwrap();
1583 writeln!(temp_file, "ERROR: Test error").unwrap();
1584 temp_file.flush().unwrap();
1585
1586 let mut config = create_test_config();
1587 config.files = vec![temp_file.path().to_path_buf()];
1588
1589 let mut watcher = LogWatcher::new(config);
1590
1591 let file_path = temp_file.path().to_path_buf();
1593
1594 let result = watcher
1596 .process_line(&file_path, "ERROR: New error occurred")
1597 .await;
1598 assert!(result.is_ok());
1599
1600 let result = watcher.handle_file_rotation(&file_path).await;
1602 assert!(result.is_ok());
1603
1604 let result = watcher
1606 .highlighter
1607 .print_file_error(&file_path.display().to_string(), "Test error");
1608 assert!(result.is_ok());
1609 }
1610}