use std::fs::File;
use std::io::{BufWriter, Read, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Condvar, Mutex};
use std::thread::{self, JoinHandle};
use std::time::{Duration, Instant};
use crate::storage::v4::io::{open_page_file, IoMode};
use crate::{Error, Result};
const WAL_BUFFER_BYTES: usize = 64 * 1024;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum FlushPolicy {
Manual,
OnEachWrite,
Group {
max_wait: Duration,
},
}
impl FlushPolicy {
fn background(self) -> Option<Duration> {
match self {
Self::Group { max_wait } => Some(max_wait),
_ => None,
}
}
}
#[derive(Debug)]
struct WalInner {
writer: BufWriter<File>,
pending_bytes: u64,
next_seq: u64,
last_synced_seq: u64,
poisoned: Option<&'static str>,
}
#[derive(Debug)]
struct WalShared {
inner: Mutex<WalInner>,
notify_writer: Condvar,
last_synced_seq: AtomicU64,
next_seq: AtomicU64,
background_alive: AtomicBool,
path: PathBuf,
policy: FlushPolicy,
}
#[derive(Debug)]
pub(crate) struct Wal {
shared: Arc<WalShared>,
background: Option<JoinHandle<()>>,
}
impl Wal {
pub(crate) fn open(path: impl Into<PathBuf>, policy: FlushPolicy) -> Result<Self> {
Self::open_with_mode(path, policy, IoMode::Buffered)
}
pub(crate) fn open_with_mode(
path: impl Into<PathBuf>,
policy: FlushPolicy,
mode: IoMode,
) -> Result<Self> {
let path = path.into();
let mut file = open_page_file(&path, mode)?;
let len = file.metadata()?.len();
let _seek = file.seek(SeekFrom::End(0))?;
let writer = BufWriter::with_capacity(WAL_BUFFER_BYTES, file);
let shared = Arc::new(WalShared {
inner: Mutex::new(WalInner {
writer,
pending_bytes: 0,
next_seq: 0,
last_synced_seq: 0,
poisoned: None,
}),
notify_writer: Condvar::new(),
last_synced_seq: AtomicU64::new(0),
next_seq: AtomicU64::new(0),
background_alive: AtomicBool::new(true),
path,
policy,
});
let _ = len;
let background = policy
.background()
.map(|max_wait| spawn_flusher(Arc::clone(&shared), max_wait))
.transpose()?;
Ok(Self { shared, background })
}
pub(crate) fn path_for(store_path: &Path) -> PathBuf {
let mut wal_name = store_path
.file_name()
.and_then(|name| name.to_str())
.map_or_else(
|| String::from("emdb.v4.wal"),
|name| format!("{name}.v4.wal"),
);
if wal_name.is_empty() {
wal_name = String::from("emdb.v4.wal");
}
let mut out = store_path.to_path_buf();
out.set_file_name(wal_name);
out
}
pub(crate) fn append(&self, bytes: &[u8]) -> Result<u64> {
let seq = {
let mut inner = self
.shared
.inner
.lock()
.map_err(|_poisoned| Error::LockPoisoned)?;
if let Some(reason) = inner.poisoned {
return Err(Error::TransactionAborted(reason));
}
inner.writer.write_all(bytes)?;
inner.pending_bytes = inner.pending_bytes.saturating_add(bytes.len() as u64);
let assigned = inner.next_seq;
inner.next_seq = inner
.next_seq
.checked_add(1)
.ok_or(Error::TransactionAborted("wal sequence number overflow"))?;
self.shared
.next_seq
.store(inner.next_seq, Ordering::Release);
assigned
};
self.shared.notify_writer.notify_one();
if matches!(self.shared.policy, FlushPolicy::OnEachWrite) {
self.wait_for_seq(seq)?;
}
Ok(seq)
}
pub(crate) fn wait_for_seq(&self, seq: u64) -> Result<()> {
if self.shared.last_synced_seq.load(Ordering::Acquire) > seq {
return Ok(());
}
let mut inner = self
.shared
.inner
.lock()
.map_err(|_poisoned| Error::LockPoisoned)?;
if let Some(reason) = inner.poisoned {
return Err(Error::TransactionAborted(reason));
}
if inner.last_synced_seq > seq {
return Ok(());
}
let target = inner.next_seq;
match perform_fsync(&mut inner) {
Ok(()) => {
inner.last_synced_seq = target;
self.shared.last_synced_seq.store(target, Ordering::Release);
Ok(())
}
Err(err) => {
inner.poisoned = Some("wal fsync failed");
Err(err)
}
}
}
pub(crate) fn flush(&self) -> Result<()> {
let target_minus_one = self.shared.next_seq.load(Ordering::Acquire);
if target_minus_one == 0 {
return Ok(());
}
self.wait_for_seq(target_minus_one - 1)
}
pub(crate) fn truncate(&self) -> Result<()> {
let mut inner = self
.shared
.inner
.lock()
.map_err(|_poisoned| Error::LockPoisoned)?;
inner.writer.flush()?;
let file = inner.writer.get_mut();
file.set_len(0)?;
let _seek = file.seek(SeekFrom::Start(0))?;
file.sync_data()?;
inner.pending_bytes = 0;
inner.next_seq = 0;
inner.last_synced_seq = 0;
self.shared.next_seq.store(0, Ordering::Release);
self.shared.last_synced_seq.store(0, Ordering::Release);
Ok(())
}
pub(crate) fn read_all(&self, out: &mut Vec<u8>) -> Result<()> {
let mut inner = self
.shared
.inner
.lock()
.map_err(|_poisoned| Error::LockPoisoned)?;
inner.writer.flush()?;
let file = inner.writer.get_mut();
let _seek = file.seek(SeekFrom::Start(0))?;
out.clear();
let _read = file.read_to_end(out)?;
let _seek = file.seek(SeekFrom::End(0))?;
Ok(())
}
#[must_use]
pub(crate) fn path(&self) -> &Path {
&self.shared.path
}
pub(crate) fn pending_bytes(&self) -> Result<u64> {
let inner = self
.shared
.inner
.lock()
.map_err(|_poisoned| Error::LockPoisoned)?;
Ok(inner.pending_bytes)
}
#[must_use]
pub(crate) fn next_seq(&self) -> u64 {
self.shared.next_seq.load(Ordering::Acquire)
}
}
impl Drop for Wal {
fn drop(&mut self) {
self.shared.background_alive.store(false, Ordering::Release);
self.shared.notify_writer.notify_all();
if let Some(handle) = self.background.take() {
let _joined = handle.join();
}
}
}
fn perform_fsync(inner: &mut WalInner) -> Result<()> {
inner.writer.flush()?;
inner.writer.get_mut().sync_data()?;
inner.pending_bytes = 0;
Ok(())
}
fn spawn_flusher(shared: Arc<WalShared>, max_wait: Duration) -> Result<JoinHandle<()>> {
thread::Builder::new()
.name("emdb-wal-flusher".to_string())
.spawn(move || flusher_loop(shared, max_wait))
.map_err(Error::from)
}
fn flusher_loop(shared: Arc<WalShared>, max_wait: Duration) {
let deadline_grace = max_wait;
while shared.background_alive.load(Ordering::Acquire) {
let inner = match shared.inner.lock() {
Ok(inner) => inner,
Err(_poisoned) => return,
};
let pending = inner.next_seq != inner.last_synced_seq;
let _ = pending;
let waited = shared
.notify_writer
.wait_timeout(inner, deadline_grace)
.map(|(guard, result)| (guard, result.timed_out()));
let (mut inner, _timed_out) = match waited {
Ok(pair) => pair,
Err(_poisoned) => return,
};
if !shared.background_alive.load(Ordering::Acquire) {
return;
}
if inner.poisoned.is_some() {
continue;
}
if inner.next_seq == inner.last_synced_seq {
continue;
}
let target = inner.next_seq;
match perform_fsync(&mut inner) {
Ok(()) => {
inner.last_synced_seq = target;
shared.last_synced_seq.store(target, Ordering::Release);
}
Err(_err) => {
inner.poisoned = Some("wal fsync failed in background");
}
}
let _ = (inner, deadline_grace);
let _instant_marker = Instant::now();
let _ = _instant_marker;
}
}
#[cfg(test)]
mod tests {
use super::{FlushPolicy, Wal};
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};
fn tmp_path(name: &str) -> std::path::PathBuf {
let mut p = std::env::temp_dir();
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_or(0_u128, |d| d.as_nanos());
p.push(format!("emdb-v4-wal-{name}-{nanos}.wal"));
p
}
fn open(name: &str, policy: FlushPolicy) -> (Wal, std::path::PathBuf) {
let path = tmp_path(name);
let wal = match Wal::open(path.clone(), policy) {
Ok(wal) => wal,
Err(err) => panic!("open should succeed: {err}"),
};
(wal, path)
}
#[test]
fn fresh_wal_starts_empty() {
let (wal, path) = open("fresh", FlushPolicy::Manual);
let pending = wal.pending_bytes();
assert!(matches!(pending, Ok(0)));
drop(wal);
let _removed = std::fs::remove_file(&path);
}
#[test]
fn append_returns_monotonic_sequence_numbers() {
let (wal, path) = open("monotonic", FlushPolicy::Manual);
let s0 = wal.append(b"alpha");
let s1 = wal.append(b"beta");
let s2 = wal.append(b"gamma");
assert!(matches!(s0, Ok(0)));
assert!(matches!(s1, Ok(1)));
assert!(matches!(s2, Ok(2)));
drop(wal);
let _removed = std::fs::remove_file(&path);
}
#[test]
fn manual_policy_does_not_fsync_until_wait() {
let (wal, path) = open("manual-fsync", FlushPolicy::Manual);
let _seq = wal.append(b"hi");
let pending = wal.pending_bytes();
assert!(matches!(pending, Ok(b) if b > 0));
let _ = wal.flush();
let pending = wal.pending_bytes();
assert!(matches!(pending, Ok(0)));
drop(wal);
let _removed = std::fs::remove_file(&path);
}
#[test]
fn on_each_write_persists_synchronously() {
let (wal, path) = open("on-each-write", FlushPolicy::OnEachWrite);
let _seq = wal.append(b"hi");
let pending = wal.pending_bytes();
assert!(matches!(pending, Ok(0)));
drop(wal);
let _removed = std::fs::remove_file(&path);
}
#[test]
fn read_all_returns_appended_bytes() {
let (wal, path) = open("read-all", FlushPolicy::Manual);
let _ = wal.append(b"foo");
let _ = wal.append(b"bar");
let mut buf = Vec::new();
let read = wal.read_all(&mut buf);
assert!(read.is_ok());
assert_eq!(&buf, b"foobar");
drop(wal);
let _removed = std::fs::remove_file(&path);
}
#[test]
fn truncate_resets_sequence_and_pending() {
let (wal, path) = open("truncate", FlushPolicy::Manual);
let _ = wal.append(b"hi");
let _ = wal.append(b"there");
let truncated = wal.truncate();
assert!(truncated.is_ok());
let pending = wal.pending_bytes();
assert!(matches!(pending, Ok(0)));
let mut buf = Vec::new();
let read = wal.read_all(&mut buf);
assert!(read.is_ok());
assert!(buf.is_empty());
drop(wal);
let _removed = std::fs::remove_file(&path);
}
#[test]
fn wait_for_seq_advances_durability_marker() {
let (wal, path) = open("wait-seq", FlushPolicy::Manual);
let s0 = match wal.append(b"a") {
Ok(s) => s,
Err(err) => panic!("append should succeed: {err}"),
};
let waited = wal.wait_for_seq(s0);
assert!(waited.is_ok());
let pending = wal.pending_bytes();
assert!(matches!(pending, Ok(0)));
drop(wal);
let _removed = std::fs::remove_file(&path);
}
#[test]
fn group_policy_fsyncs_in_background_within_deadline() {
let max_wait = Duration::from_millis(50);
let (wal, path) = open("group-bg", FlushPolicy::Group { max_wait });
let _seq = wal.append(b"async-durability");
let deadline = Instant::now() + Duration::from_secs(5);
loop {
match wal.pending_bytes() {
Ok(0) => break,
Ok(_pending) => {
if Instant::now() >= deadline {
panic!(
"background flusher did not drain pending bytes within 5s (max_wait={:?})",
max_wait
);
}
thread::sleep(Duration::from_millis(10));
}
Err(err) => panic!("pending_bytes failed: {err}"),
}
}
drop(wal);
let _removed = std::fs::remove_file(&path);
}
#[test]
fn concurrent_appends_share_a_single_fsync() {
let (wal, path) = open("concurrent", FlushPolicy::Manual);
let wal = Arc::new(wal);
let mut handles = Vec::new();
for i in 0..16_u32 {
let wal = Arc::clone(&wal);
handles.push(thread::spawn(move || {
let payload = format!("record-{i}").into_bytes();
let seq = wal.append(&payload).unwrap_or(u64::MAX);
let _ = wal.wait_for_seq(seq);
seq
}));
}
let mut max_seen = 0_u64;
for handle in handles {
let seq = handle.join().unwrap_or(0);
if seq > max_seen {
max_seen = seq;
}
}
assert!(max_seen >= 15);
let pending = wal.pending_bytes();
assert!(matches!(pending, Ok(0)));
drop(wal);
let _removed = std::fs::remove_file(&path);
}
#[test]
fn group_commit_meets_latency_target_under_burst() {
let (wal, _path) = open(
"group-burst",
FlushPolicy::Group {
max_wait: Duration::from_millis(20),
},
);
let wal = Arc::new(wal);
let started = Instant::now();
let mut handles = Vec::new();
for i in 0..32_u32 {
let wal = Arc::clone(&wal);
handles.push(thread::spawn(move || {
let payload = format!("burst-{i}").into_bytes();
let _ = wal.append(&payload);
}));
}
for handle in handles {
let _ = handle.join();
}
thread::sleep(Duration::from_millis(80));
let pending = wal.pending_bytes();
assert!(matches!(pending, Ok(0)));
let elapsed = started.elapsed();
assert!(elapsed < Duration::from_secs(2));
}
#[test]
fn path_for_appends_v4_wal_suffix() {
let derived = Wal::path_for(std::path::Path::new("/tmp/foo.emdb"));
assert!(
derived.to_string_lossy().ends_with("foo.emdb.v4.wal"),
"unexpected derived path: {}",
derived.display()
);
}
}