use std::fs;
use std::path::Path;
use std::sync::{Arc, Mutex, MutexGuard, PoisonError};
#[cfg(not(target_arch = "wasm32"))]
use std::time::Duration;
use lora_store::MutationEvent;
#[cfg(not(target_arch = "wasm32"))]
use super::group_flusher::{spawn_group_flusher, GroupFlusherHandle};
use crate::config::SyncMode;
use crate::dir::{SegmentDir, SegmentId};
use crate::errors::WalError;
use crate::lock::DirLock;
use crate::lsn::Lsn;
use crate::record::WalRecord;
use crate::recorder::WroteCommit;
use crate::replay::{replay_segments, ReplayOutcome};
use crate::segment::SegmentWriter;
struct WalState {
next_lsn: Lsn,
durable_lsn: Lsn,
active_segment_id: SegmentId,
active_writer: SegmentWriter,
oldest_segment_id: SegmentId,
}
type BgFailure = Mutex<Option<String>>;
#[derive(Debug, Clone, Copy)]
pub(super) enum FlushKind {
PerConfiguredMode,
ForceFsync,
}
pub struct Wal {
segments: SegmentDir,
sync_mode: SyncMode,
segment_target_bytes: u64,
state: Mutex<WalState>,
bg_failure: Arc<BgFailure>,
#[cfg(not(target_arch = "wasm32"))]
flusher: Mutex<Option<GroupFlusherHandle>>,
_dir_lock: DirLock,
}
impl Wal {
pub fn open(
dir: impl Into<std::path::PathBuf>,
sync_mode: SyncMode,
segment_target_bytes: u64,
checkpoint_lsn: Lsn,
) -> Result<(Arc<Self>, Vec<MutationEvent>), WalError> {
let segments = SegmentDir::new(dir);
fs::create_dir_all(segments.root())?;
let dir_lock = DirLock::acquire(segments.root())?;
let entries = segments.list()?;
let (active_id, active_writer, replay) = if entries.is_empty() {
Self::open_fresh(&segments)?
} else {
Self::open_existing(&segments, &entries, checkpoint_lsn)?
};
let next_lsn = if replay.max_lsn.is_zero() {
Lsn::new(1)
} else {
replay
.max_lsn
.checked_next()
.ok_or_else(|| WalError::Malformed("WAL LSN space is exhausted".into()))?
};
let durable_lsn = replay.max_lsn;
let oldest_segment_id = entries.first().map(|e| e.id).unwrap_or(active_id);
let state = WalState {
next_lsn,
durable_lsn,
active_segment_id: active_id,
active_writer,
oldest_segment_id,
};
let wal = Arc::new(Self {
segments,
sync_mode,
segment_target_bytes,
state: Mutex::new(state),
bg_failure: Arc::new(Mutex::new(None)),
#[cfg(not(target_arch = "wasm32"))]
flusher: Mutex::new(None),
_dir_lock: dir_lock,
});
#[cfg(not(target_arch = "wasm32"))]
{
let SyncMode::GroupSync { interval_ms } = sync_mode;
let interval = Duration::from_millis(u64::from(interval_ms.max(1)));
let handle = spawn_group_flusher(Arc::downgrade(&wal), interval);
*wal.flusher.lock().map_err(|_| WalError::Poisoned)? = Some(handle);
}
Ok((wal, replay.committed_events))
}
fn open_fresh(
segments: &SegmentDir,
) -> Result<(SegmentId, SegmentWriter, ReplayOutcome), WalError> {
let id = SegmentId::FIRST;
let writer = SegmentWriter::create(segments.path_for(id), Lsn::new(1))?;
segments.sync_dir()?;
let replay = ReplayOutcome {
committed_events: Vec::new(),
max_lsn: Lsn::ZERO,
torn_tail: None,
checkpoint_lsn_observed: None,
last_good_offset: crate::segment::SEGMENT_HEADER_LEN as u64,
};
Ok((id, writer, replay))
}
fn open_existing(
segments: &SegmentDir,
entries: &[crate::dir::SegmentEntry],
checkpoint_lsn: Lsn,
) -> Result<(SegmentId, SegmentWriter, ReplayOutcome), WalError> {
let paths: Vec<_> = entries.iter().map(|e| e.path.clone()).collect();
let replay = replay_segments(&paths, checkpoint_lsn)?;
let active = entries
.last()
.ok_or_else(|| WalError::Malformed("WAL directory has no segments".into()))?;
let mut writer = SegmentWriter::open_for_append_at(
segments.path_for(active.id),
replay.last_good_offset,
)?;
if let Some(t) = &replay.torn_tail {
if t.segment_path == active.path {
writer.truncate_to(t.last_good_offset)?;
} else {
return Err(WalError::Malformed(format!(
"torn tail found in sealed segment {}",
t.segment_path.display()
)));
}
}
Ok((active.id, writer, replay))
}
pub fn dir(&self) -> &Path {
self.segments.root()
}
pub fn sync_mode(&self) -> SyncMode {
self.sync_mode
}
pub fn durable_lsn(&self) -> Lsn {
self.state
.lock()
.unwrap_or_else(PoisonError::into_inner)
.durable_lsn
}
pub fn bg_failure(&self) -> Option<String> {
self.bg_failure
.lock()
.unwrap_or_else(PoisonError::into_inner)
.clone()
}
#[cfg(not(target_arch = "wasm32"))]
pub(super) fn bg_failure_slot(&self) -> &BgFailure {
&self.bg_failure
}
fn check_healthy(&self) -> Result<(), WalError> {
if self
.bg_failure
.lock()
.map_err(|_| WalError::Poisoned)?
.is_some()
{
return Err(WalError::Poisoned);
}
Ok(())
}
pub fn next_lsn(&self) -> Lsn {
self.state
.lock()
.unwrap_or_else(PoisonError::into_inner)
.next_lsn
}
pub fn oldest_segment_id(&self) -> u64 {
self.state
.lock()
.unwrap_or_else(PoisonError::into_inner)
.oldest_segment_id
.raw()
}
pub fn active_segment_id(&self) -> u64 {
self.state
.lock()
.unwrap_or_else(PoisonError::into_inner)
.active_segment_id
.raw()
}
pub fn begin(&self) -> Result<Lsn, WalError> {
self.check_healthy()?;
let mut state = self.lock_state()?;
self.maybe_rotate(&mut state)?;
Self::alloc_and_append(&mut state, |lsn| WalRecord::TxBegin { lsn })
}
pub fn append(&self, tx_begin_lsn: Lsn, event: &MutationEvent) -> Result<Lsn, WalError> {
self.check_healthy()?;
let mut state = self.lock_state()?;
Self::alloc_and_append(&mut state, |lsn| WalRecord::Mutation {
lsn,
tx_begin_lsn,
event: event.clone(),
})
}
pub fn append_batch(
&self,
tx_begin_lsn: Lsn,
events: Vec<MutationEvent>,
) -> Result<Lsn, WalError> {
self.check_healthy()?;
if events.is_empty() {
return Err(WalError::Encode(
"mutation batch must contain at least one event".into(),
));
}
let mut state = self.lock_state()?;
Self::alloc_and_append(&mut state, |lsn| WalRecord::MutationBatch {
lsn,
tx_begin_lsn,
events,
})
}
pub fn commit(&self, tx_begin_lsn: Lsn) -> Result<Lsn, WalError> {
self.check_healthy()?;
let mut state = self.lock_state()?;
Self::alloc_and_append(&mut state, |lsn| WalRecord::TxCommit { lsn, tx_begin_lsn })
}
pub fn abort(&self, tx_begin_lsn: Lsn) -> Result<Lsn, WalError> {
self.check_healthy()?;
let mut state = self.lock_state()?;
Self::alloc_and_append(&mut state, |lsn| WalRecord::TxAbort { lsn, tx_begin_lsn })
}
pub fn commit_tx(&self, events: Vec<MutationEvent>) -> Result<WroteCommit, WalError> {
self.check_healthy()?;
if events.is_empty() {
return Ok(WroteCommit::No);
}
{
let mut state = self.lock_state()?;
self.maybe_rotate(&mut state)?;
let begin_lsn = state.next_lsn;
let batch_lsn = begin_lsn
.checked_next()
.ok_or_else(|| WalError::Malformed("WAL LSN space is exhausted".into()))?;
let commit_lsn = batch_lsn
.checked_next()
.ok_or_else(|| WalError::Malformed("WAL LSN space is exhausted".into()))?;
let next_lsn = commit_lsn
.checked_next()
.ok_or_else(|| WalError::Malformed("WAL LSN space is exhausted".into()))?;
state.next_lsn = next_lsn;
state
.active_writer
.append(&WalRecord::TxBegin { lsn: begin_lsn })?;
state.active_writer.append(&WalRecord::MutationBatch {
lsn: batch_lsn,
tx_begin_lsn: begin_lsn,
events,
})?;
state.active_writer.append(&WalRecord::TxCommit {
lsn: commit_lsn,
tx_begin_lsn: begin_lsn,
})?;
}
self.flush_inner(FlushKind::PerConfiguredMode)?;
Ok(WroteCommit::Yes)
}
pub fn checkpoint_marker(&self, snapshot_lsn: Lsn) -> Result<Lsn, WalError> {
self.check_healthy()?;
let mut state = self.lock_state()?;
Self::alloc_and_append(&mut state, |lsn| WalRecord::Checkpoint {
lsn,
snapshot_lsn,
})
}
#[inline]
fn alloc_and_append(
state: &mut WalState,
build: impl FnOnce(Lsn) -> WalRecord,
) -> Result<Lsn, WalError> {
let lsn = state.next_lsn;
let next_lsn = lsn
.checked_next()
.ok_or_else(|| WalError::Malformed("WAL LSN space is exhausted".into()))?;
state.active_writer.append(&build(lsn))?;
state.next_lsn = next_lsn;
Ok(lsn)
}
pub fn flush(&self) -> Result<(), WalError> {
self.check_healthy()?;
self.flush_inner(FlushKind::PerConfiguredMode)
}
pub fn force_fsync(&self) -> Result<(), WalError> {
self.check_healthy()?;
self.flush_inner(FlushKind::ForceFsync)
}
pub(super) fn flush_inner(&self, kind: FlushKind) -> Result<(), WalError> {
let mut state = self.lock_state()?;
let written_lsn = Lsn::new(state.next_lsn.raw().saturating_sub(1));
if matches!(kind, FlushKind::ForceFsync) {
state.active_writer.flush_and_sync()?;
state.durable_lsn = written_lsn;
} else {
state.active_writer.flush_buffer()?;
}
Ok(())
}
pub fn truncate_up_to(&self, fence_lsn: Lsn) -> Result<(), WalError> {
let mut state = self.lock_state()?;
let active_id = state.active_segment_id;
let entries = self.segments.list()?;
let mut to_drop: Vec<crate::dir::SegmentEntry> = Vec::new();
for (i, entry) in entries.iter().enumerate() {
if entry.id >= active_id.saturating_prev() {
break;
}
let next = match entries.get(i + 1) {
Some(n) => n,
None => break,
};
let next_base = SegmentDir::base_lsn(&next.path)?;
if next_base.raw().saturating_sub(1) <= fence_lsn.raw() {
to_drop.push(entry.clone());
}
}
for entry in to_drop {
fs::remove_file(&entry.path)?;
if entry.id >= state.oldest_segment_id {
state.oldest_segment_id = entry.id.checked_next().ok_or_else(|| {
WalError::Malformed("WAL segment id space is exhausted".into())
})?;
}
}
if state.oldest_segment_id != entries.first().map(|e| e.id).unwrap_or(active_id) {
self.segments.sync_dir()?;
}
Ok(())
}
fn maybe_rotate(&self, state: &mut WalState) -> Result<(), WalError> {
if state.active_writer.bytes_written() < self.segment_target_bytes {
return Ok(());
}
state.active_writer.seal()?;
let next_id = state
.active_segment_id
.checked_next()
.ok_or_else(|| WalError::Malformed("WAL segment id space is exhausted".into()))?;
let writer = SegmentWriter::create(self.segments.path_for(next_id), state.next_lsn)?;
self.segments.sync_dir()?;
state.active_writer = writer;
state.active_segment_id = next_id;
Ok(())
}
fn lock_state(&self) -> Result<MutexGuard<'_, WalState>, WalError> {
self.state.lock().map_err(|_| WalError::Poisoned)
}
}
impl Drop for Wal {
fn drop(&mut self) {
let _ = self.flush_inner(FlushKind::ForceFsync);
#[cfg(not(target_arch = "wasm32"))]
if let Ok(slot) = self.flusher.get_mut() {
let _ = slot.take();
}
}
}