spdlog/sink/async_sink/
async_pool_sink.rs

1use crate::{
2    default_thread_pool,
3    formatter::{Formatter, UnreachableFormatter},
4    sink::{OverflowPolicy, Sink, SinkProp, SinkPropAccess, Sinks},
5    sync::*,
6    Error, ErrorHandler, LevelFilter, Record, RecordOwned, Result, ThreadPool,
7};
8
9/// A [combined sink], logging and flushing asynchronously (thread-pool-based).
10///
11/// Expensive operations (such as `log` and `flush`) on asynchronous sinks will
12/// be performed asynchronously on other threads.
13///
14/// Since there is no waiting, errors that occur while performing asynchronous
15/// operations will not be returned to the upper level, and instead the error
16/// handler of the sink will be called.
17///
18/// Users should only use asynchronous combined sinks to wrap actual sinks that
19/// require a long time for operations (e.g., file sinks that are frequently
20/// flushed, sinks involving networks), otherwise they will not get a
21/// performance boost or even worse.
22///
23/// Since the thread pool has a capacity limit, the queue may be full in some
24/// cases. When users encounter this situation, they have the following options:
25///
26///  - Adjust to a larger capacity via [`ThreadPoolBuilder::capacity`].
27///
28///  - Adjust the overflow policy via [`AsyncPoolSinkBuilder::overflow_policy`].
29///
30///  - Set up an error handler on asynchronous combined sinks via
31///    [`AsyncPoolSinkBuilder::error_handler`]. The handler will be called when
32///    a record is dropped or an operation has failed.
33///
34///
35/// # Note
36///
37/// Errors that occur in `log` and `flush` will not be returned directly,
38/// instead the error handler will be called.
39///
40/// # Examples
41///
42/// See [./examples] directory.
43///
44/// [combined sink]: index.html#combined-sink
45/// [`ThreadPoolBuilder::capacity`]: crate::ThreadPoolBuilder::capacity
46/// [./examples]: https://github.com/SpriteOvO/spdlog-rs/tree/main/spdlog/examples
47// The names `AsyncSink` and `AsyncRuntimeSink` is reserved for future use.
48pub struct AsyncPoolSink {
49    overflow_policy: OverflowPolicy,
50    thread_pool: Arc<ThreadPool>,
51    backend: Arc<Backend>,
52}
53
54impl AsyncPoolSink {
55    /// Constructs a builder of `AsyncPoolSink` with default parameters:
56    ///
57    /// | Parameter         | Default Value                       |
58    /// |-------------------|-------------------------------------|
59    /// | [level_filter]    | [`LevelFilter::All`]                |
60    /// | [error_handler]   | [`ErrorHandler::default()`]         |
61    /// | [overflow_policy] | [`OverflowPolicy::Block`]           |
62    /// | [thread_pool]     | internal shared default thread pool |
63    ///
64    /// [level_filter]: AsyncPoolSinkBuilder::level_filter
65    /// [error_handler]: AsyncPoolSinkBuilder::error_handler
66    /// [overflow_policy]: AsyncPoolSinkBuilder::overflow_policy
67    /// [thread_pool]: AsyncPoolSinkBuilder::thread_pool
68    #[must_use]
69    pub fn builder() -> AsyncPoolSinkBuilder {
70        let prop = SinkProp::default();
71        // AsyncPoolSink does not have its own formatter, and we do not impl
72        // `GetSinkProp` for it, so there should be no way to access the
73        // formatter inside the `prop`.
74        prop.set_formatter(UnreachableFormatter::new());
75
76        AsyncPoolSinkBuilder {
77            prop,
78            overflow_policy: OverflowPolicy::Block,
79            sinks: Sinks::new(),
80            thread_pool: None,
81        }
82    }
83
84    /// Gets a reference to internal sinks in the combined sink.
85    #[must_use]
86    pub fn sinks(&self) -> &[Arc<dyn Sink>] {
87        &self.backend.sinks
88    }
89
90    fn assign_task(&self, task: Task) -> Result<()> {
91        self.thread_pool.assign_task(task, self.overflow_policy)
92    }
93
94    #[must_use]
95    fn clone_backend(&self) -> Arc<Backend> {
96        Arc::clone(&self.backend)
97    }
98}
99
100impl SinkPropAccess for AsyncPoolSink {
101    fn level_filter(&self) -> LevelFilter {
102        self.backend.prop.level_filter()
103    }
104
105    fn set_level_filter(&self, level_filter: LevelFilter) {
106        self.backend.prop.set_level_filter(level_filter);
107    }
108
109    /// For [`AsyncPoolSink`], the function performs the same call to all
110    /// internal sinks.
111    fn set_formatter(&self, formatter: Box<dyn Formatter>) {
112        for sink in &self.backend.sinks {
113            sink.set_formatter(formatter.clone())
114        }
115    }
116
117    fn set_error_handler(&self, handler: ErrorHandler) {
118        self.backend.prop.set_error_handler(handler);
119    }
120}
121
122impl Sink for AsyncPoolSink {
123    fn log(&self, record: &Record) -> Result<()> {
124        self.assign_task(Task::Log {
125            backend: self.clone_backend(),
126            record: record.to_owned(),
127        })
128    }
129
130    fn flush(&self) -> Result<()> {
131        self.assign_task(Task::Flush {
132            backend: self.clone_backend(),
133        })
134    }
135
136    fn flush_on_exit(&self) -> Result<()> {
137        // https://github.com/SpriteOvO/spdlog-rs/issues/64
138        //
139        // If the program is tearing down, this will be the final flush. `crossbeam`
140        // uses thread-local internally, which is not supported in `atexit` callback.
141        // This can be bypassed by flushing sinks directly on the current thread, but
142        // before we do that we have to destroy the thread pool to ensure that any
143        // pending log tasks are completed.
144        self.thread_pool.destroy();
145        self.backend.flush_on_exit()
146    }
147}
148
149#[allow(missing_docs)]
150pub struct AsyncPoolSinkBuilder {
151    prop: SinkProp,
152    sinks: Sinks,
153    overflow_policy: OverflowPolicy,
154    thread_pool: Option<Arc<ThreadPool>>,
155}
156
157impl AsyncPoolSinkBuilder {
158    /// Add a [`Sink`].
159    #[must_use]
160    pub fn sink(mut self, sink: Arc<dyn Sink>) -> Self {
161        self.sinks.push(sink);
162        self
163    }
164
165    /// Add multiple [`Sink`]s.
166    #[must_use]
167    pub fn sinks<I>(mut self, sinks: I) -> Self
168    where
169        I: IntoIterator<Item = Arc<dyn Sink>>,
170    {
171        self.sinks.append(&mut sinks.into_iter().collect());
172        self
173    }
174
175    /// Specifies a overflow policy.
176    ///
177    /// This parameter is **optional**, and defaults to
178    /// [`OverflowPolicy::Block`].
179    ///
180    /// When the channel is full, an incoming operation is handled according to
181    /// the specified policy.
182    #[must_use]
183    pub fn overflow_policy(mut self, overflow_policy: OverflowPolicy) -> Self {
184        self.overflow_policy = overflow_policy;
185        self
186    }
187
188    /// Specifies a custom thread pool.
189    ///
190    /// This parameter is **optional**, and defaults to the internal shared
191    /// default thread pool.
192    #[must_use]
193    pub fn thread_pool(mut self, thread_pool: Arc<ThreadPool>) -> Self {
194        self.thread_pool = Some(thread_pool);
195        self
196    }
197
198    // Prop
199    //
200
201    /// Specifies a log level filter.
202    ///
203    /// This parameter is **optional**, and defaults to [`LevelFilter::All`].
204    #[must_use]
205    pub fn level_filter(self, level_filter: LevelFilter) -> Self {
206        self.prop.set_level_filter(level_filter);
207        self
208    }
209
210    #[doc(hidden)]
211    #[deprecated(
212        note = "AsyncPoolSink does not have its own formatter, this method has no effect, it was added by accident and may be removed in the future",
213        since = "0.5.2"
214    )]
215    #[must_use]
216    pub fn formatter<F>(self, formatter: F) -> Self
217    where
218        F: Formatter + 'static,
219    {
220        self.prop.set_formatter(formatter);
221        self
222    }
223
224    /// Specifies an error handler.
225    ///
226    /// This parameter is **optional**, and defaults to
227    /// [`ErrorHandler::default()`].
228    #[must_use]
229    pub fn error_handler<F: Into<ErrorHandler>>(self, handler: F) -> Self {
230        self.prop.set_error_handler(handler);
231        self
232    }
233
234    /// Builds a [`AsyncPoolSink`].
235    pub fn build(self) -> Result<AsyncPoolSink> {
236        let backend = Arc::new(Backend {
237            prop: self.prop,
238            sinks: self.sinks.clone(),
239        });
240
241        let thread_pool = self.thread_pool.unwrap_or_else(default_thread_pool);
242
243        Ok(AsyncPoolSink {
244            overflow_policy: self.overflow_policy,
245            thread_pool,
246            backend,
247        })
248    }
249
250    /// Builds a `Arc<AsyncPoolSink>`.
251    ///
252    /// This is a shorthand method for `.build().map(Arc::new)`.
253    pub fn build_arc(self) -> Result<Arc<AsyncPoolSink>> {
254        self.build().map(Arc::new)
255    }
256}
257
258pub(crate) struct Backend {
259    prop: SinkProp,
260    sinks: Sinks,
261}
262
263impl Backend {
264    fn log(&self, record: &Record) -> Result<()> {
265        let mut result = Ok(());
266        for sink in &self.sinks {
267            result = Error::push_result(result, sink.log(record));
268        }
269        result
270    }
271
272    fn flush_with(&self, with: impl Fn(&dyn Sink) -> Result<()>) -> Result<()> {
273        let mut result = Ok(());
274        for sink in &self.sinks {
275            result = Error::push_result(result, with(&**sink));
276        }
277        result
278    }
279
280    fn flush(&self) -> Result<()> {
281        self.flush_with(|sink| sink.flush())
282    }
283
284    fn flush_on_exit(&self) -> Result<()> {
285        self.flush_with(|sink| sink.flush_on_exit())
286    }
287
288    fn handle_error(&self, err: Error) {
289        self.prop.call_error_handler_internal("AsyncPoolSink", err)
290    }
291}
292
293pub(crate) enum Task {
294    Log {
295        backend: Arc<Backend>,
296        record: RecordOwned,
297    },
298    Flush {
299        backend: Arc<Backend>,
300    },
301}
302
303impl Task {
304    // calls this function in async threads
305    pub(crate) fn exec(self) {
306        match self {
307            Task::Log { backend, record } => {
308                if let Err(err) = backend.log(&record.as_ref()) {
309                    backend.handle_error(err)
310                }
311            }
312            Task::Flush { backend } => {
313                if let Err(err) = backend.flush() {
314                    backend.handle_error(err)
315                }
316            }
317        }
318    }
319}
320
321#[cfg(test)]
322mod tests {
323    use std::{thread::sleep, time::Duration};
324
325    use super::*;
326    use crate::{prelude::*, test_utils::*};
327
328    #[test]
329    fn default_thread_pool() {
330        let counter_sink = Arc::new(TestSink::new());
331        let build_logger = || {
332            build_test_logger(|b| {
333                b.sink(
334                    AsyncPoolSink::builder()
335                        .sink(counter_sink.clone())
336                        .build_arc()
337                        .unwrap(),
338                )
339                .level_filter(LevelFilter::All)
340                .flush_level_filter(LevelFilter::MoreSevereEqual(Level::Error))
341            })
342        };
343
344        assert_eq!(counter_sink.log_count(), 0);
345        assert_eq!(counter_sink.flush_count(), 0);
346
347        {
348            let logger = build_logger();
349
350            info!(logger: logger, "");
351            sleep(Duration::from_millis(50));
352            assert_eq!(counter_sink.log_count(), 1);
353            assert_eq!(counter_sink.flush_count(), 0);
354
355            warn!(logger: logger, "");
356            sleep(Duration::from_millis(50));
357            assert_eq!(counter_sink.log_count(), 2);
358            assert_eq!(counter_sink.flush_count(), 0);
359        }
360
361        {
362            let logger = build_logger();
363
364            error!(logger: logger, "");
365            sleep(Duration::from_millis(50));
366            assert_eq!(counter_sink.log_count(), 3);
367            assert_eq!(counter_sink.flush_count(), 1);
368
369            critical!(logger: logger, "");
370            sleep(Duration::from_millis(50));
371            assert_eq!(counter_sink.log_count(), 4);
372            assert_eq!(counter_sink.flush_count(), 2);
373        }
374    }
375
376    #[test]
377    fn custom_thread_pool() {
378        let counter_sink = Arc::new(TestSink::new());
379        let thread_pool = ThreadPool::builder().build_arc().unwrap();
380        let logger = build_test_logger(|b| {
381            b.sink(
382                AsyncPoolSink::builder()
383                    .sink(counter_sink.clone())
384                    .thread_pool(thread_pool)
385                    .build_arc()
386                    .unwrap(),
387            )
388            .level_filter(LevelFilter::All)
389            .flush_level_filter(LevelFilter::MoreSevereEqual(Level::Error))
390        });
391
392        assert_eq!(counter_sink.log_count(), 0);
393        assert_eq!(counter_sink.flush_count(), 0);
394
395        info!(logger: logger, "");
396        sleep(Duration::from_millis(50));
397        assert_eq!(counter_sink.log_count(), 1);
398        assert_eq!(counter_sink.flush_count(), 0);
399
400        warn!(logger: logger, "");
401        sleep(Duration::from_millis(50));
402        assert_eq!(counter_sink.log_count(), 2);
403        assert_eq!(counter_sink.flush_count(), 0);
404
405        error!(logger: logger, "");
406        sleep(Duration::from_millis(50));
407        assert_eq!(counter_sink.log_count(), 3);
408        assert_eq!(counter_sink.flush_count(), 1);
409    }
410
411    #[test]
412    fn async_opeartions() {
413        let counter_sink = Arc::new(TestSink::with_delay(Some(Duration::from_secs(1))));
414        // The default thread pool is not used here to avoid race when tests are run in
415        // parallel.
416        let thread_pool = ThreadPool::builder().build_arc().unwrap();
417        let logger = build_test_logger(|b| {
418            b.sink(
419                AsyncPoolSink::builder()
420                    .sink(counter_sink.clone())
421                    .thread_pool(thread_pool)
422                    .build_arc()
423                    .unwrap(),
424            )
425            .level_filter(LevelFilter::All)
426            .flush_level_filter(LevelFilter::MoreSevereEqual(Level::Warn))
427        });
428
429        assert_eq!(counter_sink.log_count(), 0);
430        assert_eq!(counter_sink.flush_count(), 0);
431
432        info!(logger: logger, "meow");
433        sleep(Duration::from_millis(500));
434        assert_eq!(counter_sink.log_count(), 0);
435        assert_eq!(counter_sink.flush_count(), 0);
436        sleep(Duration::from_millis(750));
437        assert_eq!(counter_sink.log_count(), 1);
438        assert_eq!(counter_sink.flush_count(), 0);
439
440        warn!(logger: logger, "nya");
441        sleep(Duration::from_millis(250));
442        assert_eq!(counter_sink.log_count(), 1);
443        assert_eq!(counter_sink.flush_count(), 0);
444        sleep(Duration::from_millis(1000));
445        assert_eq!(counter_sink.log_count(), 2);
446        assert_eq!(counter_sink.flush_count(), 0);
447        sleep(Duration::from_millis(1250));
448        assert_eq!(counter_sink.log_count(), 2);
449        assert_eq!(counter_sink.flush_count(), 1);
450    }
451}