Skip to main content

mc_minder/monitor/
mod.rs

1use anyhow::{Context, Result};
2use log::{info, debug, warn};
3use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
4use regex::Regex;
5use std::path::PathBuf;
6use tokio::sync::mpsc;
7use tokio::sync::mpsc::Receiver;
8use std::collections::HashSet;
9use std::sync::Arc;
10use parking_lot::Mutex;
11
12// ============================================================
13// Shared Types
14// ============================================================
15
16#[derive(Debug, Clone)]
17pub struct ChatMessage {
18    pub player: String,
19    pub content: String,
20    #[allow(dead_code)]
21    pub timestamp: chrono::DateTime<chrono::Local>,
22}
23
24#[derive(Debug, Clone)]
25#[allow(dead_code)]
26pub enum LogEvent {
27    // Chat events now come exclusively from ChatCapture implementations.
28    // LogMonitor no longer emits this variant, but it's kept for
29    // backward compatibility with server_run.rs event processing.
30    Chat(ChatMessage),
31    PlayerJoin(String),
32    PlayerLeave(String),
33    PlayerDeath(String),
34    ServerStart,
35    ServerStop,
36}
37
38#[derive(Debug, Clone, PartialEq, Copy)]
39struct FileId {
40    size: u64,
41    modified_secs: i64,
42}
43
44impl FileId {
45    fn from_metadata(metadata: &std::fs::Metadata) -> Option<Self> {
46        let size = metadata.len();
47        let modified = metadata.modified().ok()?;
48        let modified_secs = modified.duration_since(std::time::UNIX_EPOCH).ok()?.as_secs() as i64;
49        Some(Self { size, modified_secs })
50    }
51}
52
53// ============================================================
54// ChatCapture Trait - Unified interface for chat sources
55// ============================================================
56
57/// Trait for chat message capture. Each implementation provides
58/// a single source of truth for chat messages, eliminating the
59/// double-processing bug where both LogMonitor and ChatCapture
60/// detected the same messages.
61pub trait ChatCapture: Send {
62    /// Capture recent chat messages since the last call.
63    /// Implementations should use deduplication to avoid processing
64    /// the same message twice.
65    fn capture_recent_messages(&mut self) -> Vec<ChatMessage>;
66
67    /// Returns the name of this capture implementation for logging.
68    #[allow(dead_code)]
69    fn name(&self) -> &'static str;
70}
71
72// ============================================================
73// LogMonitor - Watches log file for non-chat events ONLY
74// ============================================================
75
76/// Monitors a log file for server lifecycle events (join, leave, death,
77/// start, stop). Chat messages are NOT emitted by LogMonitor — they
78/// come from ChatCapture implementations instead.
79pub struct LogMonitor {
80    log_path: PathBuf,
81    join_pattern: Regex,
82    leave_pattern: Regex,
83    death_pattern: Regex,
84}
85
86impl LogMonitor {
87    pub fn new(log_path: PathBuf) -> Result<Self> {
88        // Join pattern: handles vanilla player names
89        let join_pattern = Regex::new(r"\[(\d{1,2}:\d{2}:\d{2})\] \[[^\]]+\]: ([a-zA-Z0-9_]+) joined the game")
90            .context("Failed to compile join pattern")?;
91
92        // Leave pattern: handles vanilla player names
93        let leave_pattern = Regex::new(r"\[(\d{1,2}:\d{2}:\d{2})\] \[[^\]]+\]: ([a-zA-Z0-9_]+) left the game")
94            .context("Failed to compile leave pattern")?;
95
96        // Death pattern: handles various death messages
97        let death_pattern = Regex::new(r"\[(\d{1,2}:\d{2}:\d{2})\] \[[^\]]+\]: ([a-zA-Z0-9_]+) .*(died|was|fell|drowned|blew up|burned|froze|suffocated|starved)")
98            .context("Failed to compile death pattern")?;
99
100        Ok(Self {
101            log_path,
102            join_pattern,
103            leave_pattern,
104            death_pattern,
105        })
106    }
107
108    pub fn start_monitoring(self) -> Result<Receiver<LogEvent>> {
109        let (tx, rx) = mpsc::channel(100);
110
111        let log_path = self.log_path.clone();
112
113        info!("Started monitoring log file: {:?}", log_path);
114
115        let mut last_offset: u64 = 0;
116        let mut last_file_id: Option<FileId> = None;
117
118        if log_path.exists() {
119            if let Ok(metadata) = std::fs::metadata(&log_path) {
120                last_offset = metadata.len();
121                last_file_id = FileId::from_metadata(&metadata);
122            }
123        } else {
124            warn!(
125                "Log file not found: {:?}. Please start the Minecraft server first to generate the log file.",
126                log_path
127            );
128        }
129
130        let (notify_tx, notify_rx) = std::sync::mpsc::channel();
131
132        let mut watcher = RecommendedWatcher::new(
133            move |res: Result<Event, notify::Error>| {
134                if let Ok(event) = res {
135                    let _ = notify_tx.send(event);
136                }
137            },
138            Config::default(),
139        ).context("Failed to create file watcher")?;
140
141        let parent_dir = log_path
142            .parent()
143            .context("Log file has no parent directory")?
144            .to_path_buf();
145
146        let parent_dir_for_unwatch = parent_dir.clone();
147
148        watcher
149            .watch(&parent_dir, RecursiveMode::NonRecursive)
150            .context("Failed to watch log directory")?;
151
152        let patterns = (
153            self.join_pattern,
154            self.leave_pattern,
155            self.death_pattern,
156        );
157
158        std::thread::spawn(move || {
159            loop {
160                match notify_rx.recv() {
161                    Ok(event) => {
162                        if !event.paths.iter().any(|p| p.file_name().map(|n| n == "latest.log").unwrap_or(false)) {
163                            continue;
164                        }
165
166                        match event.kind {
167                            EventKind::Modify(_) | EventKind::Create(_) => {
168                                if let Ok(events) = Self::check_file_changes(
169                                    &log_path,
170                                    &mut last_offset,
171                                    &mut last_file_id,
172                                    &patterns,
173                                ) {
174                                    for log_event in events {
175                                        if tx.blocking_send(log_event).is_err() {
176                                            debug!("Receiver dropped, stopping monitor");
177                                            return;
178                                        }
179                                    }
180                                }
181                            }
182                            EventKind::Remove(_) => {
183                                debug!("Log file removed/rotated, resetting state");
184                                last_offset = 0;
185                                last_file_id = None;
186                            }
187                            _ => {}
188                        }
189                    }
190                    Err(_) => {
191                        debug!("Notify channel closed, stopping monitor");
192                        break;
193                    }
194                }
195            }
196            let _ = watcher.unwatch(&parent_dir_for_unwatch);
197        });
198
199        Ok(rx)
200    }
201
202    fn check_file_changes(
203        log_path: &PathBuf,
204        last_offset: &mut u64,
205        last_file_id: &mut Option<FileId>,
206        patterns: &(Regex, Regex, Regex),
207    ) -> Result<Vec<LogEvent>> {
208        if !log_path.exists() {
209            return Ok(Vec::new());
210        }
211
212        let metadata = std::fs::metadata(log_path)?;
213        let current_file_id = FileId::from_metadata(&metadata);
214
215        if let (Some(current), Some(last)) = (current_file_id, *last_file_id) {
216            if current != last {
217                debug!("File rotation detected, resetting offset");
218                *last_offset = 0;
219            }
220        }
221
222        let current_size = metadata.len();
223
224        if current_size < *last_offset {
225            debug!("File size decreased, resetting offset");
226            *last_offset = 0;
227        }
228
229        if current_size == *last_offset {
230            return Ok(Vec::new());
231        }
232
233        let new_content = Self::read_from_offset(log_path, *last_offset, current_size)?;
234
235        *last_offset = current_size;
236        *last_file_id = current_file_id;
237
238        let events = Self::parse_lines(&new_content, patterns);
239        Ok(events)
240    }
241
242    fn read_from_offset(log_path: &PathBuf, offset: u64, end: u64) -> Result<String> {
243        use std::fs::File;
244        use std::io::{Read, Seek, SeekFrom};
245
246        let mut file = File::open(log_path)?;
247        file.seek(SeekFrom::Start(offset))?;
248
249        let bytes_to_read = (end - offset) as usize;
250        let mut buffer = Vec::with_capacity(bytes_to_read);
251        file.take(bytes_to_read as u64).read_to_end(&mut buffer)?;
252
253        String::from_utf8(buffer.clone())
254            .map_err(|e| {
255                let lossy = String::from_utf8_lossy(&buffer);
256                warn!("UTF-8 decode error, using lossy conversion: {}", e);
257                anyhow::anyhow!("Failed to convert file content to UTF-8: {}", lossy)
258            })
259            .or_else(|_| Ok(String::from_utf8_lossy(&buffer).into_owned()))
260    }
261
262    /// Parse log lines for non-chat events only.
263    /// Chat events are handled by ChatCapture implementations.
264    fn parse_lines(content: &str, patterns: &(Regex, Regex, Regex)) -> Vec<LogEvent> {
265        let (join_pattern, leave_pattern, death_pattern) = patterns;
266        let mut events = Vec::new();
267
268        for line in content.lines() {
269            if let Some(caps) = join_pattern.captures(line) {
270                if let Some(player) = caps.get(2) {
271                    events.push(LogEvent::PlayerJoin(player.as_str().to_string()));
272                }
273            } else if let Some(caps) = leave_pattern.captures(line) {
274                if let Some(player) = caps.get(2) {
275                    events.push(LogEvent::PlayerLeave(player.as_str().to_string()));
276                }
277            } else if let Some(caps) = death_pattern.captures(line) {
278                if let Some(player) = caps.get(2) {
279                    events.push(LogEvent::PlayerDeath(player.as_str().to_string()));
280                }
281            }
282        }
283
284        events
285    }
286}
287
288// ============================================================
289// TmuxChatCapture - Captures chat from tmux session pane
290// ============================================================
291
292pub struct TmuxChatCapture {
293    session: String,
294    chat_pattern: Regex,
295    seen_positions: Arc<Mutex<HashSet<u64>>>,
296}
297
298impl TmuxChatCapture {
299    pub fn new(session: String) -> Result<Self> {
300        // Handle both vanilla: <Player> message
301        // and Fabric: [Not Secure] <Player> message
302        let chat_pattern = Regex::new(r"(?:\[Not Secure\] )?<([a-zA-Z0-9_]+)> (.+)")
303            .context("Failed to compile chat pattern")?;
304
305        Ok(Self {
306            session,
307            chat_pattern,
308            seen_positions: Arc::new(Mutex::new(HashSet::new())),
309        })
310    }
311
312    pub fn capture_pane_output(&self) -> Result<String> {
313        use std::process::Command;
314
315        let output = Command::new("tmux")
316            .args(["capture-pane", "-p", "-t", &self.session])
317            .output()
318            .context("Failed to execute tmux capture-pane")?;
319
320        if !output.status.success() {
321            return Err(anyhow::anyhow!(
322                "tmux capture-pane failed with exit code: {:?}",
323                output.status.code()
324            ));
325        }
326
327        Ok(String::from_utf8_lossy(&output.stdout).into_owned())
328    }
329
330    fn hash_line(line: &str) -> u64 {
331        use std::collections::hash_map::DefaultHasher;
332        use std::hash::{Hash, Hasher};
333        let mut hasher = DefaultHasher::new();
334        line.hash(&mut hasher);
335        hasher.finish()
336    }
337}
338
339impl ChatCapture for TmuxChatCapture {
340    fn capture_recent_messages(&mut self) -> Vec<ChatMessage> {
341        let output = match self.capture_pane_output() {
342            Ok(o) => o,
343            Err(e) => {
344                warn!("[TmuxChatCapture] Failed to capture tmux pane: {}", e);
345                return Vec::new();
346            }
347        };
348
349        let mut messages = Vec::new();
350        let mut seen = self.seen_positions.lock();
351
352        for line in output.lines().rev() {
353            let line_hash = Self::hash_line(line);
354
355            if seen.contains(&line_hash) {
356                continue;
357            }
358
359            seen.insert(line_hash);
360
361            if seen.len() > 10000 {
362                let to_remove: Vec<_> = seen.iter().take(1000).cloned().collect();
363                for r in to_remove {
364                    seen.remove(&r);
365                }
366            }
367
368            if let Some(caps) = self.chat_pattern.captures(line) {
369                if let (Some(player), Some(content)) = (caps.get(1), caps.get(2)) {
370                    debug!("[TmuxChatCapture] Parsed chat: player='{}', content='{}'", player.as_str(), content.as_str());
371                    messages.push(ChatMessage {
372                        player: player.as_str().to_string(),
373                        content: content.as_str().to_string(),
374                        timestamp: chrono::Local::now(),
375                    });
376                }
377            }
378        }
379
380        messages.reverse();
381        messages
382    }
383
384    fn name(&self) -> &'static str {
385        "TmuxChatCapture"
386    }
387}
388
389// ============================================================
390// FileChatCapture - Captures chat from log file
391// ============================================================
392
393pub struct FileChatCapture {
394    log_path: PathBuf,
395    chat_pattern: Regex,
396    seen_positions: Arc<Mutex<HashSet<u64>>>,
397}
398
399impl FileChatCapture {
400    pub fn new(log_path: PathBuf) -> Result<Self> {
401        // Handle both vanilla and [Not Secure] prefixed chat messages in log files
402        let chat_pattern = Regex::new(r"\[(\d{1,2}:\d{2}:\d{2})\] \[[^\]]+\]: (?:\[Not Secure\] )?<([a-zA-Z0-9_]+)> (.+)")
403            .context("Failed to compile chat pattern")?;
404
405        Ok(Self {
406            log_path,
407            chat_pattern,
408            seen_positions: Arc::new(Mutex::new(HashSet::new())),
409        })
410    }
411
412    fn hash_line(line: &str) -> u64 {
413        use std::collections::hash_map::DefaultHasher;
414        use std::hash::{Hash, Hasher};
415        let mut hasher = DefaultHasher::new();
416        line.hash(&mut hasher);
417        hasher.finish()
418    }
419}
420
421impl ChatCapture for FileChatCapture {
422    fn capture_recent_messages(&mut self) -> Vec<ChatMessage> {
423        let content = match std::fs::read_to_string(&self.log_path) {
424            Ok(c) => c,
425            Err(e) => {
426                warn!("[FileChatCapture] Failed to read log file: {}", e);
427                return Vec::new();
428            }
429        };
430
431        let mut messages = Vec::new();
432        let mut seen = self.seen_positions.lock();
433
434        for line in content.lines().rev().take(100) {
435            let line_hash = Self::hash_line(line);
436
437            if seen.contains(&line_hash) {
438                continue;
439            }
440
441            seen.insert(line_hash);
442
443            if let Some(caps) = self.chat_pattern.captures(line) {
444                if let (Some(player), Some(content)) = (caps.get(2), caps.get(3)) {
445                    debug!("[FileChatCapture] Parsed chat: player='{}', content='{}'", player.as_str(), content.as_str());
446                    messages.push(ChatMessage {
447                        player: player.as_str().to_string(),
448                        content: content.as_str().to_string(),
449                        timestamp: chrono::Local::now(),
450                    });
451                }
452            }
453        }
454
455        messages.reverse();
456        messages
457    }
458
459    fn name(&self) -> &'static str {
460        "FileChatCapture"
461    }
462}
463
464// ============================================================
465// ProcessChatCapture - Parses chat from process stdout lines
466// (Used by ForegroundProcess in TUI mode)
467// ============================================================
468
469#[allow(dead_code)]
470pub struct ProcessChatCapture {
471    chat_pattern: Regex,
472    seen_positions: Arc<Mutex<HashSet<u64>>>,
473}
474
475#[allow(dead_code)]
476impl ProcessChatCapture {
477    pub fn new() -> Result<Self> {
478        // Handle both vanilla: <Player> message
479        // and Fabric: [Not Secure] <Player> message
480        let chat_pattern = Regex::new(r"(?:\[Not Secure\] )?<([a-zA-Z0-9_]+)> (.+)")
481            .context("Failed to compile chat pattern")?;
482
483        Ok(Self {
484            chat_pattern,
485            seen_positions: Arc::new(Mutex::new(HashSet::new())),
486        })
487    }
488
489    /// Parse a single line for chat messages.
490    /// Used by ForegroundProcess to parse stdout lines.
491    pub fn parse_line(&self, line: &str) -> Option<ChatMessage> {
492        if let Some(caps) = self.chat_pattern.captures(line) {
493            if let (Some(player), Some(content)) = (caps.get(1), caps.get(2)) {
494                return Some(ChatMessage {
495                    player: player.as_str().to_string(),
496                    content: content.as_str().to_string(),
497                    timestamp: chrono::Local::now(),
498                });
499            }
500        }
501        None
502    }
503}
504
505impl ChatCapture for ProcessChatCapture {
506    fn capture_recent_messages(&mut self) -> Vec<ChatMessage> {
507        // ProcessChatCapture doesn't poll; messages are pushed to it
508        // by the ForegroundProcess via parse_line(). This method
509        // returns empty since we don't have a file/tmux to poll.
510        Vec::new()
511    }
512
513    fn name(&self) -> &'static str {
514        "ProcessChatCapture"
515    }
516}
517
518// ============================================================
519// Tests
520// ============================================================
521
522#[cfg(test)]
523mod tests {
524    use super::*;
525
526    #[test]
527    fn test_tmux_chat_pattern_vanilla() {
528        let pattern = Regex::new(r"(?:\[Not Secure\] )?<([a-zA-Z0-9_]+)> (.+)").unwrap();
529        let caps = pattern.captures("<Steve> hello world").unwrap();
530        assert_eq!(caps.get(1).unwrap().as_str(), "Steve");
531        assert_eq!(caps.get(2).unwrap().as_str(), "hello world");
532    }
533
534    #[test]
535    fn test_tmux_chat_pattern_not_secure() {
536        let pattern = Regex::new(r"(?:\[Not Secure\] )?<([a-zA-Z0-9_]+)> (.+)").unwrap();
537        let caps = pattern.captures("[Not Secure] <Player_1> !help").unwrap();
538        assert_eq!(caps.get(1).unwrap().as_str(), "Player_1");
539        assert_eq!(caps.get(2).unwrap().as_str(), "!help");
540    }
541
542    #[test]
543    fn test_file_chat_pattern_vanilla() {
544        let pattern = Regex::new(r"\[(\d{1,2}:\d{2}:\d{2})\] \[[^\]]+\]: (?:\[Not Secure\] )?<([a-zA-Z0-9_]+)> (.+)").unwrap();
545        let line = "[12:34:56] [Server thread/INFO]: <Steve> hello";
546        let caps = pattern.captures(line).unwrap();
547        assert_eq!(caps.get(2).unwrap().as_str(), "Steve");
548        assert_eq!(caps.get(3).unwrap().as_str(), "hello");
549    }
550
551    #[test]
552    fn test_file_chat_pattern_not_secure() {
553        let pattern = Regex::new(r"\[(\d{1,2}:\d{2}:\d{2})\] \[[^\]]+\]: (?:\[Not Secure\] )?<([a-zA-Z0-9_]+)> (.+)").unwrap();
554        let line = "[12:34:56] [Server thread/INFO]: [Not Secure] <Player_1> !help";
555        let caps = pattern.captures(line).unwrap();
556        assert_eq!(caps.get(2).unwrap().as_str(), "Player_1");
557        assert_eq!(caps.get(3).unwrap().as_str(), "!help");
558    }
559
560    #[test]
561    fn test_process_chat_capture_parse_line() {
562        let capture = ProcessChatCapture::new().unwrap();
563        
564        // Vanilla
565        let msg = capture.parse_line("<Steve> hello world").unwrap();
566        assert_eq!(msg.player, "Steve");
567        assert_eq!(msg.content, "hello world");
568
569        // Not Secure prefix
570        let msg = capture.parse_line("[Not Secure] <Player_1> !help").unwrap();
571        assert_eq!(msg.player, "Player_1");
572        assert_eq!(msg.content, "!help");
573
574        // Non-chat line
575        assert!(capture.parse_line("Server started on port 25565").is_none());
576    }
577
578    #[test]
579    fn test_log_monitor_join_pattern() {
580        let pattern = Regex::new(r"\[(\d{1,2}:\d{2}:\d{2})\] \[[^\]]+\]: ([a-zA-Z0-9_]+) joined the game").unwrap();
581        let line = "[12:34:56] [Server thread/INFO]: Steve joined the game";
582        let caps = pattern.captures(line).unwrap();
583        assert_eq!(caps.get(2).unwrap().as_str(), "Steve");
584    }
585
586    #[test]
587    fn test_log_monitor_leave_pattern() {
588        let pattern = Regex::new(r"\[(\d{1,2}:\d{2}:\d{2})\] \[[^\]]+\]: ([a-zA-Z0-9_]+) left the game").unwrap();
589        let line = "[12:34:56] [Server thread/INFO]: Player_1 left the game";
590        let caps = pattern.captures(line).unwrap();
591        assert_eq!(caps.get(2).unwrap().as_str(), "Player_1");
592    }
593}