1use crate::{
2 default_error_handler, default_thread_pool,
3 formatter::Formatter,
4 sink::{helper, OverflowPolicy, Sink, Sinks},
5 sync::*,
6 Error, ErrorHandler, LevelFilter, Record, RecordOwned, Result, ThreadPool,
7};
8
9pub struct AsyncPoolSink {
49 level_filter: Atomic<LevelFilter>,
50 overflow_policy: OverflowPolicy,
51 thread_pool: Arc<ThreadPool>,
52 backend: Arc<Backend>,
53}
54
55impl AsyncPoolSink {
56 #[must_use]
71 pub fn builder() -> AsyncPoolSinkBuilder {
72 AsyncPoolSinkBuilder {
73 level_filter: helper::SINK_DEFAULT_LEVEL_FILTER,
74 overflow_policy: OverflowPolicy::Block,
75 sinks: Sinks::new(),
76 thread_pool: None,
77 error_handler: None,
78 }
79 }
80
81 #[must_use]
83 pub fn sinks(&self) -> &[Arc<dyn Sink>] {
84 &self.backend.sinks
85 }
86
87 pub fn set_error_handler(&self, handler: Option<ErrorHandler>) {
89 self.backend.error_handler.swap(handler, Ordering::Relaxed);
90 }
91
92 fn assign_task(&self, task: Task) -> Result<()> {
93 self.thread_pool.assign_task(task, self.overflow_policy)
94 }
95
96 #[must_use]
97 fn clone_backend(&self) -> Arc<Backend> {
98 Arc::clone(&self.backend)
99 }
100}
101
102impl Sink for AsyncPoolSink {
103 fn log(&self, record: &Record) -> Result<()> {
104 self.assign_task(Task::Log {
105 backend: self.clone_backend(),
106 record: record.to_owned(),
107 })
108 }
109
110 fn flush(&self) -> Result<()> {
111 if crate::IS_TEARING_DOWN.load(Ordering::SeqCst) {
112 self.thread_pool.destroy();
120 self.backend.flush()
121 } else {
122 self.assign_task(Task::Flush {
123 backend: self.clone_backend(),
124 })
125 }
126 }
127
128 fn set_formatter(&self, formatter: Box<dyn Formatter>) {
131 for sink in &self.backend.sinks {
132 sink.set_formatter(formatter.clone())
133 }
134 }
135
136 helper::common_impl! {
137 @SinkCustom {
138 level_filter: level_filter,
139 formatter: None,
140 error_handler: backend.error_handler,
141 }
142 }
143}
144
145#[allow(missing_docs)]
146pub struct AsyncPoolSinkBuilder {
147 level_filter: LevelFilter,
148 sinks: Sinks,
149 overflow_policy: OverflowPolicy,
150 thread_pool: Option<Arc<ThreadPool>>,
151 error_handler: Option<ErrorHandler>,
152}
153
154impl AsyncPoolSinkBuilder {
155 #[must_use]
157 pub fn sink(mut self, sink: Arc<dyn Sink>) -> Self {
158 self.sinks.push(sink);
159 self
160 }
161
162 #[must_use]
164 pub fn sinks<I>(mut self, sinks: I) -> Self
165 where
166 I: IntoIterator<Item = Arc<dyn Sink>>,
167 {
168 self.sinks.append(&mut sinks.into_iter().collect());
169 self
170 }
171
172 #[must_use]
179 pub fn overflow_policy(mut self, overflow_policy: OverflowPolicy) -> Self {
180 self.overflow_policy = overflow_policy;
181 self
182 }
183
184 #[must_use]
188 pub fn thread_pool(mut self, thread_pool: Arc<ThreadPool>) -> Self {
189 self.thread_pool = Some(thread_pool);
190 self
191 }
192
193 pub fn build(self) -> Result<AsyncPoolSink> {
195 let backend = Arc::new(Backend {
196 sinks: self.sinks.clone(),
197 error_handler: Atomic::new(self.error_handler),
198 });
199
200 let thread_pool = self.thread_pool.unwrap_or_else(default_thread_pool);
201
202 Ok(AsyncPoolSink {
203 level_filter: Atomic::new(self.level_filter),
204 overflow_policy: self.overflow_policy,
205 thread_pool,
206 backend,
207 })
208 }
209
210 helper::common_impl!(@SinkBuilderCustom {
211 level_filter: level_filter,
212 formatter: None,
213 error_handler: error_handler,
214 });
215}
216
217pub(crate) struct Backend {
218 sinks: Sinks,
219 error_handler: helper::SinkErrorHandler,
220}
221
222impl Backend {
223 fn log(&self, record: &Record) -> Result<()> {
224 let mut result = Ok(());
225 for sink in &self.sinks {
226 result = Error::push_result(result, sink.log(record));
227 }
228 result
229 }
230
231 fn flush(&self) -> Result<()> {
232 let mut result = Ok(());
233 for sink in &self.sinks {
234 result = Error::push_result(result, sink.flush());
235 }
236 result
237 }
238
239 fn handle_error(&self, err: Error) {
240 self.error_handler
241 .load(Ordering::Relaxed)
242 .unwrap_or(|err| default_error_handler("AsyncPoolSink", err))(err);
243 }
244}
245
246pub(crate) enum Task {
247 Log {
248 backend: Arc<Backend>,
249 record: RecordOwned,
250 },
251 Flush {
252 backend: Arc<Backend>,
253 },
254}
255
256impl Task {
257 pub(crate) fn exec(self) {
259 match self {
260 Task::Log { backend, record } => {
261 if let Err(err) = backend.log(&record.as_ref()) {
262 backend.handle_error(err)
263 }
264 }
265 Task::Flush { backend } => {
266 if let Err(err) = backend.flush() {
267 backend.handle_error(err)
268 }
269 }
270 }
271 }
272}
273
274#[cfg(test)]
275mod tests {
276 use std::{thread::sleep, time::Duration};
277
278 use super::*;
279 use crate::{prelude::*, test_utils::*};
280
281 #[test]
282 fn default_thread_pool() {
283 let counter_sink = Arc::new(TestSink::new());
284 let build_logger = || {
285 build_test_logger(|b| {
286 b.sink(Arc::new(
287 AsyncPoolSink::builder()
288 .sink(counter_sink.clone())
289 .build()
290 .unwrap(),
291 ))
292 .level_filter(LevelFilter::All)
293 .flush_level_filter(LevelFilter::MoreSevereEqual(Level::Error))
294 })
295 };
296
297 assert_eq!(counter_sink.log_count(), 0);
298 assert_eq!(counter_sink.flush_count(), 0);
299
300 {
301 let logger = build_logger();
302
303 info!(logger: logger, "");
304 sleep(Duration::from_millis(50));
305 assert_eq!(counter_sink.log_count(), 1);
306 assert_eq!(counter_sink.flush_count(), 0);
307
308 warn!(logger: logger, "");
309 sleep(Duration::from_millis(50));
310 assert_eq!(counter_sink.log_count(), 2);
311 assert_eq!(counter_sink.flush_count(), 0);
312 }
313
314 {
315 let logger = build_logger();
316
317 error!(logger: logger, "");
318 sleep(Duration::from_millis(50));
319 assert_eq!(counter_sink.log_count(), 3);
320 assert_eq!(counter_sink.flush_count(), 1);
321
322 critical!(logger: logger, "");
323 sleep(Duration::from_millis(50));
324 assert_eq!(counter_sink.log_count(), 4);
325 assert_eq!(counter_sink.flush_count(), 2);
326 }
327 }
328
329 #[test]
330 fn custom_thread_pool() {
331 let counter_sink = Arc::new(TestSink::new());
332 let thread_pool = Arc::new(ThreadPool::builder().build().unwrap());
333 let logger = build_test_logger(|b| {
334 b.sink(Arc::new(
335 AsyncPoolSink::builder()
336 .sink(counter_sink.clone())
337 .thread_pool(thread_pool)
338 .build()
339 .unwrap(),
340 ))
341 .level_filter(LevelFilter::All)
342 .flush_level_filter(LevelFilter::MoreSevereEqual(Level::Error))
343 });
344
345 assert_eq!(counter_sink.log_count(), 0);
346 assert_eq!(counter_sink.flush_count(), 0);
347
348 info!(logger: logger, "");
349 sleep(Duration::from_millis(50));
350 assert_eq!(counter_sink.log_count(), 1);
351 assert_eq!(counter_sink.flush_count(), 0);
352
353 warn!(logger: logger, "");
354 sleep(Duration::from_millis(50));
355 assert_eq!(counter_sink.log_count(), 2);
356 assert_eq!(counter_sink.flush_count(), 0);
357
358 error!(logger: logger, "");
359 sleep(Duration::from_millis(50));
360 assert_eq!(counter_sink.log_count(), 3);
361 assert_eq!(counter_sink.flush_count(), 1);
362 }
363
364 #[test]
365 fn async_opeartions() {
366 let counter_sink = Arc::new(TestSink::with_delay(Some(Duration::from_secs(1))));
367 let thread_pool = Arc::new(ThreadPool::builder().build().unwrap());
370 let logger = build_test_logger(|b| {
371 b.sink(Arc::new(
372 AsyncPoolSink::builder()
373 .sink(counter_sink.clone())
374 .thread_pool(thread_pool)
375 .build()
376 .unwrap(),
377 ))
378 .level_filter(LevelFilter::All)
379 .flush_level_filter(LevelFilter::MoreSevereEqual(Level::Warn))
380 });
381
382 assert_eq!(counter_sink.log_count(), 0);
383 assert_eq!(counter_sink.flush_count(), 0);
384
385 info!(logger: logger, "meow");
386 sleep(Duration::from_millis(500));
387 assert_eq!(counter_sink.log_count(), 0);
388 assert_eq!(counter_sink.flush_count(), 0);
389 sleep(Duration::from_millis(750));
390 assert_eq!(counter_sink.log_count(), 1);
391 assert_eq!(counter_sink.flush_count(), 0);
392
393 warn!(logger: logger, "nya");
394 sleep(Duration::from_millis(250));
395 assert_eq!(counter_sink.log_count(), 1);
396 assert_eq!(counter_sink.flush_count(), 0);
397 sleep(Duration::from_millis(1000));
398 assert_eq!(counter_sink.log_count(), 2);
399 assert_eq!(counter_sink.flush_count(), 0);
400 sleep(Duration::from_millis(1250));
401 assert_eq!(counter_sink.log_count(), 2);
402 assert_eq!(counter_sink.flush_count(), 1);
403 }
404}