1use crate::config::Config;
2use crate::highlighter::{Highlighter, WatcherStats};
3use crate::matcher::Matcher;
4use crate::notifier::Notifier;
5use crate::utils::{get_file_size, validate_files};
6use anyhow::Result;
7use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
8use std::collections::HashMap;
9use std::fs::File;
10use std::io::{BufRead, BufReader, Seek, SeekFrom};
11use std::path::{Path, PathBuf};
12use std::time::Duration;
13use tokio::sync::mpsc;
14use tokio::time::sleep;
15use tracing::{error, info};
16
17#[derive(Debug)]
18pub struct LogWatcher {
19 config: Config,
20 matcher: Matcher,
21 highlighter: Highlighter,
22 notifier: Notifier,
23 stats: WatcherStats,
24}
25
26impl LogWatcher {
27 pub fn new(config: Config) -> Self {
28 let matcher = Matcher::new(config.clone());
29 let highlighter = Highlighter::new(config.clone());
30 let notifier = Notifier::new(config.clone());
31
32 Self {
33 config,
34 matcher,
35 highlighter,
36 notifier,
37 stats: WatcherStats::default(),
38 }
39 }
40
41 pub async fn run(&mut self) -> Result<()> {
42 let valid_files = validate_files(&self.config.files)?;
44 self.stats.files_watched = valid_files.len();
45
46 self.highlighter.print_startup_info()?;
48
49 if self.config.dry_run {
50 self.run_dry_mode(&valid_files).await?;
51 } else {
52 self.run_tail_mode(&valid_files).await?;
53 }
54
55 self.highlighter.print_shutdown_summary(&self.stats)?;
57
58 Ok(())
59 }
60
61 async fn run_dry_mode(&mut self, files: &[PathBuf]) -> Result<()> {
62 info!("Running in dry-run mode");
63
64 let mut pattern_counts: HashMap<String, usize> = HashMap::new();
65
66 for file_path in files {
67 match self.process_existing_file(file_path).await {
68 Ok(matches) => {
69 for (pattern, count) in matches {
70 *pattern_counts.entry(pattern).or_insert(0) += count;
71 }
72 }
73 Err(e) => {
74 self.highlighter
75 .print_file_error(&file_path.display().to_string(), &e.to_string())?;
76 }
77 }
78 }
79
80 let summary: Vec<(String, usize)> = pattern_counts.into_iter().collect();
82 self.highlighter.print_dry_run_summary(&summary)?;
83
84 Ok(())
85 }
86
87 async fn run_tail_mode(&mut self, files: &[PathBuf]) -> Result<()> {
88 info!("Running in tail mode");
89
90 let (tx, mut rx) = mpsc::channel::<FileEvent>(100);
92
93 let mut watchers = Vec::new();
95 for file_path in files {
96 let tx_clone = tx.clone();
97 let file_path_clone = file_path.clone();
98
99 match self.start_file_watcher(file_path_clone, tx_clone).await {
100 Ok(watcher) => watchers.push(watcher),
101 Err(e) => {
102 self.highlighter
103 .print_file_error(&file_path.display().to_string(), &e.to_string())?;
104 }
105 }
106 }
107
108 while let Some(event) = rx.recv().await {
110 match event {
111 FileEvent::NewLine { file_path, line } => {
112 self.process_line(&file_path, &line).await?;
113 }
114 FileEvent::FileRotated { file_path } => {
115 self.handle_file_rotation(&file_path).await?;
116 }
117 FileEvent::FileError { file_path, error } => {
118 self.highlighter
119 .print_file_error(&file_path.display().to_string(), &error.to_string())?;
120 }
121 }
122 }
123
124 Ok(())
125 }
126
127 async fn start_file_watcher(
128 &self,
129 file_path: PathBuf,
130 tx: mpsc::Sender<FileEvent>,
131 ) -> Result<RecommendedWatcher> {
132 let file_path_clone = file_path.clone();
133 let tx_clone = tx.clone();
134
135 let mut watcher = notify::recommended_watcher(move |res: Result<Event, notify::Error>| {
136 match res {
137 Ok(event) => {
138 if matches!(event.kind, EventKind::Modify(_)) {
139 }
141 }
142 Err(e) => {
143 let _ = tx_clone.try_send(FileEvent::FileError {
144 file_path: file_path_clone.clone(),
145 error: e,
146 });
147 }
148 }
149 })?;
150
151 watcher.watch(&file_path, RecursiveMode::NonRecursive)?;
152
153 let file_path_clone = file_path.clone();
155 let tx_clone = tx.clone();
156 let poll_interval = self.config.poll_interval;
157 let buffer_size = self.config.buffer_size;
158
159 tokio::spawn(async move {
160 let mut last_size = get_file_size(&file_path_clone).unwrap_or(0);
161
162 loop {
163 sleep(Duration::from_millis(poll_interval)).await;
164
165 match Self::poll_file_changes(&file_path_clone, last_size, buffer_size).await {
166 Ok((new_size, new_lines)) => {
167 last_size = new_size;
168
169 for line in new_lines {
170 if let Err(e) = tx_clone
171 .send(FileEvent::NewLine {
172 file_path: file_path_clone.clone(),
173 line,
174 })
175 .await
176 {
177 error!("Failed to send line event: {}", e);
178 break;
179 }
180 }
181 }
182 Err(e) => {
183 let _ = tx_clone
184 .send(FileEvent::FileError {
185 file_path: file_path_clone.clone(),
186 error: notify::Error::generic(&e.to_string()),
187 })
188 .await;
189 break;
190 }
191 }
192 }
193 });
194
195 Ok(watcher)
196 }
197
198 async fn poll_file_changes(
199 file_path: &PathBuf,
200 last_size: u64,
201 buffer_size: usize,
202 ) -> Result<(u64, Vec<String>)> {
203 let current_size = get_file_size(file_path)?;
204
205 if current_size < last_size {
206 return Err(anyhow::anyhow!("File rotation detected"));
208 }
209
210 if current_size > last_size {
211 let file = File::open(file_path)?;
213 let mut reader = BufReader::with_capacity(buffer_size, file);
214
215 reader.seek(SeekFrom::Start(last_size))?;
217
218 let mut lines = Vec::new();
219 let mut line = String::new();
220
221 while reader.read_line(&mut line)? > 0 {
222 if !line.trim().is_empty() {
223 lines.push(line.trim().to_string());
224 }
225 line.clear();
226 }
227
228 Ok((current_size, lines))
229 } else {
230 Ok((current_size, Vec::new()))
231 }
232 }
233
234 async fn process_existing_file(
235 &mut self,
236 file_path: &PathBuf,
237 ) -> Result<HashMap<String, usize>> {
238 let mut pattern_counts: HashMap<String, usize> = HashMap::new();
239
240 let file = File::open(file_path)?;
241 let reader = BufReader::new(file);
242
243 for line_result in reader.lines() {
244 let line = line_result?;
245 self.stats.lines_processed += 1;
246 let match_result = self.matcher.match_line(&line);
247
248 if match_result.matched {
249 self.stats.matches_found += 1;
250 if let Some(pattern) = &match_result.pattern {
251 *pattern_counts.entry(pattern.clone()).or_insert(0) += 1;
252 }
253
254 self.highlighter.print_line(
255 &line,
256 Some(&file_path.file_name().unwrap().to_string_lossy()),
257 &match_result,
258 true, )?;
260 }
261 }
262
263 Ok(pattern_counts)
264 }
265
266 async fn process_line(&mut self, file_path: &Path, line: &str) -> Result<()> {
267 self.stats.lines_processed += 1;
268
269 let match_result = self.matcher.match_line(line);
270
271 if match_result.matched {
272 self.stats.matches_found += 1;
273
274 if match_result.should_notify {
276 if let Some(pattern) = &match_result.pattern {
277 self.notifier
278 .send_notification(
279 pattern,
280 line,
281 Some(&file_path.file_name().unwrap().to_string_lossy()),
282 )
283 .await?;
284 self.stats.notifications_sent += 1;
285 }
286 }
287 }
288
289 self.highlighter.print_line(
291 line,
292 Some(&file_path.file_name().unwrap().to_string_lossy()),
293 &match_result,
294 false, )?;
296
297 Ok(())
298 }
299
300 async fn handle_file_rotation(&mut self, file_path: &Path) -> Result<()> {
301 self.highlighter
302 .print_file_rotation(&file_path.display().to_string())?;
303
304 sleep(Duration::from_millis(1000)).await;
306
307 if file_path.exists() {
309 self.highlighter
310 .print_file_reopened(&file_path.display().to_string())?;
311 } else {
312 self.highlighter.print_file_error(
313 &file_path.display().to_string(),
314 "File not found after rotation",
315 )?;
316 }
317
318 Ok(())
319 }
320}
321
322#[derive(Debug)]
323enum FileEvent {
324 NewLine {
325 file_path: PathBuf,
326 line: String,
327 },
328 #[allow(dead_code)]
329 FileRotated {
330 file_path: PathBuf,
331 },
332 FileError {
333 file_path: PathBuf,
334 error: notify::Error,
335 },
336}
337
338#[cfg(test)]
339mod tests {
340 use super::*;
341 use crate::cli::Args;
342 use std::io::Write;
343 use tempfile::NamedTempFile;
344
345 fn create_test_config() -> Config {
346 let args = Args {
347 files: vec![PathBuf::from("test.log")],
348 patterns: "ERROR".to_string(),
349 regex: false,
350 case_insensitive: false,
351 color_map: None,
352 notify: false,
353 notify_patterns: None,
354 notify_throttle: 5,
355 dry_run: true,
356 quiet: false,
357 no_color: true,
358 prefix_file: None,
359 poll_interval: 100,
360 buffer_size: 8192,
361 };
362 Config::from_args(&args).unwrap()
363 }
364
365 #[tokio::test]
366 async fn test_dry_run_mode() {
367 let mut temp_file = NamedTempFile::new().unwrap();
368 writeln!(temp_file, "This is an ERROR message").unwrap();
369 writeln!(temp_file, "This is a normal message").unwrap();
370 writeln!(temp_file, "Another ERROR message").unwrap();
371 temp_file.flush().unwrap();
372
373 let mut config = create_test_config();
374 config.files = vec![temp_file.path().to_path_buf()];
375
376 let mut watcher = LogWatcher::new(config);
377 let result = watcher.run().await;
378
379 assert!(result.is_ok());
380 assert_eq!(watcher.stats.matches_found, 2);
381 }
382
383 #[test]
384 fn test_poll_file_changes() {
385 let mut temp_file = NamedTempFile::new().unwrap();
386 writeln!(temp_file, "line 1").unwrap();
387 temp_file.flush().unwrap();
388
389 let initial_size = get_file_size(temp_file.path()).unwrap();
390
391 writeln!(temp_file, "line 2").unwrap();
392 temp_file.flush().unwrap();
393
394 let rt = tokio::runtime::Runtime::new().unwrap();
395 let result = rt.block_on(LogWatcher::poll_file_changes(
396 &temp_file.path().to_path_buf(),
397 initial_size,
398 1024,
399 ));
400
401 assert!(result.is_ok());
402 let (new_size, lines) = result.unwrap();
403 assert!(new_size > initial_size);
404 assert_eq!(lines.len(), 1);
405 assert_eq!(lines[0], "line 2");
406 }
407
408 #[tokio::test]
409 async fn test_process_existing_file() {
410 let mut temp_file = NamedTempFile::new().unwrap();
411 writeln!(temp_file, "ERROR: Something went wrong").unwrap();
412 writeln!(temp_file, "INFO: Normal operation").unwrap();
413 temp_file.flush().unwrap();
414
415 let config = create_test_config();
416 let mut watcher = LogWatcher::new(config);
417
418 let result = watcher
420 .process_existing_file(&temp_file.path().to_path_buf())
421 .await;
422 assert!(result.is_ok());
423 }
424
425 #[tokio::test]
426 async fn test_process_line() {
427 let mut temp_file = NamedTempFile::new().unwrap();
428 writeln!(temp_file, "ERROR: Test error").unwrap();
429 temp_file.flush().unwrap();
430
431 let config = create_test_config();
432 let mut watcher = LogWatcher::new(config);
433
434 let result = watcher
436 .process_line(temp_file.path(), "ERROR: Test error")
437 .await;
438 assert!(result.is_ok());
439 }
440
441 #[tokio::test]
442 async fn test_handle_file_rotation() {
443 let mut temp_file = NamedTempFile::new().unwrap();
444 writeln!(temp_file, "ERROR: Test error").unwrap();
445 temp_file.flush().unwrap();
446
447 let config = create_test_config();
448 let mut watcher = LogWatcher::new(config);
449
450 let result = watcher.handle_file_rotation(temp_file.path()).await;
452 assert!(result.is_ok());
453 }
454
455 #[tokio::test]
456 async fn test_run_with_startup_info() {
457 let mut temp_file = NamedTempFile::new().unwrap();
458 writeln!(temp_file, "ERROR: Test error").unwrap();
459 temp_file.flush().unwrap();
460
461 let mut config = create_test_config();
462 config.files = vec![temp_file.path().to_path_buf()];
463 config.dry_run = true;
464
465 let mut watcher = LogWatcher::new(config);
466 let result = watcher.run().await;
467 assert!(result.is_ok());
468 }
469
470 #[tokio::test]
471 async fn test_run_tail_mode_execution() {
472 let mut temp_file = NamedTempFile::new().unwrap();
473 writeln!(temp_file, "ERROR: Test error").unwrap();
474 temp_file.flush().unwrap();
475
476 let mut config = create_test_config();
477 config.files = vec![temp_file.path().to_path_buf()];
478 config.dry_run = false; let mut watcher = LogWatcher::new(config);
481
482 let result =
484 tokio::time::timeout(std::time::Duration::from_millis(100), watcher.run()).await;
485
486 assert!(result.is_err());
488 }
489
490 #[test]
491 fn test_run_tail_mode() {
492 let mut temp_file = NamedTempFile::new().unwrap();
493 writeln!(temp_file, "ERROR: Test error").unwrap();
494 temp_file.flush().unwrap();
495
496 let config = create_test_config();
497 let mut watcher = LogWatcher::new(config);
498
499 let rt = tokio::runtime::Runtime::new().unwrap();
501 let files = vec![temp_file.path().to_path_buf()];
502
503 let result = rt.block_on(async {
505 tokio::time::timeout(
506 std::time::Duration::from_millis(100),
507 watcher.run_tail_mode(&files),
508 )
509 .await
510 });
511
512 assert!(result.is_err());
514 }
515
516 #[tokio::test]
517 async fn test_dry_run_with_file_error() {
518 let mut config = create_test_config();
520 config.files = vec![PathBuf::from("/non/existent/file.log")];
521 config.dry_run = true;
522
523 let mut watcher = LogWatcher::new(config);
524 let result = watcher.run().await;
525
526 assert!(result.is_err());
528 assert!(result
529 .unwrap_err()
530 .to_string()
531 .contains("No valid files to watch"));
532 }
533
534 #[tokio::test]
535 async fn test_dry_run_summary_with_multiple_patterns() {
536 let mut temp_file = NamedTempFile::new().unwrap();
537 writeln!(temp_file, "ERROR: Something went wrong").unwrap();
538 writeln!(temp_file, "WARN: This is a warning").unwrap();
539 writeln!(temp_file, "INFO: Normal operation").unwrap();
540 writeln!(temp_file, "ERROR: Another error").unwrap();
541 temp_file.flush().unwrap();
542
543 let mut config = create_test_config();
544 config.files = vec![temp_file.path().to_path_buf()];
545 config.patterns = vec!["ERROR".to_string(), "WARN".to_string()];
546 config.dry_run = true;
547
548 let mut watcher = LogWatcher::new(config);
549 let result = watcher.run().await;
550 assert!(result.is_ok());
551 assert_eq!(watcher.stats.matches_found, 3); }
553
554 #[tokio::test]
555 async fn test_poll_file_changes_with_rotation() {
556 let mut temp_file = NamedTempFile::new().unwrap();
557 writeln!(temp_file, "line 1").unwrap();
558 temp_file.flush().unwrap();
559
560 let initial_size = get_file_size(temp_file.path()).unwrap();
561
562 temp_file.as_file_mut().set_len(0).unwrap();
564 temp_file.flush().unwrap();
565
566 let result =
567 LogWatcher::poll_file_changes(&temp_file.path().to_path_buf(), initial_size, 1024)
568 .await;
569
570 assert!(result.is_err());
572 assert!(result
573 .unwrap_err()
574 .to_string()
575 .contains("File rotation detected"));
576 }
577
578 #[tokio::test]
579 async fn test_poll_file_changes_no_new_content() {
580 let mut temp_file = NamedTempFile::new().unwrap();
581 writeln!(temp_file, "line 1").unwrap();
582 temp_file.flush().unwrap();
583
584 let initial_size = get_file_size(temp_file.path()).unwrap();
585
586 let result =
587 LogWatcher::poll_file_changes(&temp_file.path().to_path_buf(), initial_size, 1024)
588 .await;
589
590 assert!(result.is_ok());
591 let (new_size, lines) = result.unwrap();
592 assert_eq!(new_size, initial_size);
593 assert_eq!(lines.len(), 0);
594 }
595
596 #[tokio::test]
597 async fn test_poll_file_changes_with_seeking() {
598 let mut temp_file = NamedTempFile::new().unwrap();
599 writeln!(temp_file, "line 1").unwrap();
600 writeln!(temp_file, "line 2").unwrap();
601 temp_file.flush().unwrap();
602
603 let initial_size = get_file_size(temp_file.path()).unwrap();
604
605 writeln!(temp_file, "line 3").unwrap();
607 writeln!(temp_file, "line 4").unwrap();
608 temp_file.flush().unwrap();
609
610 let result =
611 LogWatcher::poll_file_changes(&temp_file.path().to_path_buf(), initial_size, 1024)
612 .await;
613
614 assert!(result.is_ok());
615 let (new_size, lines) = result.unwrap();
616 assert!(new_size > initial_size);
617 assert_eq!(lines.len(), 2);
618 assert_eq!(lines[0], "line 3");
619 assert_eq!(lines[1], "line 4");
620 }
621
622 #[tokio::test]
623 async fn test_process_line_with_notification() {
624 let mut temp_file = NamedTempFile::new().unwrap();
625 writeln!(temp_file, "ERROR: Test error").unwrap();
626 temp_file.flush().unwrap();
627
628 let mut config = create_test_config();
629 config.notify_enabled = true;
630 config.notify_patterns = vec!["ERROR".to_string()];
631
632 let mut watcher = LogWatcher::new(config);
633
634 let result = watcher
636 .process_line(temp_file.path(), "ERROR: Critical error occurred")
637 .await;
638
639 if let Err(e) = &result {
641 eprintln!("Notification test failed with error: {}", e);
642 let error_msg = e.to_string();
643 if error_msg.contains("can only be set once") || error_msg.contains("org.freedesktop.DBus.Error.ServiceUnknown") || error_msg.contains("not provided by any .service files") {
648 assert_eq!(watcher.stats.notifications_sent, 0);
651 return;
652 }
653 }
654
655 assert!(result.is_ok());
656 assert_eq!(watcher.stats.notifications_sent, 1);
657 }
658
659 #[tokio::test]
660 async fn test_process_line_without_notification() {
661 let mut temp_file = NamedTempFile::new().unwrap();
662 writeln!(temp_file, "INFO: Normal operation").unwrap();
663 temp_file.flush().unwrap();
664
665 let mut config = create_test_config();
666 config.notify_enabled = true;
667 config.notify_patterns = vec!["ERROR".to_string()];
668
669 let mut watcher = LogWatcher::new(config);
670
671 let result = watcher
673 .process_line(temp_file.path(), "INFO: Normal operation")
674 .await;
675 assert!(result.is_ok());
676 assert_eq!(watcher.stats.notifications_sent, 0);
677 }
678
679 #[tokio::test]
680 async fn test_handle_file_rotation_file_not_found() {
681 let config = create_test_config();
682 let mut watcher = LogWatcher::new(config);
683
684 let result = watcher
686 .handle_file_rotation(&PathBuf::from("/non/existent/file.log"))
687 .await;
688 assert!(result.is_ok());
689 }
690
691 #[tokio::test]
692 async fn test_start_file_watcher() {
693 let mut temp_file = NamedTempFile::new().unwrap();
694 writeln!(temp_file, "ERROR: Test error").unwrap();
695 temp_file.flush().unwrap();
696
697 let config = create_test_config();
698 let watcher = LogWatcher::new(config);
699
700 let (tx, _rx) = mpsc::channel::<FileEvent>(100);
701
702 let result = watcher
704 .start_file_watcher(temp_file.path().to_path_buf(), tx)
705 .await;
706
707 assert!(result.is_ok());
708 }
709
710 #[tokio::test]
711 async fn test_file_event_processing() {
712 let mut temp_file = NamedTempFile::new().unwrap();
713 writeln!(temp_file, "ERROR: Test error").unwrap();
714 temp_file.flush().unwrap();
715
716 let mut config = create_test_config();
717 config.dry_run = false;
718
719 let mut watcher = LogWatcher::new(config);
720
721 let result = watcher
723 .process_line(temp_file.path(), "ERROR: New error occurred")
724 .await;
725 assert!(result.is_ok());
726 assert_eq!(watcher.stats.lines_processed, 1);
727 assert_eq!(watcher.stats.matches_found, 1);
728 }
729
730 #[tokio::test]
731 async fn test_process_existing_file_with_empty_file() {
732 let temp_file = NamedTempFile::new().unwrap();
733 let config = create_test_config();
736 let mut watcher = LogWatcher::new(config);
737
738 let result = watcher
740 .process_existing_file(&temp_file.path().to_path_buf())
741 .await;
742 assert!(result.is_ok());
743 assert_eq!(watcher.stats.lines_processed, 0);
744 assert_eq!(watcher.stats.matches_found, 0);
745 }
746
747 #[tokio::test]
748 async fn test_process_existing_file_with_non_matching_content() {
749 let mut temp_file = NamedTempFile::new().unwrap();
750 writeln!(temp_file, "This is a normal message").unwrap();
751 writeln!(temp_file, "Another normal message").unwrap();
752 temp_file.flush().unwrap();
753
754 let config = create_test_config();
755 let mut watcher = LogWatcher::new(config);
756
757 let result = watcher
759 .process_existing_file(&temp_file.path().to_path_buf())
760 .await;
761 assert!(result.is_ok());
762 assert_eq!(watcher.stats.lines_processed, 2);
763 assert_eq!(watcher.stats.matches_found, 0);
764 }
765}