1use std::{cmp::Ordering, convert::Infallible, sync::Arc, time::Duration};
2
3use crate::{
4 formatter::Formatter,
5 sink::{GetSinkProp, Sink, SinkProp, Sinks},
6 sync::*,
7 Error, ErrorHandler, LevelFilter, Record, RecordOwned, Result,
8};
9
10struct DedupSinkState {
11 last_record: Option<RecordOwned>,
12 skipped_count: usize,
13}
14
15pub struct DedupSink {
78 prop: SinkProp,
79 sinks: Sinks,
80 skip_duration: Duration,
81 state: Mutex<DedupSinkState>,
82}
83
84impl DedupSink {
85 #[must_use]
103 pub fn builder() -> DedupSinkBuilder<()> {
104 DedupSinkBuilder {
105 prop: SinkProp::default(),
106 sinks: vec![],
107 skip_duration: (),
108 }
109 }
110
111 #[must_use]
113 pub fn sinks(&self) -> &[Arc<dyn Sink>] {
114 &self.sinks
115 }
116
117 #[must_use]
118 fn is_dup_record(&self, last_record: Option<Record>, other: &Record) -> bool {
119 if let Some(last_record) = last_record {
120 last_record.payload() == other.payload()
121 && last_record.level() == other.level()
122 && other.time().duration_since(last_record.time()).unwrap() < self.skip_duration
123 } else {
124 false
125 }
126 }
127
128 fn log_skipping_message(&self, state: &mut DedupSinkState) -> Result<()> {
129 if state.skipped_count != 0 {
130 let last_record = state.last_record.as_ref().unwrap().as_ref();
131 match state.skipped_count.cmp(&1) {
132 Ordering::Equal => self.log_record(&last_record)?,
133 Ordering::Greater => self.log_record(
134 &last_record
135 .replace_payload(format!("(skipped {} duplicates)", state.skipped_count)),
136 )?,
137 Ordering::Less => unreachable!(), }
139 }
140 Ok(())
141 }
142
143 fn log_record(&self, record: &Record) -> Result<()> {
144 #[allow(clippy::manual_try_fold)] self.sinks.iter().fold(Ok(()), |result, sink| {
146 Error::push_result(result, sink.log(record))
147 })
148 }
149
150 fn flush_with(&self, with: fn(&dyn Sink) -> Result<()>) -> Result<()> {
151 #[allow(clippy::manual_try_fold)] self.sinks.iter().fold(Ok(()), |result, sink| {
153 Error::push_result(result, with(sink.as_ref()))
154 })
155 }
156
157 fn flush_sinks(&self) -> Result<()> {
158 self.flush_with(|sink| sink.flush())
159 }
160
161 fn flush_sinks_on_exit(&self) -> Result<()> {
162 self.flush_with(|sink| sink.flush_on_exit())
163 }
164}
165
166impl GetSinkProp for DedupSink {
167 fn prop(&self) -> &SinkProp {
168 &self.prop
169 }
170}
171
172impl Sink for DedupSink {
173 fn log(&self, record: &Record) -> Result<()> {
174 let mut state = self.state.lock_expect();
175
176 if self.is_dup_record(state.last_record.as_ref().map(|r| r.as_ref()), record) {
177 state.skipped_count += 1;
178 return Ok(());
179 }
180 self.log_skipping_message(&mut state)?;
181
182 self.log_record(record)?;
183 state.skipped_count = 0;
184 state.last_record = Some(record.to_owned());
185
186 Ok(())
187 }
188
189 fn flush(&self) -> Result<()> {
190 self.flush_sinks()
191 }
192
193 fn flush_on_exit(&self) -> Result<()> {
194 self.flush_sinks_on_exit()
195 }
196}
197
198impl Drop for DedupSink {
199 fn drop(&mut self) {
200 if let Err(err) = self.log_skipping_message(&mut self.state.lock_expect()) {
201 self.prop.call_error_handler_internal("DedupSink", err);
202 }
203 if let Err(err) = self.flush_sinks() {
204 self.prop.call_error_handler_internal("DedupSink", err);
205 }
206 }
207}
208
209#[doc = include_str!("../include/doc/generic-builder-note.md")]
211pub struct DedupSinkBuilder<ArgS> {
212 prop: SinkProp,
213 sinks: Sinks,
214 skip_duration: ArgS,
215}
216
217impl<ArgS> DedupSinkBuilder<ArgS> {
218 #[must_use]
220 pub fn sink(mut self, sink: Arc<dyn Sink>) -> Self {
221 self.sinks.push(sink);
222 self
223 }
224
225 #[must_use]
227 pub fn sinks<I>(mut self, sinks: I) -> Self
228 where
229 I: IntoIterator<Item = Arc<dyn Sink>>,
230 {
231 self.sinks.append(&mut sinks.into_iter().collect());
232 self
233 }
234
235 #[must_use]
240 pub fn skip_duration(self, duration: Duration) -> DedupSinkBuilder<Duration> {
241 DedupSinkBuilder {
242 prop: self.prop,
243 sinks: self.sinks,
244 skip_duration: duration,
245 }
246 }
247
248 #[must_use]
255 pub fn level_filter(self, level_filter: LevelFilter) -> Self {
256 self.prop.set_level_filter(level_filter);
257 self
258 }
259
260 #[must_use]
266 pub fn formatter<F>(self, formatter: F) -> Self
267 where
268 F: Formatter + 'static,
269 {
270 self.prop.set_formatter(formatter);
271 self
272 }
273
274 #[must_use]
279 pub fn error_handler<F: Into<ErrorHandler>>(self, handler: F) -> Self {
280 self.prop.set_error_handler(handler);
281 self
282 }
283}
284
285impl DedupSinkBuilder<()> {
286 #[doc(hidden)]
287 #[deprecated(note = "\n\n\
288 builder compile-time error:\n\
289 - missing required parameter `skip_duration`\n\n\
290 ")]
291 pub fn build(self, _: Infallible) {}
292
293 #[doc(hidden)]
294 #[deprecated(note = "\n\n\
295 builder compile-time error:\n\
296 - missing required parameter `skip_duration`\n\n\
297 ")]
298 pub fn build_arc(self, _: Infallible) {}
299}
300
301impl DedupSinkBuilder<Duration> {
302 pub fn build(self) -> Result<DedupSink> {
304 Ok(DedupSink {
305 prop: self.prop,
306 sinks: self.sinks,
307 skip_duration: self.skip_duration,
308 state: Mutex::new(DedupSinkState {
309 last_record: None,
310 skipped_count: 0,
311 }),
312 })
313 }
314
315 pub fn build_arc(self) -> Result<Arc<DedupSink>> {
319 self.build().map(Arc::new)
320 }
321}
322
323#[cfg(test)]
324mod tests {
325 use std::thread::sleep;
326
327 use super::*;
328 use crate::{prelude::*, test_utils::*};
329
330 #[test]
331 fn dedup() {
332 let test_sink = Arc::new(TestSink::new());
333 let dedup_sink = DedupSink::builder()
334 .skip_duration(Duration::from_secs(1))
335 .sink(test_sink.clone())
336 .build_arc()
337 .unwrap();
338 let test = build_test_logger(|b| b.sink(dedup_sink));
339
340 info!(logger: test, "I wish I was a cat");
341 info!(logger: test, "I wish I was a cat");
342 info!(logger: test, "I wish I was a cat");
343
344 warn!(logger: test, "I wish I was a cat");
345 warn!(logger: test, "I wish I was a cat");
346 sleep(Duration::from_millis(1250));
347 warn!(logger: test, "I wish I was a cat");
348
349 warn!(logger: test, "No school");
350 warn!(logger: test, "No works");
351 info!(logger: test, "Just meow meow");
352
353 info!(logger: test, "Meow~ Meow~");
354 info!(logger: test, "Meow~ Meow~");
355 info!(logger: test, "Meow~ Meow~");
356 info!(logger: test, "Meow~ Meow~");
357 sleep(Duration::from_millis(1250));
358 info!(logger: test, "Meow~ Meow~");
359 info!(logger: test, "Meow~ Meow~");
360 info!(logger: test, "Meow~ Meow~");
361 info!(logger: test, "Meow~ Meow...");
362
363 let records = test_sink.records();
364
365 assert_eq!(records.len(), 13);
366
367 assert_eq!(records[0].payload(), "I wish I was a cat");
368 assert_eq!(records[0].level(), Level::Info);
369
370 assert_eq!(records[1].payload(), "(skipped 2 duplicates)");
371 assert_eq!(records[1].level(), Level::Info);
372
373 assert_eq!(records[2].payload(), "I wish I was a cat");
374 assert_eq!(records[2].level(), Level::Warn);
375
376 assert_eq!(records[3].payload(), "I wish I was a cat");
377 assert_eq!(records[3].level(), Level::Warn);
378
379 assert_eq!(records[4].payload(), "I wish I was a cat");
380 assert_eq!(records[4].level(), Level::Warn);
381
382 assert_eq!(records[5].payload(), "No school");
383 assert_eq!(records[5].level(), Level::Warn);
384
385 assert_eq!(records[6].payload(), "No works");
386 assert_eq!(records[6].level(), Level::Warn);
387
388 assert_eq!(records[7].payload(), "Just meow meow");
389 assert_eq!(records[7].level(), Level::Info);
390
391 assert_eq!(records[8].payload(), "Meow~ Meow~");
392 assert_eq!(records[8].level(), Level::Info);
393
394 assert_eq!(records[9].payload(), "(skipped 3 duplicates)");
395 assert_eq!(records[9].level(), Level::Info);
396
397 assert_eq!(records[10].payload(), "Meow~ Meow~");
398 assert_eq!(records[10].level(), Level::Info);
399
400 assert_eq!(records[11].payload(), "(skipped 2 duplicates)");
401 assert_eq!(records[11].level(), Level::Info);
402
403 assert_eq!(records[12].payload(), "Meow~ Meow...");
404 assert_eq!(records[12].level(), Level::Info);
405 }
406
407 #[test]
408 fn dedup_on_drop() {
409 {
410 let records = {
411 let test_sink = Arc::new(TestSink::new());
412 {
413 let dedup_sink = DedupSink::builder()
414 .skip_duration(Duration::from_secs(1))
415 .sink(test_sink.clone())
416 .build_arc()
417 .unwrap();
418 let test = build_test_logger(|b| b.sink(dedup_sink));
419
420 info!(logger: test, "I wish I was a cat");
421 info!(logger: test, "I wish I was a cat");
422 }
423 test_sink.records()
424 };
425
426 assert_eq!(records.len(), 2);
427
428 assert_eq!(records[0].payload(), "I wish I was a cat");
429 assert_eq!(records[0].level(), Level::Info);
430
431 assert_eq!(records[1].payload(), "I wish I was a cat");
432 assert_eq!(records[1].level(), Level::Info);
433 }
434
435 {
436 let records = {
437 let test_sink = Arc::new(TestSink::new());
438 {
439 let dedup_sink = DedupSink::builder()
440 .skip_duration(Duration::from_secs(1))
441 .sink(test_sink.clone())
442 .build_arc()
443 .unwrap();
444 let test = build_test_logger(|b| b.sink(dedup_sink));
445
446 info!(logger: test, "I wish I was a cat");
447 info!(logger: test, "I wish I was a cat");
448 info!(logger: test, "I wish I was a cat");
449 }
450 test_sink.records()
451 };
452
453 assert_eq!(records.len(), 2);
454
455 assert_eq!(records[0].payload(), "I wish I was a cat");
456 assert_eq!(records[0].level(), Level::Info);
457
458 assert_eq!(records[1].payload(), "(skipped 2 duplicates)");
459 assert_eq!(records[1].level(), Level::Info);
460 }
461 }
462}