use std::fs::File;
use std::io;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Condvar, Mutex};
use std::time::{Duration, Instant};
use crate::{Error, Result};
#[non_exhaustive]
#[derive(Debug, Clone, Copy, Default)]
pub enum FlushPolicy {
#[default]
OnEachFlush,
Group {
max_wait: Duration,
max_batch: usize,
},
}
pub(crate) struct GroupCoord {
state: Mutex<GroupState>,
cv: Condvar,
durable_tail: AtomicU64,
max_wait: Duration,
max_batch: usize,
}
struct GroupState {
leader_active: bool,
pending: usize,
cycle_seq: u64,
last_error: Option<(io::ErrorKind, String)>,
}
impl GroupCoord {
pub(crate) fn new(max_wait: Duration, max_batch: usize) -> Self {
let max_batch = max_batch.max(1);
Self {
state: Mutex::new(GroupState {
leader_active: false,
pending: 0,
cycle_seq: 0,
last_error: None,
}),
cv: Condvar::new(),
durable_tail: AtomicU64::new(0),
max_wait,
max_batch,
}
}
pub(crate) fn run<F>(&self, target_tail: u64, sync_call: F) -> Result<()>
where
F: FnOnce() -> std::result::Result<u64, io::Error>,
{
if self.durable_tail.load(Ordering::Acquire) >= target_tail {
return Ok(());
}
let mut state = self.state.lock().map_err(|_| Error::LockPoisoned)?;
state.pending += 1;
self.cv.notify_all();
if self.durable_tail.load(Ordering::Acquire) >= target_tail {
state.pending -= 1;
return Ok(());
}
if !state.leader_active {
state.leader_active = true;
state.last_error = None;
let deadline = Instant::now() + self.max_wait;
while state.pending < self.max_batch {
let now = Instant::now();
if now >= deadline {
break;
}
let remaining = deadline - now;
let (st, timeout) = self
.cv
.wait_timeout(state, remaining)
.map_err(|_| Error::LockPoisoned)?;
state = st;
if timeout.timed_out() {
break;
}
}
drop(state);
let sync_result = sync_call();
let mut state = self.state.lock().map_err(|_| Error::LockPoisoned)?;
state.leader_active = false;
state.cycle_seq = state.cycle_seq.wrapping_add(1);
let result = match sync_result {
Ok(synced_through) => {
self.durable_tail.store(synced_through, Ordering::Release);
Ok(())
}
Err(err) => {
state.last_error = Some((err.kind(), err.to_string()));
Err(Error::from(err))
}
};
state.pending = state.pending.saturating_sub(1);
self.cv.notify_all();
return result;
}
let entered_cycle = state.cycle_seq;
loop {
if self.durable_tail.load(Ordering::Acquire) >= target_tail {
state.pending = state.pending.saturating_sub(1);
return Ok(());
}
if state.cycle_seq != entered_cycle {
if let Some((kind, msg)) = state.last_error.as_ref() {
let err = io::Error::new(*kind, msg.clone());
state.pending = state.pending.saturating_sub(1);
return Err(Error::from(err));
}
if !state.leader_active {
state.leader_active = true;
state.last_error = None;
let deadline = Instant::now() + self.max_wait;
while state.pending < self.max_batch {
let now = Instant::now();
if now >= deadline {
break;
}
let remaining = deadline - now;
let (st, timeout) = self
.cv
.wait_timeout(state, remaining)
.map_err(|_| Error::LockPoisoned)?;
state = st;
if timeout.timed_out() {
break;
}
}
drop(state);
let sync_result = sync_call();
let mut state = self.state.lock().map_err(|_| Error::LockPoisoned)?;
state.leader_active = false;
state.cycle_seq = state.cycle_seq.wrapping_add(1);
let result = match sync_result {
Ok(synced_through) => {
self.durable_tail.store(synced_through, Ordering::Release);
Ok(())
}
Err(err) => {
state.last_error = Some((err.kind(), err.to_string()));
Err(Error::from(err))
}
};
state.pending = state.pending.saturating_sub(1);
self.cv.notify_all();
return result;
}
}
let (st, _timeout) = self
.cv
.wait_timeout(state, Duration::from_millis(50))
.map_err(|_| Error::LockPoisoned)?;
state = st;
}
}
}
pub(crate) fn group_sync(
file: &File,
tail_before_sync: u64,
) -> std::result::Result<u64, io::Error> {
file.sync_data()?;
Ok(tail_before_sync)
}