Skip to main content

rivet_logger/
logger.rs

1use std::cell::Cell;
2use std::error::Error;
3use std::sync::Arc;
4
5use time::{OffsetDateTime, UtcOffset};
6
7mod error;
8mod handler;
9mod level;
10mod processor;
11mod record;
12mod value;
13
14pub use crate::processors::{
15    ClosureContext, Git, Hostname, Introspection, LoadAverage, LoadAverageWindow, MemoryPeakUsage,
16    MemoryUsage, Mercurial, ProcessId, PsrLogMessage, Tag, Uid, Web,
17};
18pub use error::LoggerError;
19pub use handler::Handler;
20pub use level::{IntoLevel, Level};
21pub use processor::Processor;
22pub use record::{Context, LogRecord};
23pub use value::{DeferredValue, LogValue};
24
25pub type BoxError = Box<dyn Error + Send + Sync + 'static>;
26type ExceptionHandler = dyn Fn(&(dyn Error + Send + Sync + 'static), &LogRecord) + Send + Sync;
27
28thread_local! {
29    static LOG_DEPTH: Cell<usize> = const { Cell::new(0) };
30}
31
32#[derive(Clone)]
33pub struct Logger {
34    name: String,
35    handlers: Vec<Arc<dyn Handler>>,
36    processors: Vec<Arc<dyn Processor>>,
37    microsecond_timestamps: bool,
38    timezone: UtcOffset,
39    exception_handler: Option<Arc<ExceptionHandler>>,
40    detect_cycles: bool,
41}
42
43impl Logger {
44    pub fn new(name: impl Into<String>) -> Self {
45        Self {
46            name: name.into(),
47            handlers: Vec::new(),
48            processors: Vec::new(),
49            microsecond_timestamps: true,
50            timezone: UtcOffset::UTC,
51            exception_handler: None,
52            detect_cycles: true,
53        }
54    }
55
56    pub fn get_name(&self) -> &str {
57        &self.name
58    }
59
60    pub fn with_name(&self, name: impl Into<String>) -> Self {
61        let mut new = self.clone();
62        new.name = name.into();
63        new
64    }
65
66    pub fn push_handler(&mut self, handler: Arc<dyn Handler>) -> &mut Self {
67        self.handlers.insert(0, handler);
68        self
69    }
70
71    pub fn pop_handler(&mut self) -> Result<Arc<dyn Handler>, LoggerError> {
72        if self.handlers.is_empty() {
73            return Err(LoggerError::EmptyHandlerStack);
74        }
75
76        Ok(self.handlers.remove(0))
77    }
78
79    pub fn set_handlers(&mut self, handlers: Vec<Arc<dyn Handler>>) -> &mut Self {
80        self.handlers.clear();
81        for handler in handlers.into_iter().rev() {
82            self.push_handler(handler);
83        }
84        self
85    }
86
87    pub fn get_handlers(&self) -> &[Arc<dyn Handler>] {
88        &self.handlers
89    }
90
91    pub fn push_processor(&mut self, processor: Arc<dyn Processor>) -> &mut Self {
92        self.processors.insert(0, processor);
93        self
94    }
95
96    pub fn pop_processor(&mut self) -> Result<Arc<dyn Processor>, LoggerError> {
97        if self.processors.is_empty() {
98            return Err(LoggerError::EmptyProcessorStack);
99        }
100
101        Ok(self.processors.remove(0))
102    }
103
104    pub fn get_processors(&self) -> &[Arc<dyn Processor>] {
105        &self.processors
106    }
107
108    pub fn use_microsecond_timestamps(&mut self, micro: bool) -> &mut Self {
109        self.microsecond_timestamps = micro;
110        self
111    }
112
113    pub fn use_logging_loop_detection(&mut self, detect_cycles: bool) -> &mut Self {
114        self.detect_cycles = detect_cycles;
115        self
116    }
117
118    pub fn set_timezone(&mut self, timezone: UtcOffset) -> &mut Self {
119        self.timezone = timezone;
120        self
121    }
122
123    pub fn get_timezone(&self) -> UtcOffset {
124        self.timezone
125    }
126
127    pub fn set_exception_handler(&mut self, callback: Option<Arc<ExceptionHandler>>) -> &mut Self {
128        self.exception_handler = callback;
129        self
130    }
131
132    pub fn add_record(
133        &self,
134        level: impl IntoLevel,
135        message: impl Into<String>,
136        context: Context,
137    ) -> Result<bool, LoggerError> {
138        let level = level.into_level()?;
139        let (log_depth, _guard) = LogDepthGuard::new(self.detect_cycles);
140
141        if log_depth == 3 {
142            let _ = self.warning(
143                "A possible infinite logging loop was detected and aborted. It appears some of your handler code is triggering logging.",
144            );
145            return Ok(false);
146        }
147        if log_depth >= 5 {
148            return Ok(false);
149        }
150
151        let mut record = self.make_record(level, message.into(), context);
152        let mut record_initialized = self.processors.is_empty();
153        let mut handled = false;
154
155        for handler in &self.handlers {
156            if !record_initialized {
157                if !handler.is_handling(&record) {
158                    continue;
159                }
160
161                for processor in &self.processors {
162                    let current = record.clone();
163                    record = match processor.process(current) {
164                        Ok(next) => next,
165                        Err(err) => {
166                            self.handle_exception(err, &record)?;
167                            return Ok(true);
168                        }
169                    };
170                }
171
172                record_initialized = true;
173            }
174
175            match handler.handle(record.clone()) {
176                Ok(stop_bubbling) => {
177                    handled = true;
178                    if stop_bubbling {
179                        break;
180                    }
181                }
182                Err(err) => {
183                    self.handle_exception(err, &record)?;
184                    return Ok(true);
185                }
186            }
187        }
188
189        Ok(handled)
190    }
191
192    pub fn log(
193        &self,
194        level: impl IntoLevel,
195        message: impl Into<String>,
196        context: Context,
197    ) -> Result<bool, LoggerError> {
198        self.add_record(level, message, context)
199    }
200
201    pub fn debug(&self, message: impl Into<String>) -> Result<bool, LoggerError> {
202        self.add_record(Level::Debug, message, Context::new())
203    }
204
205    pub fn info(&self, message: impl Into<String>) -> Result<bool, LoggerError> {
206        self.add_record(Level::Info, message, Context::new())
207    }
208
209    pub fn notice(&self, message: impl Into<String>) -> Result<bool, LoggerError> {
210        self.add_record(Level::Notice, message, Context::new())
211    }
212
213    pub fn warning(&self, message: impl Into<String>) -> Result<bool, LoggerError> {
214        self.add_record(Level::Warning, message, Context::new())
215    }
216
217    pub fn error(&self, message: impl Into<String>) -> Result<bool, LoggerError> {
218        self.add_record(Level::Error, message, Context::new())
219    }
220
221    pub fn critical(&self, message: impl Into<String>) -> Result<bool, LoggerError> {
222        self.add_record(Level::Critical, message, Context::new())
223    }
224
225    pub fn alert(&self, message: impl Into<String>) -> Result<bool, LoggerError> {
226        self.add_record(Level::Alert, message, Context::new())
227    }
228
229    pub fn emergency(&self, message: impl Into<String>) -> Result<bool, LoggerError> {
230        self.add_record(Level::Emergency, message, Context::new())
231    }
232
233    pub fn close(&self) -> Result<(), LoggerError> {
234        let record = self.make_record(Level::Debug, "".to_string(), Context::new());
235        for handler in &self.handlers {
236            if let Err(err) = handler.close() {
237                self.handle_exception(err, &record)?;
238            }
239        }
240
241        Ok(())
242    }
243
244    pub fn reset(&self) -> Result<(), LoggerError> {
245        let record = self.make_record(Level::Debug, "".to_string(), Context::new());
246
247        for handler in &self.handlers {
248            if let Err(err) = handler.reset() {
249                self.handle_exception(err, &record)?;
250            }
251        }
252
253        for processor in &self.processors {
254            if let Err(err) = processor.reset() {
255                self.handle_exception(err, &record)?;
256            }
257        }
258
259        Ok(())
260    }
261
262    pub fn get_level_name(level: impl IntoLevel) -> Result<&'static str, LoggerError> {
263        Ok(level.into_level()?.as_str())
264    }
265
266    pub fn to_monolog_level(level: impl IntoLevel) -> Result<Level, LoggerError> {
267        level.into_level()
268    }
269
270    pub fn is_handling(&self, level: impl IntoLevel) -> Result<bool, LoggerError> {
271        let record = self.make_record(level.into_level()?, String::new(), Context::new());
272
273        Ok(self
274            .handlers
275            .iter()
276            .any(|handler| handler.is_handling(&record)))
277    }
278
279    fn make_record(&self, level: Level, message: String, context: Context) -> LogRecord {
280        let now = OffsetDateTime::now_utc().to_offset(self.timezone);
281        let datetime = if self.microsecond_timestamps {
282            now
283        } else {
284            match now.replace_nanosecond(0) {
285                Ok(ts) => ts,
286                Err(_) => now,
287            }
288        };
289
290        LogRecord {
291            datetime,
292            channel: self.name.clone(),
293            level,
294            message,
295            context,
296            extra: Context::new(),
297        }
298    }
299
300    fn handle_exception(&self, err: BoxError, record: &LogRecord) -> Result<(), LoggerError> {
301        if let Some(callback) = &self.exception_handler {
302            callback(err.as_ref(), record);
303            return Ok(());
304        }
305
306        Err(LoggerError::Unhandled(err))
307    }
308}
309
310impl Default for Logger {
311    fn default() -> Self {
312        Self::new("app")
313    }
314}
315
316struct LogDepthGuard {
317    enabled: bool,
318}
319
320impl LogDepthGuard {
321    fn new(enabled: bool) -> (usize, Self) {
322        if !enabled {
323            return (0, Self { enabled: false });
324        }
325
326        let log_depth = LOG_DEPTH.with(|depth| {
327            let next = depth.get().saturating_add(1);
328            depth.set(next);
329            next
330        });
331
332        (log_depth, Self { enabled: true })
333    }
334}
335
336impl Drop for LogDepthGuard {
337    fn drop(&mut self) {
338        if !self.enabled {
339            return;
340        }
341
342        LOG_DEPTH.with(|depth| {
343            depth.set(depth.get().saturating_sub(1));
344        });
345    }
346}
347
348#[cfg(test)]
349mod tests {
350    use std::io;
351    use std::sync::atomic::{AtomicUsize, Ordering};
352    use std::sync::Mutex;
353
354    use super::*;
355
356    #[derive(Default)]
357    struct CollectHandler {
358        records: Mutex<Vec<LogRecord>>,
359        should_stop: bool,
360        min_level: Option<Level>,
361    }
362
363    impl CollectHandler {
364        fn with_stop(should_stop: bool) -> Self {
365            Self {
366                should_stop,
367                ..Self::default()
368            }
369        }
370    }
371
372    impl Handler for CollectHandler {
373        fn is_handling(&self, record: &LogRecord) -> bool {
374            self.min_level.is_none_or(|level| record.level >= level)
375        }
376
377        fn handle(&self, record: LogRecord) -> Result<bool, BoxError> {
378            self.records
379                .lock()
380                .map_err(|_| Box::new(io::Error::other("lock poisoned")) as BoxError)?
381                .push(record);
382
383            Ok(self.should_stop)
384        }
385    }
386
387    struct ResetAwareProcessor {
388        reset_count: AtomicUsize,
389    }
390
391    impl ResetAwareProcessor {
392        fn new() -> Self {
393            Self {
394                reset_count: AtomicUsize::new(0),
395            }
396        }
397
398        fn reset_count(&self) -> usize {
399            self.reset_count.load(Ordering::Relaxed)
400        }
401    }
402
403    impl Processor for ResetAwareProcessor {
404        fn process(&self, record: LogRecord) -> Result<LogRecord, BoxError> {
405            Ok(record)
406        }
407
408        fn reset(&self) -> Result<(), BoxError> {
409            self.reset_count.fetch_add(1, Ordering::Relaxed);
410            Ok(())
411        }
412    }
413
414    #[test]
415    fn handlers_are_called_in_stack_order() {
416        let first = Arc::new(CollectHandler::default());
417        let second = Arc::new(CollectHandler::default());
418        let mut logger = Logger::new("rivet");
419
420        logger.push_handler(Arc::clone(&first) as Arc<dyn Handler>);
421        logger.push_handler(Arc::clone(&second) as Arc<dyn Handler>);
422
423        logger
424            .info("hello")
425            .expect("logger should dispatch to handlers");
426
427        let first_count = first
428            .records
429            .lock()
430            .expect("lock should not be poisoned")
431            .len();
432        let second_count = second
433            .records
434            .lock()
435            .expect("lock should not be poisoned")
436            .len();
437
438        assert_eq!(first_count, 1);
439        assert_eq!(second_count, 1);
440    }
441
442    #[test]
443    fn bubbling_stops_when_handler_returns_true() {
444        let stopping = Arc::new(CollectHandler::with_stop(true));
445        let downstream = Arc::new(CollectHandler::default());
446        let mut logger = Logger::new("rivet");
447
448        logger.push_handler(Arc::clone(&downstream) as Arc<dyn Handler>);
449        logger.push_handler(Arc::clone(&stopping) as Arc<dyn Handler>);
450
451        logger
452            .warning("stop")
453            .expect("logger should dispatch to first handler");
454
455        let stopping_count = stopping
456            .records
457            .lock()
458            .expect("lock should not be poisoned")
459            .len();
460        let downstream_count = downstream
461            .records
462            .lock()
463            .expect("lock should not be poisoned")
464            .len();
465
466        assert_eq!(stopping_count, 1);
467        assert_eq!(downstream_count, 0);
468    }
469
470    #[test]
471    fn processor_mutates_message_before_dispatch() {
472        let handler = Arc::new(CollectHandler::default());
473        let mut logger = Logger::new("rivet");
474
475        logger.push_handler(Arc::clone(&handler) as Arc<dyn Handler>);
476        logger.push_processor(Arc::new(|mut record: LogRecord| {
477            record.message = format!("[processed] {}", record.message);
478            Ok(record)
479        }));
480
481        logger.info("hello").expect("logger should process record");
482
483        let message = handler
484            .records
485            .lock()
486            .expect("lock should not be poisoned")
487            .first()
488            .expect("handler should receive one record")
489            .message
490            .clone();
491
492        assert_eq!(message, "[processed] hello");
493    }
494
495    #[test]
496    fn parses_rfc_5424_level_values() {
497        assert_eq!(Level::from_rfc_5424(7), Some(Level::Debug));
498        assert_eq!(
499            2_u8.into_level().expect("2 is valid RFC 5424"),
500            Level::Critical
501        );
502    }
503
504    #[test]
505    fn reset_calls_processor_reset() {
506        let processor = Arc::new(ResetAwareProcessor::new());
507        let mut logger = Logger::new("rivet");
508        logger.push_processor(Arc::clone(&processor) as Arc<dyn Processor>);
509
510        logger.reset().expect("reset should succeed");
511
512        assert_eq!(processor.reset_count(), 1);
513    }
514}