mod test_utils;
use flexi_logger::{
filter::{LogLineFilter, LogLineWriter},
Cleanup, Criterion, DeferredNow, Duplicate, FileSpec, Logger, Naming, WriteMode,
TS_DASHES_BLANK_COLONS_DOT_BLANK,
};
use glob::glob;
use log::*;
use std::cmp::Ordering;
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::num::NonZeroUsize;
use std::ops::Add;
use std::sync::Mutex;
use std::thread::JoinHandle;
const NO_OF_THREADS: usize = 5;
const NO_OF_LOGLINES_PER_THREAD: usize = 20_000;
const ROTATE_OVER_SIZE: u64 = 800_000;
#[test]
fn multi_threaded() {
test_utils::wait_for_start_of_second();
let directory = test_utils::dir();
{
let _logger;
let _stopwatch = test_utils::Stopwatch::default();
_logger = Logger::try_with_str("debug")
.unwrap()
.log_to_file(
FileSpec::default()
.basename("test_mtn")
.directory(&directory),
)
.write_mode(WriteMode::BufferAndFlush)
.format(test_format)
.duplicate_to_stderr(Duplicate::Info)
.rotate(
Criterion::Size(ROTATE_OVER_SIZE),
Naming::Numbers,
Cleanup::Never,
)
.filter(Box::new(DedupWriter::with_leeway(
std::num::NonZeroUsize::new(22).unwrap(),
)))
.start()
.unwrap_or_else(|e| panic!("Logger initialization failed with {e}"));
info!("create a huge number of log lines, but deduplicate them");
wait_for_workers_to_close(start_worker_threads(NO_OF_THREADS));
}
verify_logs(&directory.display().to_string());
}
fn start_worker_threads(no_of_workers: usize) -> Vec<JoinHandle<u8>> {
let mut worker_handles: Vec<JoinHandle<u8>> = Vec::with_capacity(no_of_workers);
trace!("Starting {no_of_workers} worker threads");
for thread_number in 0..no_of_workers {
trace!("Starting thread {thread_number}");
worker_handles.push(
std::thread::Builder::new()
.name(thread_number.to_string())
.spawn(move || {
do_work(thread_number);
0
})
.unwrap(),
);
}
trace!("All {} worker threads started.", worker_handles.len());
worker_handles
}
fn do_work(thread_number: usize) {
trace!("({thread_number}) Thread started working");
trace!("ERROR_IF_PRINTED");
for _idx in 0..NO_OF_LOGLINES_PER_THREAD {
debug!("bliblablub");
}
std::thread::sleep(std::time::Duration::from_millis(500));
}
fn wait_for_workers_to_close(worker_handles: Vec<JoinHandle<u8>>) {
for worker_handle in worker_handles {
worker_handle
.join()
.unwrap_or_else(|e| panic!("Joining worker thread failed: {e:?}"));
}
trace!("All worker threads joined.");
}
pub fn test_format(
w: &mut dyn std::io::Write,
now: &mut DeferredNow,
record: &Record,
) -> std::io::Result<()> {
write!(
w,
"XXXXX [{}] T[{:?}] {} [{}:{}] {}",
now.format(TS_DASHES_BLANK_COLONS_DOT_BLANK),
std::thread::current().name().unwrap_or("<unnamed>"),
record.level(),
record.file().unwrap_or("<unnamed>"),
record.line().unwrap_or(0),
&record.args()
)
}
pub struct DedupWriter {
deduper: Mutex<Deduper>,
}
impl DedupWriter {
pub fn with_leeway(leeway: NonZeroUsize) -> Self {
Self {
deduper: Mutex::new(Deduper::with_leeway(leeway)),
}
}
}
impl LogLineFilter for DedupWriter {
fn write(
&self,
now: &mut DeferredNow,
record: &Record,
log_line_writer: &dyn LogLineWriter,
) -> std::io::Result<()> {
let mut deduper = self.deduper.lock().unwrap();
let dedup_action = deduper.dedup(record);
match dedup_action {
DedupAction::Allow => {
log_line_writer.write(now, record)
}
DedupAction::AllowLastOfLeeway(_) => {
log_line_writer.write(now, record)?;
log_line_writer.write(
now,
&log::Record::builder()
.level(log::Level::Warn)
.file_static(Some(file!()))
.line(Some(line!()))
.module_path_static(Some("flexi_logger"))
.target("flexi_logger")
.args(format_args!(
"last record has been repeated consecutive times, \
following duplicates will be skipped...",
))
.build(),
)
}
DedupAction::AllowAfterSkipped(skipped) => {
log_line_writer.write(
now,
&log::Record::builder()
.level(log::Level::Info)
.file_static(Some(file!()))
.line(Some(line!()))
.module_path_static(Some("flexi_logger"))
.target("flexi_logger")
.args(format_args!("last record was skipped {skipped} times"))
.build(),
)?;
log_line_writer.write(now, record)
}
DedupAction::Skip => Ok(()),
}
}
}
struct Deduper {
leeway: NonZeroUsize,
last_record: LastRecord,
duplicates: usize,
}
#[derive(Debug, PartialEq, Eq)]
enum DedupAction {
Allow,
AllowLastOfLeeway(usize),
AllowAfterSkipped(usize),
Skip,
}
impl Deduper {
pub fn with_leeway(leeway: NonZeroUsize) -> Self {
Self {
leeway,
last_record: LastRecord {
file: None,
line: None,
msg: String::new(),
},
duplicates: 0,
}
}
fn dedup(&mut self, record: &Record) -> DedupAction {
let new_line = record.line();
let new_file = record.file();
let new_msg = record.args().to_string();
if new_line == self.last_record.line
&& new_file == self.last_record.file.as_deref()
&& new_msg == self.last_record.msg
{
if let Some(updated_dups) = self.duplicates.checked_add(1) {
self.duplicates = updated_dups;
} else {
let skipped = self.duplicates - self.leeway();
self.duplicates = 0;
return DedupAction::AllowAfterSkipped(skipped);
}
match self.duplicates.cmp(&self.leeway()) {
Ordering::Less => DedupAction::Allow,
Ordering::Equal => DedupAction::AllowLastOfLeeway(self.leeway()),
Ordering::Greater => DedupAction::Skip,
}
} else {
self.last_record.file = new_file.map(ToOwned::to_owned);
self.last_record.line = new_line;
self.last_record.msg = new_msg;
let dups = self.duplicates;
self.duplicates = 0;
match dups {
n if n > self.leeway() => DedupAction::AllowAfterSkipped(n - self.leeway()),
_ => DedupAction::Allow,
}
}
}
fn leeway(&self) -> usize {
self.leeway.get()
}
}
struct LastRecord {
file: Option<String>,
line: Option<u32>,
msg: String,
}
fn verify_logs(directory: &str) {
let pattern = String::from(directory).add("/*");
let globresults = match glob(&pattern) {
Err(e) => panic!("Is this ({pattern}) really a directory? Listing failed with {e}"),
Ok(globresults) => globresults,
};
let mut no_of_log_files = 0;
let mut line_count = 0_usize;
for globresult in globresults {
let pathbuf = globresult.unwrap_or_else(|e| panic!("Ups - error occured: {e}"));
let f = File::open(&pathbuf)
.unwrap_or_else(|e| panic!("Cannot open file {pathbuf:?} due to {e}"));
no_of_log_files += 1;
let mut reader = BufReader::new(f);
let mut buffer = String::new();
while reader.read_line(&mut buffer).unwrap() > 0 {
if buffer.starts_with("XXXXX") {
line_count += 1;
} else {
panic!("irregular line in log file {pathbuf:?}: \"{buffer}\"");
}
buffer.clear();
}
}
assert_eq!(line_count, 27);
println!(
"Found {line_count} log lines from {NO_OF_THREADS} threads in {no_of_log_files} files"
);
}