1use std::{
8 fmt::Write as _,
9 future::Future,
10 io::{self, stderr, stdout, Write},
11 pin::Pin,
12 str::FromStr,
13 thread::{self, JoinHandle},
14};
15
16use crate::Result;
17use chrono::{DateTime, Local, Utc};
18use colored::{Color, Colorize as _};
19use futures::executor::block_on;
20use serde::{Deserialize, Serialize};
21use serde_json::{Map, Value};
22use serde_with::{serde_as, DisplayFromStr};
23use tokio::{
24 select,
25 sync::mpsc::{unbounded_channel, UnboundedSender},
26};
27use tracing::Level;
28
29#[serde_as]
30#[derive(Serialize, Deserialize, Debug)]
31pub struct LogItem {
32 pub time: Value,
33 #[serde_as(as = "DisplayFromStr")]
34 pub level: Level,
35 pub message: String,
36 #[serde(skip_serializing_if = "String::is_empty")]
37 pub target: String,
38 #[serde(skip_serializing_if = "Map::is_empty")]
39 pub fields: Map<String, Value>,
40 #[serde(skip_serializing_if = "Map::is_empty")]
41 pub span: Map<String, Value>,
42 #[serde(skip_serializing_if = "Option::is_none")]
43 pub filename: Option<String>,
44 #[serde(skip_serializing_if = "Option::is_none")]
45 pub line_number: Option<i64>,
46}
47
48impl LogItem {
49 fn json_take_object(mp: &mut Map<String, Value>, key: &str) -> Map<String, Value> {
50 if let Value::Object(x) = mp.remove(key).unwrap_or_default() {
51 x
52 } else {
53 Map::default()
54 }
55 }
56
57 fn from_json(mut s: Map<String, Value>) -> Self {
58 let target = s
59 .get("target")
60 .and_then(Value::as_str)
61 .unwrap_or_default()
62 .to_owned();
63 let level = Level::from_str(s.get("level").and_then(Value::as_str).unwrap_or("ERROR"))
64 .unwrap_or(Level::ERROR);
65 let filename = s
66 .get("filename")
67 .and_then(Value::as_str)
68 .map(str::to_string);
69 let line_number = s.get("line_number").and_then(Value::as_i64);
70 let mut fields = Self::json_take_object(&mut s, "fields");
71 let message = fields
72 .remove("message")
73 .unwrap_or_default()
74 .as_str()
75 .unwrap_or_default()
76 .to_owned();
77 let span = Self::json_take_object(&mut s, "span");
78 Self {
79 time: Value::default(),
80 level,
81 message,
82 target,
83 fields,
84 span,
85 filename,
86 line_number,
87 }
88 }
89}
90
91struct LogSender {
92 tx: UnboundedSender<Map<String, Value>>,
93}
94
95impl Write for LogSender {
96 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
97 self.tx
98 .send(serde_json::from_slice(buf)?)
99 .or(Err(io::ErrorKind::BrokenPipe))?;
100 Ok(buf.len())
101 }
102
103 fn flush(&mut self) -> io::Result<()> {
104 Ok(())
106 }
107}
108
109impl LogSender {
110 fn new(tx: UnboundedSender<Map<String, Value>>) -> impl Fn() -> Self {
111 move || Self { tx: tx.clone() }
112 }
113}
114
115#[derive(Clone)]
116pub struct Logger {
117 tx: UnboundedSender<Map<String, Value>>,
118}
119
120impl Logger {
121 pub fn sender(&self) -> UnboundedSender<Map<String, Value>> {
123 self.tx.clone()
124 }
125
126 pub fn init(&self, builder: &LoggerBuilder) {
129 tracing_subscriber::fmt()
130 .with_max_level(builder.level)
131 .with_writer(LogSender::new(self.tx.clone()))
132 .without_time()
133 .with_file(builder.filename)
134 .with_line_number(builder.line_number)
135 .json()
136 .init();
137 }
138}
139
140pub type WriterFn = Box<dyn Fn(LogItem, Box<dyn Write>) -> Result<()> + Send>;
141pub type FilterFn = Box<dyn Fn(&LogItem) -> bool + Send>;
142pub type TransformerFn = Box<dyn Fn(LogItem) -> LogItem + Send>;
143pub type HandlerFn = Box<dyn Fn(&Map<String, Value>) -> Pin<Box<dyn Future<Output = bool>>> + Send>;
144
145pub struct LoggerGuard {
151 stop_tx: UnboundedSender<()>,
152 join: Option<JoinHandle<()>>,
153}
154
155impl Drop for LoggerGuard {
156 fn drop(&mut self) {
157 self.stop_tx.send(()).unwrap();
158 if let Some(x) = self.join.take() {
159 x.join().unwrap();
160 }
161 }
162}
163
164pub struct LoggerBuilder {
165 json: bool,
166 level: Level,
167 filename: bool,
168 line_number: bool,
169 filter: Option<FilterFn>,
170 transformer: Option<TransformerFn>,
171 json_writer: WriterFn,
172 color_writer: WriterFn,
173 handler: Option<HandlerFn>,
174}
175
176impl LoggerBuilder {
177 pub fn fmt_level(level: &Level) -> String {
184 format!("{: >5}", level.to_string())
185 .bold()
186 .color(match *level {
187 Level::TRACE | Level::DEBUG => Color::Magenta,
188 Level::INFO => Color::Green,
189 Level::WARN => Color::Yellow,
190 Level::ERROR => Color::Red,
191 })
192 .to_string()
193 }
194
195 fn default_json_writer(item: LogItem, mut writer: Box<dyn Write>) -> Result<()> {
196 let v = serde_json::to_string(&item).unwrap_or_default();
197 writer.write_fmt(format_args!("{v}\n"))?;
198 writer.flush().map_err(Into::into)
199 }
200
201 fn default_color_writer(item: LogItem, mut writer: Box<dyn Write>) -> Result<()> {
202 let mut buf = String::new();
203 write!(
204 buf,
205 "{} {} {}",
206 item.time.as_str().unwrap_or_default().bright_black(),
207 Self::fmt_level(&item.level),
208 item.target.bright_black()
209 )?;
210 if let Some(filename) = item.filename {
211 if let Some(line_number) = item.line_number {
212 buf += &format!("({}:{})", filename, line_number)
213 .bright_black()
214 .to_string();
215 }
216 }
217 write!(buf, "{} {}", ":".bright_black(), item.message)?;
218 for (k, v) in &item.fields {
219 if !k.starts_with("log.") {
220 buf += &format!(" field.{k}={v}").bright_black().to_string();
221 }
222 }
223 for (k, v) in item.span {
224 if !k.starts_with("http.") && !k.starts_with("otel.") && k != "name" {
225 buf += &format!(" span.{k}={v}").bright_black().to_string();
226 }
227 }
228
229 writer.write_fmt(format_args!("{buf}\n"))?;
230 writer.flush().map_err(Into::into)
231 }
232
233 pub fn new() -> Self {
236 Self {
237 json: false,
238 level: Level::INFO,
239 filename: false,
240 line_number: false,
241 filter: None,
242 transformer: None,
243 json_writer: Box::new(Self::default_json_writer),
244 color_writer: Box::new(Self::default_color_writer),
245 handler: None,
246 }
247 }
248
249 pub fn json_writer(mut self, writer: WriterFn) -> Self {
254 self.json_writer = writer;
255 self
256 }
257
258 pub fn color_writer(mut self, writer: WriterFn) -> Self {
263 self.color_writer = writer;
264 self
265 }
266
267 pub fn json(mut self) -> Self {
269 self.json = true;
270 self
271 }
272
273 pub fn level(mut self, level: Level) -> Self {
275 self.level = level;
276 self
277 }
278
279 pub fn filename(mut self) -> Self {
281 self.filename = true;
282 self
283 }
284
285 pub fn line_number(mut self) -> Self {
287 self.line_number = true;
288 self
289 }
290
291 pub fn handler<F>(mut self, handler: F) -> Self
300 where
301 F: Fn(&Map<String, Value>) -> Pin<Box<dyn Future<Output = bool>>> + Send + 'static,
302 {
303 self.handler = Some(Box::new(handler));
304 self
305 }
306
307 pub fn filter<F>(mut self, filter: F) -> Self
314 where
315 F: Fn(&LogItem) -> bool + Send + 'static,
316 {
317 self.filter = Some(Box::new(filter));
318 self
319 }
320
321 pub fn transformer<F>(mut self, transformer: F) -> Self
328 where
329 F: Fn(LogItem) -> LogItem + Send + 'static,
330 {
331 self.transformer = Some(Box::new(transformer));
332 self
333 }
334
335 pub fn start(self) -> (Logger, LoggerGuard) {
342 let (tx, mut rx) = unbounded_channel();
343 let (stop_tx, mut stop_rx) = unbounded_channel();
344 tracing_subscriber::fmt()
345 .with_max_level(self.level)
346 .with_writer(LogSender::new(tx.clone()))
347 .without_time()
348 .with_file(self.filename)
349 .with_line_number(self.line_number)
350 .json()
351 .init();
352
353 let join = thread::spawn(move || {
354 let handler = |v: Map<String, Value>| async {
355 if let Some(x) = &self.handler {
356 if !x(&v).await {
357 return;
358 }
359 }
360 let mut item = LogItem::from_json(v);
361 let time = item.fields.remove("_time").unwrap_or_default().as_i64();
362 if self.json {
363 item.time = time.unwrap_or_else(|| Utc::now().timestamp_micros()).into();
364 } else {
365 item.time = time
366 .map_or_else(Local::now, |v| {
367 DateTime::from_timestamp_micros(v)
368 .unwrap_or_default()
369 .into()
370 })
371 .format("%F %T%.6f")
372 .to_string()
373 .into();
374 }
375
376 if let Some(filter) = &self.filter {
377 if !filter(&item) {
378 return;
379 }
380 }
381 if let Some(transformer) = &self.transformer {
382 item = transformer(item);
383 }
384 let writer: Box<dyn io::Write> = if item.level <= Level::WARN {
385 Box::new(stderr())
386 } else {
387 Box::new(stdout())
388 };
389 if self.json {
390 let _ = (self.json_writer)(item, writer);
391 } else {
392 let _ = (self.color_writer)(item, writer);
393 }
394 };
395 block_on(async move {
396 loop {
397 select! {
398 Some(v) = rx.recv() => {
399 handler(v).await;
400 },
401 _ = stop_rx.recv() => {
402 while let Ok(v) = rx.try_recv(){
403 handler(v).await;
404 }
405 break;
406 }
407 }
408 }
409 })
410 });
411 (
412 Logger { tx },
413 LoggerGuard {
414 stop_tx,
415 join: Some(join),
416 },
417 )
418 }
419}