spdlog/sink/async_sink/
async_pool_sink.rs

1use crate::{
2    default_thread_pool,
3    formatter::{Formatter, UnreachableFormatter},
4    sink::{OverflowPolicy, Sink, SinkProp, SinkPropAccess, Sinks},
5    sync::*,
6    Error, ErrorHandler, LevelFilter, Record, RecordOwned, Result, ThreadPool,
7};
8
9/// A [combined sink], logging and flushing asynchronously (thread-pool-based).
10///
11/// Expensive operations (such as `log` and `flush`) on asynchronous sinks will
12/// be performed asynchronously on other threads.
13///
14/// Since there is no waiting, errors that occur while performing asynchronous
15/// operations will not be returned to the upper level, and instead the error
16/// handler of the sink will be called.
17///
18/// Users should only use asynchronous combined sinks to wrap actual sinks that
19/// require a long time for operations (e.g., file sinks that are frequently
20/// flushed, sinks involving networks), otherwise they will not get a
21/// performance boost or even worse.
22///
23/// Since the thread pool has a capacity limit, the queue may be full in some
24/// cases. When users encounter this situation, they have the following options:
25///
26///  - Adjust to a larger capacity via [`ThreadPoolBuilder::capacity`].
27///
28///  - Adjust the overflow policy via [`AsyncPoolSinkBuilder::overflow_policy`].
29///
30///  - Set up an error handler on asynchronous combined sinks via
31///    [`AsyncPoolSinkBuilder::error_handler`]. The handler will be called when
32///    a record is dropped or an operation has failed.
33///
34///
35/// # Note
36///
37/// Errors that occur in `log` and `flush` will not be returned directly,
38/// instead the error handler will be called.
39///
40/// # Examples
41///
42/// See [./examples] directory.
43///
44/// [combined sink]: index.html#combined-sink
45/// [`ThreadPoolBuilder::capacity`]: crate::ThreadPoolBuilder::capacity
46/// [./examples]: https://github.com/SpriteOvO/spdlog-rs/tree/main/spdlog/examples
47// The names `AsyncSink` and `AsyncRuntimeSink` is reserved for future use.
48pub struct AsyncPoolSink {
49    overflow_policy: OverflowPolicy,
50    thread_pool: Arc<ThreadPool>,
51    backend: Arc<Backend>,
52}
53
54impl AsyncPoolSink {
55    /// Constructs a builder of `AsyncPoolSink` with default parameters:
56    ///
57    /// | Parameter         | Default Value                       |
58    /// |-------------------|-------------------------------------|
59    /// | [level_filter]    | `All`                               |
60    /// | [error_handler]   | [`ErrorHandler::default()`]         |
61    /// | [overflow_policy] | `Block`                             |
62    /// | [thread_pool]     | internal shared default thread pool |
63    ///
64    /// [level_filter]: AsyncPoolSinkBuilder::level_filter
65    /// [error_handler]: AsyncPoolSinkBuilder::error_handler
66    /// [`ErrorHandler::default()`]: crate::error::ErrorHandler::default()
67    /// [overflow_policy]: AsyncPoolSinkBuilder::overflow_policy
68    /// [thread_pool]: AsyncPoolSinkBuilder::thread_pool
69    #[must_use]
70    pub fn builder() -> AsyncPoolSinkBuilder {
71        let prop = SinkProp::default();
72        // AsyncPoolSink does not have its own formatter, and we do not impl
73        // `GetSinkProp` for it, so there should be no way to access the
74        // formatter inside the `prop`.
75        prop.set_formatter(UnreachableFormatter::new());
76
77        AsyncPoolSinkBuilder {
78            prop,
79            overflow_policy: OverflowPolicy::Block,
80            sinks: Sinks::new(),
81            thread_pool: None,
82        }
83    }
84
85    /// Gets a reference to internal sinks in the combined sink.
86    #[must_use]
87    pub fn sinks(&self) -> &[Arc<dyn Sink>] {
88        &self.backend.sinks
89    }
90
91    fn assign_task(&self, task: Task) -> Result<()> {
92        self.thread_pool.assign_task(task, self.overflow_policy)
93    }
94
95    #[must_use]
96    fn clone_backend(&self) -> Arc<Backend> {
97        Arc::clone(&self.backend)
98    }
99}
100
101impl SinkPropAccess for AsyncPoolSink {
102    fn level_filter(&self) -> LevelFilter {
103        self.backend.prop.level_filter()
104    }
105
106    fn set_level_filter(&self, level_filter: LevelFilter) {
107        self.backend.prop.set_level_filter(level_filter);
108    }
109
110    /// For [`AsyncPoolSink`], the function performs the same call to all
111    /// internal sinks.
112    fn set_formatter(&self, formatter: Box<dyn Formatter>) {
113        for sink in &self.backend.sinks {
114            sink.set_formatter(formatter.clone())
115        }
116    }
117
118    fn set_error_handler(&self, handler: ErrorHandler) {
119        self.backend.prop.set_error_handler(handler);
120    }
121}
122
123impl Sink for AsyncPoolSink {
124    fn log(&self, record: &Record) -> Result<()> {
125        self.assign_task(Task::Log {
126            backend: self.clone_backend(),
127            record: record.to_owned(),
128        })
129    }
130
131    fn flush(&self) -> Result<()> {
132        if crate::IS_TEARING_DOWN.load(Ordering::SeqCst) {
133            // https://github.com/SpriteOvO/spdlog-rs/issues/64
134            //
135            // If the program is tearing down, this will be the final flush. `crossbeam`
136            // uses thread-local internally, which is not supported in `atexit` callback.
137            // This can be bypassed by flushing sinks directly on the current thread, but
138            // before we do that we have to destroy the thread pool to ensure that any
139            // pending log tasks are completed.
140            self.thread_pool.destroy();
141            self.backend.flush()
142        } else {
143            self.assign_task(Task::Flush {
144                backend: self.clone_backend(),
145            })
146        }
147    }
148}
149
150#[allow(missing_docs)]
151pub struct AsyncPoolSinkBuilder {
152    prop: SinkProp,
153    sinks: Sinks,
154    overflow_policy: OverflowPolicy,
155    thread_pool: Option<Arc<ThreadPool>>,
156}
157
158impl AsyncPoolSinkBuilder {
159    /// Add a [`Sink`].
160    #[must_use]
161    pub fn sink(mut self, sink: Arc<dyn Sink>) -> Self {
162        self.sinks.push(sink);
163        self
164    }
165
166    /// Add multiple [`Sink`]s.
167    #[must_use]
168    pub fn sinks<I>(mut self, sinks: I) -> Self
169    where
170        I: IntoIterator<Item = Arc<dyn Sink>>,
171    {
172        self.sinks.append(&mut sinks.into_iter().collect());
173        self
174    }
175
176    /// Specifies a overflow policy.
177    ///
178    /// This parameter is **optional**.
179    ///
180    /// When the channel is full, an incoming operation is handled according to
181    /// the specified policy.
182    #[must_use]
183    pub fn overflow_policy(mut self, overflow_policy: OverflowPolicy) -> Self {
184        self.overflow_policy = overflow_policy;
185        self
186    }
187
188    /// Specifies a custom thread pool.
189    ///
190    /// This parameter is **optional**.
191    #[must_use]
192    pub fn thread_pool(mut self, thread_pool: Arc<ThreadPool>) -> Self {
193        self.thread_pool = Some(thread_pool);
194        self
195    }
196
197    // Prop
198    //
199
200    /// Specifies a log level filter.
201    ///
202    /// This parameter is **optional**.
203    #[must_use]
204    pub fn level_filter(self, level_filter: LevelFilter) -> Self {
205        self.prop.set_level_filter(level_filter);
206        self
207    }
208
209    /// Specifies a formatter.
210    ///
211    /// This parameter is **optional**.
212    #[must_use]
213    pub fn formatter<F>(self, formatter: F) -> Self
214    where
215        F: Formatter + 'static,
216    {
217        self.prop.set_formatter(formatter);
218        self
219    }
220
221    /// Specifies an error handler.
222    ///
223    /// This parameter is **optional**.
224    #[must_use]
225    pub fn error_handler<F: Into<ErrorHandler>>(self, handler: F) -> Self {
226        self.prop.set_error_handler(handler);
227        self
228    }
229
230    /// Builds a [`AsyncPoolSink`].
231    pub fn build(self) -> Result<AsyncPoolSink> {
232        let backend = Arc::new(Backend {
233            prop: self.prop,
234            sinks: self.sinks.clone(),
235        });
236
237        let thread_pool = self.thread_pool.unwrap_or_else(default_thread_pool);
238
239        Ok(AsyncPoolSink {
240            overflow_policy: self.overflow_policy,
241            thread_pool,
242            backend,
243        })
244    }
245}
246
247pub(crate) struct Backend {
248    prop: SinkProp,
249    sinks: Sinks,
250}
251
252impl Backend {
253    fn log(&self, record: &Record) -> Result<()> {
254        let mut result = Ok(());
255        for sink in &self.sinks {
256            result = Error::push_result(result, sink.log(record));
257        }
258        result
259    }
260
261    fn flush(&self) -> Result<()> {
262        let mut result = Ok(());
263        for sink in &self.sinks {
264            result = Error::push_result(result, sink.flush());
265        }
266        result
267    }
268
269    fn handle_error(&self, err: Error) {
270        self.prop.call_error_handler_internal("AsyncPoolSink", err)
271    }
272}
273
274pub(crate) enum Task {
275    Log {
276        backend: Arc<Backend>,
277        record: RecordOwned,
278    },
279    Flush {
280        backend: Arc<Backend>,
281    },
282}
283
284impl Task {
285    // calls this function in async threads
286    pub(crate) fn exec(self) {
287        match self {
288            Task::Log { backend, record } => {
289                if let Err(err) = backend.log(&record.as_ref()) {
290                    backend.handle_error(err)
291                }
292            }
293            Task::Flush { backend } => {
294                if let Err(err) = backend.flush() {
295                    backend.handle_error(err)
296                }
297            }
298        }
299    }
300}
301
302#[cfg(test)]
303mod tests {
304    use std::{thread::sleep, time::Duration};
305
306    use super::*;
307    use crate::{prelude::*, test_utils::*};
308
309    #[test]
310    fn default_thread_pool() {
311        let counter_sink = Arc::new(TestSink::new());
312        let build_logger = || {
313            build_test_logger(|b| {
314                b.sink(Arc::new(
315                    AsyncPoolSink::builder()
316                        .sink(counter_sink.clone())
317                        .build()
318                        .unwrap(),
319                ))
320                .level_filter(LevelFilter::All)
321                .flush_level_filter(LevelFilter::MoreSevereEqual(Level::Error))
322            })
323        };
324
325        assert_eq!(counter_sink.log_count(), 0);
326        assert_eq!(counter_sink.flush_count(), 0);
327
328        {
329            let logger = build_logger();
330
331            info!(logger: logger, "");
332            sleep(Duration::from_millis(50));
333            assert_eq!(counter_sink.log_count(), 1);
334            assert_eq!(counter_sink.flush_count(), 0);
335
336            warn!(logger: logger, "");
337            sleep(Duration::from_millis(50));
338            assert_eq!(counter_sink.log_count(), 2);
339            assert_eq!(counter_sink.flush_count(), 0);
340        }
341
342        {
343            let logger = build_logger();
344
345            error!(logger: logger, "");
346            sleep(Duration::from_millis(50));
347            assert_eq!(counter_sink.log_count(), 3);
348            assert_eq!(counter_sink.flush_count(), 1);
349
350            critical!(logger: logger, "");
351            sleep(Duration::from_millis(50));
352            assert_eq!(counter_sink.log_count(), 4);
353            assert_eq!(counter_sink.flush_count(), 2);
354        }
355    }
356
357    #[test]
358    fn custom_thread_pool() {
359        let counter_sink = Arc::new(TestSink::new());
360        let thread_pool = Arc::new(ThreadPool::builder().build().unwrap());
361        let logger = build_test_logger(|b| {
362            b.sink(Arc::new(
363                AsyncPoolSink::builder()
364                    .sink(counter_sink.clone())
365                    .thread_pool(thread_pool)
366                    .build()
367                    .unwrap(),
368            ))
369            .level_filter(LevelFilter::All)
370            .flush_level_filter(LevelFilter::MoreSevereEqual(Level::Error))
371        });
372
373        assert_eq!(counter_sink.log_count(), 0);
374        assert_eq!(counter_sink.flush_count(), 0);
375
376        info!(logger: logger, "");
377        sleep(Duration::from_millis(50));
378        assert_eq!(counter_sink.log_count(), 1);
379        assert_eq!(counter_sink.flush_count(), 0);
380
381        warn!(logger: logger, "");
382        sleep(Duration::from_millis(50));
383        assert_eq!(counter_sink.log_count(), 2);
384        assert_eq!(counter_sink.flush_count(), 0);
385
386        error!(logger: logger, "");
387        sleep(Duration::from_millis(50));
388        assert_eq!(counter_sink.log_count(), 3);
389        assert_eq!(counter_sink.flush_count(), 1);
390    }
391
392    #[test]
393    fn async_opeartions() {
394        let counter_sink = Arc::new(TestSink::with_delay(Some(Duration::from_secs(1))));
395        // The default thread pool is not used here to avoid race when tests are run in
396        // parallel.
397        let thread_pool = Arc::new(ThreadPool::builder().build().unwrap());
398        let logger = build_test_logger(|b| {
399            b.sink(Arc::new(
400                AsyncPoolSink::builder()
401                    .sink(counter_sink.clone())
402                    .thread_pool(thread_pool)
403                    .build()
404                    .unwrap(),
405            ))
406            .level_filter(LevelFilter::All)
407            .flush_level_filter(LevelFilter::MoreSevereEqual(Level::Warn))
408        });
409
410        assert_eq!(counter_sink.log_count(), 0);
411        assert_eq!(counter_sink.flush_count(), 0);
412
413        info!(logger: logger, "meow");
414        sleep(Duration::from_millis(500));
415        assert_eq!(counter_sink.log_count(), 0);
416        assert_eq!(counter_sink.flush_count(), 0);
417        sleep(Duration::from_millis(750));
418        assert_eq!(counter_sink.log_count(), 1);
419        assert_eq!(counter_sink.flush_count(), 0);
420
421        warn!(logger: logger, "nya");
422        sleep(Duration::from_millis(250));
423        assert_eq!(counter_sink.log_count(), 1);
424        assert_eq!(counter_sink.flush_count(), 0);
425        sleep(Duration::from_millis(1000));
426        assert_eq!(counter_sink.log_count(), 2);
427        assert_eq!(counter_sink.flush_count(), 0);
428        sleep(Duration::from_millis(1250));
429        assert_eq!(counter_sink.log_count(), 2);
430        assert_eq!(counter_sink.flush_count(), 1);
431    }
432}