Skip to main content

mtlog_core/
utils.rs

1use std::{
2    ops::Deref,
3    sync::{Arc, Mutex},
4    thread::JoinHandle,
5    time::{Duration, Instant},
6};
7
8use chrono::Utc;
9use colored::Colorize;
10use crossbeam_channel::{RecvTimeoutError, Sender, unbounded};
11use log::Level;
12use uuid::Uuid;
13
14use crate::{
15    config::MTLOG_CONFIG,
16    log_writer::{LogStdout, LogWriter},
17};
18
19/// Guard that ensures the logger is properly shut down when dropped.
20/// Hold this guard for the lifetime of your logging session.
21pub struct LoggerGuard {
22    senders: Vec<Arc<LogSender>>,
23}
24
25impl LoggerGuard {
26    pub fn new(senders: Vec<Arc<LogSender>>) -> Self {
27        Self { senders }
28    }
29}
30
31impl Drop for LoggerGuard {
32    fn drop(&mut self) {
33        for sender in &self.senders {
34            sender.shutdown();
35        }
36    }
37}
38
39#[derive(Debug, Clone)]
40pub struct LogMessage {
41    pub message: String,
42    pub level: Level,
43    pub name: Option<String>,
44}
45
46pub struct LogSender {
47    sender: Sender<Arc<LogMessage>>,
48    handler: Arc<Mutex<Option<JoinHandle<bool>>>>,
49}
50
51impl Deref for LogSender {
52    type Target = Sender<Arc<LogMessage>>;
53    fn deref(&self) -> &Self::Target {
54        &self.sender
55    }
56}
57
58impl Drop for LogSender {
59    fn drop(&mut self) {
60        self.shutdown();
61    }
62}
63
64impl LogSender {
65    pub fn new(sender: Sender<Arc<LogMessage>>, handler: JoinHandle<bool>) -> Self {
66        Self {
67            sender,
68            handler: Arc::new(Mutex::new(Some(handler))),
69        }
70    }
71
72    pub fn shutdown(&self) {
73        let mut guard = self.handler.lock().unwrap();
74        if let Some(handle) = guard.take() {
75            // Send shutdown message - ignore error if channel is already closed
76            let _ = self.send(Arc::new(LogMessage {
77                message: "___SHUTDOWN___".into(),
78                level: Level::Info,
79                name: None,
80            }));
81            if !handle.join().expect("Unable to join logger thread") {
82                panic!("Logger thread shutdown failed");
83            }
84        }
85    }
86}
87
88fn format_log(message: &str, level: Level, name: Option<&str>) -> String {
89    let time = Utc::now().format("%Y-%m-%dT%H:%M:%S%.3f");
90    let level = match level {
91        Level::Error => "ERROR".red(),
92        Level::Warn => "WARN".yellow(),
93        Level::Info => "INFO".green(),
94        Level::Debug => "DEBUG".blue(),
95        Level::Trace => "TRACE".purple(),
96    };
97    if let Some(name) = name {
98        format!("[{time} {name} {level}] {message}")
99    } else {
100        format!("[{time} {level}] {message}")
101    }
102}
103
104pub fn spawn_log_thread_stdout(mut writer: LogStdout) -> LogSender {
105    let (sender, receiver) = unbounded::<Arc<LogMessage>>();
106    let handler = std::thread::spawn(move || {
107        // No batching for stdout - process messages immediately
108        while let Ok(log_message) = receiver.recv() {
109            let LogMessage {
110                message,
111                level,
112                name,
113            } = log_message.as_ref();
114
115            if message == "___SHUTDOWN___" {
116                break;
117            }
118
119            if message.starts_with("___PROGRESS___") {
120                let message = message.trim_start_matches("___PROGRESS___");
121                if let Some((uuid_str, message)) = message.split_once("___")
122                    && let Ok(uuid) = Uuid::parse_str(uuid_str)
123                {
124                    if message == "FINISHED" {
125                        writer.finished(uuid);
126                    } else {
127                        writer.progress(message, uuid);
128                    }
129                }
130            } else {
131                let message = format_log(message, *level, name.as_deref());
132                writer.regular(&message);
133            }
134        }
135        true
136    });
137    LogSender::new(sender, handler)
138}
139
140pub fn spawn_log_thread_file(mut writer: impl LogWriter + Send + 'static) -> LogSender {
141    let (sender, receiver) = unbounded::<Arc<LogMessage>>();
142    let handler = std::thread::spawn(move || {
143        let mut batch = Vec::with_capacity(32);
144        let flush_interval = Duration::from_millis(MTLOG_CONFIG.FLUSH_INTERVAL_MS);
145        let mut last_flush = Instant::now();
146        loop {
147            // Calculate timeout until next flush
148            let elapsed = last_flush.elapsed();
149            let timeout = if elapsed >= flush_interval {
150                Duration::from_millis(1) // Force immediate processing
151            } else {
152                flush_interval - elapsed
153            };
154
155            // Collect a batch of messages with timeout
156            match receiver.recv_timeout(timeout) {
157                Ok(msg) => {
158                    batch.push(msg);
159                    while let Ok(msg) = receiver.try_recv() {
160                        batch.push(msg);
161                        if batch.len() >= 32 {
162                            break;
163                        }
164                    }
165                }
166                Err(RecvTimeoutError::Timeout) => {
167                    // Timeout - no messages received, batch is empty
168                    // Only flush if the flush interval has elapsed
169                    if last_flush.elapsed() >= flush_interval {
170                        writer.flush();
171                        last_flush = Instant::now();
172                    }
173                    continue;
174                }
175                Err(RecvTimeoutError::Disconnected) => break,
176            }
177
178            // Process the batch
179            let mut should_shutdown = false;
180            for log_message in batch.drain(..) {
181                let LogMessage {
182                    message,
183                    level,
184                    name,
185                } = log_message.as_ref();
186
187                if message == "___SHUTDOWN___" {
188                    should_shutdown = true;
189                    break;
190                }
191
192                if message.starts_with("___PROGRESS___") {
193                    let message = message.trim_start_matches("___PROGRESS___");
194                    if let Some((uuid_str, message)) = message.split_once("___")
195                        && let Ok(uuid) = Uuid::parse_str(uuid_str)
196                    {
197                        if message == "FINISHED" {
198                            writer.finished(uuid);
199                        } else {
200                            writer.progress(message, uuid);
201                        }
202                    }
203                } else {
204                    let message = format_log(message, *level, name.as_deref());
205                    writer.regular(&message);
206                }
207            }
208
209            // Flush periodically or when shutting down
210            if should_shutdown || last_flush.elapsed() >= flush_interval {
211                writer.flush();
212                last_flush = Instant::now();
213            }
214
215            if should_shutdown {
216                break;
217            }
218        }
219        true
220    });
221    LogSender::new(sender, handler)
222}