async_rawlogger/
lib.rs

1use crossbeam_channel::{Receiver, RecvTimeoutError, Sender, TrySendError, bounded, unbounded};
2pub use log::{
3    Level, LevelFilter, Record, debug, error, info, log, log_enabled, logger, trace, warn,
4};
5use log::{Log, Metadata, SetLoggerError, set_boxed_logger, set_max_level};
6use logmsg::LogMsg;
7use std::{
8    borrow::Cow,
9    fmt::Display,
10    io::Error as IoError,
11    sync::atomic::{AtomicBool, Ordering},
12    time::{Duration, Instant},
13};
14mod logmsg;
15
16enum LoggerInput {
17    LogMsg(LogMsg),
18    Flush,
19}
20
21enum LoggerOutput {
22    Flushed,
23}
24
25pub trait FtLogFormat: Send + Sync {
26    /// turn an reference to record into a box object, which can be sent to log thread
27    /// and then formatted into string.
28    fn msg(
29        &self,
30        record: &Record,
31    ) -> Box<dyn Send + Sync + Display>;
32}
33
34pub struct FtLogFormatter;
35impl FtLogFormat for FtLogFormatter {
36    /// Return a box object that contains required data (e.g. thread name, line of code, etc.) for later formatting into string
37    #[inline]
38    fn msg(
39        &self,
40        record: &Record,
41    ) -> Box<dyn Send + Sync + Display> {
42        Box::new(Message {
43            level: record.level(),
44            file: record
45                .file_static()
46                .map(Cow::Borrowed)
47                .or_else(|| record.file().map(|s| Cow::Owned(s.to_owned())))
48                .unwrap_or(Cow::Borrowed("")),
49            line: record.line(),
50            args: record
51                .args()
52                .as_str()
53                .map(Cow::Borrowed)
54                .unwrap_or_else(|| Cow::Owned(record.args().to_string())),
55        })
56    }
57}
58
59struct Message {
60    level: Level,
61    file: Cow<'static, str>,
62    line: Option<u32>,
63    args: Cow<'static, str>,
64}
65
66impl Display for Message {
67    fn fmt(
68        &self,
69        f: &mut std::fmt::Formatter<'_>,
70    ) -> std::fmt::Result {
71        f.write_str(&format!(
72            "{} [{}:{}] {}",
73            self.level,
74            self.file,
75            self.line.unwrap_or(0),
76            self.args
77        ))
78    }
79}
80
81/// A guard that flushes logs associated to a Logger on a drop
82///
83/// With this guard, you can ensure all logs are written to destination
84/// when the application exits.
85pub struct LoggerGuard {
86    queue: Sender<LoggerInput>,
87    notification: Receiver<LoggerOutput>,
88}
89impl Drop for LoggerGuard {
90    fn drop(&mut self) {
91        self.queue
92            .send(LoggerInput::Flush)
93            .expect("logger queue closed when flushing, this is a bug");
94        self.notification
95            .recv()
96            .expect("logger notification closed, this is a bug");
97    }
98}
99/// global logger
100pub struct Logger {
101    format: Box<dyn FtLogFormat>,
102    level: LevelFilter,
103    queue: Sender<LoggerInput>,
104    notification: Receiver<LoggerOutput>,
105    stopped: AtomicBool,
106}
107
108impl Logger {
109    pub fn init(self) -> Result<LoggerGuard, SetLoggerError> {
110        let guard = LoggerGuard {
111            queue: self.queue.clone(),
112            notification: self.notification.clone(),
113        };
114
115        set_max_level(self.level);
116        let boxed = Box::new(self);
117        set_boxed_logger(boxed).map(|_| guard)
118    }
119}
120
121impl Log for Logger {
122    #[inline]
123    fn enabled(
124        &self,
125        metadata: &Metadata,
126    ) -> bool {
127        self.level >= metadata.level()
128    }
129
130    fn log(
131        &self,
132        record: &Record,
133    ) {
134        let msg = self.format.msg(record);
135        let msg = LoggerInput::LogMsg(LogMsg {
136            time: std::time::SystemTime::now(),
137            msg,
138        });
139        match self.queue.try_send(msg) {
140            Err(TrySendError::Full(_)) => {}
141            Err(TrySendError::Disconnected(_)) => {
142                let stop = self.stopped.load(Ordering::SeqCst);
143                if !stop {
144                    eprintln!("logger queue closed when logging, this is a bug");
145                    self.stopped.store(true, Ordering::SeqCst)
146                }
147            }
148            _ => (),
149        }
150    }
151
152    fn flush(&self) {
153        let _ = self
154            .queue
155            .send(LoggerInput::Flush)
156            .map_err(|e| eprintln!("logger queue closed when flushing, this is a bug: {e}"));
157    }
158}
159
160struct BoundedChannelOption {
161    size: usize,
162}
163
164pub struct Builder {
165    format: Box<dyn FtLogFormat>,
166    level: Option<LevelFilter>,
167    root_level: Option<LevelFilter>,
168    bounded_channel_option: Option<BoundedChannelOption>,
169}
170
171/// Handy function to get ftlog builder
172#[inline]
173pub fn builder() -> Builder {
174    Builder::new()
175}
176
177impl Builder {
178    #[inline]
179    /// Create a ftlog builder with default settings:
180    /// - global log level: INFO
181    /// - root log level: INFO
182    /// - default formatter: `FtLogFormatter`
183    /// - output to stderr
184    /// - bounded channel between worker thread and log thread, with a size limit of 100_000
185    /// - discard excessive log messages
186    /// - log with timestamp of local timezone
187    pub fn new() -> Builder {
188        Builder {
189            format: Box::new(FtLogFormatter),
190            level: None,
191            root_level: None,
192            bounded_channel_option: Some(BoundedChannelOption { size: 100_000 }),
193        }
194    }
195
196    /// Set custom formatter
197    #[inline]
198    pub fn format<F: FtLogFormat + 'static>(
199        mut self,
200        format: F,
201    ) -> Builder {
202        self.format = Box::new(format);
203        self
204    }
205
206    /// set channel size to unbound
207    ///
208    /// **ATTENTION**: too much log message will lead to huge memory consumption,
209    /// as log messages are queued to be handled by log thread.
210    /// When log message exceed the current channel size, it will double the size by default,
211    /// Since channel expansion asks for memory allocation, log calls can be slow down.
212    #[inline]
213    pub fn unbounded(mut self) -> Builder {
214        self.bounded_channel_option = None;
215        self
216    }
217
218    #[inline]
219    /// Set max log level
220    ///
221    /// Logs with level more verbose than this will not be sent to log thread.
222    pub fn max_log_level(
223        mut self,
224        level: LevelFilter,
225    ) -> Builder {
226        self.level = Some(level);
227        self
228    }
229
230    #[inline]
231    /// Set max log level
232    ///
233    /// Logs with level more verbose than this will not be sent to log thread.
234    pub fn root_log_level(
235        mut self,
236        level: LevelFilter,
237    ) -> Builder {
238        self.root_level = Some(level);
239        self
240    }
241
242    /// Finish building ftlog logger
243    ///
244    /// The call spawns a log thread to formatting log message into string,
245    /// and write to output target.
246    pub fn build(self) -> Result<Logger, IoError> {
247        let global_level = self.level.unwrap_or(LevelFilter::Info);
248        let root_level = self.root_level.unwrap_or(global_level);
249        if global_level < root_level {
250            warn!("Logs with level more verbose than {global_level} will be ignored");
251        }
252
253        let (sync_sender, receiver) = match &self.bounded_channel_option {
254            None => unbounded(),
255            Some(option) => bounded(option.size),
256        };
257        let (notification_sender, notification_receiver) = bounded(1);
258        std::thread::Builder::new()
259            .name("logger".to_string())
260            .spawn(move || {
261                let mut last_flush = Instant::now();
262                let timeout = Duration::from_millis(200);
263                loop {
264                    match receiver.recv_timeout(timeout) {
265                        Ok(LoggerInput::LogMsg(msg)) => {
266                            msg.write();
267                        }
268                        Ok(LoggerInput::Flush) => {
269                            let max = receiver.len();
270                            'queue: for _ in 1..=max {
271                                if let Ok(LoggerInput::LogMsg(msg)) = receiver.try_recv() {
272                                    msg.write();
273                                } else {
274                                    break 'queue;
275                                }
276                            }
277                            notification_sender
278                                .send(LoggerOutput::Flushed)
279                                .expect("logger notification failed");
280                        }
281                        Err(RecvTimeoutError::Timeout) => {
282                            if last_flush.elapsed() > Duration::from_millis(1000) {
283                                last_flush = Instant::now();
284                            }
285                        }
286                        Err(e) => {
287                            eprintln!(
288                                "sender closed without sending a Quit first, this is a bug, {e}"
289                            );
290                        }
291                    }
292                }
293            })?;
294        Ok(Logger {
295            format: self.format,
296            level: global_level,
297            queue: sync_sender,
298            notification: notification_receiver,
299            stopped: AtomicBool::new(false),
300        })
301    }
302
303    /// try building and setting as global logger
304    pub fn try_init(self) -> Result<LoggerGuard, Box<dyn std::error::Error>> {
305        let logger = self.build()?;
306        Ok(logger.init()?)
307    }
308}
309
310impl Default for Builder {
311    #[inline]
312    fn default() -> Self {
313        Builder::new()
314    }
315}