1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
// The mutex_atomic here is a false positive. We use the mutex because of the condvar.
#![allow(
    unknown_lints,
    renamed_and_removed_lints,
    clippy::unknown_clippy_lints,
    clippy::mutex_atomic
)]
//! Support for logging in the background.
//!
//! The [`AsyncLogger`] can wrap a logger and do the logging in a separate thread. Note that to not
//! lose logs on shutdown, the logger needs to be flushed, either manually or using the
//! [`FlushGuard`].
//!
//! To integrate with the [`Pipeline`], the [`Background`] can be used as a [`Transformation`] of
//! loggers.
//!
//! [`AsyncLogger`]: crate::background::AsyncLogger
//! [`Background`]: crate::background::Background
//! [`FlushGuard`]: crate::background::FlushGuard
//! [`Pipeline`]: spirit::fragment::pipeline::Pipeline
//! [`Transformation`]: spirit::fragment::Transformation

use std::cell::RefCell;
use std::panic::{self, AssertUnwindSafe};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::thread::{Builder as ThreadBuilder, Thread};
use std::time::Duration;

use either::Either;
use fern::Dispatch;
use flume::{Receiver, Sender, TrySendError};
use log::{Level, LevelFilter, Log, Metadata, Record};
use spirit::extension::{Autojoin, Extensible, Extension};
use spirit::fragment::Transformation;
use spirit::AnyError;

thread_local! {
    // The thread name injected by the background logging.
    //
    // There's a chance someone uses sync logging even with the background logging enabled, in
    // which case this'll always stay None (notice that the background logger creates its own
    // thread and only that one sets this, so we can't pollute other thread).
    static LOG_THREAD_NAME: RefCell<Option<Arc<str>>> = RefCell::new(None);

    // Reusable mine thread name, used as a source when putting the log message into the channel.
    //
    // Because we need to potentially log the thread name into multiple loggers, we don't want to
    // clone it every time we take it out from LOG_THREAD_NAME, but we can't have a reference into
    // it both because of refcell and thread-local. We can, however, clone an Arc. When we already
    // have Arcs around, we can as well cache the name in it for the whole lifetime of the thread.
    static MY_THREAD_NAME: Arc<str> = {
        Arc::from(thread::current().name().unwrap_or(super::UNKNOWN_THREAD))
    };
}

fn reset_thread_name() {
    LOG_THREAD_NAME.with(|log| *log.borrow_mut() = None);
}

// In case it we are inside the background logging thread, we have the LOG_THREAD_NAME set (unless
// we log a message ourselves). If not, then we simply take the local thread name.
pub(crate) fn get_thread_name(thread: &Thread) -> Either<&str, Arc<str>> {
    LOG_THREAD_NAME.with(|n| {
        n.borrow()
            .as_ref()
            .map(|n| Either::Right(Arc::clone(n)))
            .unwrap_or_else(|| Either::Left(thread.name().unwrap_or(super::UNKNOWN_THREAD)))
    })
}

struct FlushDone {
    done: Mutex<bool>,
    wakeup: Condvar,
}

impl FlushDone {
    fn new() -> Self {
        Self {
            done: Mutex::new(false),
            wakeup: Condvar::new(),
        }
    }
    fn wait(&self) {
        let mut done = self.done.lock().unwrap();
        while !*done {
            done = self.wakeup.wait(done).unwrap();
        }
    }
}

struct DropNotify(Arc<FlushDone>);

impl Drop for DropNotify {
    fn drop(&mut self) {
        *self.0.done.lock().unwrap() = true;
        self.0.wakeup.notify_all();
    }
}

enum Instruction {
    Msg {
        msg: String,
        level: Level,
        target: String,
        module_path: Option<String>,
        file: Option<String>,
        line: Option<u32>,
        thread: Arc<str>,
    },
    Flush(DropNotify),
}

impl Instruction {
    fn process(self, dst: &dyn Log) {
        match self {
            Instruction::Msg {
                msg,
                level,
                target,
                module_path,
                file,
                line,
                thread,
            } => {
                LOG_THREAD_NAME.with(|n| n.replace(Some(thread)));
                dst.log(
                    &Record::builder()
                        .args(format_args!("{}", msg))
                        .level(level)
                        .target(&target)
                        .file(file.as_ref().map(|f| f as &str))
                        .line(line)
                        .module_path(module_path.as_ref().map(|m| m as &str))
                        .build(),
                );
            }
            Instruction::Flush(done) => {
                dst.flush();
                drop(done);
            }
        }
    }
}

