1extern crate core;
2
3use chrono::prelude::*;
4use core_affinity::CoreId;
5use once_cell::sync::OnceCell;
6use std::borrow::Cow;
7use std::fs::{self, File};
8use std::io::{self, BufWriter, Write};
9use std::path::Path;
10use std::sync::atomic::AtomicU8;
11use std::sync::Arc;
12use std::thread;
13use std::time::Duration;
14use ufmt::{uwrite, uwriteln};
15
16use symlink::{remove_symlink_auto, symlink_auto};
17
18pub mod internal;
19pub mod log_proxy;
20pub mod macros;
21
22mod consts;
23mod fmt_utils;
24
25pub static GLOBAL_LOGGER: OnceCell<Logger> = OnceCell::new();
26pub static GLOBAL_LOGGER_STOP_FLAG: once_cell::sync::Lazy<std::sync::Mutex<bool>> =
27 once_cell::sync::Lazy::new(|| std::sync::Mutex::new(false));
28
29const TIME_FORMAT_STR: &str = "%H:%M:%S";
30
31thread_local! {
32 pub static TID: std::cell::Cell<&'static str> = std::cell::Cell::new(Box::leak(format!("{}", gettid::gettid()).into_boxed_str()));
33}
34
35pub struct UString(pub String);
36impl ufmt::uWrite for UString {
37 type Error = std::io::Error;
38
39 fn write_str(&mut self, s: &str) -> Result<(), std::io::Error> {
40 self.0.push_str(s);
41 Ok(())
42 }
43}
44impl ufmt::uDisplay for UString {
45 fn fmt<W>(&self, f: &mut ufmt::Formatter<'_, W>) -> Result<(), W::Error>
46 where
47 W: ufmt::uWrite + ?Sized,
48 {
49 <str as ufmt::uDisplay>::fmt(&self.0, f)
50 }
51}
52
53#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
54pub enum LogLevel {
55 Trace = 0,
56 Debug = 1,
57 Info = 2,
58 Warn = 3,
59 Error = 4,
60 Off = 99,
61}
62
63impl From<log::Level> for LogLevel {
64 fn from(value: log::Level) -> Self {
65 match value {
66 log::Level::Trace => LogLevel::Trace,
67 log::Level::Debug => LogLevel::Debug,
68 log::Level::Info => LogLevel::Info,
69 log::Level::Warn => LogLevel::Warn,
70 log::Level::Error => LogLevel::Error,
71 }
72 }
73}
74impl From<LogLevel> for log::LevelFilter {
75 fn from(value: LogLevel) -> Self {
76 match value {
77 LogLevel::Trace => log::LevelFilter::Trace,
78 LogLevel::Debug => log::LevelFilter::Debug,
79 LogLevel::Info => log::LevelFilter::Info,
80 LogLevel::Warn => log::LevelFilter::Warn,
81 LogLevel::Error => log::LevelFilter::Error,
82 LogLevel::Off => log::LevelFilter::Off,
83 }
84 }
85}
86impl From<&str> for LogLevel {
87 fn from(value: &str) -> Self {
88 match value {
89 "TRACE" | "trace" | "Trace" => LogLevel::Trace,
90 "DEBUG" | "debug" | "Debug" => LogLevel::Debug,
91 "INFO" | "info" | "Info" => LogLevel::Info,
92 "WARN" | "warn" | "Warn" => LogLevel::Warn,
93 "ERROR" | "error" | "Error" => LogLevel::Error,
94 "OFF" | "off" | "Off" => LogLevel::Off,
95 _ => LogLevel::Info,
96 }
97 }
98}
99
100impl LogLevel {
101 pub fn to_str(&self) -> &'static str {
102 match self {
103 LogLevel::Debug => "DEBUG",
104 LogLevel::Info => "INFO",
105 LogLevel::Warn => "WARN",
106 LogLevel::Error => "ERROR",
107 LogLevel::Trace => "TRACE",
108 LogLevel::Off => "OFF",
109 }
110 }
111}
112
113pub struct LoggingFunc {
117 func: Box<dyn Fn() -> Cow<'static, str> + Send>,
118 file: &'static str,
119 line: u32,
120 tid: &'static str,
121 level: LogLevel,
122 system_time: u64,
123}
124
125impl LoggingFunc {
126 #[allow(dead_code)]
127 pub fn new<T>(
128 func: T,
129 file: &'static str,
130 line: u32,
131 tid: &'static str,
132 lvl: LogLevel,
133 system_time: u64,
134 ) -> LoggingFunc
135 where
136 T: Fn() -> Cow<'static, str> + 'static + Send,
137 {
138 LoggingFunc {
139 func: Box::new(func),
140 file,
141 line,
142 tid,
143 level: lvl,
144 system_time,
145 }
146 }
147 fn invoke(self, rolling_logger: &mut RollingLogger) {
148 rolling_logger.write_date_time_str(self.system_time);
149 let output = (self.func)();
150 let output_str = output.as_ref();
151
152 let _ = uwriteln!(
153 rolling_logger,
154 "[{}] {}:{} {} {}",
155 self.tid,
156 self.file,
157 self.line,
158 self.level.to_str(),
159 output_str
160 );
161 }
162}
163
164#[derive(Copy, Clone, Debug, Eq, PartialEq)]
166pub enum RollingFrequency {
167 EveryDay,
168 EveryHour,
169 EveryMinute,
170}
171
172impl RollingFrequency {
173 pub fn equivalent_datetime(&self, dt: &DateTime<Local>) -> DateTime<Local> {
176 match self {
177 RollingFrequency::EveryDay => Local
178 .with_ymd_and_hms(dt.year(), dt.month(), dt.day(), 0, 0, 0)
179 .unwrap(),
180 RollingFrequency::EveryHour => Local
181 .with_ymd_and_hms(dt.year(), dt.month(), dt.day(), dt.hour(), 0, 0)
182 .unwrap(),
183 RollingFrequency::EveryMinute => Local
184 .with_ymd_and_hms(dt.year(), dt.month(), dt.day(), dt.hour(), dt.minute(), 0)
185 .unwrap(),
186 }
187 }
188}
189
190#[derive(Copy, Clone, Default, Debug, Eq, PartialEq)]
191pub struct RollingCondition {
192 last_write_opt: Option<DateTime<Local>>,
193 frequency_opt: Option<RollingFrequency>,
194 max_size_opt: Option<u64>,
195}
196
197impl RollingCondition {
198 pub fn new() -> RollingCondition {
200 RollingCondition {
201 last_write_opt: Some(Local::now()),
202 frequency_opt: None,
203 max_size_opt: None,
204 }
205 }
206
207 pub fn frequency(mut self, x: RollingFrequency) -> RollingCondition {
209 self.frequency_opt = Some(x);
210 self
211 }
212
213 pub fn daily(mut self) -> RollingCondition {
215 self.frequency_opt = Some(RollingFrequency::EveryDay);
216 self
217 }
218
219 pub fn hourly(mut self) -> RollingCondition {
221 self.frequency_opt = Some(RollingFrequency::EveryHour);
222 self
223 }
224
225 pub fn minutely(mut self) -> RollingCondition {
226 self.frequency_opt = Some(RollingFrequency::EveryMinute);
227 self
228 }
229
230 pub fn max_size(mut self, x: u64) -> RollingCondition {
232 self.max_size_opt = Some(x);
233 self
234 }
235}
236
237impl RollingCondition {
238 fn should_rollover(&mut self, now: &DateTime<Local>, current_filesize: u64) -> bool {
239 let mut rollover = false;
240 if let Some(frequency) = self.frequency_opt.as_ref() {
241 if let Some(last_write) = self.last_write_opt.as_ref() {
242 if frequency.equivalent_datetime(now) != frequency.equivalent_datetime(last_write) {
243 rollover = true;
244 }
245 }
246 }
247 if let Some(max_size) = self.max_size_opt.as_ref() {
248 if current_filesize >= *max_size {
249 rollover = true;
250 }
251 }
252 self.last_write_opt = Some(*now);
253 rollover
254 }
255}
256
257pub struct RollingLogger {
258 condition: RollingCondition,
259 prefix: String,
260 folder: String,
261 max_files: usize,
262 writer_buffer: Option<BufWriter<File>>,
263 current_file_size: u64,
264 time_fmt_str: String,
265 cached_date_time: (
266 u64, String, ),
269}
270
271impl RollingLogger {
272 pub fn new(
273 rc: RollingCondition,
274 time_fmt_str: String,
275 folder: String,
276 prefix: String,
277 max_files: usize,
278 ) -> Self {
279 if std::fs::metadata(&folder).is_err() {
280 std::fs::create_dir_all(&folder).expect("Failed to create log folder");
281 }
282
283 let mut rolling_logger = RollingLogger {
284 condition: rc,
285 prefix,
286 folder,
287 max_files,
288 time_fmt_str,
289 writer_buffer: None,
290 current_file_size: 0,
291 cached_date_time: (0, "".into()),
292 };
293 rolling_logger
294 .open_writer_if_needed(&Local::now())
295 .expect("Failed to open log file");
296 rolling_logger
297 }
298}
299
300pub struct LoggerGuard;
301
302impl Drop for LoggerGuard {
303 fn drop(&mut self) {
304 crate::Logger::finish();
305 }
306}
307
308pub struct Logger {
309 rc: RollingCondition,
310 folder: String,
311 prefix: String,
312 max_files: usize,
313 cpu: Option<usize>,
314 queue_size: usize,
315 sleep_duration_nanos: u64,
316 thread_name: String,
317 set_std_log: bool,
318 time_format_str: Option<String>,
319 sender: Option<crossbeam_channel::Sender<LoggingFunc>>,
320 status: Arc<AtomicU8>, }
322
323impl Logger {
324 pub fn finish() {
325 let mut finish_flag = GLOBAL_LOGGER_STOP_FLAG.lock().unwrap();
326 if !(*finish_flag) {
328 *finish_flag = true;
329 GLOBAL_LOGGER
330 .get()
331 .unwrap()
332 .status
333 .store(3, std::sync::atomic::Ordering::Relaxed);
334 while GLOBAL_LOGGER
335 .get()
336 .unwrap()
337 .status
338 .load(std::sync::atomic::Ordering::Relaxed)
339 != 4
340 {
341 thread::sleep(Duration::from_micros(100));
342 }
343 }
344 }
345 pub fn flush() {
346 GLOBAL_LOGGER
347 .get()
348 .unwrap()
349 .status
350 .store(2, std::sync::atomic::Ordering::Relaxed);
351 }
352 pub fn new(rc: RollingCondition, folder: String, prefix: String) -> Self {
353 Logger {
354 rc,
355 folder,
356 prefix,
357 max_files: consts::MAX_KEEP_FILE,
358 cpu: None,
359 set_std_log: false,
360 time_format_str: None,
361 queue_size: consts::MAX_QUEUE_SIZE,
362 sleep_duration_nanos: consts::BACKGROUND_SLEEP_TIME_STEP_NANOS,
363 thread_name: String::from("low_latency_log"),
364 sender: None,
365 status: Arc::new(AtomicU8::new(0)),
366 }
367 }
368
369 pub fn cpu(mut self, cpu: usize) -> Self {
370 self.cpu = Some(cpu);
371 self
372 }
373
374 pub fn max_files(mut self, max_files: usize) -> Self {
375 self.max_files = max_files;
376 self
377 }
378
379 pub fn queue_size(mut self, queue_size: usize) -> Self {
380 self.queue_size = queue_size;
381 self
382 }
383 pub fn std_log(mut self, set: bool) -> Self {
384 self.set_std_log = set;
385 self
386 }
387 pub fn time_format_str(mut self, fmt: &str) -> Self {
388 self.time_format_str = Some(fmt.into());
389 self
390 }
391 pub fn background_sleep_time_step_nanos(mut self, nanos: u64) -> Self {
392 self.sleep_duration_nanos = nanos;
393 self
394 }
395
396 pub fn init(mut self) -> io::Result<LoggerGuard> {
397 let (tx, rx) = match self.queue_size {
398 0 => crossbeam_channel::unbounded(),
399 _ => crossbeam_channel::bounded(self.queue_size),
400 };
401
402 self.sender = Some(tx);
403
404 let time_fmt_str = if self.time_format_str.is_none() {
405 TIME_FORMAT_STR.into()
406 } else {
407 self.time_format_str.as_ref().unwrap().clone()
408 };
409 let mut rolling_logger = RollingLogger::new(
410 self.rc,
411 time_fmt_str,
412 self.folder.clone(),
413 self.prefix.clone(),
414 self.max_files,
415 );
416
417 let status = self.status.clone();
418
419 let _a = thread::Builder::new()
420 .name(self.thread_name.to_string())
421 .spawn(move || {
422 if let Some(core) = self.cpu {
423 core_affinity::set_for_current(CoreId { id: core });
424 }
425 status.store(1, std::sync::atomic::Ordering::Relaxed); loop {
427 match rx.try_recv() {
428 Ok(cmd) => {
429 Self::process_log_command(cmd, &mut rolling_logger);
430 }
431 Err(e) => {
432 let st = status.load(std::sync::atomic::Ordering::Relaxed);
433 if st == 2 {
434 let _ = rolling_logger.flush();
436 status.store(1, std::sync::atomic::Ordering::Relaxed);
437 } else if st == 3 {
438 let _ = rolling_logger.flush();
440 break;
441 }
442 match e {
443 crossbeam_channel::TryRecvError::Empty => {
444 let _ = rolling_logger.flush();
445 thread::sleep(Duration::from_nanos(self.sleep_duration_nanos));
446 }
447 crossbeam_channel::TryRecvError::Disconnected => {
448 let _ = rolling_logger.flush();
449 break;
450 }
451 }
452 }
453 }
454 }
455 status.store(4, std::sync::atomic::Ordering::Relaxed); });
457
458 let set_std_logger = self.set_std_log;
459 let _ = GLOBAL_LOGGER.set(self);
460 if set_std_logger {
461 let fast_logger = log_proxy::LogProxy::default();
462 log::set_max_level(LogLevel::Info.into());
463 log::set_boxed_logger(Box::new(fast_logger)).unwrap();
464 }
465 Ok(LoggerGuard)
466 }
467
468 fn process_log_command(cmd: LoggingFunc, rolling_logger: &mut RollingLogger) {
469 cmd.invoke(rolling_logger);
470 }
471
472 pub fn log(&self, func: LoggingFunc) {
473 match &self.sender {
474 Some(tx) => {
475 tx.send(func).unwrap();
476 }
477 None => (),
478 }
479 }
480}
481
482impl RollingLogger {
483 fn flush(&mut self) -> io::Result<()> {
484 if let Some(writer) = self.writer_buffer.as_mut() {
485 writer.flush()?;
486 }
487 Ok(())
488 }
489 pub fn rollover(&mut self) -> io::Result<()> {
490 self.flush()?;
491 self.writer_buffer.take();
493 self.current_file_size = 0;
494 Ok(())
495 }
496
497 fn new_file_name(&self, now: &DateTime<Local>) -> String {
498 let mut str = String::with_capacity(self.prefix.len() + 16);
499 str.push_str(self.prefix.as_str());
500 str.push('.');
501 str.push_str(now.format("%Y%m%d.%H%M%S").to_string().as_str());
502 str
503 }
504 fn open_writer_if_needed(&mut self, now: &DateTime<Local>) -> io::Result<()> {
506 if self.writer_buffer.is_none() {
507 let p = self.new_file_name(now);
508 let new_file_path = std::path::Path::new(&self.folder).join(&p);
509 if std::fs::metadata(&self.folder).is_err() {
510 std::fs::create_dir_all(&self.folder)?;
511 }
512 let f = std::fs::OpenOptions::new()
513 .append(true)
514 .create(true)
515 .open(&new_file_path)?;
516 self.writer_buffer = Some(BufWriter::with_capacity(1024 * 1024, f));
517 {
519 let folder = std::path::Path::new(&self.folder);
520 if let Ok(path) = folder.canonicalize() {
521 let latest_log_symlink = path.join(&self.prefix);
522 let _ = remove_symlink_auto(folder.join(&self.prefix));
523 let _ = symlink_auto(new_file_path.canonicalize().unwrap(), latest_log_symlink);
524 }
525 }
526 self.current_file_size = std::fs::metadata(&p).map_or(0, |m| m.len());
527 self.check_and_remove_log_file()?;
528 }
529 Ok(())
530 }
531
532 pub fn rollate_with_datetime(&mut self, time_point: &DateTime<Local>) -> io::Result<()> {
533 if self
534 .condition
535 .should_rollover(time_point, self.current_file_size)
536 {
537 if let Err(e) = self.rollover() {
538 eprintln!("WARNING: Failed to rotate logfile {}", e);
539 }
540 }
541 self.open_writer_if_needed(time_point)?;
542 Ok(())
543 }
544
545 pub fn write_to_buffer(&mut self, buf: &[u8]) -> io::Result<usize> {
546 let writer = self.writer_buffer.as_mut().unwrap();
547 let buf_len = buf.len();
548 writer.write_all(buf).map(|_| {
549 self.current_file_size += u64::try_from(buf_len).unwrap_or(u64::MAX);
550 buf_len
551 })
552 }
553
554 pub fn write_date_time_str(&mut self, unix_timestamp_ns: u64) {
555 let now_sec: u64 = unix_timestamp_ns / 1_000_000_000;
556 let data_str_array = {
557 let cached_timestamp_sec = self.cached_date_time.0;
558 if now_sec != cached_timestamp_sec {
559 let local_date_time =
561 DateTime::from_timestamp_nanos(unix_timestamp_ns as i64).with_timezone(&Local);
562 let _ = self.rollate_with_datetime(&local_date_time); {
564 let cached = &mut self.cached_date_time;
566 cached.0 = now_sec;
567 cached.1 = local_date_time
568 .format(self.time_fmt_str.as_str())
569 .to_string();
570 }
571 }
572 self.cached_date_time.1.as_bytes()
573 };
574 let writer = self.writer_buffer.as_mut().unwrap();
575 let _ = writer.write_all(data_str_array).map(|_| {
576 self.current_file_size += u64::try_from(data_str_array.len()).unwrap_or(u64::MAX);
577 });
578
579 uwrite!(self, ".{} ", unix_timestamp_ns - (now_sec * 1_000_000_000)).unwrap();
580 }
581
582 fn check_and_remove_log_file(&mut self) -> io::Result<()> {
583 let files = std::fs::read_dir(&self.folder)?;
584
585 let mut log_files = vec![];
586 for f in files.flatten() {
587 let fname = f.file_name().to_string_lossy().to_string();
588 if fname.starts_with(&self.prefix) && fname != self.prefix {
589 log_files.push(fname);
590 }
591 }
592
593 log_files.sort_by(|a, b| b.cmp(a));
594
595 if log_files.len() > self.max_files {
596 for f in log_files.drain(self.max_files..) {
597 let p = Path::new(&self.folder).join(f);
598 if let Err(e) = fs::remove_file(&p) {
599 eprintln!(
600 "WARNING: Failed to remove old logfile {}: {}",
601 p.to_string_lossy(),
602 e
603 );
604 }
605 }
606 }
607 Ok(())
608 }
609}
610
611impl ufmt::uWrite for RollingLogger {
612 type Error = std::io::Error;
613
614 fn write_str(&mut self, s: &str) -> Result<(), std::io::Error> {
615 self.write_to_buffer(s.as_bytes())?;
616 Ok(())
617 }
618}
619
620#[allow(dead_code)]
621impl RollingLogger {
622 #[inline]
623 fn write_char(&mut self, s: char) -> Result<usize, std::io::Error> {
624 self.write_to_buffer(&[s as u8])
625 }
626 #[inline]
627 fn write_str(&mut self, s: &str) -> Result<usize, std::io::Error> {
628 self.write_to_buffer(s.as_bytes())
629 }
630 #[inline]
631 fn write_bytes(&mut self, s: &[u8]) -> Result<usize, std::io::Error> {
632 self.write_to_buffer(s)
633 }
634 #[inline]
635 fn write_u32(&mut self, n: u32) -> Result<(), std::io::Error> {
636 let writer_buffer = self.writer_buffer.as_mut().unwrap();
637 fmt_utils::write_u32(n, writer_buffer)
638 }
639}
640
641impl Drop for Logger {
642 fn drop(&mut self) {
643 Self::finish();
644 }
645}
646
647pub fn logger() -> &'static Logger {
648 GLOBAL_LOGGER.get().unwrap()
649}
650
651#[cfg(test)]
652mod tests {
653 use super::*;
654
655 #[test]
656 fn test_log_func_size() {
657 let size = std::mem::size_of::<LoggingFunc>();
658 println!("The size of LoggingFunc is: {}", size);
659 assert!(size <= 64);
660 }
661}