low_latency_log/
lib.rs

1extern crate core;
2
3use chrono::prelude::*;
4use core_affinity::CoreId;
5use once_cell::sync::OnceCell;
6use std::borrow::Cow;
7use std::fs::{self, File};
8use std::io::{self, BufWriter, Write};
9use std::path::Path;
10use std::sync::atomic::AtomicU8;
11use std::sync::Arc;
12use std::thread;
13use std::time::Duration;
14use ufmt::{uwrite, uwriteln};
15
16use symlink::{remove_symlink_auto, symlink_auto};
17
18pub mod internal;
19pub mod log_proxy;
20pub mod macros;
21
22mod consts;
23mod fmt_utils;
24
25pub static GLOBAL_LOGGER: OnceCell<Logger> = OnceCell::new();
26pub static GLOBAL_LOGGER_STOP_FLAG: once_cell::sync::Lazy<std::sync::Mutex<bool>> =
27    once_cell::sync::Lazy::new(|| std::sync::Mutex::new(false));
28
29const TIME_FORMAT_STR: &str = "%H:%M:%S";
30
31thread_local! {
32    pub static TID: std::cell::Cell<&'static str> = std::cell::Cell::new(Box::leak(format!("{}", gettid::gettid()).into_boxed_str()));
33}
34
35pub struct UString(pub String);
36impl ufmt::uWrite for UString {
37    type Error = std::io::Error;
38
39    fn write_str(&mut self, s: &str) -> Result<(), std::io::Error> {
40        self.0.push_str(s);
41        Ok(())
42    }
43}
44impl ufmt::uDisplay for UString {
45    fn fmt<W>(&self, f: &mut ufmt::Formatter<'_, W>) -> Result<(), W::Error>
46    where
47        W: ufmt::uWrite + ?Sized,
48    {
49        <str as ufmt::uDisplay>::fmt(&self.0, f)
50    }
51}
52
53#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
54pub enum LogLevel {
55    Trace = 0,
56    Debug = 1,
57    Info = 2,
58    Warn = 3,
59    Error = 4,
60    Off = 99,
61}
62
63impl From<log::Level> for LogLevel {
64    fn from(value: log::Level) -> Self {
65        match value {
66            log::Level::Trace => LogLevel::Trace,
67            log::Level::Debug => LogLevel::Debug,
68            log::Level::Info => LogLevel::Info,
69            log::Level::Warn => LogLevel::Warn,
70            log::Level::Error => LogLevel::Error,
71        }
72    }
73}
74impl From<LogLevel> for log::LevelFilter {
75    fn from(value: LogLevel) -> Self {
76        match value {
77            LogLevel::Trace => log::LevelFilter::Trace,
78            LogLevel::Debug => log::LevelFilter::Debug,
79            LogLevel::Info => log::LevelFilter::Info,
80            LogLevel::Warn => log::LevelFilter::Warn,
81            LogLevel::Error => log::LevelFilter::Error,
82            LogLevel::Off => log::LevelFilter::Off,
83        }
84    }
85}
86impl From<&str> for LogLevel {
87    fn from(value: &str) -> Self {
88        match value {
89            "TRACE" | "trace" | "Trace" => LogLevel::Trace,
90            "DEBUG" | "debug" | "Debug" => LogLevel::Debug,
91            "INFO" | "info" | "Info" => LogLevel::Info,
92            "WARN" | "warn" | "Warn" => LogLevel::Warn,
93            "ERROR" | "error" | "Error" => LogLevel::Error,
94            "OFF" | "off" | "Off" => LogLevel::Off,
95            _ => LogLevel::Info,
96        }
97    }
98}
99
100impl LogLevel {
101    pub fn to_str(&self) -> &'static str {
102        match self {
103            LogLevel::Debug => "DEBUG",
104            LogLevel::Info => "INFO",
105            LogLevel::Warn => "WARN",
106            LogLevel::Error => "ERROR",
107            LogLevel::Trace => "TRACE",
108            LogLevel::Off => "OFF",
109        }
110    }
111}
112
113/*
114 NOTE: this struct should be as small as possible to avoid cache miss
115*/
116pub struct LoggingFunc {
117    func: Box<dyn Fn() -> Cow<'static, str> + Send>,
118    file: &'static str,
119    line: u32,
120    tid: &'static str,
121    level: LogLevel,
122    system_time: u64,
123}
124
125impl LoggingFunc {
126    #[allow(dead_code)]
127    pub fn new<T>(
128        func: T,
129        file: &'static str,
130        line: u32,
131        tid: &'static str,
132        lvl: LogLevel,
133        system_time: u64,
134    ) -> LoggingFunc
135    where
136        T: Fn() -> Cow<'static, str> + 'static + Send,
137    {
138        LoggingFunc {
139            func: Box::new(func),
140            file,
141            line,
142            tid,
143            level: lvl,
144            system_time,
145        }
146    }
147    fn invoke(self, rolling_logger: &mut RollingLogger) {
148        rolling_logger.write_date_time_str(self.system_time);
149        let output = (self.func)();
150        let output_str = output.as_ref();
151
152        let _ = uwriteln!(
153            rolling_logger,
154            "[{}] {}:{} {} {}",
155            self.tid,
156            self.file,
157            self.line,
158            self.level.to_str(),
159            output_str
160        );
161    }
162}
163
164/// Determines how often a file should be rolled over
165#[derive(Copy, Clone, Debug, Eq, PartialEq)]
166pub enum RollingFrequency {
167    EveryDay,
168    EveryHour,
169    EveryMinute,
170}
171
172impl RollingFrequency {
173    /// Calculates a datetime that will be different if data should be in
174    /// different files.
175    pub fn equivalent_datetime(&self, dt: &DateTime<Local>) -> DateTime<Local> {
176        match self {
177            RollingFrequency::EveryDay => Local
178                .with_ymd_and_hms(dt.year(), dt.month(), dt.day(), 0, 0, 0)
179                .unwrap(),
180            RollingFrequency::EveryHour => Local
181                .with_ymd_and_hms(dt.year(), dt.month(), dt.day(), dt.hour(), 0, 0)
182                .unwrap(),
183            RollingFrequency::EveryMinute => Local
184                .with_ymd_and_hms(dt.year(), dt.month(), dt.day(), dt.hour(), dt.minute(), 0)
185                .unwrap(),
186        }
187    }
188}
189
190#[derive(Copy, Clone, Default, Debug, Eq, PartialEq)]
191pub struct RollingCondition {
192    last_write_opt: Option<DateTime<Local>>,
193    frequency_opt: Option<RollingFrequency>,
194    max_size_opt: Option<u64>,
195}
196
197impl RollingCondition {
198    /// Constructs a new struct that does not yet have any condition set.
199    pub fn new() -> RollingCondition {
200        RollingCondition {
201            last_write_opt: Some(Local::now()),
202            frequency_opt: None,
203            max_size_opt: None,
204        }
205    }
206
207    /// Sets a condition to rollover on the given frequency
208    pub fn frequency(mut self, x: RollingFrequency) -> RollingCondition {
209        self.frequency_opt = Some(x);
210        self
211    }
212
213    /// Sets a condition to rollover when the date changes
214    pub fn daily(mut self) -> RollingCondition {
215        self.frequency_opt = Some(RollingFrequency::EveryDay);
216        self
217    }
218
219    /// Sets a condition to rollover when the date or hour changes
220    pub fn hourly(mut self) -> RollingCondition {
221        self.frequency_opt = Some(RollingFrequency::EveryHour);
222        self
223    }
224
225    pub fn minutely(mut self) -> RollingCondition {
226        self.frequency_opt = Some(RollingFrequency::EveryMinute);
227        self
228    }
229
230    /// Sets a condition to rollover when a certain size is reached
231    pub fn max_size(mut self, x: u64) -> RollingCondition {
232        self.max_size_opt = Some(x);
233        self
234    }
235}
236
237impl RollingCondition {
238    fn should_rollover(&mut self, now: &DateTime<Local>, current_filesize: u64) -> bool {
239        let mut rollover = false;
240        if let Some(frequency) = self.frequency_opt.as_ref() {
241            if let Some(last_write) = self.last_write_opt.as_ref() {
242                if frequency.equivalent_datetime(now) != frequency.equivalent_datetime(last_write) {
243                    rollover = true;
244                }
245            }
246        }
247        if let Some(max_size) = self.max_size_opt.as_ref() {
248            if current_filesize >= *max_size {
249                rollover = true;
250            }
251        }
252        self.last_write_opt = Some(*now);
253        rollover
254    }
255}
256
257pub struct RollingLogger {
258    condition: RollingCondition,
259    prefix: String,
260    folder: String,
261    max_files: usize,
262    writer_buffer: Option<BufWriter<File>>,
263    current_file_size: u64,
264    time_fmt_str: String,
265    cached_date_time: (
266        u64,    /* unix_timestamp_sec */
267        String, /* date_time_str_without_subsec */
268    ),
269}
270
271impl RollingLogger {
272    pub fn new(
273        rc: RollingCondition,
274        time_fmt_str: String,
275        folder: String,
276        prefix: String,
277        max_files: usize,
278    ) -> Self {
279        if std::fs::metadata(&folder).is_err() {
280            std::fs::create_dir_all(&folder).expect("Failed to create log folder");
281        }
282
283        let mut rolling_logger = RollingLogger {
284            condition: rc,
285            prefix,
286            folder,
287            max_files,
288            time_fmt_str,
289            writer_buffer: None,
290            current_file_size: 0,
291            cached_date_time: (0, "".into()),
292        };
293        rolling_logger
294            .open_writer_if_needed(&Local::now())
295            .expect("Failed to open log file");
296        rolling_logger
297    }
298}
299
300pub struct LoggerGuard;
301
302impl Drop for LoggerGuard {
303    fn drop(&mut self) {
304        crate::Logger::finish();
305    }
306}
307
308pub struct Logger {
309    rc: RollingCondition,
310    folder: String,
311    prefix: String,
312    max_files: usize,
313    cpu: Option<usize>,
314    queue_size: usize,
315    sleep_duration_nanos: u64,
316    thread_name: String,
317    set_std_log: bool,
318    time_format_str: Option<String>,
319    sender: Option<crossbeam_channel::Sender<LoggingFunc>>,
320    status: Arc<AtomicU8>, /* 0->uninit, 1->inited, 2->require to flush, 3->require to stop, 4->stopped */
321}
322
323impl Logger {
324    pub fn finish() {
325        let mut finish_flag = GLOBAL_LOGGER_STOP_FLAG.lock().unwrap();
326        // we can only finish logger once
327        if !(*finish_flag) {
328            *finish_flag = true;
329            GLOBAL_LOGGER
330                .get()
331                .unwrap()
332                .status
333                .store(3, std::sync::atomic::Ordering::Relaxed);
334            while GLOBAL_LOGGER
335                .get()
336                .unwrap()
337                .status
338                .load(std::sync::atomic::Ordering::Relaxed)
339                != 4
340            {
341                thread::sleep(Duration::from_micros(100));
342            }
343        }
344    }
345    pub fn flush() {
346        GLOBAL_LOGGER
347            .get()
348            .unwrap()
349            .status
350            .store(2, std::sync::atomic::Ordering::Relaxed);
351    }
352    pub fn new(rc: RollingCondition, folder: String, prefix: String) -> Self {
353        Logger {
354            rc,
355            folder,
356            prefix,
357            max_files: consts::MAX_KEEP_FILE,
358            cpu: None,
359            set_std_log: false,
360            time_format_str: None,
361            queue_size: consts::MAX_QUEUE_SIZE,
362            sleep_duration_nanos: consts::BACKGROUND_SLEEP_TIME_STEP_NANOS,
363            thread_name: String::from("low_latency_log"),
364            sender: None,
365            status: Arc::new(AtomicU8::new(0)),
366        }
367    }
368
369    pub fn cpu(mut self, cpu: usize) -> Self {
370        self.cpu = Some(cpu);
371        self
372    }
373
374    pub fn max_files(mut self, max_files: usize) -> Self {
375        self.max_files = max_files;
376        self
377    }
378
379    pub fn queue_size(mut self, queue_size: usize) -> Self {
380        self.queue_size = queue_size;
381        self
382    }
383    pub fn std_log(mut self, set: bool) -> Self {
384        self.set_std_log = set;
385        self
386    }
387    pub fn time_format_str(mut self, fmt: &str) -> Self {
388        self.time_format_str = Some(fmt.into());
389        self
390    }
391    pub fn background_sleep_time_step_nanos(mut self, nanos: u64) -> Self {
392        self.sleep_duration_nanos = nanos;
393        self
394    }
395
396    pub fn init(mut self) -> io::Result<LoggerGuard> {
397        let (tx, rx) = match self.queue_size {
398            0 => crossbeam_channel::unbounded(),
399            _ => crossbeam_channel::bounded(self.queue_size),
400        };
401
402        self.sender = Some(tx);
403
404        let time_fmt_str = if self.time_format_str.is_none() {
405            TIME_FORMAT_STR.into()
406        } else {
407            self.time_format_str.as_ref().unwrap().clone()
408        };
409        let mut rolling_logger = RollingLogger::new(
410            self.rc,
411            time_fmt_str,
412            self.folder.clone(),
413            self.prefix.clone(),
414            self.max_files,
415        );
416
417        let status = self.status.clone();
418
419        let _a = thread::Builder::new()
420            .name(self.thread_name.to_string())
421            .spawn(move || {
422                if let Some(core) = self.cpu {
423                    core_affinity::set_for_current(CoreId { id: core });
424                }
425                status.store(1, std::sync::atomic::Ordering::Relaxed); // set logger initted
426                loop {
427                    match rx.try_recv() {
428                        Ok(cmd) => {
429                            Self::process_log_command(cmd, &mut rolling_logger);
430                        }
431                        Err(e) => {
432                            let st = status.load(std::sync::atomic::Ordering::Relaxed);
433                            if st == 2 {
434                                // check if require to flush
435                                let _ = rolling_logger.flush();
436                                status.store(1, std::sync::atomic::Ordering::Relaxed);
437                            } else if st == 3 {
438                                // check if require to stop
439                                let _ = rolling_logger.flush();
440                                break;
441                            }
442                            match e {
443                                crossbeam_channel::TryRecvError::Empty => {
444                                    let _ = rolling_logger.flush();
445                                    thread::sleep(Duration::from_nanos(self.sleep_duration_nanos));
446                                }
447                                crossbeam_channel::TryRecvError::Disconnected => {
448                                    let _ = rolling_logger.flush();
449                                    break;
450                                }
451                            }
452                        }
453                    }
454                }
455                status.store(4, std::sync::atomic::Ordering::Relaxed); // set logger stopped
456            });
457
458        let set_std_logger = self.set_std_log;
459        let _ = GLOBAL_LOGGER.set(self);
460        if set_std_logger {
461            let fast_logger = log_proxy::LogProxy::default();
462            log::set_max_level(LogLevel::Info.into());
463            log::set_boxed_logger(Box::new(fast_logger)).unwrap();
464        }
465        Ok(LoggerGuard)
466    }
467
468    fn process_log_command(cmd: LoggingFunc, rolling_logger: &mut RollingLogger) {
469        cmd.invoke(rolling_logger);
470    }
471
472    pub fn log(&self, func: LoggingFunc) {
473        match &self.sender {
474            Some(tx) => {
475                tx.send(func).unwrap();
476            }
477            None => (),
478        }
479    }
480}
481
482impl RollingLogger {
483    fn flush(&mut self) -> io::Result<()> {
484        if let Some(writer) = self.writer_buffer.as_mut() {
485            writer.flush()?;
486        }
487        Ok(())
488    }
489    pub fn rollover(&mut self) -> io::Result<()> {
490        self.flush()?;
491        // We must close the current file before rotating files
492        self.writer_buffer.take();
493        self.current_file_size = 0;
494        Ok(())
495    }
496
497    fn new_file_name(&self, now: &DateTime<Local>) -> String {
498        let mut str = String::with_capacity(self.prefix.len() + 16);
499        str.push_str(self.prefix.as_str());
500        str.push('.');
501        str.push_str(now.format("%Y%m%d.%H%M%S").to_string().as_str());
502        str
503    }
504    /// Opens a writer for the current file.
505    fn open_writer_if_needed(&mut self, now: &DateTime<Local>) -> io::Result<()> {
506        if self.writer_buffer.is_none() {
507            let p = self.new_file_name(now);
508            let new_file_path = std::path::Path::new(&self.folder).join(&p);
509            if std::fs::metadata(&self.folder).is_err() {
510                std::fs::create_dir_all(&self.folder)?;
511            }
512            let f = std::fs::OpenOptions::new()
513                .append(true)
514                .create(true)
515                .open(&new_file_path)?;
516            self.writer_buffer = Some(BufWriter::with_capacity(1024 * 1024, f));
517            // make a soft link to latest file
518            {
519                let folder = std::path::Path::new(&self.folder);
520                if let Ok(path) = folder.canonicalize() {
521                    let latest_log_symlink = path.join(&self.prefix);
522                    let _ = remove_symlink_auto(folder.join(&self.prefix));
523                    let _ = symlink_auto(new_file_path.canonicalize().unwrap(), latest_log_symlink);
524                }
525            }
526            self.current_file_size = std::fs::metadata(&p).map_or(0, |m| m.len());
527            self.check_and_remove_log_file()?;
528        }
529        Ok(())
530    }
531
532    pub fn rollate_with_datetime(&mut self, time_point: &DateTime<Local>) -> io::Result<()> {
533        if self
534            .condition
535            .should_rollover(time_point, self.current_file_size)
536        {
537            if let Err(e) = self.rollover() {
538                eprintln!("WARNING: Failed to rotate logfile  {}", e);
539            }
540        }
541        self.open_writer_if_needed(time_point)?;
542        Ok(())
543    }
544
545    pub fn write_to_buffer(&mut self, buf: &[u8]) -> io::Result<usize> {
546        let writer = self.writer_buffer.as_mut().unwrap();
547        let buf_len = buf.len();
548        writer.write_all(buf).map(|_| {
549            self.current_file_size += u64::try_from(buf_len).unwrap_or(u64::MAX);
550            buf_len
551        })
552    }
553
554    pub fn write_date_time_str(&mut self, unix_timestamp_ns: u64) {
555        let now_sec: u64 = unix_timestamp_ns / 1_000_000_000;
556        let data_str_array = {
557            let cached_timestamp_sec = self.cached_date_time.0;
558            if now_sec != cached_timestamp_sec {
559                // if cached timestamp is not the same as now
560                let local_date_time =
561                    DateTime::from_timestamp_nanos(unix_timestamp_ns as i64).with_timezone(&Local);
562                let _ = self.rollate_with_datetime(&local_date_time); // rollate if needed
563                {
564                    // update cached date time
565                    let cached = &mut self.cached_date_time;
566                    cached.0 = now_sec;
567                    cached.1 = local_date_time
568                        .format(self.time_fmt_str.as_str())
569                        .to_string();
570                }
571            }
572            self.cached_date_time.1.as_bytes()
573        };
574        let writer = self.writer_buffer.as_mut().unwrap();
575        let _ = writer.write_all(data_str_array).map(|_| {
576            self.current_file_size += u64::try_from(data_str_array.len()).unwrap_or(u64::MAX);
577        });
578
579        uwrite!(self, ".{} ", unix_timestamp_ns - (now_sec * 1_000_000_000)).unwrap();
580    }
581
582    fn check_and_remove_log_file(&mut self) -> io::Result<()> {
583        let files = std::fs::read_dir(&self.folder)?;
584
585        let mut log_files = vec![];
586        for f in files.flatten() {
587            let fname = f.file_name().to_string_lossy().to_string();
588            if fname.starts_with(&self.prefix) && fname != self.prefix {
589                log_files.push(fname);
590            }
591        }
592
593        log_files.sort_by(|a, b| b.cmp(a));
594
595        if log_files.len() > self.max_files {
596            for f in log_files.drain(self.max_files..) {
597                let p = Path::new(&self.folder).join(f);
598                if let Err(e) = fs::remove_file(&p) {
599                    eprintln!(
600                        "WARNING: Failed to remove old logfile {}: {}",
601                        p.to_string_lossy(),
602                        e
603                    );
604                }
605            }
606        }
607        Ok(())
608    }
609}
610
611impl ufmt::uWrite for RollingLogger {
612    type Error = std::io::Error;
613
614    fn write_str(&mut self, s: &str) -> Result<(), std::io::Error> {
615        self.write_to_buffer(s.as_bytes())?;
616        Ok(())
617    }
618}
619
620#[allow(dead_code)]
621impl RollingLogger {
622    #[inline]
623    fn write_char(&mut self, s: char) -> Result<usize, std::io::Error> {
624        self.write_to_buffer(&[s as u8])
625    }
626    #[inline]
627    fn write_str(&mut self, s: &str) -> Result<usize, std::io::Error> {
628        self.write_to_buffer(s.as_bytes())
629    }
630    #[inline]
631    fn write_bytes(&mut self, s: &[u8]) -> Result<usize, std::io::Error> {
632        self.write_to_buffer(s)
633    }
634    #[inline]
635    fn write_u32(&mut self, n: u32) -> Result<(), std::io::Error> {
636        let writer_buffer = self.writer_buffer.as_mut().unwrap();
637        fmt_utils::write_u32(n, writer_buffer)
638    }
639}
640
641impl Drop for Logger {
642    fn drop(&mut self) {
643        Self::finish();
644    }
645}
646
647pub fn logger() -> &'static Logger {
648    GLOBAL_LOGGER.get().unwrap()
649}
650
651#[cfg(test)]
652mod tests {
653    use super::*;
654
655    #[test]
656    fn test_log_func_size() {
657        let size = std::mem::size_of::<LoggingFunc>();
658        println!("The size of LoggingFunc is: {}", size);
659        assert!(size <= 64);
660    }
661}