use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Weak};
use std::thread::{self, JoinHandle};
use std::time::Duration;
use super::wal::{FlushKind, Wal};
pub(super) struct GroupFlusherHandle {
shutdown: Arc<AtomicBool>,
handle: Option<JoinHandle<()>>,
}
impl Drop for GroupFlusherHandle {
fn drop(&mut self) {
self.shutdown.store(true, Ordering::Release);
if let Some(h) = self.handle.take() {
let _ = h.join();
}
}
}
pub(super) fn spawn_group_flusher(weak: Weak<Wal>, interval: Duration) -> GroupFlusherHandle {
let shutdown = Arc::new(AtomicBool::new(false));
let shutdown_clone = Arc::clone(&shutdown);
let handle = thread::spawn(move || {
while !shutdown_clone.load(Ordering::Acquire) {
let slice = Duration::from_millis(50).min(interval);
let mut elapsed = Duration::ZERO;
while elapsed < interval && !shutdown_clone.load(Ordering::Acquire) {
thread::sleep(slice);
elapsed += slice;
}
if shutdown_clone.load(Ordering::Acquire) {
break;
}
match weak.upgrade() {
Some(wal) => {
if let Err(err) = wal.flush_inner(FlushKind::ForceFsync) {
let mut slot = wal.bg_failure_slot().lock().unwrap();
if slot.is_none() {
*slot = Some(format!("bg fsync failed: {err}"));
}
break;
}
}
None => break,
}
}
});
GroupFlusherHandle {
shutdown,
handle: Some(handle),
}
}