1use crate::{
2 default_thread_pool,
3 formatter::{Formatter, UnreachableFormatter},
4 sink::{OverflowPolicy, Sink, SinkProp, SinkPropAccess, Sinks},
5 sync::*,
6 Error, ErrorHandler, LevelFilter, Record, RecordOwned, Result, ThreadPool,
7};
8
9pub struct AsyncPoolSink {
49 overflow_policy: OverflowPolicy,
50 thread_pool: Arc<ThreadPool>,
51 backend: Arc<Backend>,
52}
53
54impl AsyncPoolSink {
55 #[must_use]
70 pub fn builder() -> AsyncPoolSinkBuilder {
71 let prop = SinkProp::default();
72 prop.set_formatter(UnreachableFormatter::new());
76
77 AsyncPoolSinkBuilder {
78 prop,
79 overflow_policy: OverflowPolicy::Block,
80 sinks: Sinks::new(),
81 thread_pool: None,
82 }
83 }
84
85 #[must_use]
87 pub fn sinks(&self) -> &[Arc<dyn Sink>] {
88 &self.backend.sinks
89 }
90
91 fn assign_task(&self, task: Task) -> Result<()> {
92 self.thread_pool.assign_task(task, self.overflow_policy)
93 }
94
95 #[must_use]
96 fn clone_backend(&self) -> Arc<Backend> {
97 Arc::clone(&self.backend)
98 }
99}
100
101impl SinkPropAccess for AsyncPoolSink {
102 fn level_filter(&self) -> LevelFilter {
103 self.backend.prop.level_filter()
104 }
105
106 fn set_level_filter(&self, level_filter: LevelFilter) {
107 self.backend.prop.set_level_filter(level_filter);
108 }
109
110 fn set_formatter(&self, formatter: Box<dyn Formatter>) {
113 for sink in &self.backend.sinks {
114 sink.set_formatter(formatter.clone())
115 }
116 }
117
118 fn set_error_handler(&self, handler: ErrorHandler) {
119 self.backend.prop.set_error_handler(handler);
120 }
121}
122
123impl Sink for AsyncPoolSink {
124 fn log(&self, record: &Record) -> Result<()> {
125 self.assign_task(Task::Log {
126 backend: self.clone_backend(),
127 record: record.to_owned(),
128 })
129 }
130
131 fn flush(&self) -> Result<()> {
132 if crate::IS_TEARING_DOWN.load(Ordering::SeqCst) {
133 self.thread_pool.destroy();
141 self.backend.flush()
142 } else {
143 self.assign_task(Task::Flush {
144 backend: self.clone_backend(),
145 })
146 }
147 }
148}
149
150#[allow(missing_docs)]
151pub struct AsyncPoolSinkBuilder {
152 prop: SinkProp,
153 sinks: Sinks,
154 overflow_policy: OverflowPolicy,
155 thread_pool: Option<Arc<ThreadPool>>,
156}
157
158impl AsyncPoolSinkBuilder {
159 #[must_use]
161 pub fn sink(mut self, sink: Arc<dyn Sink>) -> Self {
162 self.sinks.push(sink);
163 self
164 }
165
166 #[must_use]
168 pub fn sinks<I>(mut self, sinks: I) -> Self
169 where
170 I: IntoIterator<Item = Arc<dyn Sink>>,
171 {
172 self.sinks.append(&mut sinks.into_iter().collect());
173 self
174 }
175
176 #[must_use]
183 pub fn overflow_policy(mut self, overflow_policy: OverflowPolicy) -> Self {
184 self.overflow_policy = overflow_policy;
185 self
186 }
187
188 #[must_use]
192 pub fn thread_pool(mut self, thread_pool: Arc<ThreadPool>) -> Self {
193 self.thread_pool = Some(thread_pool);
194 self
195 }
196
197 #[must_use]
204 pub fn level_filter(self, level_filter: LevelFilter) -> Self {
205 self.prop.set_level_filter(level_filter);
206 self
207 }
208
209 #[must_use]
213 pub fn formatter<F>(self, formatter: F) -> Self
214 where
215 F: Formatter + 'static,
216 {
217 self.prop.set_formatter(formatter);
218 self
219 }
220
221 #[must_use]
225 pub fn error_handler<F: Into<ErrorHandler>>(self, handler: F) -> Self {
226 self.prop.set_error_handler(handler);
227 self
228 }
229
230 pub fn build(self) -> Result<AsyncPoolSink> {
232 let backend = Arc::new(Backend {
233 prop: self.prop,
234 sinks: self.sinks.clone(),
235 });
236
237 let thread_pool = self.thread_pool.unwrap_or_else(default_thread_pool);
238
239 Ok(AsyncPoolSink {
240 overflow_policy: self.overflow_policy,
241 thread_pool,
242 backend,
243 })
244 }
245}
246
247pub(crate) struct Backend {
248 prop: SinkProp,
249 sinks: Sinks,
250}
251
252impl Backend {
253 fn log(&self, record: &Record) -> Result<()> {
254 let mut result = Ok(());
255 for sink in &self.sinks {
256 result = Error::push_result(result, sink.log(record));
257 }
258 result
259 }
260
261 fn flush(&self) -> Result<()> {
262 let mut result = Ok(());
263 for sink in &self.sinks {
264 result = Error::push_result(result, sink.flush());
265 }
266 result
267 }
268
269 fn handle_error(&self, err: Error) {
270 self.prop.call_error_handler_internal("AsyncPoolSink", err)
271 }
272}
273
274pub(crate) enum Task {
275 Log {
276 backend: Arc<Backend>,
277 record: RecordOwned,
278 },
279 Flush {
280 backend: Arc<Backend>,
281 },
282}
283
284impl Task {
285 pub(crate) fn exec(self) {
287 match self {
288 Task::Log { backend, record } => {
289 if let Err(err) = backend.log(&record.as_ref()) {
290 backend.handle_error(err)
291 }
292 }
293 Task::Flush { backend } => {
294 if let Err(err) = backend.flush() {
295 backend.handle_error(err)
296 }
297 }
298 }
299 }
300}
301
302#[cfg(test)]
303mod tests {
304 use std::{thread::sleep, time::Duration};
305
306 use super::*;
307 use crate::{prelude::*, test_utils::*};
308
309 #[test]
310 fn default_thread_pool() {
311 let counter_sink = Arc::new(TestSink::new());
312 let build_logger = || {
313 build_test_logger(|b| {
314 b.sink(Arc::new(
315 AsyncPoolSink::builder()
316 .sink(counter_sink.clone())
317 .build()
318 .unwrap(),
319 ))
320 .level_filter(LevelFilter::All)
321 .flush_level_filter(LevelFilter::MoreSevereEqual(Level::Error))
322 })
323 };
324
325 assert_eq!(counter_sink.log_count(), 0);
326 assert_eq!(counter_sink.flush_count(), 0);
327
328 {
329 let logger = build_logger();
330
331 info!(logger: logger, "");
332 sleep(Duration::from_millis(50));
333 assert_eq!(counter_sink.log_count(), 1);
334 assert_eq!(counter_sink.flush_count(), 0);
335
336 warn!(logger: logger, "");
337 sleep(Duration::from_millis(50));
338 assert_eq!(counter_sink.log_count(), 2);
339 assert_eq!(counter_sink.flush_count(), 0);
340 }
341
342 {
343 let logger = build_logger();
344
345 error!(logger: logger, "");
346 sleep(Duration::from_millis(50));
347 assert_eq!(counter_sink.log_count(), 3);
348 assert_eq!(counter_sink.flush_count(), 1);
349
350 critical!(logger: logger, "");
351 sleep(Duration::from_millis(50));
352 assert_eq!(counter_sink.log_count(), 4);
353 assert_eq!(counter_sink.flush_count(), 2);
354 }
355 }
356
357 #[test]
358 fn custom_thread_pool() {
359 let counter_sink = Arc::new(TestSink::new());
360 let thread_pool = Arc::new(ThreadPool::builder().build().unwrap());
361 let logger = build_test_logger(|b| {
362 b.sink(Arc::new(
363 AsyncPoolSink::builder()
364 .sink(counter_sink.clone())
365 .thread_pool(thread_pool)
366 .build()
367 .unwrap(),
368 ))
369 .level_filter(LevelFilter::All)
370 .flush_level_filter(LevelFilter::MoreSevereEqual(Level::Error))
371 });
372
373 assert_eq!(counter_sink.log_count(), 0);
374 assert_eq!(counter_sink.flush_count(), 0);
375
376 info!(logger: logger, "");
377 sleep(Duration::from_millis(50));
378 assert_eq!(counter_sink.log_count(), 1);
379 assert_eq!(counter_sink.flush_count(), 0);
380
381 warn!(logger: logger, "");
382 sleep(Duration::from_millis(50));
383 assert_eq!(counter_sink.log_count(), 2);
384 assert_eq!(counter_sink.flush_count(), 0);
385
386 error!(logger: logger, "");
387 sleep(Duration::from_millis(50));
388 assert_eq!(counter_sink.log_count(), 3);
389 assert_eq!(counter_sink.flush_count(), 1);
390 }
391
392 #[test]
393 fn async_opeartions() {
394 let counter_sink = Arc::new(TestSink::with_delay(Some(Duration::from_secs(1))));
395 let thread_pool = Arc::new(ThreadPool::builder().build().unwrap());
398 let logger = build_test_logger(|b| {
399 b.sink(Arc::new(
400 AsyncPoolSink::builder()
401 .sink(counter_sink.clone())
402 .thread_pool(thread_pool)
403 .build()
404 .unwrap(),
405 ))
406 .level_filter(LevelFilter::All)
407 .flush_level_filter(LevelFilter::MoreSevereEqual(Level::Warn))
408 });
409
410 assert_eq!(counter_sink.log_count(), 0);
411 assert_eq!(counter_sink.flush_count(), 0);
412
413 info!(logger: logger, "meow");
414 sleep(Duration::from_millis(500));
415 assert_eq!(counter_sink.log_count(), 0);
416 assert_eq!(counter_sink.flush_count(), 0);
417 sleep(Duration::from_millis(750));
418 assert_eq!(counter_sink.log_count(), 1);
419 assert_eq!(counter_sink.flush_count(), 0);
420
421 warn!(logger: logger, "nya");
422 sleep(Duration::from_millis(250));
423 assert_eq!(counter_sink.log_count(), 1);
424 assert_eq!(counter_sink.flush_count(), 0);
425 sleep(Duration::from_millis(1000));
426 assert_eq!(counter_sink.log_count(), 2);
427 assert_eq!(counter_sink.flush_count(), 0);
428 sleep(Duration::from_millis(1250));
429 assert_eq!(counter_sink.log_count(), 2);
430 assert_eq!(counter_sink.flush_count(), 1);
431 }
432}