log_watcher/
watcher.rs

1use crate::config::Config;
2use crate::highlighter::{Highlighter, WatcherStats};
3use crate::matcher::Matcher;
4use crate::notifier::Notifier;
5use crate::utils::{get_file_size, validate_files};
6use anyhow::Result;
7use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
8use std::collections::HashMap;
9use std::fs::File;
10use std::io::{BufRead, BufReader, Seek, SeekFrom};
11use std::path::{Path, PathBuf};
12use std::time::Duration;
13use tokio::sync::mpsc;
14use tokio::time::sleep;
15use tracing::{error, info};
16
17#[derive(Debug)]
18pub struct LogWatcher {
19    config: Config,
20    matcher: Matcher,
21    highlighter: Highlighter,
22    notifier: Notifier,
23    stats: WatcherStats,
24}
25
26impl LogWatcher {
27    pub fn new(config: Config) -> Self {
28        let matcher = Matcher::new(config.clone());
29        let highlighter = Highlighter::new(config.clone());
30        let notifier = Notifier::new(config.clone());
31
32        Self {
33            config,
34            matcher,
35            highlighter,
36            notifier,
37            stats: WatcherStats::default(),
38        }
39    }
40
41    pub async fn run(&mut self) -> Result<()> {
42        // Validate files
43        let valid_files = validate_files(&self.config.files)?;
44        self.stats.files_watched = valid_files.len();
45
46        // Print startup information
47        self.highlighter.print_startup_info()?;
48
49        if self.config.dry_run {
50            self.run_dry_mode(&valid_files).await?;
51        } else {
52            self.run_tail_mode(&valid_files).await?;
53        }
54
55        // Print shutdown summary
56        self.highlighter.print_shutdown_summary(&self.stats)?;
57
58        Ok(())
59    }
60
61    async fn run_dry_mode(&mut self, files: &[PathBuf]) -> Result<()> {
62        info!("Running in dry-run mode");
63
64        let mut pattern_counts: HashMap<String, usize> = HashMap::new();
65
66        for file_path in files {
67            match self.process_existing_file(file_path).await {
68                Ok(matches) => {
69                    for (pattern, count) in matches {
70                        *pattern_counts.entry(pattern).or_insert(0) += count;
71                    }
72                }
73                Err(e) => {
74                    self.highlighter
75                        .print_file_error(&file_path.display().to_string(), &e.to_string())?;
76                }
77            }
78        }
79
80        // Print summary
81        let summary: Vec<(String, usize)> = pattern_counts.into_iter().collect();
82        self.highlighter.print_dry_run_summary(&summary)?;
83
84        Ok(())
85    }
86
87    async fn run_tail_mode(&mut self, files: &[PathBuf]) -> Result<()> {
88        info!("Running in tail mode");
89
90        // Create channels for file events
91        let (tx, mut rx) = mpsc::channel::<FileEvent>(100);
92
93        // Start file watchers
94        let mut watchers = Vec::new();
95        for file_path in files {
96            let tx_clone = tx.clone();
97            let file_path_clone = file_path.clone();
98
99            match self.start_file_watcher(file_path_clone, tx_clone).await {
100                Ok(watcher) => watchers.push(watcher),
101                Err(e) => {
102                    self.highlighter
103                        .print_file_error(&file_path.display().to_string(), &e.to_string())?;
104                }
105            }
106        }
107
108        // Process file events
109        while let Some(event) = rx.recv().await {
110            match event {
111                FileEvent::NewLine { file_path, line } => {
112                    self.process_line(&file_path, &line).await?;
113                }
114                FileEvent::FileRotated { file_path } => {
115                    self.handle_file_rotation(&file_path).await?;
116                }
117                FileEvent::FileError { file_path, error } => {
118                    self.highlighter
119                        .print_file_error(&file_path.display().to_string(), &error.to_string())?;
120                }
121            }
122        }
123
124        Ok(())
125    }
126
127    async fn start_file_watcher(
128        &self,
129        file_path: PathBuf,
130        tx: mpsc::Sender<FileEvent>,
131    ) -> Result<RecommendedWatcher> {
132        let file_path_clone = file_path.clone();
133        let tx_clone = tx.clone();
134
135        let mut watcher = notify::recommended_watcher(move |res: Result<Event, notify::Error>| {
136            match res {
137                Ok(event) => {
138                    if matches!(event.kind, EventKind::Modify(_)) {
139                        // File was modified, we'll poll for new content
140                    }
141                }
142                Err(e) => {
143                    let _ = tx_clone.try_send(FileEvent::FileError {
144                        file_path: file_path_clone.clone(),
145                        error: e,
146                    });
147                }
148            }
149        })?;
150
151        watcher.watch(&file_path, RecursiveMode::NonRecursive)?;
152
153        // Start polling task for this file
154        let file_path_clone = file_path.clone();
155        let tx_clone = tx.clone();
156        let poll_interval = self.config.poll_interval;
157        let buffer_size = self.config.buffer_size;
158
159        tokio::spawn(async move {
160            let mut last_size = get_file_size(&file_path_clone).unwrap_or(0);
161
162            loop {
163                sleep(Duration::from_millis(poll_interval)).await;
164
165                match Self::poll_file_changes(&file_path_clone, last_size, buffer_size).await {
166                    Ok((new_size, new_lines)) => {
167                        last_size = new_size;
168
169                        for line in new_lines {
170                            if let Err(e) = tx_clone
171                                .send(FileEvent::NewLine {
172                                    file_path: file_path_clone.clone(),
173                                    line,
174                                })
175                                .await
176                            {
177                                error!("Failed to send line event: {}", e);
178                                break;
179                            }
180                        }
181                    }
182                    Err(e) => {
183                        let _ = tx_clone
184                            .send(FileEvent::FileError {
185                                file_path: file_path_clone.clone(),
186                                error: notify::Error::generic(&e.to_string()),
187                            })
188                            .await;
189                        break;
190                    }
191                }
192            }
193        });
194
195        Ok(watcher)
196    }
197
198    async fn poll_file_changes(
199        file_path: &PathBuf,
200        last_size: u64,
201        buffer_size: usize,
202    ) -> Result<(u64, Vec<String>)> {
203        let current_size = get_file_size(file_path)?;
204
205        if current_size < last_size {
206            // File was rotated
207            return Err(anyhow::anyhow!("File rotation detected"));
208        }
209
210        if current_size > last_size {
211            // File has new content
212            let file = File::open(file_path)?;
213            let mut reader = BufReader::with_capacity(buffer_size, file);
214
215            // Seek to last position
216            reader.seek(SeekFrom::Start(last_size))?;
217
218            let mut lines = Vec::new();
219            let mut line = String::new();
220
221            while reader.read_line(&mut line)? > 0 {
222                if !line.trim().is_empty() {
223                    lines.push(line.trim().to_string());
224                }
225                line.clear();
226            }
227
228            Ok((current_size, lines))
229        } else {
230            Ok((current_size, Vec::new()))
231        }
232    }
233
234    async fn process_existing_file(
235        &mut self,
236        file_path: &PathBuf,
237    ) -> Result<HashMap<String, usize>> {
238        let mut pattern_counts: HashMap<String, usize> = HashMap::new();
239
240        let file = File::open(file_path)?;
241        let reader = BufReader::new(file);
242
243        for line_result in reader.lines() {
244            let line = line_result?;
245            self.stats.lines_processed += 1;
246            let match_result = self.matcher.match_line(&line);
247
248            if match_result.matched {
249                self.stats.matches_found += 1;
250                if let Some(pattern) = &match_result.pattern {
251                    *pattern_counts.entry(pattern.clone()).or_insert(0) += 1;
252                }
253
254                self.highlighter.print_line(
255                    &line,
256                    Some(&file_path.file_name().unwrap().to_string_lossy()),
257                    &match_result,
258                    true, // dry run
259                )?;
260            }
261        }
262
263        Ok(pattern_counts)
264    }
265
266    async fn process_line(&mut self, file_path: &Path, line: &str) -> Result<()> {
267        self.stats.lines_processed += 1;
268
269        let match_result = self.matcher.match_line(line);
270
271        if match_result.matched {
272            self.stats.matches_found += 1;
273
274            // Send notification if needed
275            if match_result.should_notify {
276                if let Some(pattern) = &match_result.pattern {
277                    self.notifier
278                        .send_notification(
279                            pattern,
280                            line,
281                            Some(&file_path.file_name().unwrap().to_string_lossy()),
282                        )
283                        .await?;
284                    self.stats.notifications_sent += 1;
285                }
286            }
287        }
288
289        // Print the line
290        self.highlighter.print_line(
291            line,
292            Some(&file_path.file_name().unwrap().to_string_lossy()),
293            &match_result,
294            false, // not dry run
295        )?;
296
297        Ok(())
298    }
299
300    async fn handle_file_rotation(&mut self, file_path: &Path) -> Result<()> {
301        self.highlighter
302            .print_file_rotation(&file_path.display().to_string())?;
303
304        // Wait a bit for the new file to be created
305        sleep(Duration::from_millis(1000)).await;
306
307        // Try to reopen the file
308        if file_path.exists() {
309            self.highlighter
310                .print_file_reopened(&file_path.display().to_string())?;
311        } else {
312            self.highlighter.print_file_error(
313                &file_path.display().to_string(),
314                "File not found after rotation",
315            )?;
316        }
317
318        Ok(())
319    }
320}
321
322#[derive(Debug)]
323enum FileEvent {
324    NewLine {
325        file_path: PathBuf,
326        line: String,
327    },
328    #[allow(dead_code)]
329    FileRotated {
330        file_path: PathBuf,
331    },
332    FileError {
333        file_path: PathBuf,
334        error: notify::Error,
335    },
336}
337
338#[cfg(test)]
339mod tests {
340    use super::*;
341    use crate::cli::Args;
342    use std::io::Write;
343    use tempfile::NamedTempFile;
344
345    fn create_test_config() -> Config {
346        let args = Args {
347            files: vec![PathBuf::from("test.log")],
348            patterns: "ERROR".to_string(),
349            regex: false,
350            case_insensitive: false,
351            color_map: None,
352            notify: false,
353            notify_patterns: None,
354            notify_throttle: 5,
355            dry_run: true,
356            quiet: false,
357            no_color: true,
358            prefix_file: None,
359            poll_interval: 100,
360            buffer_size: 8192,
361        };
362        Config::from_args(&args).unwrap()
363    }
364
365    #[tokio::test]
366    async fn test_dry_run_mode() {
367        let mut temp_file = NamedTempFile::new().unwrap();
368        writeln!(temp_file, "This is an ERROR message").unwrap();
369        writeln!(temp_file, "This is a normal message").unwrap();
370        writeln!(temp_file, "Another ERROR message").unwrap();
371        temp_file.flush().unwrap();
372
373        let mut config = create_test_config();
374        config.files = vec![temp_file.path().to_path_buf()];
375
376        let mut watcher = LogWatcher::new(config);
377        let result = watcher.run().await;
378
379        assert!(result.is_ok());
380        assert_eq!(watcher.stats.matches_found, 2);
381    }
382
383    #[test]
384    fn test_poll_file_changes() {
385        let mut temp_file = NamedTempFile::new().unwrap();
386        writeln!(temp_file, "line 1").unwrap();
387        temp_file.flush().unwrap();
388
389        let initial_size = get_file_size(temp_file.path()).unwrap();
390
391        writeln!(temp_file, "line 2").unwrap();
392        temp_file.flush().unwrap();
393
394        let rt = tokio::runtime::Runtime::new().unwrap();
395        let result = rt.block_on(LogWatcher::poll_file_changes(
396            &temp_file.path().to_path_buf(),
397            initial_size,
398            1024,
399        ));
400
401        assert!(result.is_ok());
402        let (new_size, lines) = result.unwrap();
403        assert!(new_size > initial_size);
404        assert_eq!(lines.len(), 1);
405        assert_eq!(lines[0], "line 2");
406    }
407
408    #[tokio::test]
409    async fn test_process_existing_file() {
410        let mut temp_file = NamedTempFile::new().unwrap();
411        writeln!(temp_file, "ERROR: Something went wrong").unwrap();
412        writeln!(temp_file, "INFO: Normal operation").unwrap();
413        temp_file.flush().unwrap();
414
415        let config = create_test_config();
416        let mut watcher = LogWatcher::new(config);
417
418        // Test processing existing file content
419        let result = watcher
420            .process_existing_file(&temp_file.path().to_path_buf())
421            .await;
422        assert!(result.is_ok());
423    }
424
425    #[tokio::test]
426    async fn test_process_line() {
427        let mut temp_file = NamedTempFile::new().unwrap();
428        writeln!(temp_file, "ERROR: Test error").unwrap();
429        temp_file.flush().unwrap();
430
431        let config = create_test_config();
432        let mut watcher = LogWatcher::new(config);
433
434        // Test processing a line
435        let result = watcher
436            .process_line(temp_file.path(), "ERROR: Test error")
437            .await;
438        assert!(result.is_ok());
439    }
440
441    #[tokio::test]
442    async fn test_handle_file_rotation() {
443        let mut temp_file = NamedTempFile::new().unwrap();
444        writeln!(temp_file, "ERROR: Test error").unwrap();
445        temp_file.flush().unwrap();
446
447        let config = create_test_config();
448        let mut watcher = LogWatcher::new(config);
449
450        // Test file rotation handling
451        let result = watcher.handle_file_rotation(temp_file.path()).await;
452        assert!(result.is_ok());
453    }
454
455    #[tokio::test]
456    async fn test_run_with_startup_info() {
457        let mut temp_file = NamedTempFile::new().unwrap();
458        writeln!(temp_file, "ERROR: Test error").unwrap();
459        temp_file.flush().unwrap();
460
461        let mut config = create_test_config();
462        config.files = vec![temp_file.path().to_path_buf()];
463        config.dry_run = true;
464
465        let mut watcher = LogWatcher::new(config);
466        let result = watcher.run().await;
467        assert!(result.is_ok());
468    }
469
470    #[tokio::test]
471    async fn test_run_tail_mode_execution() {
472        let mut temp_file = NamedTempFile::new().unwrap();
473        writeln!(temp_file, "ERROR: Test error").unwrap();
474        temp_file.flush().unwrap();
475
476        let mut config = create_test_config();
477        config.files = vec![temp_file.path().to_path_buf()];
478        config.dry_run = false; // Enable tail mode
479
480        let mut watcher = LogWatcher::new(config);
481
482        // Use a short timeout to avoid hanging
483        let result =
484            tokio::time::timeout(std::time::Duration::from_millis(100), watcher.run()).await;
485
486        // Should timeout (which is expected for this test)
487        assert!(result.is_err());
488    }
489
490    #[test]
491    fn test_run_tail_mode() {
492        let mut temp_file = NamedTempFile::new().unwrap();
493        writeln!(temp_file, "ERROR: Test error").unwrap();
494        temp_file.flush().unwrap();
495
496        let config = create_test_config();
497        let mut watcher = LogWatcher::new(config);
498
499        // Test tail mode (short timeout to avoid hanging)
500        let rt = tokio::runtime::Runtime::new().unwrap();
501        let files = vec![temp_file.path().to_path_buf()];
502
503        // Use a short timeout for testing
504        let result = rt.block_on(async {
505            tokio::time::timeout(
506                std::time::Duration::from_millis(100),
507                watcher.run_tail_mode(&files),
508            )
509            .await
510        });
511
512        // Should timeout (which is expected for this test)
513        assert!(result.is_err());
514    }
515
516    #[tokio::test]
517    async fn test_dry_run_with_file_error() {
518        // Create a config with a non-existent file to trigger error handling
519        let mut config = create_test_config();
520        config.files = vec![PathBuf::from("/non/existent/file.log")];
521        config.dry_run = true;
522
523        let mut watcher = LogWatcher::new(config);
524        let result = watcher.run().await;
525
526        // Should fail because no valid files are available to watch
527        assert!(result.is_err());
528        assert!(result
529            .unwrap_err()
530            .to_string()
531            .contains("No valid files to watch"));
532    }
533
534    #[tokio::test]
535    async fn test_dry_run_summary_with_multiple_patterns() {
536        let mut temp_file = NamedTempFile::new().unwrap();
537        writeln!(temp_file, "ERROR: Something went wrong").unwrap();
538        writeln!(temp_file, "WARN: This is a warning").unwrap();
539        writeln!(temp_file, "INFO: Normal operation").unwrap();
540        writeln!(temp_file, "ERROR: Another error").unwrap();
541        temp_file.flush().unwrap();
542
543        let mut config = create_test_config();
544        config.files = vec![temp_file.path().to_path_buf()];
545        config.patterns = vec!["ERROR".to_string(), "WARN".to_string()];
546        config.dry_run = true;
547
548        let mut watcher = LogWatcher::new(config);
549        let result = watcher.run().await;
550        assert!(result.is_ok());
551        assert_eq!(watcher.stats.matches_found, 3); // 2 ERROR + 1 WARN
552    }
553
554    #[tokio::test]
555    async fn test_poll_file_changes_with_rotation() {
556        let mut temp_file = NamedTempFile::new().unwrap();
557        writeln!(temp_file, "line 1").unwrap();
558        temp_file.flush().unwrap();
559
560        let initial_size = get_file_size(temp_file.path()).unwrap();
561
562        // Simulate file rotation by truncating the file
563        temp_file.as_file_mut().set_len(0).unwrap();
564        temp_file.flush().unwrap();
565
566        let result =
567            LogWatcher::poll_file_changes(&temp_file.path().to_path_buf(), initial_size, 1024)
568                .await;
569
570        // Should detect file rotation
571        assert!(result.is_err());
572        assert!(result
573            .unwrap_err()
574            .to_string()
575            .contains("File rotation detected"));
576    }
577
578    #[tokio::test]
579    async fn test_poll_file_changes_no_new_content() {
580        let mut temp_file = NamedTempFile::new().unwrap();
581        writeln!(temp_file, "line 1").unwrap();
582        temp_file.flush().unwrap();
583
584        let initial_size = get_file_size(temp_file.path()).unwrap();
585
586        let result =
587            LogWatcher::poll_file_changes(&temp_file.path().to_path_buf(), initial_size, 1024)
588                .await;
589
590        assert!(result.is_ok());
591        let (new_size, lines) = result.unwrap();
592        assert_eq!(new_size, initial_size);
593        assert_eq!(lines.len(), 0);
594    }
595
596    #[tokio::test]
597    async fn test_poll_file_changes_with_seeking() {
598        let mut temp_file = NamedTempFile::new().unwrap();
599        writeln!(temp_file, "line 1").unwrap();
600        writeln!(temp_file, "line 2").unwrap();
601        temp_file.flush().unwrap();
602
603        let initial_size = get_file_size(temp_file.path()).unwrap();
604
605        // Add more content
606        writeln!(temp_file, "line 3").unwrap();
607        writeln!(temp_file, "line 4").unwrap();
608        temp_file.flush().unwrap();
609
610        let result =
611            LogWatcher::poll_file_changes(&temp_file.path().to_path_buf(), initial_size, 1024)
612                .await;
613
614        assert!(result.is_ok());
615        let (new_size, lines) = result.unwrap();
616        assert!(new_size > initial_size);
617        assert_eq!(lines.len(), 2);
618        assert_eq!(lines[0], "line 3");
619        assert_eq!(lines[1], "line 4");
620    }
621
622    #[tokio::test]
623    async fn test_process_line_with_notification() {
624        let mut temp_file = NamedTempFile::new().unwrap();
625        writeln!(temp_file, "ERROR: Test error").unwrap();
626        temp_file.flush().unwrap();
627
628        let mut config = create_test_config();
629        config.notify_enabled = true;
630        config.notify_patterns = vec!["ERROR".to_string()];
631
632        let mut watcher = LogWatcher::new(config);
633
634        // Test processing a line that should trigger notification
635        let result = watcher
636            .process_line(temp_file.path(), "ERROR: Critical error occurred")
637            .await;
638
639        // Check if the result is ok, if not print the error for debugging
640        if let Err(e) = &result {
641            eprintln!("Notification test failed with error: {}", e);
642            let error_msg = e.to_string();
643            // Handle different notification system errors across platforms
644            if error_msg.contains("can only be set once") || // macOS
645               error_msg.contains("org.freedesktop.DBus.Error.ServiceUnknown") || // Linux
646               error_msg.contains("not provided by any .service files")
647            // Linux D-Bus
648            {
649                // This is expected behavior in test environment, so we consider it a success
650                // The notification counter is 0 because the notification failed before being sent
651                assert_eq!(watcher.stats.notifications_sent, 0);
652                return;
653            }
654        }
655
656        assert!(result.is_ok());
657        assert_eq!(watcher.stats.notifications_sent, 1);
658    }
659
660    #[tokio::test]
661    async fn test_process_line_without_notification() {
662        let mut temp_file = NamedTempFile::new().unwrap();
663        writeln!(temp_file, "INFO: Normal operation").unwrap();
664        temp_file.flush().unwrap();
665
666        let mut config = create_test_config();
667        config.notify_enabled = true;
668        config.notify_patterns = vec!["ERROR".to_string()];
669
670        let mut watcher = LogWatcher::new(config);
671
672        // Test processing a line that should not trigger notification
673        let result = watcher
674            .process_line(temp_file.path(), "INFO: Normal operation")
675            .await;
676        assert!(result.is_ok());
677        assert_eq!(watcher.stats.notifications_sent, 0);
678    }
679
680    #[tokio::test]
681    async fn test_handle_file_rotation_file_not_found() {
682        let config = create_test_config();
683        let mut watcher = LogWatcher::new(config);
684
685        // Test file rotation handling with a non-existent file
686        let result = watcher
687            .handle_file_rotation(&PathBuf::from("/non/existent/file.log"))
688            .await;
689        assert!(result.is_ok());
690    }
691
692    #[tokio::test]
693    async fn test_start_file_watcher() {
694        let mut temp_file = NamedTempFile::new().unwrap();
695        writeln!(temp_file, "ERROR: Test error").unwrap();
696        temp_file.flush().unwrap();
697
698        let config = create_test_config();
699        let watcher = LogWatcher::new(config);
700
701        let (tx, _rx) = mpsc::channel::<FileEvent>(100);
702
703        // Test watcher creation
704        let result = watcher
705            .start_file_watcher(temp_file.path().to_path_buf(), tx)
706            .await;
707
708        assert!(result.is_ok());
709    }
710
711    #[tokio::test]
712    async fn test_file_event_processing() {
713        let mut temp_file = NamedTempFile::new().unwrap();
714        writeln!(temp_file, "ERROR: Test error").unwrap();
715        temp_file.flush().unwrap();
716
717        let mut config = create_test_config();
718        config.dry_run = false;
719
720        let mut watcher = LogWatcher::new(config);
721
722        // Test FileEvent::NewLine processing
723        let result = watcher
724            .process_line(temp_file.path(), "ERROR: New error occurred")
725            .await;
726        assert!(result.is_ok());
727        assert_eq!(watcher.stats.lines_processed, 1);
728        assert_eq!(watcher.stats.matches_found, 1);
729    }
730
731    #[tokio::test]
732    async fn test_process_existing_file_with_empty_file() {
733        let temp_file = NamedTempFile::new().unwrap();
734        // Don't write anything to create an empty file
735
736        let config = create_test_config();
737        let mut watcher = LogWatcher::new(config);
738
739        // Test processing empty file
740        let result = watcher
741            .process_existing_file(&temp_file.path().to_path_buf())
742            .await;
743        assert!(result.is_ok());
744        assert_eq!(watcher.stats.lines_processed, 0);
745        assert_eq!(watcher.stats.matches_found, 0);
746    }
747
748    #[tokio::test]
749    async fn test_process_existing_file_with_non_matching_content() {
750        let mut temp_file = NamedTempFile::new().unwrap();
751        writeln!(temp_file, "This is a normal message").unwrap();
752        writeln!(temp_file, "Another normal message").unwrap();
753        temp_file.flush().unwrap();
754
755        let config = create_test_config();
756        let mut watcher = LogWatcher::new(config);
757
758        // Test processing file with no matches
759        let result = watcher
760            .process_existing_file(&temp_file.path().to_path_buf())
761            .await;
762        assert!(result.is_ok());
763        assert_eq!(watcher.stats.lines_processed, 2);
764        assert_eq!(watcher.stats.matches_found, 0);
765    }
766}