struct SyncLogger {
    logger: Box<dyn Log>,
    lost_msgs: AtomicUsize,
}

struct Recv {
    shared: Arc<SyncLogger>,
    instructions: Receiver<Instruction>,
}

impl Recv {
    fn run(&self) {
        let mut panicked = false;
        loop {
            let result = panic::catch_unwind(AssertUnwindSafe(|| {
                if panicked {
                    reset_thread_name();
                    self.shared.logger.log(
                        &Record::builder()
                            .args(format_args!("Panic in the logger thread, restarted"))
                            .level(Level::Error)
                            .target(module_path!())
                            .line(Some(line!()))
                            .module_path(Some(module_path!()))
                            .build(),
                    );
                }
                for i in &self.instructions {
                    let lost_msgs = self.shared.lost_msgs.swap(0, Ordering::Relaxed);
                    if lost_msgs > 0 {
                        reset_thread_name();
                        self.shared.logger.log(
                            &Record::builder()
                                .args(format_args!("Lost {} messages", lost_msgs))
                                .level(Level::Warn)
                                .target(module_path!())
                                .line(Some(line!()))
                                .module_path(Some(module_path!()))
                                .build(),
                        );
                    }
                    i.process(&*self.shared.logger);
                }
            }));
            if result.is_ok() {
                break;
            }
            panicked = true;
            thread::sleep(Duration::from_millis(100));
        }
        self.shared.logger.flush();
    }
}

/// Selection of how to act if the channel to the logger thread is full.
///
/// This enum is non-exhaustive. Adding more variants in the future will not be considered a
/// breaking change.
#[derive(Copy, Clone, Debug, Eq, PartialEq, PartialOrd, Ord, Hash)]
#[non_exhaustive]
pub enum OverflowMode {
    /// Blocks until there's enough space to push the message.
    Block,

    /// If there's not enough space in the channel, the message is dropped and counted.
    ///
    /// Subsequently, the thread will log how many messages were lost.
    DropMsg,

    /// Drop the messages that don't without any indication it happened.
    DropMsgSilently,

    /// Drop less severe messages sooner than filling the whole buffer.
    ///
    /// If the buffer is completely full, it acts like the [`DropMsg`][OverflowMode::DropMsg]. If
    /// it's not full, but has more than `fill_limit` messages in it, messages with severity
    /// `from_level` or less severe are dropped, while more severe are still inserted into the
    /// buffer.
    ///
    /// Both limits are inclusive.
    AdaptiveDrop {
        /// Level of severity of messages to drop if the buffer is more full that `from_level`.
        from_level: Level,

        /// The level at which the less severe messages start being dropped.
        fill_limit: usize,
    },
}

impl OverflowMode {
    fn count_lost(self) -> bool {
        matches!(
            self,
            OverflowMode::DropMsg | OverflowMode::AdaptiveDrop { .. }
        )
    }
}

/// A logger that postpones the logging into a background thread.
///
/// Note that to not lose messages, the logger need to be flushed.
///
/// Either manually:
///
/// ```rust
/// log::logger().flush();
/// ```
///
/// Or by dropping the [`FlushGuard`] (this is useful to flush even in non-success cases or as
/// integrations with other utilities).
///
/// ```rust
/// # use spirit_log::background::FlushGuard;
/// fn main() {
///     let _guard = FlushGuard;
///     // The rest of the application code.
/// }
/// ```
///
/// Note that even with this, things like [`std::process::exit`] will allow messages to be lost.
pub struct AsyncLogger {
    mode: OverflowMode,
    ch: Sender<Instruction>,
    shared: Arc<SyncLogger>,
}

