use std::path::Path;
use std::sync::{Arc, Mutex};
use lora_store::{MutationEvent, MutationRecorder};
use crate::error::WalError;
use crate::lsn::Lsn;
use crate::wal::Wal;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum WroteCommit {
Yes,
No,
}
pub trait WalMirror: Send + Sync {
fn persist(&self, wal_dir: &Path) -> Result<(), WalError>;
fn persist_force(&self, wal_dir: &Path) -> Result<(), WalError> {
self.persist(wal_dir)
}
}
#[derive(Default)]
struct RecorderState {
armed: bool,
active_tx: Option<Lsn>,
buffer: Vec<MutationEvent>,
poisoned: Option<String>,
}
pub struct WalRecorder {
wal: Arc<Wal>,
mirror: Option<Arc<dyn WalMirror>>,
state: Mutex<RecorderState>,
}
impl WalRecorder {
pub fn new(wal: Arc<Wal>) -> Self {
Self::new_with_mirror(wal, None)
}
pub fn new_with_mirror(wal: Arc<Wal>, mirror: Option<Arc<dyn WalMirror>>) -> Self {
Self {
wal,
mirror,
state: Mutex::new(RecorderState::default()),
}
}
pub fn wal(&self) -> &Arc<Wal> {
&self.wal
}
pub fn arm(&self) -> Result<(), WalError> {
let mut state = self.state.lock().unwrap();
if state.poisoned.is_some() {
return Err(WalError::Poisoned);
}
if state.armed {
state.poisoned = Some("WalRecorder::arm called while already armed".into());
return Err(WalError::Poisoned);
}
state.armed = true;
state.buffer.clear();
Ok(())
}
pub fn commit(&self) -> Result<WroteCommit, WalError> {
let mut state = self.state.lock().unwrap();
if state.poisoned.is_some() {
return Err(WalError::Poisoned);
}
if !state.armed {
state.poisoned = Some("WalRecorder::commit called without an armed query".into());
return Err(WalError::Poisoned);
}
state.armed = false;
if state.buffer.is_empty() && state.active_tx.is_none() {
return Ok(WroteCommit::No);
}
let events = std::mem::take(&mut state.buffer);
let tx = match state.active_tx {
Some(tx) => tx,
None => self.wal.begin().inspect_err(|e| {
state.poisoned = Some(e.to_string());
})?,
};
state.active_tx = Some(tx);
self.wal.append_batch(tx, events).inspect_err(|e| {
state.poisoned = Some(e.to_string());
})?;
self.wal.commit(tx).inspect_err(|e| {
state.poisoned = Some(e.to_string());
})?;
state.active_tx = None;
Ok(WroteCommit::Yes)
}
pub fn abort(&self) -> Result<bool, WalError> {
let mut state = self.state.lock().unwrap();
if state.poisoned.is_some() {
return Err(WalError::Poisoned);
}
state.armed = false;
let had_buffered_events = !state.buffer.is_empty();
state.buffer.clear();
match state.active_tx.take() {
Some(tx) => {
self.wal.abort(tx).inspect_err(|e| {
state.poisoned = Some(e.to_string());
})?;
Ok(true)
}
None => Ok(had_buffered_events),
}
}
pub fn flush(&self) -> Result<(), WalError> {
let mut state = self.state.lock().unwrap();
if state.poisoned.is_some() {
return Err(WalError::Poisoned);
}
self.wal.flush().inspect_err(|e| {
state.poisoned = Some(e.to_string());
})?;
if let Some(mirror) = &self.mirror {
mirror.persist(self.wal.dir()).inspect_err(|e| {
state.poisoned = Some(e.to_string());
})?;
}
Ok(())
}
pub fn force_fsync(&self) -> Result<(), WalError> {
let mut state = self.state.lock().unwrap();
if state.poisoned.is_some() {
return Err(WalError::Poisoned);
}
self.wal.force_fsync().inspect_err(|e| {
state.poisoned = Some(e.to_string());
})?;
if let Some(mirror) = &self.mirror {
mirror.persist_force(self.wal.dir()).inspect_err(|e| {
state.poisoned = Some(e.to_string());
})?;
}
Ok(())
}
pub fn checkpoint_marker(&self, snapshot_lsn: Lsn) -> Result<Lsn, WalError> {
let mut state = self.state.lock().unwrap();
if state.poisoned.is_some() {
return Err(WalError::Poisoned);
}
self.wal.checkpoint_marker(snapshot_lsn).inspect_err(|e| {
state.poisoned = Some(e.to_string());
})
}
pub fn truncate_up_to(&self, fence_lsn: Lsn) -> Result<(), WalError> {
if let Some(mirror) = &self.mirror {
mirror.persist_force(self.wal.dir())?;
return Ok(());
}
self.wal.truncate_up_to(fence_lsn)?;
Ok(())
}
pub fn is_poisoned(&self) -> bool {
if self.state.lock().unwrap().poisoned.is_some() {
return true;
}
self.wal.bg_failure().is_some()
}
pub fn poison(&self, reason: impl Into<String>) {
let mut state = self.state.lock().unwrap();
state.poisoned.get_or_insert_with(|| reason.into());
state.active_tx = None;
state.armed = false;
state.buffer.clear();
}
#[doc(hidden)]
pub fn clear_poisoned_for_tests(&self) {
let mut state = self.state.lock().unwrap();
state.poisoned = None;
state.active_tx = None;
state.armed = false;
state.buffer.clear();
}
}
impl MutationRecorder for WalRecorder {
fn record(&self, event: &MutationEvent) {
let mut state = self.state.lock().unwrap();
if state.poisoned.is_some() {
return;
}
if !state.armed {
state.poisoned.get_or_insert_with(|| {
"MutationRecorder::record fired outside an armed query".into()
});
return;
}
state.buffer.push(event.clone());
}
fn poisoned(&self) -> Option<String> {
let state = self.state.lock().unwrap();
if let Some(msg) = state.poisoned.clone() {
return Some(msg);
}
self.wal.bg_failure()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use lora_store::{GraphStorageMut, InMemoryGraph, MutationEvent, Properties, PropertyValue};
use crate::config::SyncMode;
use crate::testing::TmpDir;
use crate::Wal;
fn open_wal(dir: &std::path::Path) -> Arc<Wal> {
let (wal, replay) =
Wal::open(dir, SyncMode::PerCommit, 8 * 1024 * 1024, Lsn::ZERO).unwrap();
assert!(replay.is_empty());
wal
}
#[test]
fn record_outside_arm_poisons() {
let dir = TmpDir::new("no-arm");
let recorder = WalRecorder::new(open_wal(&dir.path));
recorder.record(&MutationEvent::Clear);
assert!(recorder.is_poisoned());
let msg = recorder.poisoned().unwrap();
assert!(msg.contains("outside an armed query"));
}
#[test]
fn arm_record_commit_round_trip_via_in_memory_graph() {
let dir = TmpDir::new("happy");
let recorder: Arc<WalRecorder> = Arc::new(WalRecorder::new(open_wal(&dir.path)));
let mut g = InMemoryGraph::new();
g.set_mutation_recorder(Some(recorder.clone()));
recorder.arm().unwrap();
let mut props = Properties::new();
props.insert("v".into(), PropertyValue::Int(1));
g.create_node(vec!["N".into()], props);
let mut props2 = Properties::new();
props2.insert("v".into(), PropertyValue::Int(2));
g.create_node(vec!["N".into()], props2);
let outcome = recorder.commit().unwrap();
assert_eq!(outcome, WroteCommit::Yes);
recorder.flush().unwrap();
assert!(!recorder.is_poisoned());
g.set_mutation_recorder(None);
drop(recorder);
let (_wal, events) =
Wal::open(&dir.path, SyncMode::PerCommit, 8 * 1024 * 1024, Lsn::ZERO).unwrap();
assert_eq!(events.len(), 2);
assert!(matches!(events[0], MutationEvent::CreateNode { id: 0, .. }));
assert!(matches!(events[1], MutationEvent::CreateNode { id: 1, .. }));
}
#[test]
fn arm_then_commit_with_no_mutations_writes_nothing() {
let dir = TmpDir::new("ro");
let recorder = WalRecorder::new(open_wal(&dir.path));
let next_before = recorder.wal().next_lsn();
recorder.arm().unwrap();
let outcome = recorder.commit().unwrap();
assert_eq!(outcome, WroteCommit::No);
let next_after = recorder.wal().next_lsn();
assert_eq!(
next_before, next_after,
"read-only commit must not allocate any LSNs"
);
}
#[test]
fn abort_drops_in_flight_events_on_replay() {
let dir = TmpDir::new("abort");
let recorder: Arc<WalRecorder> = Arc::new(WalRecorder::new(open_wal(&dir.path)));
let mut g = InMemoryGraph::new();
g.set_mutation_recorder(Some(recorder.clone()));
recorder.arm().unwrap();
g.create_node(vec!["A".into()], Properties::new());
let _ = recorder.commit().unwrap();
recorder.flush().unwrap();
recorder.arm().unwrap();
g.create_node(vec!["B".into()], Properties::new());
let aborted = recorder.abort().unwrap();
assert!(aborted, "abort after buffered mutations should quarantine");
recorder.flush().unwrap();
g.set_mutation_recorder(None);
drop(recorder);
let (_wal, events) =
Wal::open(&dir.path, SyncMode::PerCommit, 8 * 1024 * 1024, Lsn::ZERO).unwrap();
assert_eq!(events.len(), 1);
if let MutationEvent::CreateNode { labels, .. } = &events[0] {
assert_eq!(labels, &vec!["A".to_string()]);
} else {
panic!("expected CreateNode for label A, got {:?}", events[0]);
}
}
#[test]
fn arm_while_armed_poisons() {
let dir = TmpDir::new("double-arm");
let recorder = WalRecorder::new(open_wal(&dir.path));
recorder.arm().unwrap();
let err = recorder.arm().unwrap_err();
assert!(matches!(err, WalError::Poisoned));
assert!(recorder.is_poisoned());
}
#[test]
fn poisoned_recorder_swallows_subsequent_records() {
let dir = TmpDir::new("swallow");
let recorder = WalRecorder::new(open_wal(&dir.path));
recorder.record(&MutationEvent::Clear);
assert!(recorder.is_poisoned());
for _ in 0..10 {
recorder.record(&MutationEvent::Clear);
}
assert!(recorder.is_poisoned());
}
#[test]
fn checkpoint_marker_through_recorder() {
let dir = TmpDir::new("ckpt");
let recorder = WalRecorder::new(open_wal(&dir.path));
recorder.arm().unwrap();
recorder.record(&MutationEvent::Clear);
assert_eq!(recorder.commit().unwrap(), WroteCommit::Yes);
recorder.force_fsync().unwrap();
let snapshot_lsn = recorder.wal().durable_lsn();
let marker_lsn = recorder.checkpoint_marker(snapshot_lsn).unwrap();
recorder.force_fsync().unwrap();
assert!(marker_lsn >= Lsn::new(1));
let outcome = crate::replay::replay_dir(&dir.path, Lsn::ZERO).unwrap();
assert_eq!(outcome.checkpoint_lsn_observed, Some(snapshot_lsn));
}
}