1use crate::{
2 default_thread_pool,
3 formatter::{Formatter, UnreachableFormatter},
4 sink::{OverflowPolicy, Sink, SinkProp, SinkPropAccess, Sinks},
5 sync::*,
6 Error, ErrorHandler, LevelFilter, Record, RecordOwned, Result, ThreadPool,
7};
8
9pub struct AsyncPoolSink {
49 overflow_policy: OverflowPolicy,
50 thread_pool: Arc<ThreadPool>,
51 backend: Arc<Backend>,
52}
53
54impl AsyncPoolSink {
55 #[must_use]
69 pub fn builder() -> AsyncPoolSinkBuilder {
70 let prop = SinkProp::default();
71 prop.set_formatter(UnreachableFormatter::new());
75
76 AsyncPoolSinkBuilder {
77 prop,
78 overflow_policy: OverflowPolicy::Block,
79 sinks: Sinks::new(),
80 thread_pool: None,
81 }
82 }
83
84 #[must_use]
86 pub fn sinks(&self) -> &[Arc<dyn Sink>] {
87 &self.backend.sinks
88 }
89
90 fn assign_task(&self, task: Task) -> Result<()> {
91 self.thread_pool.assign_task(task, self.overflow_policy)
92 }
93
94 #[must_use]
95 fn clone_backend(&self) -> Arc<Backend> {
96 Arc::clone(&self.backend)
97 }
98}
99
100impl SinkPropAccess for AsyncPoolSink {
101 fn level_filter(&self) -> LevelFilter {
102 self.backend.prop.level_filter()
103 }
104
105 fn set_level_filter(&self, level_filter: LevelFilter) {
106 self.backend.prop.set_level_filter(level_filter);
107 }
108
109 fn set_formatter(&self, formatter: Box<dyn Formatter>) {
112 for sink in &self.backend.sinks {
113 sink.set_formatter(formatter.clone())
114 }
115 }
116
117 fn set_error_handler(&self, handler: ErrorHandler) {
118 self.backend.prop.set_error_handler(handler);
119 }
120}
121
122impl Sink for AsyncPoolSink {
123 fn log(&self, record: &Record) -> Result<()> {
124 self.assign_task(Task::Log {
125 backend: self.clone_backend(),
126 record: record.to_owned(),
127 })
128 }
129
130 fn flush(&self) -> Result<()> {
131 self.assign_task(Task::Flush {
132 backend: self.clone_backend(),
133 })
134 }
135
136 fn flush_on_exit(&self) -> Result<()> {
137 self.thread_pool.destroy();
145 self.backend.flush_on_exit()
146 }
147}
148
149#[allow(missing_docs)]
150pub struct AsyncPoolSinkBuilder {
151 prop: SinkProp,
152 sinks: Sinks,
153 overflow_policy: OverflowPolicy,
154 thread_pool: Option<Arc<ThreadPool>>,
155}
156
157impl AsyncPoolSinkBuilder {
158 #[must_use]
160 pub fn sink(mut self, sink: Arc<dyn Sink>) -> Self {
161 self.sinks.push(sink);
162 self
163 }
164
165 #[must_use]
167 pub fn sinks<I>(mut self, sinks: I) -> Self
168 where
169 I: IntoIterator<Item = Arc<dyn Sink>>,
170 {
171 self.sinks.append(&mut sinks.into_iter().collect());
172 self
173 }
174
175 #[must_use]
183 pub fn overflow_policy(mut self, overflow_policy: OverflowPolicy) -> Self {
184 self.overflow_policy = overflow_policy;
185 self
186 }
187
188 #[must_use]
193 pub fn thread_pool(mut self, thread_pool: Arc<ThreadPool>) -> Self {
194 self.thread_pool = Some(thread_pool);
195 self
196 }
197
198 #[must_use]
205 pub fn level_filter(self, level_filter: LevelFilter) -> Self {
206 self.prop.set_level_filter(level_filter);
207 self
208 }
209
210 #[doc(hidden)]
211 #[deprecated(
212 note = "AsyncPoolSink does not have its own formatter, this method has no effect, it was added by accident and may be removed in the future",
213 since = "0.5.2"
214 )]
215 #[must_use]
216 pub fn formatter<F>(self, formatter: F) -> Self
217 where
218 F: Formatter + 'static,
219 {
220 self.prop.set_formatter(formatter);
221 self
222 }
223
224 #[must_use]
229 pub fn error_handler<F: Into<ErrorHandler>>(self, handler: F) -> Self {
230 self.prop.set_error_handler(handler);
231 self
232 }
233
234 pub fn build(self) -> Result<AsyncPoolSink> {
236 let backend = Arc::new(Backend {
237 prop: self.prop,
238 sinks: self.sinks.clone(),
239 });
240
241 let thread_pool = self.thread_pool.unwrap_or_else(default_thread_pool);
242
243 Ok(AsyncPoolSink {
244 overflow_policy: self.overflow_policy,
245 thread_pool,
246 backend,
247 })
248 }
249
250 pub fn build_arc(self) -> Result<Arc<AsyncPoolSink>> {
254 self.build().map(Arc::new)
255 }
256}
257
258pub(crate) struct Backend {
259 prop: SinkProp,
260 sinks: Sinks,
261}
262
263impl Backend {
264 fn log(&self, record: &Record) -> Result<()> {
265 let mut result = Ok(());
266 for sink in &self.sinks {
267 result = Error::push_result(result, sink.log(record));
268 }
269 result
270 }
271
272 fn flush_with(&self, with: impl Fn(&dyn Sink) -> Result<()>) -> Result<()> {
273 let mut result = Ok(());
274 for sink in &self.sinks {
275 result = Error::push_result(result, with(&**sink));
276 }
277 result
278 }
279
280 fn flush(&self) -> Result<()> {
281 self.flush_with(|sink| sink.flush())
282 }
283
284 fn flush_on_exit(&self) -> Result<()> {
285 self.flush_with(|sink| sink.flush_on_exit())
286 }
287
288 fn handle_error(&self, err: Error) {
289 self.prop.call_error_handler_internal("AsyncPoolSink", err)
290 }
291}
292
293pub(crate) enum Task {
294 Log {
295 backend: Arc<Backend>,
296 record: RecordOwned,
297 },
298 Flush {
299 backend: Arc<Backend>,
300 },
301}
302
303impl Task {
304 pub(crate) fn exec(self) {
306 match self {
307 Task::Log { backend, record } => {
308 if let Err(err) = backend.log(&record.as_ref()) {
309 backend.handle_error(err)
310 }
311 }
312 Task::Flush { backend } => {
313 if let Err(err) = backend.flush() {
314 backend.handle_error(err)
315 }
316 }
317 }
318 }
319}
320
321#[cfg(test)]
322mod tests {
323 use std::{thread::sleep, time::Duration};
324
325 use super::*;
326 use crate::{prelude::*, test_utils::*};
327
328 #[test]
329 fn default_thread_pool() {
330 let counter_sink = Arc::new(TestSink::new());
331 let build_logger = || {
332 build_test_logger(|b| {
333 b.sink(
334 AsyncPoolSink::builder()
335 .sink(counter_sink.clone())
336 .build_arc()
337 .unwrap(),
338 )
339 .level_filter(LevelFilter::All)
340 .flush_level_filter(LevelFilter::MoreSevereEqual(Level::Error))
341 })
342 };
343
344 assert_eq!(counter_sink.log_count(), 0);
345 assert_eq!(counter_sink.flush_count(), 0);
346
347 {
348 let logger = build_logger();
349
350 info!(logger: logger, "");
351 sleep(Duration::from_millis(50));
352 assert_eq!(counter_sink.log_count(), 1);
353 assert_eq!(counter_sink.flush_count(), 0);
354
355 warn!(logger: logger, "");
356 sleep(Duration::from_millis(50));
357 assert_eq!(counter_sink.log_count(), 2);
358 assert_eq!(counter_sink.flush_count(), 0);
359 }
360
361 {
362 let logger = build_logger();
363
364 error!(logger: logger, "");
365 sleep(Duration::from_millis(50));
366 assert_eq!(counter_sink.log_count(), 3);
367 assert_eq!(counter_sink.flush_count(), 1);
368
369 critical!(logger: logger, "");
370 sleep(Duration::from_millis(50));
371 assert_eq!(counter_sink.log_count(), 4);
372 assert_eq!(counter_sink.flush_count(), 2);
373 }
374 }
375
376 #[test]
377 fn custom_thread_pool() {
378 let counter_sink = Arc::new(TestSink::new());
379 let thread_pool = ThreadPool::builder().build_arc().unwrap();
380 let logger = build_test_logger(|b| {
381 b.sink(
382 AsyncPoolSink::builder()
383 .sink(counter_sink.clone())
384 .thread_pool(thread_pool)
385 .build_arc()
386 .unwrap(),
387 )
388 .level_filter(LevelFilter::All)
389 .flush_level_filter(LevelFilter::MoreSevereEqual(Level::Error))
390 });
391
392 assert_eq!(counter_sink.log_count(), 0);
393 assert_eq!(counter_sink.flush_count(), 0);
394
395 info!(logger: logger, "");
396 sleep(Duration::from_millis(50));
397 assert_eq!(counter_sink.log_count(), 1);
398 assert_eq!(counter_sink.flush_count(), 0);
399
400 warn!(logger: logger, "");
401 sleep(Duration::from_millis(50));
402 assert_eq!(counter_sink.log_count(), 2);
403 assert_eq!(counter_sink.flush_count(), 0);
404
405 error!(logger: logger, "");
406 sleep(Duration::from_millis(50));
407 assert_eq!(counter_sink.log_count(), 3);
408 assert_eq!(counter_sink.flush_count(), 1);
409 }
410
411 #[test]
412 fn async_opeartions() {
413 let counter_sink = Arc::new(TestSink::with_delay(Some(Duration::from_secs(1))));
414 let thread_pool = ThreadPool::builder().build_arc().unwrap();
417 let logger = build_test_logger(|b| {
418 b.sink(
419 AsyncPoolSink::builder()
420 .sink(counter_sink.clone())
421 .thread_pool(thread_pool)
422 .build_arc()
423 .unwrap(),
424 )
425 .level_filter(LevelFilter::All)
426 .flush_level_filter(LevelFilter::MoreSevereEqual(Level::Warn))
427 });
428
429 assert_eq!(counter_sink.log_count(), 0);
430 assert_eq!(counter_sink.flush_count(), 0);
431
432 info!(logger: logger, "meow");
433 sleep(Duration::from_millis(500));
434 assert_eq!(counter_sink.log_count(), 0);
435 assert_eq!(counter_sink.flush_count(), 0);
436 sleep(Duration::from_millis(750));
437 assert_eq!(counter_sink.log_count(), 1);
438 assert_eq!(counter_sink.flush_count(), 0);
439
440 warn!(logger: logger, "nya");
441 sleep(Duration::from_millis(250));
442 assert_eq!(counter_sink.log_count(), 1);
443 assert_eq!(counter_sink.flush_count(), 0);
444 sleep(Duration::from_millis(1000));
445 assert_eq!(counter_sink.log_count(), 2);
446 assert_eq!(counter_sink.flush_count(), 0);
447 sleep(Duration::from_millis(1250));
448 assert_eq!(counter_sink.log_count(), 2);
449 assert_eq!(counter_sink.flush_count(), 1);
450 }
451}