impl AsyncLogger {
    /// Sends the given logger to a background thread.
    ///
    /// # Params
    ///
    /// * `logger`: The logger to use in the background thread.
    /// * `buffer`: How many messages there can be waiting in the channel to the background thread.
    /// * `mode`: What to do if a message doesn't fit into the channel.
    ///
    /// # Panics
    ///
    /// * If the buffer size is 0.
    /// * If the [`AdaptiveDrop`][OverflowMode::AdaptiveDrop] `fill_limit` is zero or larger or
    ///   equal to `buffer`.
    pub fn new(logger: Box<dyn Log>, buffer: usize, mode: OverflowMode) -> Self {
        assert!(
            buffer > 0,
            "Zero-sized buffer for async logging makes no sense"
        );
        if let OverflowMode::AdaptiveDrop { fill_limit, .. } = mode {
            assert!(fill_limit > 0);
            assert!(fill_limit < buffer);
        }
        let shared = Arc::new(SyncLogger {
            logger,
            lost_msgs: AtomicUsize::new(0),
        });
        let (sender, receiver) = flume::bounded(buffer);
        let recv = Recv {
            shared: Arc::clone(&shared),
            instructions: receiver,
        };
        ThreadBuilder::new()
            .name("spirit-log-bg".to_owned())
            .spawn(move || {
                recv.run();
            })
            .expect("Failed to start logging thread");
        AsyncLogger {
            mode,
            ch: sender,
            shared,
        }
    }
}

impl Log for AsyncLogger {
    fn enabled(&self, metadata: &Metadata) -> bool {
        metadata.level() <= log::max_level() && self.shared.logger.enabled(metadata)
    }
    fn log(&self, record: &Record) {
        // Don't allocate bunch of strings if the log message would get thrown away anyway.
        // Do the cheap check first to avoid calling through the virtual table & doing arbitrary
        // stuff of the logger.
        if self.enabled(record.metadata()) {
            if let OverflowMode::AdaptiveDrop {
                from_level,
                fill_limit,
            } = self.mode
            {
                if record.level() >= from_level && self.ch.len() >= fill_limit {
                    self.shared.lost_msgs.fetch_add(1, Ordering::Relaxed);
                    return;
                }
            }
            let i = Instruction::Msg {
                file: record.file().map(ToOwned::to_owned),
                level: record.level(),
                line: record.line(),
                module_path: record.module_path().map(ToOwned::to_owned),
                msg: format!("{}", record.args()),
                target: record.target().to_owned(),
                thread: MY_THREAD_NAME.with(|n| Arc::clone(n)),
            };
            if self.mode == OverflowMode::Block {
                self.ch.send(i).expect("Logging thread disappeared");
            } else {
                match self.ch.try_send(i) {
                    Err(TrySendError::Full(_)) if self.mode.count_lost() => {
                        self.shared.lost_msgs.fetch_add(1, Ordering::Relaxed);
                    }
                    Err(TrySendError::Full(_)) | Ok(()) => (),
                    _ => panic!("Logging thread disappeared"),
                }
            }
        }
    }
    fn flush(&self) {
        let done = Arc::new(FlushDone::new());
        self.ch
            .send(Instruction::Flush(DropNotify(Arc::clone(&done))))
            .expect("Logger thread disappeared");
        done.wait();
    }
}

impl Drop for AsyncLogger {
    /// Flushes the logger before going away, to make sure log messages are not lost.
    ///
    /// As much as it's possible to ensure.
    fn drop(&mut self) {
        self.flush();
    }
}

