1use std::{
2 ops::Deref,
3 sync::{Arc, Mutex},
4 thread::JoinHandle,
5 time::{Duration, Instant},
6};
7
8use chrono::Utc;
9use colored::Colorize;
10use crossbeam_channel::{RecvTimeoutError, Sender, unbounded};
11use log::Level;
12use uuid::Uuid;
13
14use crate::{config::MTLOG_CONFIG, log_writer::LogWriter};
15
16pub struct LoggerGuard {
19 senders: Vec<Arc<LogSender>>,
20}
21
22impl LoggerGuard {
23 pub fn new(senders: Vec<Arc<LogSender>>) -> Self {
25 Self { senders }
26 }
27}
28
29impl Drop for LoggerGuard {
30 fn drop(&mut self) {
31 for sender in &self.senders {
32 sender.shutdown();
33 }
34 }
35}
36
37#[derive(Debug, Clone)]
38pub struct LogMessage {
39 pub message: String,
40 pub level: Level,
41 pub name: Option<String>,
42}
43
44pub struct LogSender {
45 sender: Sender<Arc<LogMessage>>,
46 handler: Arc<Mutex<Option<JoinHandle<bool>>>>,
47}
48
49impl Deref for LogSender {
50 type Target = Sender<Arc<LogMessage>>;
51 fn deref(&self) -> &Self::Target {
52 &self.sender
53 }
54}
55
56impl Drop for LogSender {
57 fn drop(&mut self) {
58 self.shutdown();
59 }
60}
61
62impl LogSender {
63 pub fn new(sender: Sender<Arc<LogMessage>>, handler: JoinHandle<bool>) -> Self {
64 Self {
65 sender,
66 handler: Arc::new(Mutex::new(Some(handler))),
67 }
68 }
69
70 pub fn shutdown(&self) {
73 let mut guard = self.handler.lock().unwrap();
74 if let Some(handle) = guard.take() {
75 let _ = self.send(Arc::new(LogMessage {
77 message: "___SHUTDOWN___".into(),
78 level: Level::Info,
79 name: None,
80 }));
81 if !handle.join().expect("Unable to join logger thread") {
82 panic!("Logger thread shutdown failed");
83 }
84 }
85 }
86}
87
88fn format_log(message: &str, level: Level, name: Option<&str>) -> String {
89 let time = Utc::now().format("%Y-%m-%dT%H:%M:%S%.3f");
90 let level = match level {
91 Level::Error => "ERROR".red(),
92 Level::Warn => "WARN".yellow(),
93 Level::Info => "INFO".green(),
94 Level::Debug => "DEBUG".blue(),
95 Level::Trace => "TRACE".purple(),
96 };
97 if let Some(name) = name {
98 format!("[{time} {name} {level}] {message}")
99 } else {
100 format!("[{time} {level}] {message}")
101 }
102}
103
104pub fn spawn_log_thread<W: LogWriter + Send + 'static>(mut writer: W) -> LogSender {
105 let (sender, receiver) = unbounded::<Arc<LogMessage>>();
106 let handler = std::thread::spawn(move || {
107 let mut batch = Vec::with_capacity(32);
108 let flush_interval = Duration::from_millis(MTLOG_CONFIG.FLUSH_INTERVAL_MS);
109 let mut last_flush = Instant::now();
110 loop {
111 let elapsed = last_flush.elapsed();
113 let timeout = if elapsed >= flush_interval {
114 Duration::from_millis(1) } else {
116 flush_interval - elapsed
117 };
118
119 match receiver.recv_timeout(timeout) {
121 Ok(msg) => {
122 batch.push(msg);
123 while let Ok(msg) = receiver.try_recv() {
125 batch.push(msg);
126 if batch.len() >= 32 {
127 break;
128 }
129 }
130 }
131 Err(RecvTimeoutError::Timeout) => {
132 if last_flush.elapsed() >= flush_interval && !batch.is_empty() {
134 } else if batch.is_empty() {
136 writer.flush();
138 last_flush = Instant::now();
139 continue;
140 }
141 }
142 Err(RecvTimeoutError::Disconnected) => break,
143 }
144
145 let mut should_shutdown = false;
147 for log_message in batch.drain(..) {
148 let LogMessage {
149 message,
150 level,
151 name,
152 } = log_message.as_ref();
153
154 if message == "___SHUTDOWN___" {
155 should_shutdown = true;
156 break;
157 }
158
159 if message.starts_with("___PROGRESS___") {
160 let message = message.trim_start_matches("___PROGRESS___");
161 if let Some((uuid_str, message)) = message.split_once("___")
162 && let Ok(uuid) = Uuid::parse_str(uuid_str)
163 {
164 if message == "FINISHED" {
165 writer.finished(uuid);
166 } else {
167 writer.progress(message, uuid);
168 }
169 }
170 } else {
171 let message = format_log(message, *level, name.as_deref());
172 writer.regular(&message);
173 }
174 }
175
176 if should_shutdown || last_flush.elapsed() >= flush_interval {
178 writer.flush();
179 last_flush = Instant::now();
180 }
181
182 if should_shutdown {
183 break;
184 }
185 }
186 true
187 });
188 LogSender::new(sender, handler)
189}