use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread::{self, JoinHandle};
use std::time::Duration;
use super::ring::{self, Consumer, Producer};
pub const DRAINER_POLL_INTERVAL: Duration = Duration::from_millis(1);
pub struct LogSink<T: Send + 'static> {
tx: Producer<T>,
shutdown: Arc<AtomicBool>,
drainer: Option<JoinHandle<()>>,
}
impl<T: Send + 'static> LogSink<T> {
pub fn new<F>(capacity: usize, mut drain_one: F) -> Self
where
F: FnMut(T) + Send + 'static,
{
let (tx, rx) = ring::channel::<T>(capacity);
let shutdown = Arc::new(AtomicBool::new(false));
let shutdown_for_thread = Arc::clone(&shutdown);
let drainer = thread::Builder::new()
.name("tympan-aspl-log-drainer".into())
.spawn(move || {
drainer_loop(&rx, &shutdown_for_thread, &mut drain_one);
})
.expect("spawning the LogSink drainer thread failed");
Self {
tx,
shutdown,
drainer: Some(drainer),
}
}
pub fn log(&self, event: T) -> bool {
self.tx.try_push(event).is_ok()
}
#[must_use]
pub fn capacity(&self) -> usize {
self.tx.capacity()
}
}
impl<T: Send + 'static> Drop for LogSink<T> {
fn drop(&mut self) {
self.shutdown.store(true, Ordering::Release);
if let Some(handle) = self.drainer.take() {
let _ = handle.join();
}
}
}
#[allow(clippy::disallowed_methods)]
fn drainer_loop<T, F>(rx: &Consumer<T>, shutdown: &AtomicBool, drain_one: &mut F)
where
T: Send + 'static,
F: FnMut(T),
{
loop {
let mut drained = false;
while let Some(event) = rx.try_pop() {
drain_one(event);
drained = true;
}
if shutdown.load(Ordering::Acquire) {
while let Some(event) = rx.try_pop() {
drain_one(event);
}
break;
}
if !drained {
thread::sleep(DRAINER_POLL_INTERVAL);
}
}
}
#[cfg(test)]
mod tests {
#![allow(clippy::disallowed_methods, clippy::disallowed_types)]
use super::*;
use std::sync::Mutex;
fn collector() -> (Arc<Mutex<Vec<u32>>>, impl FnMut(u32) + Send + 'static) {
let store = Arc::new(Mutex::new(Vec::new()));
let store_for_drainer = Arc::clone(&store);
let drain = move |v: u32| {
store_for_drainer.lock().unwrap().push(v);
};
(store, drain)
}
#[test]
fn events_pushed_before_drop_are_all_drained() {
let (store, drain) = collector();
let sink = LogSink::<u32>::new(64, drain);
for i in 0..20 {
assert!(sink.log(i), "queue should not be full at item {i}");
}
drop(sink);
let collected = store.lock().unwrap();
assert_eq!(*collected, (0..20).collect::<Vec<_>>());
}
#[test]
fn capacity_matches_constructor_argument() {
let sink = LogSink::<u32>::new(256, |_| {});
assert_eq!(sink.capacity(), 256);
}
#[test]
fn log_returns_false_when_queue_is_full() {
let release = Arc::new(AtomicBool::new(false));
let release_for_drainer = Arc::clone(&release);
let drain = move |_v: u32| {
while !release_for_drainer.load(Ordering::Acquire) {
thread::sleep(Duration::from_micros(100));
}
};
let sink = LogSink::<u32>::new(4, drain);
let mut successes = 0;
let mut saw_full = false;
for i in 0..1_000 {
if sink.log(i) {
successes += 1;
} else {
saw_full = true;
break;
}
}
assert!(
saw_full,
"queue never reported full after {successes} pushes"
);
release.store(true, Ordering::Release);
drop(sink);
}
#[test]
fn drop_flushes_events_pushed_after_the_last_poll() {
let (store, drain) = collector();
let sink = LogSink::<u32>::new(1024, drain);
for i in 0..500 {
assert!(sink.log(i));
}
drop(sink);
let collected = store.lock().unwrap();
assert_eq!(collected.len(), 500, "every pushed event must be drained");
for (i, &v) in collected.iter().enumerate() {
assert_eq!(v, i as u32);
}
}
#[test]
fn drainer_panic_is_swallowed_by_drop() {
let sink = LogSink::<u32>::new(16, |_v| panic!("drainer suicide"));
let _ = sink.log(0);
thread::sleep(Duration::from_millis(20));
drop(sink);
}
}