use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::thread::{self, JoinHandle};
use std::time::Duration;
use crossbeam_channel::{bounded, Receiver, Sender};
use parking_lot::RwLock;
use crate::level::LogLevel;
use crate::record::Record;
use crate::handler::{Handler, HandlerRef, HandlerError};
use crate::formatters::Formatter;
use crate::handler::HandlerFilter;
const DEFAULT_BATCH_SIZE: usize = 32;
const DEFAULT_FLUSH_INTERVAL: Duration = Duration::from_millis(100);
#[derive(Debug)]
pub enum AsyncCommand {
Log(Record),
Flush,
Shutdown,
}
#[derive(Clone, Debug)]
pub struct AsyncLoggerHandle {
sender: Sender<AsyncCommand>,
running: Arc<AtomicBool>,
queued_records: Arc<AtomicUsize>,
}
impl AsyncLoggerHandle {
pub fn log(&self, record: Record) -> bool {
if !self.running.load(Ordering::Relaxed) {
return false;
}
match self.sender.try_send(AsyncCommand::Log(record)) {
Ok(_) => {
self.queued_records.fetch_add(1, Ordering::Relaxed);
true
}
Err(_) => false,
}
}
pub fn flush(&self) -> bool {
if !self.running.load(Ordering::Relaxed) {
return false;
}
self.sender.try_send(AsyncCommand::Flush).is_ok()
}
pub fn shutdown(&self) {
if !self.running.load(Ordering::Relaxed) {
return;
}
let _ = self.sender.send(AsyncCommand::Shutdown);
self.running.store(false, Ordering::Relaxed);
}
pub fn queued_records(&self) -> usize {
self.queued_records.load(Ordering::Relaxed)
}
}
impl Drop for AsyncLoggerHandle {
fn drop(&mut self) {
self.shutdown();
}
}
pub struct AsyncLoggerBuilder {
queue_size: usize,
handlers: Vec<HandlerRef>,
level: LogLevel,
workers: usize,
batch_size: usize,
flush_interval: Duration,
}
impl AsyncLoggerBuilder {
pub fn new() -> Self {
Self {
queue_size: 10000,
handlers: Vec::new(),
level: LogLevel::Info,
workers: 1,
batch_size: DEFAULT_BATCH_SIZE,
flush_interval: DEFAULT_FLUSH_INTERVAL,
}
}
pub fn with_queue_size(mut self, queue_size: usize) -> Self {
self.queue_size = queue_size;
self
}
pub fn with_handlers(mut self, handlers: Vec<HandlerRef>) -> Self {
self.handlers = handlers;
self
}
pub fn with_level(mut self, level: LogLevel) -> Self {
self.level = level;
self
}
pub fn with_workers(mut self, workers: usize) -> Self {
self.workers = workers;
self
}
pub fn with_batch_size(mut self, batch_size: usize) -> Self {
self.batch_size = batch_size;
self
}
pub fn with_flush_interval(mut self, flush_interval: Duration) -> Self {
self.flush_interval = flush_interval;
self
}
pub fn build(self) -> AsyncLoggerHandle {
let (sender, receiver) = bounded(self.queue_size);
let running = Arc::new(AtomicBool::new(true));
let queued_records = Arc::new(AtomicUsize::new(0));
let mut workers = Vec::with_capacity(self.workers);
for _ in 0..self.workers {
let worker = AsyncWorker::new(
receiver.clone(),
self.handlers.clone(),
self.level,
running.clone(),
queued_records.clone(),
self.batch_size,
self.flush_interval,
);
workers.push(worker.spawn());
}
AsyncLoggerHandle {
sender,
running,
queued_records,
}
}
}
struct AsyncWorker {
receiver: Receiver<AsyncCommand>,
handlers: Vec<HandlerRef>,
level: LogLevel,
running: Arc<AtomicBool>,
queued_records: Arc<AtomicUsize>,
batch_size: usize,
flush_interval: Duration,
}
impl AsyncWorker {
fn new(
receiver: Receiver<AsyncCommand>,
handlers: Vec<HandlerRef>,
level: LogLevel,
running: Arc<AtomicBool>,
queued_records: Arc<AtomicUsize>,
batch_size: usize,
flush_interval: Duration,
) -> Self {
Self {
receiver,
handlers,
level,
running,
queued_records,
batch_size,
flush_interval,
}
}
fn spawn(self) -> JoinHandle<()> {
thread::spawn(move || self.run())
}
fn run(mut self) {
let mut batch = Vec::with_capacity(self.batch_size);
let mut last_flush = std::time::Instant::now();
while self.running.load(Ordering::Relaxed) {
match self.receiver.recv_timeout(self.flush_interval) {
Ok(AsyncCommand::Log(record)) => {
batch.push(record);
self.queued_records.fetch_sub(1, Ordering::Relaxed);
if batch.len() >= self.batch_size {
self.process_batch(&mut batch);
last_flush = std::time::Instant::now();
}
}
Ok(AsyncCommand::Flush) => {
self.process_batch(&mut batch);
self.flush_handlers();
last_flush = std::time::Instant::now();
}
Ok(AsyncCommand::Shutdown) => {
self.process_batch(&mut batch);
self.flush_handlers();
break;
}
Err(_) => {
if !batch.is_empty() && last_flush.elapsed() >= self.flush_interval {
self.process_batch(&mut batch);
last_flush = std::time::Instant::now();
}
}
}
}
}
fn process_batch(&self, batch: &mut Vec<Record>) {
if batch.is_empty() {
return;
}
for handler in &self.handlers {
let guard = handler.read();
if guard.is_enabled() && self.level >= guard.level() {
let _ = guard.handle_batch(batch);
}
}
batch.clear();
}
fn flush_handlers(&self) {
for handler in &self.handlers {
let guard = handler.read();
if guard.is_enabled() {
let _ = guard.flush();
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
use crate::handler::Handler;
use crate::formatters::Formatter;
use crate::handler::HandlerFilter;
#[derive(Debug)]
struct TestHandler {
records: RwLock<Vec<Record>>,
enabled: bool,
level: LogLevel,
formatter: Formatter,
}
impl Handler for TestHandler {
fn handle(&self, record: &Record) -> Result<(), HandlerError> {
self.records.write().push(record.clone());
Ok(())
}
fn handle_batch(&self, records: &[Record]) -> Result<(), HandlerError> {
let mut guard = self.records.write();
guard.extend_from_slice(records);
Ok(())
}
fn is_enabled(&self) -> bool {
self.enabled
}
fn level(&self) -> LogLevel {
self.level
}
fn set_level(&mut self, level: LogLevel) {
self.level = level;
}
fn set_enabled(&mut self, enabled: bool) {
self.enabled = enabled;
}
fn formatter(&self) -> &Formatter {
&self.formatter
}
fn set_formatter(&mut self, formatter: Formatter) {
self.formatter = formatter;
}
fn set_filter(&mut self, _filter: Option<HandlerFilter>) {
}
fn filter(&self) -> Option<&HandlerFilter> {
None
}
}
#[test]
fn test_async_logger_basic() {
let handler = Arc::new(TestHandler {
records: RwLock::new(Vec::new()),
enabled: true,
level: LogLevel::Info,
formatter: Formatter::default(),
});
let logger = AsyncLogger::new(
LogLevel::Info,
vec![Arc::new(RwLock::new(handler.clone()))],
100,
2
);
for i in 0..10 {
let record = Record::new(
LogLevel::Info,
format!("test message {}", i),
None,
None,
None,
);
assert!(logger.log(record));
}
thread::sleep(Duration::from_millis(200));
let records = handler.records.read();
assert_eq!(records.len(), 10);
}
#[test]
fn test_async_logger_level_filtering() {
let handler = Arc::new(TestHandler {
records: RwLock::new(Vec::new()),
enabled: true,
level: LogLevel::Info,
formatter: Formatter::default(),
});
let logger = AsyncLogger::new(
LogLevel::Info,
vec![Arc::new(RwLock::new(handler.clone()))],
100,
2
);
let debug_record = Record::new(
LogLevel::Debug,
"debug message",
None,
None,
None,
);
let info_record = Record::new(
LogLevel::Info,
"info message",
None,
None,
None,
);
assert!(!logger.log(debug_record)); assert!(logger.log(info_record));
thread::sleep(Duration::from_millis(200));
let records = handler.records.read();
assert_eq!(records.len(), 1);
assert_eq!(records[0].level(), LogLevel::Info);
}
#[test]
fn test_async_logger_shutdown() {
let handler = Arc::new(TestHandler {
records: RwLock::new(Vec::new()),
enabled: true,
level: LogLevel::Info,
formatter: Formatter::default(),
});
let logger = AsyncLogger::new(
LogLevel::Info,
vec![Arc::new(RwLock::new(handler.clone()))],
100,
2
);
for i in 0..5 {
let record = Record::new(
LogLevel::Info,
format!("test message {}", i),
None,
None,
None,
);
logger.log(record);
}
logger.shutdown();
thread::sleep(Duration::from_millis(200));
let records = handler.records.read();
assert_eq!(records.len(), 5);
}
}