1use std::{
2 ops::Deref,
3 sync::{Arc, Mutex},
4 thread::JoinHandle,
5 time::{Duration, Instant},
6};
7
8use chrono::Utc;
9use colored::Colorize;
10use crossbeam_channel::{RecvTimeoutError, Sender, unbounded};
11use log::Level;
12use uuid::Uuid;
13
14use crate::{
15 config::MTLOG_CONFIG,
16 log_writer::{LogStdout, LogWriter},
17};
18
19pub struct LoggerGuard {
22 senders: Vec<Arc<LogSender>>,
23}
24
25impl LoggerGuard {
26 pub fn new(senders: Vec<Arc<LogSender>>) -> Self {
27 Self { senders }
28 }
29}
30
31impl Drop for LoggerGuard {
32 fn drop(&mut self) {
33 for sender in &self.senders {
34 sender.shutdown();
35 }
36 }
37}
38
39#[derive(Debug, Clone)]
40pub struct LogMessage {
41 pub message: String,
42 pub level: Level,
43 pub name: Option<String>,
44}
45
46pub struct LogSender {
47 sender: Sender<Arc<LogMessage>>,
48 handler: Arc<Mutex<Option<JoinHandle<bool>>>>,
49}
50
51impl Deref for LogSender {
52 type Target = Sender<Arc<LogMessage>>;
53 fn deref(&self) -> &Self::Target {
54 &self.sender
55 }
56}
57
58impl Drop for LogSender {
59 fn drop(&mut self) {
60 self.shutdown();
61 }
62}
63
64impl LogSender {
65 pub fn new(sender: Sender<Arc<LogMessage>>, handler: JoinHandle<bool>) -> Self {
66 Self {
67 sender,
68 handler: Arc::new(Mutex::new(Some(handler))),
69 }
70 }
71
72 pub fn shutdown(&self) {
73 let mut guard = self.handler.lock().unwrap();
74 if let Some(handle) = guard.take() {
75 let _ = self.send(Arc::new(LogMessage {
77 message: "___SHUTDOWN___".into(),
78 level: Level::Info,
79 name: None,
80 }));
81 if !handle.join().expect("Unable to join logger thread") {
82 panic!("Logger thread shutdown failed");
83 }
84 }
85 }
86}
87
88fn format_log(message: &str, level: Level, name: Option<&str>) -> String {
89 let time = Utc::now().format("%Y-%m-%dT%H:%M:%S%.3f");
90 let level = match level {
91 Level::Error => "ERROR".red(),
92 Level::Warn => "WARN".yellow(),
93 Level::Info => "INFO".green(),
94 Level::Debug => "DEBUG".blue(),
95 Level::Trace => "TRACE".purple(),
96 };
97 if let Some(name) = name {
98 format!("[{time} {name} {level}] {message}")
99 } else {
100 format!("[{time} {level}] {message}")
101 }
102}
103
104pub fn spawn_log_thread_stdout(mut writer: LogStdout) -> LogSender {
105 let (sender, receiver) = unbounded::<Arc<LogMessage>>();
106 let handler = std::thread::spawn(move || {
107 while let Ok(log_message) = receiver.recv() {
109 let LogMessage {
110 message,
111 level,
112 name,
113 } = log_message.as_ref();
114
115 if message == "___SHUTDOWN___" {
116 break;
117 }
118
119 if message.starts_with("___PROGRESS___") {
120 let message = message.trim_start_matches("___PROGRESS___");
121 if let Some((uuid_str, message)) = message.split_once("___")
122 && let Ok(uuid) = Uuid::parse_str(uuid_str)
123 {
124 if message == "FINISHED" {
125 writer.finished(uuid);
126 } else {
127 writer.progress(message, uuid);
128 }
129 }
130 } else {
131 let message = format_log(message, *level, name.as_deref());
132 writer.regular(&message);
133 }
134 }
135 true
136 });
137 LogSender::new(sender, handler)
138}
139
140pub fn spawn_log_thread_file(mut writer: impl LogWriter + Send + 'static) -> LogSender {
141 let (sender, receiver) = unbounded::<Arc<LogMessage>>();
142 let handler = std::thread::spawn(move || {
143 let mut batch = Vec::with_capacity(32);
144 let flush_interval = Duration::from_millis(MTLOG_CONFIG.FLUSH_INTERVAL_MS);
145 let mut last_flush = Instant::now();
146 loop {
147 let elapsed = last_flush.elapsed();
149 let timeout = if elapsed >= flush_interval {
150 Duration::from_millis(1) } else {
152 flush_interval - elapsed
153 };
154
155 match receiver.recv_timeout(timeout) {
157 Ok(msg) => {
158 batch.push(msg);
159 while let Ok(msg) = receiver.try_recv() {
160 batch.push(msg);
161 if batch.len() >= 32 {
162 break;
163 }
164 }
165 }
166 Err(RecvTimeoutError::Timeout) => {
167 if last_flush.elapsed() >= flush_interval {
170 writer.flush();
171 last_flush = Instant::now();
172 }
173 continue;
174 }
175 Err(RecvTimeoutError::Disconnected) => break,
176 }
177
178 let mut should_shutdown = false;
180 for log_message in batch.drain(..) {
181 let LogMessage {
182 message,
183 level,
184 name,
185 } = log_message.as_ref();
186
187 if message == "___SHUTDOWN___" {
188 should_shutdown = true;
189 break;
190 }
191
192 if message.starts_with("___PROGRESS___") {
193 let message = message.trim_start_matches("___PROGRESS___");
194 if let Some((uuid_str, message)) = message.split_once("___")
195 && let Ok(uuid) = Uuid::parse_str(uuid_str)
196 {
197 if message == "FINISHED" {
198 writer.finished(uuid);
199 } else {
200 writer.progress(message, uuid);
201 }
202 }
203 } else {
204 let message = format_log(message, *level, name.as_deref());
205 writer.regular(&message);
206 }
207 }
208
209 if should_shutdown || last_flush.elapsed() >= flush_interval {
211 writer.flush();
212 last_flush = Instant::now();
213 }
214
215 if should_shutdown {
216 break;
217 }
218 }
219 true
220 });
221 LogSender::new(sender, handler)
222}