spdlog/sink/async_sink/
async_pool_sink.rs

1use crate::{
2    default_error_handler, default_thread_pool,
3    formatter::Formatter,
4    sink::{helper, OverflowPolicy, Sink, Sinks},
5    sync::*,
6    Error, ErrorHandler, LevelFilter, Record, RecordOwned, Result, ThreadPool,
7};
8
9/// A [combined sink], logging and flushing asynchronously (thread-pool-based).
10///
11/// Expensive operations (such as `log` and `flush`) on asynchronous sinks will
12/// be performed asynchronously on other threads.
13///
14/// Since there is no waiting, errors that occur while performing asynchronous
15/// operations will not be returned to the upper level, and instead the error
16/// handler of the sink will be called.
17///
18/// Users should only use asynchronous combined sinks to wrap actual sinks that
19/// require a long time for operations (e.g., file sinks that are frequently
20/// flushed, sinks involving networks), otherwise they will not get a
21/// performance boost or even worse.
22///
23/// Since the thread pool has a capacity limit, the queue may be full in some
24/// cases. When users encounter this situation, they have the following options:
25///
26///  - Adjust to a larger capacity via [`ThreadPoolBuilder::capacity`].
27///
28///  - Adjust the overflow policy via [`AsyncPoolSinkBuilder::overflow_policy`].
29///
30///  - Set up an error handler on asynchronous combined sinks via
31///    [`AsyncPoolSinkBuilder::error_handler`]. The handler will be called when
32///    a record is dropped or an operation has failed.
33///
34///
35/// # Note
36///
37/// Errors that occur in `log` and `flush` will not be returned directly,
38/// instead the error handler will be called.
39///
40/// # Examples
41///
42/// See [./examples] directory.
43///
44/// [combined sink]: index.html#combined-sink
45/// [`ThreadPoolBuilder::capacity`]: crate::ThreadPoolBuilder::capacity
46/// [./examples]: https://github.com/SpriteOvO/spdlog-rs/tree/main/spdlog/examples
47// The names `AsyncSink` and `AsyncRuntimeSink` is reserved for future use.
48pub struct AsyncPoolSink {
49    level_filter: Atomic<LevelFilter>,
50    overflow_policy: OverflowPolicy,
51    thread_pool: Arc<ThreadPool>,
52    backend: Arc<Backend>,
53}
54
55impl AsyncPoolSink {
56    /// Constructs a builder of `AsyncPoolSink` with default parameters:
57    ///
58    /// | Parameter         | Default Value                       |
59    /// |-------------------|-------------------------------------|
60    /// | [level_filter]    | `All`                               |
61    /// | [error_handler]   | [default error handler]             |
62    /// | [overflow_policy] | `Block`                             |
63    /// | [thread_pool]     | internal shared default thread pool |
64    ///
65    /// [level_filter]: AsyncPoolSinkBuilder::level_filter
66    /// [error_handler]: AsyncPoolSinkBuilder::error_handler
67    /// [default error handler]: error/index.html#default-error-handler
68    /// [overflow_policy]: AsyncPoolSinkBuilder::overflow_policy
69    /// [thread_pool]: AsyncPoolSinkBuilder::thread_pool
70    #[must_use]
71    pub fn builder() -> AsyncPoolSinkBuilder {
72        AsyncPoolSinkBuilder {
73            level_filter: helper::SINK_DEFAULT_LEVEL_FILTER,
74            overflow_policy: OverflowPolicy::Block,
75            sinks: Sinks::new(),
76            thread_pool: None,
77            error_handler: None,
78        }
79    }
80
81    /// Gets a reference to internal sinks in the combined sink.
82    #[must_use]
83    pub fn sinks(&self) -> &[Arc<dyn Sink>] {
84        &self.backend.sinks
85    }
86
87    /// Sets a error handler.
88    pub fn set_error_handler(&self, handler: Option<ErrorHandler>) {
89        self.backend.error_handler.swap(handler, Ordering::Relaxed);
90    }
91
92    fn assign_task(&self, task: Task) -> Result<()> {
93        self.thread_pool.assign_task(task, self.overflow_policy)
94    }
95
96    #[must_use]
97    fn clone_backend(&self) -> Arc<Backend> {
98        Arc::clone(&self.backend)
99    }
100}
101
102impl Sink for AsyncPoolSink {
103    fn log(&self, record: &Record) -> Result<()> {
104        self.assign_task(Task::Log {
105            backend: self.clone_backend(),
106            record: record.to_owned(),
107        })
108    }
109
110    fn flush(&self) -> Result<()> {
111        if crate::IS_TEARING_DOWN.load(Ordering::SeqCst) {
112            // https://github.com/SpriteOvO/spdlog-rs/issues/64
113            //
114            // If the program is tearing down, this will be the final flush. `crossbeam`
115            // uses thread-local internally, which is not supported in `atexit` callback.
116            // This can be bypassed by flushing sinks directly on the current thread, but
117            // before we do that we have to destroy the thread pool to ensure that any
118            // pending log tasks are completed.
119            self.thread_pool.destroy();
120            self.backend.flush()
121        } else {
122            self.assign_task(Task::Flush {
123                backend: self.clone_backend(),
124            })
125        }
126    }
127
128    /// For [`AsyncPoolSink`], the function performs the same call to all
129    /// internal sinks.
130    fn set_formatter(&self, formatter: Box<dyn Formatter>) {
131        for sink in &self.backend.sinks {
132            sink.set_formatter(formatter.clone())
133        }
134    }
135
136    helper::common_impl! {
137        @SinkCustom {
138            level_filter: level_filter,
139            formatter: None,
140            error_handler: backend.error_handler,
141        }
142    }
143}
144
145#[allow(missing_docs)]
146pub struct AsyncPoolSinkBuilder {
147    level_filter: LevelFilter,
148    sinks: Sinks,
149    overflow_policy: OverflowPolicy,
150    thread_pool: Option<Arc<ThreadPool>>,
151    error_handler: Option<ErrorHandler>,
152}
153
154impl AsyncPoolSinkBuilder {
155    /// Add a [`Sink`].
156    #[must_use]
157    pub fn sink(mut self, sink: Arc<dyn Sink>) -> Self {
158        self.sinks.push(sink);
159        self
160    }
161
162    /// Add multiple [`Sink`]s.
163    #[must_use]
164    pub fn sinks<I>(mut self, sinks: I) -> Self
165    where
166        I: IntoIterator<Item = Arc<dyn Sink>>,
167    {
168        self.sinks.append(&mut sinks.into_iter().collect());
169        self
170    }
171
172    /// Specifies a overflow policy.
173    ///
174    /// This parameter is **optional**.
175    ///
176    /// When the channel is full, an incoming operation is handled according to
177    /// the specified policy.
178    #[must_use]
179    pub fn overflow_policy(mut self, overflow_policy: OverflowPolicy) -> Self {
180        self.overflow_policy = overflow_policy;
181        self
182    }
183
184    /// Specifies a custom thread pool.
185    ///
186    /// This parameter is **optional**.
187    #[must_use]
188    pub fn thread_pool(mut self, thread_pool: Arc<ThreadPool>) -> Self {
189        self.thread_pool = Some(thread_pool);
190        self
191    }
192
193    /// Builds a [`AsyncPoolSink`].
194    pub fn build(self) -> Result<AsyncPoolSink> {
195        let backend = Arc::new(Backend {
196            sinks: self.sinks.clone(),
197            error_handler: Atomic::new(self.error_handler),
198        });
199
200        let thread_pool = self.thread_pool.unwrap_or_else(default_thread_pool);
201
202        Ok(AsyncPoolSink {
203            level_filter: Atomic::new(self.level_filter),
204            overflow_policy: self.overflow_policy,
205            thread_pool,
206            backend,
207        })
208    }
209
210    helper::common_impl!(@SinkBuilderCustom {
211        level_filter: level_filter,
212        formatter: None,
213        error_handler: error_handler,
214    });
215}
216
217pub(crate) struct Backend {
218    sinks: Sinks,
219    error_handler: helper::SinkErrorHandler,
220}
221
222impl Backend {
223    fn log(&self, record: &Record) -> Result<()> {
224        let mut result = Ok(());
225        for sink in &self.sinks {
226            result = Error::push_result(result, sink.log(record));
227        }
228        result
229    }
230
231    fn flush(&self) -> Result<()> {
232        let mut result = Ok(());
233        for sink in &self.sinks {
234            result = Error::push_result(result, sink.flush());
235        }
236        result
237    }
238
239    fn handle_error(&self, err: Error) {
240        self.error_handler
241            .load(Ordering::Relaxed)
242            .unwrap_or(|err| default_error_handler("AsyncPoolSink", err))(err);
243    }
244}
245
246pub(crate) enum Task {
247    Log {
248        backend: Arc<Backend>,
249        record: RecordOwned,
250    },
251    Flush {
252        backend: Arc<Backend>,
253    },
254}
255
256impl Task {
257    // calls this function in async threads
258    pub(crate) fn exec(self) {
259        match self {
260            Task::Log { backend, record } => {
261                if let Err(err) = backend.log(&record.as_ref()) {
262                    backend.handle_error(err)
263                }
264            }
265            Task::Flush { backend } => {
266                if let Err(err) = backend.flush() {
267                    backend.handle_error(err)
268                }
269            }
270        }
271    }
272}
273
274#[cfg(test)]
275mod tests {
276    use std::{thread::sleep, time::Duration};
277
278    use super::*;
279    use crate::{prelude::*, test_utils::*};
280
281    #[test]
282    fn default_thread_pool() {
283        let counter_sink = Arc::new(TestSink::new());
284        let build_logger = || {
285            build_test_logger(|b| {
286                b.sink(Arc::new(
287                    AsyncPoolSink::builder()
288                        .sink(counter_sink.clone())
289                        .build()
290                        .unwrap(),
291                ))
292                .level_filter(LevelFilter::All)
293                .flush_level_filter(LevelFilter::MoreSevereEqual(Level::Error))
294            })
295        };
296
297        assert_eq!(counter_sink.log_count(), 0);
298        assert_eq!(counter_sink.flush_count(), 0);
299
300        {
301            let logger = build_logger();
302
303            info!(logger: logger, "");
304            sleep(Duration::from_millis(50));
305            assert_eq!(counter_sink.log_count(), 1);
306            assert_eq!(counter_sink.flush_count(), 0);
307
308            warn!(logger: logger, "");
309            sleep(Duration::from_millis(50));
310            assert_eq!(counter_sink.log_count(), 2);
311            assert_eq!(counter_sink.flush_count(), 0);
312        }
313
314        {
315            let logger = build_logger();
316
317            error!(logger: logger, "");
318            sleep(Duration::from_millis(50));
319            assert_eq!(counter_sink.log_count(), 3);
320            assert_eq!(counter_sink.flush_count(), 1);
321
322            critical!(logger: logger, "");
323            sleep(Duration::from_millis(50));
324            assert_eq!(counter_sink.log_count(), 4);
325            assert_eq!(counter_sink.flush_count(), 2);
326        }
327    }
328
329    #[test]
330    fn custom_thread_pool() {
331        let counter_sink = Arc::new(TestSink::new());
332        let thread_pool = Arc::new(ThreadPool::builder().build().unwrap());
333        let logger = build_test_logger(|b| {
334            b.sink(Arc::new(
335                AsyncPoolSink::builder()
336                    .sink(counter_sink.clone())
337                    .thread_pool(thread_pool)
338                    .build()
339                    .unwrap(),
340            ))
341            .level_filter(LevelFilter::All)
342            .flush_level_filter(LevelFilter::MoreSevereEqual(Level::Error))
343        });
344
345        assert_eq!(counter_sink.log_count(), 0);
346        assert_eq!(counter_sink.flush_count(), 0);
347
348        info!(logger: logger, "");
349        sleep(Duration::from_millis(50));
350        assert_eq!(counter_sink.log_count(), 1);
351        assert_eq!(counter_sink.flush_count(), 0);
352
353        warn!(logger: logger, "");
354        sleep(Duration::from_millis(50));
355        assert_eq!(counter_sink.log_count(), 2);
356        assert_eq!(counter_sink.flush_count(), 0);
357
358        error!(logger: logger, "");
359        sleep(Duration::from_millis(50));
360        assert_eq!(counter_sink.log_count(), 3);
361        assert_eq!(counter_sink.flush_count(), 1);
362    }
363
364    #[test]
365    fn async_opeartions() {
366        let counter_sink = Arc::new(TestSink::with_delay(Some(Duration::from_secs(1))));
367        // The default thread pool is not used here to avoid race when tests are run in
368        // parallel.
369        let thread_pool = Arc::new(ThreadPool::builder().build().unwrap());
370        let logger = build_test_logger(|b| {
371            b.sink(Arc::new(
372                AsyncPoolSink::builder()
373                    .sink(counter_sink.clone())
374                    .thread_pool(thread_pool)
375                    .build()
376                    .unwrap(),
377            ))
378            .level_filter(LevelFilter::All)
379            .flush_level_filter(LevelFilter::MoreSevereEqual(Level::Warn))
380        });
381
382        assert_eq!(counter_sink.log_count(), 0);
383        assert_eq!(counter_sink.flush_count(), 0);
384
385        info!(logger: logger, "meow");
386        sleep(Duration::from_millis(500));
387        assert_eq!(counter_sink.log_count(), 0);
388        assert_eq!(counter_sink.flush_count(), 0);
389        sleep(Duration::from_millis(750));
390        assert_eq!(counter_sink.log_count(), 1);
391        assert_eq!(counter_sink.flush_count(), 0);
392
393        warn!(logger: logger, "nya");
394        sleep(Duration::from_millis(250));
395        assert_eq!(counter_sink.log_count(), 1);
396        assert_eq!(counter_sink.flush_count(), 0);
397        sleep(Duration::from_millis(1000));
398        assert_eq!(counter_sink.log_count(), 2);
399        assert_eq!(counter_sink.flush_count(), 0);
400        sleep(Duration::from_millis(1250));
401        assert_eq!(counter_sink.log_count(), 2);
402        assert_eq!(counter_sink.flush_count(), 1);
403    }
404}