use std::fs;
#[cfg(test)]
use std::fs::OpenOptions;
use std::path::Path;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, Weak};
use std::thread::{self, JoinHandle};
use std::time::Duration;
use lora_store::MutationEvent;
use crate::config::SyncMode;
use crate::dir::{SegmentDir, SegmentId};
use crate::error::WalError;
use crate::lock::DirLock;
use crate::lsn::Lsn;
use crate::record::WalRecord;
use crate::replay::{replay_segments, ReplayOutcome};
use crate::segment::SegmentWriter;
struct WalState {
next_lsn: Lsn,
durable_lsn: Lsn,
active_segment_id: SegmentId,
active_writer: SegmentWriter,
oldest_segment_id: SegmentId,
}
type BgFailure = Mutex<Option<String>>;
#[derive(Debug, Clone, Copy)]
enum FlushKind {
PerConfiguredMode,
ForceFsync,
}
pub struct Wal {
segments: SegmentDir,
sync_mode: SyncMode,
segment_target_bytes: u64,
state: Mutex<WalState>,
bg_failure: Arc<BgFailure>,
_flusher: Mutex<Option<GroupFlusherHandle>>,
_dir_lock: DirLock,
}
impl Wal {
pub fn open(
dir: impl Into<std::path::PathBuf>,
sync_mode: SyncMode,
segment_target_bytes: u64,
checkpoint_lsn: Lsn,
) -> Result<(Arc<Self>, Vec<MutationEvent>), WalError> {
let segments = SegmentDir::new(dir);
fs::create_dir_all(segments.root())?;
let dir_lock = DirLock::acquire(segments.root())?;
let entries = segments.list()?;
let (active_id, active_writer, replay) = if entries.is_empty() {
Self::open_fresh(&segments)?
} else {
Self::open_existing(&segments, &entries, checkpoint_lsn)?
};
let next_lsn = if replay.max_lsn.is_zero() {
Lsn::new(1)
} else {
replay.max_lsn.next()
};
let durable_lsn = replay.max_lsn;
let oldest_segment_id = entries.first().map(|e| e.id).unwrap_or(active_id);
let state = WalState {
next_lsn,
durable_lsn,
active_segment_id: active_id,
active_writer,
oldest_segment_id,
};
let wal = Arc::new(Self {
segments,
sync_mode,
segment_target_bytes,
state: Mutex::new(state),
bg_failure: Arc::new(Mutex::new(None)),
_flusher: Mutex::new(None),
_dir_lock: dir_lock,
});
if let SyncMode::Group { interval_ms } = sync_mode {
let interval = Duration::from_millis(u64::from(interval_ms.max(1)));
let handle = spawn_group_flusher(Arc::downgrade(&wal), interval);
*wal._flusher.lock().unwrap() = Some(handle);
}
Ok((wal, replay.committed_events))
}
fn open_fresh(
segments: &SegmentDir,
) -> Result<(SegmentId, SegmentWriter, ReplayOutcome), WalError> {
let id = SegmentId::FIRST;
let writer = SegmentWriter::create(segments.path_for(id), Lsn::new(1))?;
segments.sync_dir()?;
let replay = ReplayOutcome {
committed_events: Vec::new(),
max_lsn: Lsn::ZERO,
torn_tail: None,
checkpoint_lsn_observed: None,
};
Ok((id, writer, replay))
}
fn open_existing(
segments: &SegmentDir,
entries: &[crate::dir::SegmentEntry],
checkpoint_lsn: Lsn,
) -> Result<(SegmentId, SegmentWriter, ReplayOutcome), WalError> {
let paths: Vec<_> = entries.iter().map(|e| e.path.clone()).collect();
let replay = replay_segments(&paths, checkpoint_lsn)?;
let active = entries.last().expect("entries non-empty in open_existing");
let (mut writer, _torn_from_writer) =
SegmentWriter::open_for_append(segments.path_for(active.id))?;
if let Some(t) = &replay.torn_tail {
if t.segment_path == active.path {
writer.truncate_to(t.last_good_offset)?;
} else {
return Err(WalError::Malformed(format!(
"torn tail found in sealed segment {}",
t.segment_path.display()
)));
}
}
Ok((active.id, writer, replay))
}
pub fn dir(&self) -> &Path {
self.segments.root()
}
pub fn sync_mode(&self) -> SyncMode {
self.sync_mode
}
pub fn durable_lsn(&self) -> Lsn {
self.state.lock().unwrap().durable_lsn
}
pub fn bg_failure(&self) -> Option<String> {
self.bg_failure.lock().unwrap().clone()
}
fn check_healthy(&self) -> Result<(), WalError> {
if self.bg_failure.lock().unwrap().is_some() {
return Err(WalError::Poisoned);
}
Ok(())
}
pub fn next_lsn(&self) -> Lsn {
self.state.lock().unwrap().next_lsn
}
pub fn oldest_segment_id(&self) -> u64 {
self.state.lock().unwrap().oldest_segment_id.raw()
}
pub fn active_segment_id(&self) -> u64 {
self.state.lock().unwrap().active_segment_id.raw()
}
pub fn begin(&self) -> Result<Lsn, WalError> {
self.check_healthy()?;
let mut state = self.state.lock().unwrap();
self.maybe_rotate(&mut state)?;
Self::alloc_and_append(&mut state, |lsn| WalRecord::TxBegin { lsn })
}
pub fn append(&self, tx_begin_lsn: Lsn, event: &MutationEvent) -> Result<Lsn, WalError> {
self.check_healthy()?;
let mut state = self.state.lock().unwrap();
Self::alloc_and_append(&mut state, |lsn| WalRecord::Mutation {
lsn,
tx_begin_lsn,
event: event.clone(),
})
}
pub fn commit(&self, tx_begin_lsn: Lsn) -> Result<Lsn, WalError> {
self.check_healthy()?;
let mut state = self.state.lock().unwrap();
Self::alloc_and_append(&mut state, |lsn| WalRecord::TxCommit { lsn, tx_begin_lsn })
}
pub fn abort(&self, tx_begin_lsn: Lsn) -> Result<Lsn, WalError> {
self.check_healthy()?;
let mut state = self.state.lock().unwrap();
Self::alloc_and_append(&mut state, |lsn| WalRecord::TxAbort { lsn, tx_begin_lsn })
}
pub fn checkpoint_marker(&self, snapshot_lsn: Lsn) -> Result<Lsn, WalError> {
self.check_healthy()?;
let mut state = self.state.lock().unwrap();
Self::alloc_and_append(&mut state, |lsn| WalRecord::Checkpoint {
lsn,
snapshot_lsn,
})
}
#[inline]
fn alloc_and_append(
state: &mut WalState,
build: impl FnOnce(Lsn) -> WalRecord,
) -> Result<Lsn, WalError> {
let lsn = state.next_lsn;
state.next_lsn = lsn.next();
state.active_writer.append(&build(lsn))?;
Ok(lsn)
}
pub fn flush(&self) -> Result<(), WalError> {
self.check_healthy()?;
self.flush_inner(FlushKind::PerConfiguredMode)
}
pub fn force_fsync(&self) -> Result<(), WalError> {
self.check_healthy()?;
self.flush_inner(FlushKind::ForceFsync)
}
fn flush_inner(&self, kind: FlushKind) -> Result<(), WalError> {
let mut state = self.state.lock().unwrap();
let written_lsn = Lsn::new(state.next_lsn.raw().saturating_sub(1));
let do_fsync = matches!(
(kind, self.sync_mode),
(FlushKind::ForceFsync, _) | (_, SyncMode::PerCommit)
);
let advance_durable = matches!(
(kind, self.sync_mode),
(FlushKind::ForceFsync, _) | (_, SyncMode::PerCommit) | (_, SyncMode::None)
);
if do_fsync {
state.active_writer.flush_and_sync()?;
} else {
state.active_writer.flush_buffer()?;
}
if advance_durable {
state.durable_lsn = written_lsn;
}
Ok(())
}
pub fn truncate_up_to(&self, fence_lsn: Lsn) -> Result<(), WalError> {
let mut state = self.state.lock().unwrap();
let active_id = state.active_segment_id;
let entries = self.segments.list()?;
let mut to_drop: Vec<crate::dir::SegmentEntry> = Vec::new();
for (i, entry) in entries.iter().enumerate() {
if entry.id >= active_id.saturating_prev() {
break;
}
let next = match entries.get(i + 1) {
Some(n) => n,
None => break,
};
let next_base = SegmentDir::base_lsn(&next.path)?;
if next_base.raw().saturating_sub(1) <= fence_lsn.raw() {
to_drop.push(entry.clone());
}
}
for entry in to_drop {
fs::remove_file(&entry.path)?;
if entry.id >= state.oldest_segment_id {
state.oldest_segment_id = entry.id.next();
}
}
if state.oldest_segment_id != entries.first().map(|e| e.id).unwrap_or(active_id) {
self.segments.sync_dir()?;
}
Ok(())
}
fn maybe_rotate(&self, state: &mut WalState) -> Result<(), WalError> {
if state.active_writer.bytes_written() < self.segment_target_bytes {
return Ok(());
}
state.active_writer.flush_and_sync()?;
state.active_writer.seal()?;
let next_id = state.active_segment_id.next();
let writer = SegmentWriter::create(self.segments.path_for(next_id), state.next_lsn)?;
self.segments.sync_dir()?;
state.active_writer = writer;
state.active_segment_id = next_id;
Ok(())
}
}
impl Drop for Wal {
fn drop(&mut self) {
if let Ok(slot) = self._flusher.get_mut() {
let _ = slot.take();
}
}
}
struct GroupFlusherHandle {
shutdown: Arc<AtomicBool>,
handle: Option<JoinHandle<()>>,
}
impl Drop for GroupFlusherHandle {
fn drop(&mut self) {
self.shutdown.store(true, Ordering::Release);
if let Some(h) = self.handle.take() {
let _ = h.join();
}
}
}
fn spawn_group_flusher(weak: Weak<Wal>, interval: Duration) -> GroupFlusherHandle {
let shutdown = Arc::new(AtomicBool::new(false));
let shutdown_clone = Arc::clone(&shutdown);
let handle = thread::spawn(move || {
while !shutdown_clone.load(Ordering::Acquire) {
let slice = Duration::from_millis(50).min(interval);
let mut elapsed = Duration::ZERO;
while elapsed < interval && !shutdown_clone.load(Ordering::Acquire) {
thread::sleep(slice);
elapsed += slice;
}
if shutdown_clone.load(Ordering::Acquire) {
break;
}
match weak.upgrade() {
Some(wal) => {
if let Err(err) = wal.flush_inner(FlushKind::ForceFsync) {
let mut slot = wal.bg_failure.lock().unwrap();
if slot.is_none() {
*slot = Some(format!("bg fsync failed: {err}"));
}
break;
}
}
None => break,
}
}
});
GroupFlusherHandle {
shutdown,
handle: Some(handle),
}
}
#[cfg(test)]
mod tests {
use super::*;
use lora_store::{MutationEvent, Properties, PropertyValue};
use crate::testing::TmpDir;
fn ev(id: u64) -> MutationEvent {
let mut p = Properties::new();
p.insert("v".into(), PropertyValue::Int(id as i64));
MutationEvent::CreateNode {
id,
labels: vec!["N".into()],
properties: p,
}
}
fn open_default(dir: &Path) -> (Arc<Wal>, Vec<MutationEvent>) {
Wal::open(dir, SyncMode::PerCommit, 8 * 1024 * 1024, Lsn::ZERO).unwrap()
}
#[test]
fn fresh_open_creates_first_segment() {
let dir = TmpDir::new("fresh");
let (wal, replay) = open_default(&dir.path);
assert!(replay.is_empty());
assert_eq!(wal.next_lsn(), Lsn::new(1));
assert_eq!(wal.active_segment_id(), 1);
let entries: Vec<_> = std::fs::read_dir(&dir.path)
.unwrap()
.filter_map(|e| e.ok())
.map(|e| e.file_name().to_string_lossy().into_owned())
.collect();
assert!(
entries.iter().any(|n| n == ".lora-wal.lock"),
"WAL dir should contain the live directory lock, found: {entries:?}"
);
assert!(
entries
.iter()
.filter(|n| n.as_str() != ".lora-wal.lock")
.all(|n| n.ends_with(".wal")),
"WAL dir should contain only segment files plus the lock, found: {entries:?}"
);
}
#[test]
fn opening_same_directory_twice_fails_until_first_handle_drops() {
let dir = TmpDir::new("exclusive");
let (wal, _) = open_default(&dir.path);
match Wal::open(&dir.path, SyncMode::PerCommit, 8 * 1024 * 1024, Lsn::ZERO) {
Err(WalError::AlreadyOpen { dir: locked_dir }) => {
assert_eq!(locked_dir, dir.path);
}
Err(err) => panic!("expected AlreadyOpen, got {err:?}"),
Ok(_) => panic!("second WAL open on same directory should fail"),
}
drop(wal);
let (reopened, _) = open_default(&dir.path);
drop(reopened);
}
#[test]
fn begin_append_commit_round_trip_through_replay() {
let dir = TmpDir::new("commit");
{
let (wal, _) = open_default(&dir.path);
let begin = wal.begin().unwrap();
wal.append(begin, &ev(1)).unwrap();
wal.append(begin, &ev(2)).unwrap();
wal.commit(begin).unwrap();
wal.flush().unwrap();
let begin = wal.begin().unwrap();
wal.append(begin, &ev(3)).unwrap();
wal.commit(begin).unwrap();
wal.flush().unwrap();
}
let (wal, replay) = open_default(&dir.path);
assert_eq!(replay.len(), 3);
assert_eq!(replay[0], ev(1));
assert_eq!(replay[1], ev(2));
assert_eq!(replay[2], ev(3));
assert_eq!(wal.next_lsn(), Lsn::new(8));
}
#[test]
fn aborted_transaction_is_dropped_on_replay() {
let dir = TmpDir::new("abort");
{
let (wal, _) = open_default(&dir.path);
let b1 = wal.begin().unwrap();
wal.append(b1, &ev(1)).unwrap();
wal.commit(b1).unwrap();
wal.flush().unwrap();
let b2 = wal.begin().unwrap();
wal.append(b2, &ev(99)).unwrap();
wal.abort(b2).unwrap();
wal.flush().unwrap();
}
let (_, replay) = open_default(&dir.path);
assert_eq!(replay, vec![ev(1)]);
}
#[test]
fn uncommitted_transaction_at_end_of_log_is_discarded() {
let dir = TmpDir::new("uncommitted");
{
let (wal, _) = open_default(&dir.path);
let b1 = wal.begin().unwrap();
wal.append(b1, &ev(1)).unwrap();
wal.commit(b1).unwrap();
wal.flush().unwrap();
let b2 = wal.begin().unwrap();
wal.append(b2, &ev(99)).unwrap();
wal.flush().unwrap();
}
let (_, replay) = open_default(&dir.path);
assert_eq!(replay, vec![ev(1)]);
}
#[test]
fn segment_rotation_at_begin_boundary() {
let dir = TmpDir::new("rotate");
let (wal, _) = Wal::open(&dir.path, SyncMode::PerCommit, 256, Lsn::ZERO).unwrap();
let b1 = wal.begin().unwrap();
for i in 0..5 {
wal.append(b1, &ev(i)).unwrap();
}
wal.commit(b1).unwrap();
wal.flush().unwrap();
assert_eq!(wal.active_segment_id(), 1);
let b2 = wal.begin().unwrap();
wal.append(b2, &ev(100)).unwrap();
wal.commit(b2).unwrap();
wal.flush().unwrap();
assert_eq!(
wal.active_segment_id(),
2,
"begin() should have rotated to segment 2"
);
let segments = SegmentDir::new(&dir.path).list().unwrap();
assert_eq!(segments.len(), 2);
drop(wal);
let (_, replay) = open_default(&dir.path);
assert_eq!(replay.len(), 6);
}
#[test]
fn checkpoint_lsn_skips_already_checkpointed_events() {
let dir = TmpDir::new("ckpt-skip");
let (wal, _) = open_default(&dir.path);
let a = wal.begin().unwrap();
wal.append(a, &ev(1)).unwrap();
wal.append(a, &ev(2)).unwrap();
let commit_a = wal.commit(a).unwrap();
wal.flush().unwrap();
let b = wal.begin().unwrap();
wal.append(b, &ev(3)).unwrap();
wal.commit(b).unwrap();
wal.flush().unwrap();
drop(wal);
let (_, replay) =
Wal::open(&dir.path, SyncMode::PerCommit, 8 * 1024 * 1024, commit_a).unwrap();
assert_eq!(replay, vec![ev(3)]);
}
#[test]
fn replay_rejects_commit_without_begin() {
let dir = TmpDir::new("commit-without-begin");
{
let (wal, _) = open_default(&dir.path);
wal.commit(Lsn::new(99)).unwrap();
wal.flush().unwrap();
}
let err = match Wal::open(&dir.path, SyncMode::PerCommit, 8 * 1024 * 1024, Lsn::ZERO) {
Ok(_) => panic!("malformed WAL should not open"),
Err(err) => err,
};
assert!(
matches!(err, WalError::Malformed(ref msg) if msg.contains("missing tx begin")),
"expected malformed missing-begin error, got {err:?}"
);
}
#[test]
fn replay_rejects_mutation_without_begin() {
let dir = TmpDir::new("mutation-without-begin");
{
let (wal, _) = open_default(&dir.path);
wal.append(Lsn::new(99), &ev(1)).unwrap();
wal.flush().unwrap();
}
let err = match Wal::open(&dir.path, SyncMode::PerCommit, 8 * 1024 * 1024, Lsn::ZERO) {
Ok(_) => panic!("malformed WAL should not open"),
Err(err) => err,
};
assert!(
matches!(err, WalError::Malformed(ref msg) if msg.contains("missing tx begin")),
"expected malformed missing-begin error, got {err:?}"
);
}
#[test]
fn torn_tail_is_truncated_on_open() {
let dir = TmpDir::new("torn");
{
let (wal, _) = open_default(&dir.path);
let b = wal.begin().unwrap();
wal.append(b, &ev(1)).unwrap();
wal.commit(b).unwrap();
wal.flush().unwrap();
}
let segments = SegmentDir::new(&dir.path).list().unwrap();
let active = &segments.last().unwrap().path;
{
use std::io::Write;
let mut f = OpenOptions::new().append(true).open(active).unwrap();
f.write_all(&[0xff; 32]).unwrap();
f.sync_all().unwrap();
}
let (wal, replay) = open_default(&dir.path);
assert_eq!(replay, vec![ev(1)]);
let b = wal.begin().unwrap();
wal.append(b, &ev(2)).unwrap();
wal.commit(b).unwrap();
wal.flush().unwrap();
drop(wal);
let (_, replay) = open_default(&dir.path);
assert_eq!(replay, vec![ev(1), ev(2)]);
}
#[test]
fn checkpoint_marker_is_recorded_and_observed() {
let dir = TmpDir::new("ckpt-marker");
let snapshot_lsn = {
let (wal, _) = open_default(&dir.path);
let b = wal.begin().unwrap();
wal.append(b, &ev(1)).unwrap();
let commit = wal.commit(b).unwrap();
wal.flush().unwrap();
wal.checkpoint_marker(commit).unwrap();
wal.flush().unwrap();
commit
};
let outcome = crate::replay::replay_dir(&dir.path, Lsn::ZERO).unwrap();
assert_eq!(
outcome.checkpoint_lsn_observed,
Some(snapshot_lsn),
"checkpoint marker should be surfaced by replay"
);
}
#[test]
fn group_mode_durable_lsn_advances_via_bg_flusher() {
let dir = TmpDir::new("group");
let (wal, _) = Wal::open(
&dir.path,
SyncMode::Group { interval_ms: 25 },
8 * 1024 * 1024,
Lsn::ZERO,
)
.unwrap();
let begin = wal.begin().unwrap();
wal.append(begin, &ev(1)).unwrap();
wal.commit(begin).unwrap();
wal.flush().unwrap();
assert_eq!(
wal.durable_lsn(),
Lsn::ZERO,
"Group flush() must not advance durable_lsn"
);
let deadline = std::time::Instant::now() + std::time::Duration::from_millis(500);
loop {
if wal.durable_lsn() > Lsn::ZERO {
break;
}
if std::time::Instant::now() >= deadline {
panic!(
"bg flusher did not advance durable_lsn within 500 ms (still at {})",
wal.durable_lsn()
);
}
std::thread::sleep(std::time::Duration::from_millis(10));
}
assert_eq!(wal.durable_lsn().raw(), wal.next_lsn().raw() - 1);
drop(wal);
}
#[test]
fn none_mode_advances_durable_lsn_on_flush() {
let dir = TmpDir::new("none");
let (wal, _) = Wal::open(&dir.path, SyncMode::None, 8 * 1024 * 1024, Lsn::ZERO).unwrap();
let begin = wal.begin().unwrap();
wal.append(begin, &ev(1)).unwrap();
wal.commit(begin).unwrap();
wal.flush().unwrap();
assert_eq!(wal.durable_lsn().raw(), wal.next_lsn().raw() - 1);
}
#[test]
fn force_fsync_always_advances_durable_lsn() {
let dir = TmpDir::new("force-fsync");
let (wal, _) = Wal::open(
&dir.path,
SyncMode::Group {
interval_ms: 60_000,
},
8 * 1024 * 1024,
Lsn::ZERO,
)
.unwrap();
let begin = wal.begin().unwrap();
wal.append(begin, &ev(1)).unwrap();
wal.commit(begin).unwrap();
wal.flush().unwrap(); assert_eq!(wal.durable_lsn(), Lsn::ZERO);
wal.force_fsync().unwrap();
assert_eq!(wal.durable_lsn().raw(), wal.next_lsn().raw() - 1);
}
#[test]
fn truncate_up_to_drops_old_sealed_segments() {
let dir = TmpDir::new("truncate");
let (wal, _) = Wal::open(&dir.path, SyncMode::PerCommit, 64, Lsn::ZERO).unwrap();
let mut last_commit = Lsn::ZERO;
for i in 0..5 {
let b = wal.begin().unwrap();
wal.append(b, &ev(i)).unwrap();
last_commit = wal.commit(b).unwrap();
wal.flush().unwrap();
}
assert!(
wal.active_segment_id() >= 4,
"expected several rotations, got {}",
wal.active_segment_id()
);
let segments = SegmentDir::new(&dir.path);
let before = segments.list().unwrap().len();
wal.truncate_up_to(last_commit).unwrap();
let after = segments.list().unwrap().len();
assert!(
after < before,
"truncate_up_to should have dropped at least one segment ({} → {})",
before,
after
);
assert!(
after >= 2,
"active and the segment preceding it must be kept"
);
drop(wal);
let (_, replay) = Wal::open(&dir.path, SyncMode::PerCommit, 64, last_commit).unwrap();
assert!(replay.is_empty());
}
}