use std::{cmp::Ordering, convert::Infallible, sync::Arc, time::Duration};
use crate::{
formatter::Formatter,
sink::{Sink, SinkProp, SinkPropAccess, Sinks},
sync::*,
Error, ErrorHandler, LevelFilter, Record, RecordOwned, Result,
};
struct DedupSinkState {
last_record: Option<RecordOwned>,
skipped_count: usize,
}
pub struct DedupSink {
prop: SinkProp,
sinks: Sinks,
skip_duration: Duration,
state: Mutex<DedupSinkState>,
}
impl DedupSink {
#[must_use]
pub fn builder() -> DedupSinkBuilder<()> {
DedupSinkBuilder {
prop: SinkProp::default(),
sinks: vec![],
skip_duration: (),
}
}
#[must_use]
pub fn sinks(&self) -> &[Arc<dyn Sink>] {
&self.sinks
}
#[must_use]
fn is_dup_record(&self, last_record: Option<Record>, other: &Record) -> bool {
if let Some(last_record) = last_record {
last_record.payload() == other.payload()
&& last_record.level() == other.level()
&& other.time().duration_since(last_record.time()).unwrap() < self.skip_duration
} else {
false
}
}
fn log_skipping_message(&self, state: &mut DedupSinkState) -> Result<()> {
if state.skipped_count != 0 {
let last_record = state.last_record.as_ref().unwrap().as_ref();
match state.skipped_count.cmp(&1) {
Ordering::Equal => self.log_record(&last_record)?,
Ordering::Greater => self.log_record(
&last_record
.replace_payload(format!("(skipped {} duplicates)", state.skipped_count)),
)?,
Ordering::Less => unreachable!(), }
}
Ok(())
}
fn log_record(&self, record: &Record) -> Result<()> {
#[allow(clippy::manual_try_fold)] self.sinks.iter().fold(Ok(()), |result, sink| {
Error::push_result(result, sink.log(record))
})
}
fn flush_with(&self, with: fn(&dyn Sink) -> Result<()>) -> Result<()> {
#[allow(clippy::manual_try_fold)] self.sinks.iter().fold(Ok(()), |result, sink| {
Error::push_result(result, with(sink.as_ref()))
})
}
fn flush_sinks(&self) -> Result<()> {
self.flush_with(|sink| sink.flush())
}
fn flush_sinks_on_exit(&self) -> Result<()> {
self.flush_with(|sink| sink.flush_on_exit())
}
}
impl SinkPropAccess for DedupSink {
fn level_filter(&self) -> LevelFilter {
self.prop.level_filter()
}
fn set_level_filter(&self, level_filter: LevelFilter) {
self.prop.set_level_filter(level_filter);
}
fn set_formatter(&self, formatter: Box<dyn Formatter>) {
for sink in &self.sinks {
sink.set_formatter(formatter.clone())
}
}
fn set_error_handler(&self, handler: ErrorHandler) {
self.prop.set_error_handler(handler);
}
}
impl Sink for DedupSink {
fn log(&self, record: &Record) -> Result<()> {
let mut state = self.state.lock_expect();
if self.is_dup_record(state.last_record.as_ref().map(|r| r.as_ref()), record) {
state.skipped_count += 1;
return Ok(());
}
self.log_skipping_message(&mut state)?;
self.log_record(record)?;
state.skipped_count = 0;
state.last_record = Some(record.to_owned());
Ok(())
}
fn flush(&self) -> Result<()> {
self.flush_sinks()
}
fn flush_on_exit(&self) -> Result<()> {
self.flush_sinks_on_exit()
}
}
impl Drop for DedupSink {
fn drop(&mut self) {
if let Err(err) = self.log_skipping_message(&mut self.state.lock_expect()) {
self.prop.call_error_handler_internal("DedupSink", err);
}
if let Err(err) = self.flush_sinks() {
self.prop.call_error_handler_internal("DedupSink", err);
}
}
}
#[doc = include_str!("../include/doc/generic-builder-note.md")]
pub struct DedupSinkBuilder<ArgS> {
prop: SinkProp,
sinks: Sinks,
skip_duration: ArgS,
}
impl<ArgS> DedupSinkBuilder<ArgS> {
#[must_use]
pub fn sink(mut self, sink: Arc<dyn Sink>) -> Self {
self.sinks.push(sink);
self
}
#[must_use]
pub fn sinks<I>(mut self, sinks: I) -> Self
where
I: IntoIterator<Item = Arc<dyn Sink>>,
{
self.sinks.append(&mut sinks.into_iter().collect());
self
}
#[must_use]
pub fn skip_duration(self, duration: Duration) -> DedupSinkBuilder<Duration> {
DedupSinkBuilder {
prop: self.prop,
sinks: self.sinks,
skip_duration: duration,
}
}
#[must_use]
pub fn level_filter(self, level_filter: LevelFilter) -> Self {
self.prop.set_level_filter(level_filter);
self
}
#[must_use]
pub fn formatter<F>(self, formatter: F) -> Self
where
F: Formatter + 'static,
{
self.prop.set_formatter(formatter);
self
}
#[must_use]
pub fn error_handler<F: Into<ErrorHandler>>(self, handler: F) -> Self {
self.prop.set_error_handler(handler);
self
}
}
impl DedupSinkBuilder<()> {
#[doc(hidden)]
#[deprecated(note = "\n\n\
builder compile-time error:\n\
- missing required parameter `skip_duration`\n\n\
")]
pub fn build(self, _: Infallible) {}
#[doc(hidden)]
#[deprecated(note = "\n\n\
builder compile-time error:\n\
- missing required parameter `skip_duration`\n\n\
")]
pub fn build_arc(self, _: Infallible) {}
}
impl DedupSinkBuilder<Duration> {
pub fn build(self) -> Result<DedupSink> {
Ok(DedupSink {
prop: self.prop,
sinks: self.sinks,
skip_duration: self.skip_duration,
state: Mutex::new(DedupSinkState {
last_record: None,
skipped_count: 0,
}),
})
}
pub fn build_arc(self) -> Result<Arc<DedupSink>> {
self.build().map(Arc::new)
}
}
#[cfg(test)]
mod tests {
use std::thread::sleep;
use super::*;
use crate::{prelude::*, test_utils::*};
#[test]
fn dedup() {
let test_sink = Arc::new(TestSink::new());
let dedup_sink = DedupSink::builder()
.skip_duration(Duration::from_secs(1))
.sink(test_sink.clone())
.build_arc()
.unwrap();
let test = build_test_logger(|b| b.sink(dedup_sink));
info!(logger: test, "I wish I was a cat");
info!(logger: test, "I wish I was a cat");
info!(logger: test, "I wish I was a cat");
warn!(logger: test, "I wish I was a cat");
warn!(logger: test, "I wish I was a cat");
sleep(Duration::from_millis(1250));
warn!(logger: test, "I wish I was a cat");
warn!(logger: test, "No school");
warn!(logger: test, "No works");
info!(logger: test, "Just meow meow");
info!(logger: test, "Meow~ Meow~");
info!(logger: test, "Meow~ Meow~");
info!(logger: test, "Meow~ Meow~");
info!(logger: test, "Meow~ Meow~");
sleep(Duration::from_millis(1250));
info!(logger: test, "Meow~ Meow~");
info!(logger: test, "Meow~ Meow~");
info!(logger: test, "Meow~ Meow~");
info!(logger: test, "Meow~ Meow...");
let records = test_sink.records();
assert_eq!(records.len(), 13);
assert_eq!(records[0].payload(), "I wish I was a cat");
assert_eq!(records[0].level(), Level::Info);
assert_eq!(records[1].payload(), "(skipped 2 duplicates)");
assert_eq!(records[1].level(), Level::Info);
assert_eq!(records[2].payload(), "I wish I was a cat");
assert_eq!(records[2].level(), Level::Warn);
assert_eq!(records[3].payload(), "I wish I was a cat");
assert_eq!(records[3].level(), Level::Warn);
assert_eq!(records[4].payload(), "I wish I was a cat");
assert_eq!(records[4].level(), Level::Warn);
assert_eq!(records[5].payload(), "No school");
assert_eq!(records[5].level(), Level::Warn);
assert_eq!(records[6].payload(), "No works");
assert_eq!(records[6].level(), Level::Warn);
assert_eq!(records[7].payload(), "Just meow meow");
assert_eq!(records[7].level(), Level::Info);
assert_eq!(records[8].payload(), "Meow~ Meow~");
assert_eq!(records[8].level(), Level::Info);
assert_eq!(records[9].payload(), "(skipped 3 duplicates)");
assert_eq!(records[9].level(), Level::Info);
assert_eq!(records[10].payload(), "Meow~ Meow~");
assert_eq!(records[10].level(), Level::Info);
assert_eq!(records[11].payload(), "(skipped 2 duplicates)");
assert_eq!(records[11].level(), Level::Info);
assert_eq!(records[12].payload(), "Meow~ Meow...");
assert_eq!(records[12].level(), Level::Info);
}
#[test]
fn dedup_on_drop() {
{
let records = {
let test_sink = Arc::new(TestSink::new());
{
let dedup_sink = DedupSink::builder()
.skip_duration(Duration::from_secs(1))
.sink(test_sink.clone())
.build_arc()
.unwrap();
let test = build_test_logger(|b| b.sink(dedup_sink));
info!(logger: test, "I wish I was a cat");
info!(logger: test, "I wish I was a cat");
}
test_sink.records()
};
assert_eq!(records.len(), 2);
assert_eq!(records[0].payload(), "I wish I was a cat");
assert_eq!(records[0].level(), Level::Info);
assert_eq!(records[1].payload(), "I wish I was a cat");
assert_eq!(records[1].level(), Level::Info);
}
{
let records = {
let test_sink = Arc::new(TestSink::new());
{
let dedup_sink = DedupSink::builder()
.skip_duration(Duration::from_secs(1))
.sink(test_sink.clone())
.build_arc()
.unwrap();
let test = build_test_logger(|b| b.sink(dedup_sink));
info!(logger: test, "I wish I was a cat");
info!(logger: test, "I wish I was a cat");
info!(logger: test, "I wish I was a cat");
}
test_sink.records()
};
assert_eq!(records.len(), 2);
assert_eq!(records[0].payload(), "I wish I was a cat");
assert_eq!(records[0].level(), Level::Info);
assert_eq!(records[1].payload(), "(skipped 2 duplicates)");
assert_eq!(records[1].level(), Level::Info);
}
}
}