use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum FlushPolicy {
#[default]
Never,
Manual,
Always,
EveryBytes(usize),
EveryWrites(usize),
EveryMillis(u64),
}
pub(crate) struct TimeBasedFlusher {
running: Arc<AtomicBool>,
thread: Option<thread::JoinHandle<()>>,
}
const SHUTDOWN_POLL_MS: u64 = 50;
impl TimeBasedFlusher {
pub(crate) fn new<F>(interval_ms: u64, flush_callback: F) -> Option<Self>
where
F: Fn() -> bool + Send + 'static,
{
if interval_ms == 0 {
return None;
}
let interval = Duration::from_millis(interval_ms);
let running = Arc::new(AtomicBool::new(true));
let running_clone = Arc::clone(&running);
let shutdown_poll = Duration::from_millis(SHUTDOWN_POLL_MS.min(interval_ms));
let handle = thread::spawn(move || {
let mut elapsed = Duration::ZERO;
while running_clone.load(Ordering::Acquire) {
let remaining = interval.saturating_sub(elapsed);
let slice = shutdown_poll.min(remaining);
thread::sleep(slice);
elapsed += slice;
if !running_clone.load(Ordering::Acquire) {
break;
}
if elapsed >= interval {
let _ = flush_callback();
elapsed = Duration::ZERO;
}
}
});
Some(Self {
running,
thread: Some(handle),
})
}
}
impl Drop for TimeBasedFlusher {
fn drop(&mut self) {
self.running.store(false, Ordering::Release);
if let Some(handle) = self.thread.take() {
let _ = thread::spawn(move || {
let _ = handle.join();
});
}
}
}