spdlog/sink/
dedup_sink.rs

1use std::{cmp::Ordering, convert::Infallible, sync::Arc, time::Duration};
2
3use crate::{
4    formatter::Formatter,
5    sink::{GetSinkProp, Sink, SinkProp, Sinks},
6    sync::*,
7    Error, ErrorHandler, LevelFilter, Record, RecordOwned, Result,
8};
9
10struct DedupSinkState {
11    last_record: Option<RecordOwned>,
12    skipped_count: usize,
13}
14
15/// A [combined sink], skip consecutive repeated records.
16///
17/// More than 2 consecutive repeated records, the records after the first one
18/// will be replaced with a single record `"(skipped {count} duplicates)"`.
19///
20/// The skip will stop if the incoming record compares to the last skipped
21/// records:
22/// - content changed, or
23/// - logging level changed, or
24/// - interval exceeded the skip duration
25///
26/// # Example
27///
28/// ```
29/// use std::time::Duration;
30///
31/// use spdlog::{prelude::*, sink::DedupSink};
32/// # use spdlog::{
33/// #     formatter::{pattern, PatternFormatter},
34/// #     sink::WriteSink,
35/// # };
36/// #
37/// # fn main() -> Result<(), spdlog::Error> {
38/// # let underlying_sink = WriteSink::builder()
39/// #     .formatter(PatternFormatter::new(pattern!("{payload}\n")))
40/// #     .target(Vec::new())
41/// #     .build_arc()?;
42///
43/// # let sink = {
44/// #     let underlying_sink = underlying_sink.clone();
45/// let sink = DedupSink::builder()
46///     .sink(underlying_sink)
47///     .skip_duration(Duration::from_secs(1))
48///     .build_arc()?;
49/// #     sink
50/// # };
51/// # let doctest = Logger::builder().sink(sink).build()?;
52///
53/// // ... Add the `sink` to a logger
54///
55/// info!(logger: doctest, "I wish I was a cat");
56/// info!(logger: doctest, "I wish I was a cat");
57/// info!(logger: doctest, "I wish I was a cat");
58/// // The skip will stop since the content changed.
59/// info!(logger: doctest, "No school");
60/// info!(logger: doctest, "No works");
61/// info!(logger: doctest, "Just meow meow");
62///
63/// # assert_eq!(
64/// #     String::from_utf8(underlying_sink.clone_target()).unwrap(),
65/// /* Output of `underlying_sink` */
66/// r#"I wish I was a cat
67/// (skipped 2 duplicates)
68/// No school
69/// No works
70/// Just meow meow
71/// "#
72/// # );
73/// # Ok(()) }
74/// ```
75///
76/// [combined sink]: index.html#combined-sink
77pub struct DedupSink {
78    prop: SinkProp,
79    sinks: Sinks,
80    skip_duration: Duration,
81    state: Mutex<DedupSinkState>,
82}
83
84impl DedupSink {
85    /// Gets a builder of `DedupSink` with default parameters:
86    ///
87    /// | Parameter       | Default Value               |
88    /// |-----------------|-----------------------------|
89    /// | [level_filter]  | [`LevelFilter::All`]        |
90    /// | [formatter]     | [`FullFormatter`]           |
91    /// | [error_handler] | [`ErrorHandler::default()`] |
92    /// |                 |                             |
93    /// | [sinks]         | `[]`                        |
94    /// | [skip_duration] | *must be specified*         |
95    ///
96    /// [level_filter]: DedupSinkBuilder::level_filter
97    /// [formatter]: DedupSinkBuilder::formatter
98    /// [`FullFormatter`]: crate::formatter::FullFormatter
99    /// [error_handler]: DedupSinkBuilder::error_handler
100    /// [sinks]: DedupSinkBuilder::sink
101    /// [skip_duration]: DedupSinkBuilder::skip_duration
102    #[must_use]
103    pub fn builder() -> DedupSinkBuilder<()> {
104        DedupSinkBuilder {
105            prop: SinkProp::default(),
106            sinks: vec![],
107            skip_duration: (),
108        }
109    }
110
111    /// Gets a reference to internal sinks in the combined sink.
112    #[must_use]
113    pub fn sinks(&self) -> &[Arc<dyn Sink>] {
114        &self.sinks
115    }
116
117    #[must_use]
118    fn is_dup_record(&self, last_record: Option<Record>, other: &Record) -> bool {
119        if let Some(last_record) = last_record {
120            last_record.payload() == other.payload()
121                && last_record.level() == other.level()
122                && other.time().duration_since(last_record.time()).unwrap() < self.skip_duration
123        } else {
124            false
125        }
126    }
127
128    fn log_skipping_message(&self, state: &mut DedupSinkState) -> Result<()> {
129        if state.skipped_count != 0 {
130            let last_record = state.last_record.as_ref().unwrap().as_ref();
131            match state.skipped_count.cmp(&1) {
132                Ordering::Equal => self.log_record(&last_record)?,
133                Ordering::Greater => self.log_record(
134                    &last_record
135                        .replace_payload(format!("(skipped {} duplicates)", state.skipped_count)),
136                )?,
137                Ordering::Less => unreachable!(), // We have checked if `state.skipped_count != 0`
138            }
139        }
140        Ok(())
141    }
142
143    fn log_record(&self, record: &Record) -> Result<()> {
144        #[allow(clippy::manual_try_fold)] // https://github.com/rust-lang/rust-clippy/issues/11554
145        self.sinks.iter().fold(Ok(()), |result, sink| {
146            Error::push_result(result, sink.log(record))
147        })
148    }
149
150    fn flush_with(&self, with: fn(&dyn Sink) -> Result<()>) -> Result<()> {
151        #[allow(clippy::manual_try_fold)] // https://github.com/rust-lang/rust-clippy/issues/11554
152        self.sinks.iter().fold(Ok(()), |result, sink| {
153            Error::push_result(result, with(sink.as_ref()))
154        })
155    }
156
157    fn flush_sinks(&self) -> Result<()> {
158        self.flush_with(|sink| sink.flush())
159    }
160
161    fn flush_sinks_on_exit(&self) -> Result<()> {
162        self.flush_with(|sink| sink.flush_on_exit())
163    }
164}
165
166impl GetSinkProp for DedupSink {
167    fn prop(&self) -> &SinkProp {
168        &self.prop
169    }
170}
171
172impl Sink for DedupSink {
173    fn log(&self, record: &Record) -> Result<()> {
174        let mut state = self.state.lock_expect();
175
176        if self.is_dup_record(state.last_record.as_ref().map(|r| r.as_ref()), record) {
177            state.skipped_count += 1;
178            return Ok(());
179        }
180        self.log_skipping_message(&mut state)?;
181
182        self.log_record(record)?;
183        state.skipped_count = 0;
184        state.last_record = Some(record.to_owned());
185
186        Ok(())
187    }
188
189    fn flush(&self) -> Result<()> {
190        self.flush_sinks()
191    }
192
193    fn flush_on_exit(&self) -> Result<()> {
194        self.flush_sinks_on_exit()
195    }
196}
197
198impl Drop for DedupSink {
199    fn drop(&mut self) {
200        if let Err(err) = self.log_skipping_message(&mut self.state.lock_expect()) {
201            self.prop.call_error_handler_internal("DedupSink", err);
202        }
203        if let Err(err) = self.flush_sinks() {
204            self.prop.call_error_handler_internal("DedupSink", err);
205        }
206    }
207}
208
209/// #
210#[doc = include_str!("../include/doc/generic-builder-note.md")]
211pub struct DedupSinkBuilder<ArgS> {
212    prop: SinkProp,
213    sinks: Sinks,
214    skip_duration: ArgS,
215}
216
217impl<ArgS> DedupSinkBuilder<ArgS> {
218    /// Add a [`Sink`].
219    #[must_use]
220    pub fn sink(mut self, sink: Arc<dyn Sink>) -> Self {
221        self.sinks.push(sink);
222        self
223    }
224
225    /// Add multiple [`Sink`]s.
226    #[must_use]
227    pub fn sinks<I>(mut self, sinks: I) -> Self
228    where
229        I: IntoIterator<Item = Arc<dyn Sink>>,
230    {
231        self.sinks.append(&mut sinks.into_iter().collect());
232        self
233    }
234
235    /// Only consecutive repeated records within the given duration will be
236    /// skipped.
237    ///
238    /// This parameter is **required**.
239    #[must_use]
240    pub fn skip_duration(self, duration: Duration) -> DedupSinkBuilder<Duration> {
241        DedupSinkBuilder {
242            prop: self.prop,
243            sinks: self.sinks,
244            skip_duration: duration,
245        }
246    }
247
248    // Prop
249    //
250
251    /// Specifies a log level filter.
252    ///
253    /// This parameter is **optional**, and defaults to [`LevelFilter::All`].
254    #[must_use]
255    pub fn level_filter(self, level_filter: LevelFilter) -> Self {
256        self.prop.set_level_filter(level_filter);
257        self
258    }
259
260    /// Specifies a formatter.
261    ///
262    /// This parameter is **optional**, and defaults to [`FullFormatter`].
263    ///
264    /// [`FullFormatter`]: crate::formatter::FullFormatter
265    #[must_use]
266    pub fn formatter<F>(self, formatter: F) -> Self
267    where
268        F: Formatter + 'static,
269    {
270        self.prop.set_formatter(formatter);
271        self
272    }
273
274    /// Specifies an error handler.
275    ///
276    /// This parameter is **optional**, and defaults to
277    /// [`ErrorHandler::default()`].
278    #[must_use]
279    pub fn error_handler<F: Into<ErrorHandler>>(self, handler: F) -> Self {
280        self.prop.set_error_handler(handler);
281        self
282    }
283}
284
285impl DedupSinkBuilder<()> {
286    #[doc(hidden)]
287    #[deprecated(note = "\n\n\
288        builder compile-time error:\n\
289        - missing required parameter `skip_duration`\n\n\
290    ")]
291    pub fn build(self, _: Infallible) {}
292
293    #[doc(hidden)]
294    #[deprecated(note = "\n\n\
295        builder compile-time error:\n\
296        - missing required parameter `skip_duration`\n\n\
297    ")]
298    pub fn build_arc(self, _: Infallible) {}
299}
300
301impl DedupSinkBuilder<Duration> {
302    /// Builds a [`DedupSink`].
303    pub fn build(self) -> Result<DedupSink> {
304        Ok(DedupSink {
305            prop: self.prop,
306            sinks: self.sinks,
307            skip_duration: self.skip_duration,
308            state: Mutex::new(DedupSinkState {
309                last_record: None,
310                skipped_count: 0,
311            }),
312        })
313    }
314
315    /// Builds a `Arc<DedupSink>`.
316    ///
317    /// This is a shorthand method for `.build().map(Arc::new)`.
318    pub fn build_arc(self) -> Result<Arc<DedupSink>> {
319        self.build().map(Arc::new)
320    }
321}
322
323#[cfg(test)]
324mod tests {
325    use std::thread::sleep;
326
327    use super::*;
328    use crate::{prelude::*, test_utils::*};
329
330    #[test]
331    fn dedup() {
332        let test_sink = Arc::new(TestSink::new());
333        let dedup_sink = DedupSink::builder()
334            .skip_duration(Duration::from_secs(1))
335            .sink(test_sink.clone())
336            .build_arc()
337            .unwrap();
338        let test = build_test_logger(|b| b.sink(dedup_sink));
339
340        info!(logger: test, "I wish I was a cat");
341        info!(logger: test, "I wish I was a cat");
342        info!(logger: test, "I wish I was a cat");
343
344        warn!(logger: test, "I wish I was a cat");
345        warn!(logger: test, "I wish I was a cat");
346        sleep(Duration::from_millis(1250));
347        warn!(logger: test, "I wish I was a cat");
348
349        warn!(logger: test, "No school");
350        warn!(logger: test, "No works");
351        info!(logger: test, "Just meow meow");
352
353        info!(logger: test, "Meow~ Meow~");
354        info!(logger: test, "Meow~ Meow~");
355        info!(logger: test, "Meow~ Meow~");
356        info!(logger: test, "Meow~ Meow~");
357        sleep(Duration::from_millis(1250));
358        info!(logger: test, "Meow~ Meow~");
359        info!(logger: test, "Meow~ Meow~");
360        info!(logger: test, "Meow~ Meow~");
361        info!(logger: test, "Meow~ Meow...");
362
363        let records = test_sink.records();
364
365        assert_eq!(records.len(), 13);
366
367        assert_eq!(records[0].payload(), "I wish I was a cat");
368        assert_eq!(records[0].level(), Level::Info);
369
370        assert_eq!(records[1].payload(), "(skipped 2 duplicates)");
371        assert_eq!(records[1].level(), Level::Info);
372
373        assert_eq!(records[2].payload(), "I wish I was a cat");
374        assert_eq!(records[2].level(), Level::Warn);
375
376        assert_eq!(records[3].payload(), "I wish I was a cat");
377        assert_eq!(records[3].level(), Level::Warn);
378
379        assert_eq!(records[4].payload(), "I wish I was a cat");
380        assert_eq!(records[4].level(), Level::Warn);
381
382        assert_eq!(records[5].payload(), "No school");
383        assert_eq!(records[5].level(), Level::Warn);
384
385        assert_eq!(records[6].payload(), "No works");
386        assert_eq!(records[6].level(), Level::Warn);
387
388        assert_eq!(records[7].payload(), "Just meow meow");
389        assert_eq!(records[7].level(), Level::Info);
390
391        assert_eq!(records[8].payload(), "Meow~ Meow~");
392        assert_eq!(records[8].level(), Level::Info);
393
394        assert_eq!(records[9].payload(), "(skipped 3 duplicates)");
395        assert_eq!(records[9].level(), Level::Info);
396
397        assert_eq!(records[10].payload(), "Meow~ Meow~");
398        assert_eq!(records[10].level(), Level::Info);
399
400        assert_eq!(records[11].payload(), "(skipped 2 duplicates)");
401        assert_eq!(records[11].level(), Level::Info);
402
403        assert_eq!(records[12].payload(), "Meow~ Meow...");
404        assert_eq!(records[12].level(), Level::Info);
405    }
406
407    #[test]
408    fn dedup_on_drop() {
409        {
410            let records = {
411                let test_sink = Arc::new(TestSink::new());
412                {
413                    let dedup_sink = DedupSink::builder()
414                        .skip_duration(Duration::from_secs(1))
415                        .sink(test_sink.clone())
416                        .build_arc()
417                        .unwrap();
418                    let test = build_test_logger(|b| b.sink(dedup_sink));
419
420                    info!(logger: test, "I wish I was a cat");
421                    info!(logger: test, "I wish I was a cat");
422                }
423                test_sink.records()
424            };
425
426            assert_eq!(records.len(), 2);
427
428            assert_eq!(records[0].payload(), "I wish I was a cat");
429            assert_eq!(records[0].level(), Level::Info);
430
431            assert_eq!(records[1].payload(), "I wish I was a cat");
432            assert_eq!(records[1].level(), Level::Info);
433        }
434
435        {
436            let records = {
437                let test_sink = Arc::new(TestSink::new());
438                {
439                    let dedup_sink = DedupSink::builder()
440                        .skip_duration(Duration::from_secs(1))
441                        .sink(test_sink.clone())
442                        .build_arc()
443                        .unwrap();
444                    let test = build_test_logger(|b| b.sink(dedup_sink));
445
446                    info!(logger: test, "I wish I was a cat");
447                    info!(logger: test, "I wish I was a cat");
448                    info!(logger: test, "I wish I was a cat");
449                }
450                test_sink.records()
451            };
452
453            assert_eq!(records.len(), 2);
454
455            assert_eq!(records[0].payload(), "I wish I was a cat");
456            assert_eq!(records[0].level(), Level::Info);
457
458            assert_eq!(records[1].payload(), "(skipped 2 duplicates)");
459            assert_eq!(records[1].level(), Level::Info);
460        }
461    }
462}