1use crate::{
2 default_thread_pool,
3 formatter::{Formatter, UnreachableFormatter},
4 sink::{OverflowPolicy, Sink, SinkProp, SinkPropAccess, Sinks},
5 sync::*,
6 Error, ErrorHandler, LevelFilter, Record, RecordOwned, Result, ThreadPool,
7};
8
9pub struct AsyncPoolSink {
49 overflow_policy: OverflowPolicy,
50 thread_pool: Arc<ThreadPool>,
51 backend: Arc<Backend>,
52}
53
54impl AsyncPoolSink {
55 #[must_use]
69 pub fn builder() -> AsyncPoolSinkBuilder {
70 let prop = SinkProp::default();
71 prop.set_formatter(UnreachableFormatter::new());
75
76 AsyncPoolSinkBuilder {
77 prop,
78 overflow_policy: OverflowPolicy::Block,
79 sinks: Sinks::new(),
80 thread_pool: None,
81 }
82 }
83
84 #[must_use]
86 pub fn sinks(&self) -> &[Arc<dyn Sink>] {
87 &self.backend.sinks
88 }
89
90 fn assign_task(&self, task: Task) -> Result<()> {
91 self.thread_pool.assign_task(task, self.overflow_policy)
92 }
93
94 #[must_use]
95 fn clone_backend(&self) -> Arc<Backend> {
96 Arc::clone(&self.backend)
97 }
98}
99
100impl SinkPropAccess for AsyncPoolSink {
101 fn level_filter(&self) -> LevelFilter {
102 self.backend.prop.level_filter()
103 }
104
105 fn set_level_filter(&self, level_filter: LevelFilter) {
106 self.backend.prop.set_level_filter(level_filter);
107 }
108
109 fn set_formatter(&self, formatter: Box<dyn Formatter>) {
112 for sink in &self.backend.sinks {
113 sink.set_formatter(formatter.clone())
114 }
115 }
116
117 fn set_error_handler(&self, handler: ErrorHandler) {
118 self.backend.prop.set_error_handler(handler);
119 }
120}
121
122impl Sink for AsyncPoolSink {
123 fn log(&self, record: &Record) -> Result<()> {
124 self.assign_task(Task::Log {
125 backend: self.clone_backend(),
126 record: record.to_owned(),
127 })
128 }
129
130 fn flush(&self) -> Result<()> {
131 self.assign_task(Task::Flush {
132 backend: self.clone_backend(),
133 })
134 }
135
136 fn flush_on_exit(&self) -> Result<()> {
137 self.thread_pool.destroy();
145 self.backend.flush_on_exit()
146 }
147}
148
149#[allow(missing_docs)]
150pub struct AsyncPoolSinkBuilder {
151 prop: SinkProp,
152 sinks: Sinks,
153 overflow_policy: OverflowPolicy,
154 thread_pool: Option<Arc<ThreadPool>>,
155}
156
157impl AsyncPoolSinkBuilder {
158 #[must_use]
160 pub fn sink(mut self, sink: Arc<dyn Sink>) -> Self {
161 self.sinks.push(sink);
162 self
163 }
164
165 #[must_use]
167 pub fn sinks<I>(mut self, sinks: I) -> Self
168 where
169 I: IntoIterator<Item = Arc<dyn Sink>>,
170 {
171 self.sinks.append(&mut sinks.into_iter().collect());
172 self
173 }
174
175 #[must_use]
183 pub fn overflow_policy(mut self, overflow_policy: OverflowPolicy) -> Self {
184 self.overflow_policy = overflow_policy;
185 self
186 }
187
188 #[must_use]
193 pub fn thread_pool(mut self, thread_pool: Arc<ThreadPool>) -> Self {
194 self.thread_pool = Some(thread_pool);
195 self
196 }
197
198 #[must_use]
205 pub fn level_filter(self, level_filter: LevelFilter) -> Self {
206 self.prop.set_level_filter(level_filter);
207 self
208 }
209
210 #[doc(hidden)]
211 #[deprecated(
212 note = "AsyncPoolSink does not have its own formatter, this method has no effect, it was added by accident and may be removed in the future",
213 since = "0.5.2"
214 )]
215 #[must_use]
216 pub fn formatter<F>(self, formatter: F) -> Self
217 where
218 F: Formatter + 'static,
219 {
220 self.prop.set_formatter(formatter);
221 self
222 }
223
224 #[must_use]
229 pub fn error_handler<F: Into<ErrorHandler>>(self, handler: F) -> Self {
230 self.prop.set_error_handler(handler);
231 self
232 }
233
234 pub fn build(self) -> Result<AsyncPoolSink> {
236 let backend = Arc::new(Backend {
237 prop: self.prop,
238 sinks: self.sinks.clone(),
239 });
240
241 let thread_pool = self.thread_pool.unwrap_or_else(default_thread_pool);
242
243 Ok(AsyncPoolSink {
244 overflow_policy: self.overflow_policy,
245 thread_pool,
246 backend,
247 })
248 }
249
250 pub fn build_arc(self) -> Result<Arc<AsyncPoolSink>> {
254 self.build().map(Arc::new)
255 }
256}
257
258pub(crate) struct Backend {
259 prop: SinkProp,
260 sinks: Sinks,
261}
262
263impl Backend {
264 fn log(&self, record: &Record) -> Result<()> {
265 let mut result = Ok(());
266 for sink in &self.sinks {
267 result = Error::push_result(result, sink.log(record));
268 }
269 result
270 }
271
272 fn flush_with(&self, with: impl Fn(&dyn Sink) -> Result<()>) -> Result<()> {
273 let mut result = Ok(());
274 for sink in &self.sinks {
275 result = Error::push_result(result, with(&**sink));
276 }
277 result
278 }
279
280 fn flush(&self) -> Result<()> {
281 self.flush_with(|sink| sink.flush())
282 }
283
284 fn flush_on_exit(&self) -> Result<()> {
285 self.flush_with(|sink| sink.flush_on_exit())
286 }
287
288 fn handle_error(&self, err: Error) {
289 self.prop.call_error_handler_internal("AsyncPoolSink", err)
290 }
291}
292
293pub(crate) enum Task {
294 Log {
295 backend: Arc<Backend>,
296 record: RecordOwned,
297 },
298 Flush {
299 backend: Arc<Backend>,
300 },
301 #[cfg(test)]
302 __ForTestUse {
303 sleep: Option<std::time::Duration>,
304 },
305}
306
307impl Task {
308 pub(crate) fn exec(self) {
310 match self {
311 Task::Log { backend, record } => {
312 if let Err(err) = backend.log(&record.as_ref()) {
313 backend.handle_error(err)
314 }
315 }
316 Task::Flush { backend } => {
317 if let Err(err) = backend.flush() {
318 backend.handle_error(err)
319 }
320 }
321 #[cfg(test)]
322 Task::__ForTestUse { sleep } => {
323 if let Some(sleep) = sleep {
324 std::thread::sleep(sleep);
325 }
326 }
327 }
328 }
329}
330
331#[cfg(test)]
332mod tests {
333 use std::{thread::sleep, time::Duration};
334
335 use super::*;
336 use crate::{prelude::*, test_utils::*};
337
338 #[test]
339 fn default_thread_pool() {
340 let counter_sink = Arc::new(TestSink::new());
341 let build_logger = || {
342 build_test_logger(|b| {
343 b.sink(
344 AsyncPoolSink::builder()
345 .sink(counter_sink.clone())
346 .build_arc()
347 .unwrap(),
348 )
349 .level_filter(LevelFilter::All)
350 .flush_level_filter(LevelFilter::MoreSevereEqual(Level::Error))
351 })
352 };
353
354 assert_eq!(counter_sink.log_count(), 0);
355 assert_eq!(counter_sink.flush_count(), 0);
356
357 {
358 let logger = build_logger();
359
360 info!(logger: logger, "");
361 sleep(Duration::from_millis(50));
362 assert_eq!(counter_sink.log_count(), 1);
363 assert_eq!(counter_sink.flush_count(), 0);
364
365 warn!(logger: logger, "");
366 sleep(Duration::from_millis(50));
367 assert_eq!(counter_sink.log_count(), 2);
368 assert_eq!(counter_sink.flush_count(), 0);
369 }
370
371 {
372 let logger = build_logger();
373
374 error!(logger: logger, "");
375 sleep(Duration::from_millis(50));
376 assert_eq!(counter_sink.log_count(), 3);
377 assert_eq!(counter_sink.flush_count(), 1);
378
379 critical!(logger: logger, "");
380 sleep(Duration::from_millis(50));
381 assert_eq!(counter_sink.log_count(), 4);
382 assert_eq!(counter_sink.flush_count(), 2);
383 }
384 }
385
386 #[test]
387 fn custom_thread_pool() {
388 let counter_sink = Arc::new(TestSink::new());
389 let thread_pool = ThreadPool::builder().build_arc().unwrap();
390 let logger = build_test_logger(|b| {
391 b.sink(
392 AsyncPoolSink::builder()
393 .sink(counter_sink.clone())
394 .thread_pool(thread_pool)
395 .build_arc()
396 .unwrap(),
397 )
398 .level_filter(LevelFilter::All)
399 .flush_level_filter(LevelFilter::MoreSevereEqual(Level::Error))
400 });
401
402 assert_eq!(counter_sink.log_count(), 0);
403 assert_eq!(counter_sink.flush_count(), 0);
404
405 info!(logger: logger, "");
406 sleep(Duration::from_millis(50));
407 assert_eq!(counter_sink.log_count(), 1);
408 assert_eq!(counter_sink.flush_count(), 0);
409
410 warn!(logger: logger, "");
411 sleep(Duration::from_millis(50));
412 assert_eq!(counter_sink.log_count(), 2);
413 assert_eq!(counter_sink.flush_count(), 0);
414
415 error!(logger: logger, "");
416 sleep(Duration::from_millis(50));
417 assert_eq!(counter_sink.log_count(), 3);
418 assert_eq!(counter_sink.flush_count(), 1);
419 }
420
421 #[test]
422 fn async_opeartions() {
423 let counter_sink = Arc::new(TestSink::with_delay(Some(Duration::from_secs(1))));
424 let thread_pool = ThreadPool::builder().build_arc().unwrap();
427 let logger = build_test_logger(|b| {
428 b.sink(
429 AsyncPoolSink::builder()
430 .sink(counter_sink.clone())
431 .thread_pool(thread_pool)
432 .build_arc()
433 .unwrap(),
434 )
435 .level_filter(LevelFilter::All)
436 .flush_level_filter(LevelFilter::MoreSevereEqual(Level::Warn))
437 });
438
439 assert_eq!(counter_sink.log_count(), 0);
440 assert_eq!(counter_sink.flush_count(), 0);
441
442 info!(logger: logger, "meow");
443 sleep(Duration::from_millis(500));
444 assert_eq!(counter_sink.log_count(), 0);
445 assert_eq!(counter_sink.flush_count(), 0);
446 sleep(Duration::from_millis(750));
447 assert_eq!(counter_sink.log_count(), 1);
448 assert_eq!(counter_sink.flush_count(), 0);
449
450 warn!(logger: logger, "nya");
451 sleep(Duration::from_millis(250));
452 assert_eq!(counter_sink.log_count(), 1);
453 assert_eq!(counter_sink.flush_count(), 0);
454 sleep(Duration::from_millis(1000));
455 assert_eq!(counter_sink.log_count(), 2);
456 assert_eq!(counter_sink.flush_count(), 0);
457 sleep(Duration::from_millis(1250));
458 assert_eq!(counter_sink.log_count(), 2);
459 assert_eq!(counter_sink.flush_count(), 1);
460 }
461}