/// A [`Transformation`] to move loggers into background threads.
///
/// By default, loggers created by the [`Pipeline`] are synchronous ‒ they block to do their IO.
/// This puts the IO into a separate thread, with a buffer in between, allowing the rest of the
/// application not to block.
///
/// The same warnings about lost messages and flushing as in the [`AsyncLogger`] case apply here.
/// However, the [`Extensible::keep_guard`] and [`spirit::Extensible::autojoin_bg_thread`] can be
/// used with the [`FlushGuard`] to ensure this happens automatically (the [`FlushGuard`] also
/// implements [`Extension`], which takes care of the setup).
///
/// # Examples
///
/// ```rust
/// use log::info;
/// use serde::Deserialize;
/// use spirit::{Empty, Pipeline, Spirit};
/// use spirit::prelude::*;
/// use spirit_log::{Background, Cfg as LogCfg, FlushGuard, OverflowMode};
///
/// #[derive(Clone, Debug, Default, Deserialize)]
/// struct Cfg {
///     #[serde(default, skip_serializing_if = "LogCfg::is_empty")]
///     logging: LogCfg,
/// }
///
/// impl Cfg {
///     fn logging(&self) -> LogCfg {
///         self.logging.clone()
///     }
/// }
///
/// fn main() {
///     Spirit::<Empty, Cfg>::new()
///         .with(
///             Pipeline::new("logging")
///                 .extract_cfg(Cfg::logging)
///                 .transform(Background::new(100, OverflowMode::Block)),
///         )
///         .with_singleton(FlushGuard)
///         .run(|_spirit| {
/// #           let spirit = std::sync::Arc::clone(_spirit);
/// #           std::thread::spawn(move || spirit.terminate());
///             info!("Hello world");
///             Ok(())
///         });
/// }
/// ```
///
/// [`Pipeline`]: spirit::fragment::pipeline::Pipeline
/// [`Extensible::keep_guard`]: spirit::Extensible::keep_guard
/// [`Extensible::autojoin_bg_thread`]: spirit::Extensible::autojoin_bg_thread
/// [`Extension`]: spirit::extension::Extension
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct Background {
    mode: OverflowMode,
    buffer: usize,
}

impl Background {
    /// Creates a new [`Background`] object.
    ///
    /// # Params
    ///
    /// * `buffer`: How many messages fit into the channel to the background thread.
    /// * `mode`: What to do if the current message does not fit.
    pub fn new(buffer: usize, mode: OverflowMode) -> Self {
        Background { mode, buffer }
    }
}

impl<I, F> Transformation<Dispatch, I, F> for Background {
    type OutputResource = (LevelFilter, Box<dyn Log>);
    type OutputInstaller = I;
    fn installer(&mut self, original: I, _name: &'static str) -> I {
        original
    }
    fn transform(
        &mut self,
        dispatch: Dispatch,
        _fragment: &F,
        _name: &'static str,
    ) -> Result<(LevelFilter, Box<dyn Log>), AnyError> {
        let (level, sync_logger) = dispatch.into_log();
        let bg = AsyncLogger::new(sync_logger, self.buffer, self.mode);
        Ok((level, Box::new(bg)))
    }
}

/// This, when dropped, flushes the logger.
///
/// Unless the logger is flushed, there's a risk of losing messages on application termination.
///
/// It can be used either separately or plugged into the spirit [`Builder`] (through the
/// [`Extension`] trait). In that case, it also turns on the [`autojoin_bg_thread`] option, so the
/// application actually waits for the spirit thread to terminate and drops the guard.
///
/// Note that it's fine to flush the logs multiple times (it only costs some performance, because
/// the flush needs to wait for all the queued messages to be written).
///
/// # Examples
///
/// ```rust
/// # use log::info;
/// # use spirit::{Empty, Spirit};
/// # use spirit::prelude::*;
/// # use spirit_log::FlushGuard;
/// Spirit::<Empty, Empty>::new()
///     .with_singleton(FlushGuard)
///     .run(|_spirit| {
/// #       let spirit = std::sync::Arc::clone(_spirit);
/// #       std::thread::spawn(move || spirit.terminate());
///         info!("Hello world");
///         Ok(())
///     });
/// ```
///
/// [`Builder`]: spirit::Builder
/// [`autojoin_bg_thread`]: spirit::Extensible::autojoin_bg_thread
pub struct FlushGuard;

impl FlushGuard {
    /// Performs the flush of the global logger.
    ///
    /// This can be used directly, instead of getting an instance of the [`FlushGuard`] and
    /// dropping it. But both ways have the same effect.
    pub fn flush() {
        log::logger().flush();
    }
}

impl Drop for FlushGuard {
    fn drop(&mut self) {
        Self::flush();
    }
}

impl<E> Extension<E> for FlushGuard
where
    E: Extensible<Ok = E>,
{
    fn apply(self, builder: E) -> Result<E, AnyError> {
        let builder = builder
            .autojoin_bg_thread(Autojoin::TerminateAndJoin)
            .keep_guard(self);
        Ok(builder)
    }
}