use std::sync::{Arc, Condvar, Mutex, PoisonError, Weak};
use std::thread::{self, JoinHandle};
use std::time::Duration;
use super::wal::{FlushKind, Wal};
pub(super) struct GroupFlusherHandle {
shutdown: Arc<(Mutex<bool>, Condvar)>,
handle: Option<JoinHandle<()>>,
}
impl Drop for GroupFlusherHandle {
fn drop(&mut self) {
let (lock, cv) = &*self.shutdown;
if let Ok(mut shutdown) = lock.lock() {
*shutdown = true;
cv.notify_one();
}
if let Some(h) = self.handle.take() {
if h.thread().id() == thread::current().id() {
return;
}
let _ = h.join();
}
}
}
pub(super) fn spawn_group_flusher(weak: Weak<Wal>, interval: Duration) -> GroupFlusherHandle {
let shutdown = Arc::new((Mutex::new(false), Condvar::new()));
let shutdown_clone = Arc::clone(&shutdown);
let handle = thread::spawn(move || {
loop {
let (lock, cv) = &*shutdown_clone;
let shutdown = match lock.lock() {
Ok(guard) => guard,
Err(_) => break,
};
let (shutdown, _) = match cv.wait_timeout_while(shutdown, interval, |s| !*s) {
Ok(pair) => pair,
Err(_) => break,
};
if *shutdown {
break;
}
drop(shutdown);
match weak.upgrade() {
Some(wal) => {
if let Err(err) = wal.flush_inner(FlushKind::ForceFsync) {
let mut slot = wal
.bg_failure_slot()
.lock()
.unwrap_or_else(PoisonError::into_inner);
if slot.is_none() {
*slot = Some(format!("bg fsync failed: {err}"));
}
break;
}
}
None => break,
}
}
});
GroupFlusherHandle {
shutdown,
handle: Some(handle),
}
}