use std::sync::Arc;
use std::sync::Barrier;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::mpsc;
use logforth_append_async::AsyncBuilder;
use logforth_core::Append;
use logforth_core::Diagnostic;
use logforth_core::Error;
use logforth_core::Trap;
use logforth_core::record::Record;
#[derive(Debug)]
struct BarrierAppend {
started: Arc<AtomicBool>,
barrier: Arc<Barrier>,
}
impl Append for BarrierAppend {
fn append(&self, _: &Record, _: &[Box<dyn Diagnostic>]) -> Result<(), Error> {
Ok(())
}
fn flush(&self) -> Result<(), Error> {
self.started.store(true, Ordering::SeqCst);
self.barrier.wait();
Ok(())
}
}
#[derive(Debug)]
struct FailingFlush;
impl Append for FailingFlush {
fn append(&self, _: &Record, _: &[Box<dyn Diagnostic>]) -> Result<(), Error> {
Ok(())
}
fn flush(&self) -> Result<(), Error> {
Err(Error::new("flush failed"))
}
}
#[derive(Debug)]
struct CapturedTrap {
errors: mpsc::Sender<String>,
}
impl Trap for CapturedTrap {
fn trap(&self, err: &Error) {
self.errors.send(err.to_string()).unwrap();
}
}
#[test]
fn flush_waits_for_worker_completion() {
let started = Arc::new(AtomicBool::new(false));
let barrier = Arc::new(Barrier::new(2));
let append = BarrierAppend {
started: started.clone(),
barrier: barrier.clone(),
};
let async_append = AsyncBuilder::new("async-flush-wait").append(append).build();
let barrier_for_main = barrier.clone();
let flush_handle = std::thread::spawn(move || async_append.flush());
while !started.load(Ordering::SeqCst) {
std::thread::yield_now();
}
assert!(!flush_handle.is_finished());
barrier_for_main.wait();
flush_handle
.join()
.expect("flush thread panicked")
.expect("flush should succeed");
}
#[test]
fn flush_handles_errors_in_worker_thread() {
let (tx, rx) = mpsc::channel();
let async_append = AsyncBuilder::new("async-flush-error")
.trap(CapturedTrap { errors: tx })
.append(FailingFlush)
.build();
async_append.flush().unwrap();
let err = rx.recv().unwrap();
assert!(err.contains("flush failed"));
}