use gorust::{go, channel};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use crate::writer::LogWriter;
pub struct AsyncLogBackend {
tx: channel::Sender<String>,
running: Arc<AtomicBool>,
}
impl AsyncLogBackend {
pub fn new(writer: Box<dyn LogWriter>, buffer_size: usize) -> Self {
let (tx, rx) = channel::new_with_capacity(buffer_size);
let running = Arc::new(AtomicBool::new(true));
let running_clone = running.clone();
go(move || {
Self::log_writer_loop(rx, writer, running_clone);
});
Self { tx, running }
}
fn log_writer_loop(
rx: channel::Receiver<String>,
writer: Box<dyn LogWriter>,
running: Arc<AtomicBool>,
) {
while running.load(Ordering::Relaxed) {
if !gorust::scheduler::Scheduler::is_running() {
running.store(false, Ordering::Relaxed);
break;
}
match rx.try_recv() {
Ok(msg) => {
writer.write(&msg);
}
Err(_) => {
std::thread::sleep(std::time::Duration::from_millis(10));
}
}
}
loop {
match rx.try_recv() {
Ok(msg) => writer.write(&msg),
Err(_) => break,
}
}
writer.flush();
}
pub fn send(&self, msg: String) {
let _ = self.tx.try_send(msg);
}
pub fn shutdown(&self) {
self.running.store(false, Ordering::Relaxed);
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::{Arc, Mutex};
struct MockWriter {
messages: Arc<Mutex<Vec<String>>>,
}
impl MockWriter {
fn new() -> (Self, Arc<Mutex<Vec<String>>>) {
let messages = Arc::new(Mutex::new(Vec::new()));
(
Self {
messages: messages.clone(),
},
messages,
)
}
}
impl LogWriter for MockWriter {
fn write(&self, msg: &str) {
if let Ok(mut msgs) = self.messages.lock() {
msgs.push(msg.to_string());
}
}
fn flush(&self) {}
}
#[test]
fn test_backend_send_and_shutdown() {
let (writer, messages) = MockWriter::new();
let backend = AsyncLogBackend::new(Box::new(writer), 1024);
backend.send("test message 1".to_string());
backend.send("test message 2".to_string());
std::thread::sleep(std::time::Duration::from_millis(200));
backend.shutdown();
std::thread::sleep(std::time::Duration::from_millis(100));
let msgs = messages.lock().unwrap();
let _ = msgs.len();
}
#[test]
fn test_backend_try_send_non_blocking() {
let (writer, _) = MockWriter::new();
let backend = AsyncLogBackend::new(Box::new(writer), 1);
for i in 0..10 {
backend.send(format!("message {}", i));
}
backend.shutdown();
}
}