mtlog_core/
utils.rs

1use std::{
2    ops::Deref,
3    sync::{Arc, Mutex},
4    thread::JoinHandle,
5    time::{Duration, Instant},
6};
7
8use chrono::Utc;
9use colored::Colorize;
10use crossbeam_channel::{RecvTimeoutError, Sender, unbounded};
11use log::Level;
12use uuid::Uuid;
13
14use crate::{config::MTLOG_CONFIG, log_writer::LogWriter};
15
16/// Guard that ensures the logger is properly shut down when dropped.
17/// Hold this guard for the lifetime of your logging session.
18pub struct LoggerGuard {
19    senders: Vec<Arc<LogSender>>,
20}
21
22impl LoggerGuard {
23    /// Creates a new LoggerGuard with the given senders.
24    pub fn new(senders: Vec<Arc<LogSender>>) -> Self {
25        Self { senders }
26    }
27}
28
29impl Drop for LoggerGuard {
30    fn drop(&mut self) {
31        for sender in &self.senders {
32            sender.shutdown();
33        }
34    }
35}
36
37#[derive(Debug, Clone)]
38pub struct LogMessage {
39    pub message: String,
40    pub level: Level,
41    pub name: Option<String>,
42}
43
44pub struct LogSender {
45    sender: Sender<Arc<LogMessage>>,
46    handler: Arc<Mutex<Option<JoinHandle<bool>>>>,
47}
48
49impl Deref for LogSender {
50    type Target = Sender<Arc<LogMessage>>;
51    fn deref(&self) -> &Self::Target {
52        &self.sender
53    }
54}
55
56impl Drop for LogSender {
57    fn drop(&mut self) {
58        self.shutdown();
59    }
60}
61
62impl LogSender {
63    pub fn new(sender: Sender<Arc<LogMessage>>, handler: JoinHandle<bool>) -> Self {
64        Self {
65            sender,
66            handler: Arc::new(Mutex::new(Some(handler))),
67        }
68    }
69
70    /// Shuts down the logger thread and waits for it to finish.
71    /// This method is idempotent - calling it multiple times is safe.
72    pub fn shutdown(&self) {
73        let mut guard = self.handler.lock().unwrap();
74        if let Some(handle) = guard.take() {
75            // Send shutdown message - ignore error if channel is already closed
76            let _ = self.send(Arc::new(LogMessage {
77                message: "___SHUTDOWN___".into(),
78                level: Level::Info,
79                name: None,
80            }));
81            if !handle.join().expect("Unable to join logger thread") {
82                panic!("Logger thread shutdown failed");
83            }
84        }
85    }
86}
87
88fn format_log(message: &str, level: Level, name: Option<&str>) -> String {
89    let time = Utc::now().format("%Y-%m-%dT%H:%M:%S%.3f");
90    let level = match level {
91        Level::Error => "ERROR".red(),
92        Level::Warn => "WARN".yellow(),
93        Level::Info => "INFO".green(),
94        Level::Debug => "DEBUG".blue(),
95        Level::Trace => "TRACE".purple(),
96    };
97    if let Some(name) = name {
98        format!("[{time} {name} {level}] {message}")
99    } else {
100        format!("[{time} {level}] {message}")
101    }
102}
103
104pub fn spawn_log_thread<W: LogWriter + Send + 'static>(mut writer: W) -> LogSender {
105    let (sender, receiver) = unbounded::<Arc<LogMessage>>();
106    let handler = std::thread::spawn(move || {
107        let mut batch = Vec::with_capacity(32);
108        let flush_interval = Duration::from_millis(MTLOG_CONFIG.FLUSH_INTERVAL_MS);
109        let mut last_flush = Instant::now();
110        loop {
111            // Calculate timeout until next flush
112            let elapsed = last_flush.elapsed();
113            let timeout = if elapsed >= flush_interval {
114                Duration::from_millis(1) // Force immediate processing
115            } else {
116                flush_interval - elapsed
117            };
118
119            // Collect a batch of messages with timeout
120            match receiver.recv_timeout(timeout) {
121                Ok(msg) => {
122                    batch.push(msg);
123                    // Try to collect more messages without blocking
124                    while let Ok(msg) = receiver.try_recv() {
125                        batch.push(msg);
126                        if batch.len() >= 32 {
127                            break;
128                        }
129                    }
130                }
131                Err(RecvTimeoutError::Timeout) => {
132                    // Timeout reached - flush if needed
133                    if last_flush.elapsed() >= flush_interval && !batch.is_empty() {
134                        // Process any pending messages
135                    } else if batch.is_empty() {
136                        // No messages to process, but we should flush the writer
137                        writer.flush();
138                        last_flush = Instant::now();
139                        continue;
140                    }
141                }
142                Err(RecvTimeoutError::Disconnected) => break,
143            }
144
145            // Process the batch
146            let mut should_shutdown = false;
147            for log_message in batch.drain(..) {
148                let LogMessage {
149                    message,
150                    level,
151                    name,
152                } = log_message.as_ref();
153
154                if message == "___SHUTDOWN___" {
155                    should_shutdown = true;
156                    break;
157                }
158
159                if message.starts_with("___PROGRESS___") {
160                    let message = message.trim_start_matches("___PROGRESS___");
161                    if let Some((uuid_str, message)) = message.split_once("___")
162                        && let Ok(uuid) = Uuid::parse_str(uuid_str)
163                    {
164                        if message == "FINISHED" {
165                            writer.finished(uuid);
166                        } else {
167                            writer.progress(message, uuid);
168                        }
169                    }
170                } else {
171                    let message = format_log(message, *level, name.as_deref());
172                    writer.regular(&message);
173                }
174            }
175
176            // Flush periodically or when shutting down
177            if should_shutdown || last_flush.elapsed() >= flush_interval {
178                writer.flush();
179                last_flush = Instant::now();
180            }
181
182            if should_shutdown {
183                break;
184            }
185        }
186        true
187    });
188    LogSender::new(sender, handler)
189}