use std::fs::{File, OpenOptions};
use std::io::{BufReader, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use crate::error::WalError;
use crate::frame::{self, DecodeOutcome};
use crate::segment;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct Lsn(pub u64);
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TailState {
Clean,
TruncatedAt(u64),
}
#[derive(Debug, Clone)]
pub struct WalConfig {
pub max_segment_size: Option<u64>,
}
impl Default for WalConfig {
fn default() -> Self {
Self {
max_segment_size: Some(64 * 1024 * 1024),
}
}
}
#[derive(Debug, Clone)]
struct SegmentMeta {
seq: u32,
last_lsn: u64,
}
#[derive(Debug)]
pub struct Wal {
file: File,
dir: PathBuf,
active_seq: u32,
next_lsn: u64,
write_offset: u64,
tail_state: TailState,
segments: Vec<SegmentMeta>,
active_first_lsn: Option<u64>,
config: WalConfig,
}
impl Wal {
pub fn open(dir: &Path) -> Result<Self, WalError> {
Self::open_with_config(dir, WalConfig::default())
}
pub fn open_with_config(dir: &Path, config: WalConfig) -> Result<Self, WalError> {
let dir_created = !dir.exists();
std::fs::create_dir_all(dir)?;
if dir_created && let Some(parent) = dir.parent() {
fsync_dir(parent)?;
}
let v1_path = dir.join(segment::V1_FILENAME);
let v1_exists = v1_path.exists();
let mut seqs = segment::scan_segments(dir)?;
if v1_exists && !seqs.is_empty() {
return Err(WalError::Corrupt {
reason: format!("both wal.log and segment files exist in {}", dir.display()),
});
}
if v1_exists {
let v2_path = segment::segment_path(dir, 1);
std::fs::rename(&v1_path, &v2_path)?;
fsync_dir(dir)?;
seqs = vec![1];
}
if seqs.is_empty() {
let path = segment::segment_path(dir, 1);
File::create(&path)?;
fsync_dir(dir)?;
seqs = vec![1];
}
for window in seqs.windows(2) {
if window[1] != window[0] + 1 {
return Err(WalError::Corrupt {
reason: format!(
"gap in segment sequence: wal-{:06}.log followed by wal-{:06}.log",
window[0], window[1]
),
});
}
}
let mut completed_segments: Vec<SegmentMeta> = Vec::new();
let mut next_lsn = 1u64;
let mut tail_state = TailState::Clean;
let mut write_offset = 0u64;
let mut active_first_lsn: Option<u64> = None;
let mut prev_last_lsn: Option<u64> = None;
let active_seq = *seqs
.last()
.expect("seqs is non-empty — step 3 ensures at least one segment exists");
for (idx, &seq) in seqs.iter().enumerate() {
let is_last = idx == seqs.len() - 1;
let seg_path = segment::segment_path(dir, seq);
let seg_file = File::open(&seg_path)?;
let mut reader = BufReader::new(seg_file);
let mut seg_first_lsn: Option<u64> = None;
let mut seg_last_lsn: Option<u64> = None;
let mut seg_offset = 0u64;
loop {
match frame::decode(&mut reader)? {
DecodeOutcome::Entry {
lsn,
bytes_consumed,
..
} => {
if seg_first_lsn.is_none() {
seg_first_lsn = Some(lsn);
}
seg_last_lsn = Some(lsn);
next_lsn = lsn + 1;
seg_offset += bytes_consumed;
}
DecodeOutcome::EndOfLog => {
if is_last {
tail_state = TailState::Clean;
write_offset = seg_offset;
}
break;
}
DecodeOutcome::Corrupt => {
if is_last {
tail_state = TailState::TruncatedAt(seg_offset);
write_offset = seg_offset;
} else {
return Err(WalError::Corrupt {
reason: format!(
"corruption in completed segment wal-{:06}.log at offset {}",
seq, seg_offset
),
});
}
break;
}
}
}
if let Some(first) = seg_first_lsn {
if let Some(prev) = prev_last_lsn
&& first != prev + 1
{
return Err(WalError::Corrupt {
reason: format!(
"LSN discontinuity: segment wal-{:06}.log starts at LSN {}, \
expected {} (previous segment ends at LSN {})",
seq,
first,
prev + 1,
prev
),
});
}
prev_last_lsn = seg_last_lsn;
}
if is_last {
active_first_lsn = seg_first_lsn;
} else if let Some(last) = seg_last_lsn {
completed_segments.push(SegmentMeta {
seq,
last_lsn: last,
});
}
}
let active_path = segment::segment_path(dir, active_seq);
if let TailState::TruncatedAt(off) = tail_state {
let trunc_file = OpenOptions::new().write(true).open(&active_path)?;
trunc_file.set_len(off)?;
trunc_file.sync_all()?;
fsync_dir(dir)?;
}
let mut file = OpenOptions::new()
.read(true)
.write(true)
.open(&active_path)?;
file.seek(SeekFrom::Start(write_offset))?;
Ok(Wal {
file,
dir: dir.to_path_buf(),
active_seq,
next_lsn,
write_offset,
tail_state,
segments: completed_segments,
active_first_lsn,
config,
})
}
#[must_use = "check Ok/Err and use the returned Lsn"]
pub fn append(&mut self, data: &[u8]) -> Result<Lsn, WalError> {
let lsn = self.next_lsn;
let frame_bytes = frame::encode(lsn, data)?;
if let Some(max_size) = self.config.max_segment_size
&& self.write_offset > 0
&& self.write_offset + frame_bytes.len() as u64 > max_size
{
self.rotate()?;
}
self.file.write_all(&frame_bytes).map_err(|e| {
let _ = self.file.seek(SeekFrom::Start(self.write_offset));
WalError::Io(e)
})?;
self.file.sync_data()?;
if self.active_first_lsn.is_none() {
self.active_first_lsn = Some(lsn);
}
self.write_offset += frame_bytes.len() as u64;
self.next_lsn += 1;
Ok(Lsn(lsn))
}
pub fn rotate(&mut self) -> Result<(), WalError> {
if self.active_first_lsn.is_none() {
return Ok(());
}
self.file.sync_all()?;
let last_lsn = self.next_lsn - 1;
self.segments.push(SegmentMeta {
seq: self.active_seq,
last_lsn,
});
let new_seq = self.active_seq + 1;
let new_path = segment::segment_path(&self.dir, new_seq);
let new_file = OpenOptions::new()
.read(true)
.write(true)
.create_new(true)
.open(&new_path)?;
fsync_dir(&self.dir)?;
self.file = new_file;
self.active_seq = new_seq;
self.write_offset = 0;
self.active_first_lsn = None;
Ok(())
}
#[must_use = "check Ok/Err; segment deletion errors leave WAL in a partial state"]
pub fn checkpoint(&mut self, applied_up_to: Lsn) -> Result<usize, WalError> {
let mut deleted = 0usize;
while let Some(seg) = self.segments.first() {
if seg.last_lsn <= applied_up_to.0 {
let path = segment::segment_path(&self.dir, seg.seq);
std::fs::remove_file(&path)?;
fsync_dir(&self.dir)?;
self.segments.remove(0);
deleted += 1;
} else {
break;
}
}
Ok(deleted)
}
pub fn iter(&self) -> WalIter {
let mut segment_seqs: Vec<u32> = self.segments.iter().map(|s| s.seq).collect();
segment_seqs.push(self.active_seq);
WalIter {
state: IterState::Init {
dir: self.dir.clone(),
segment_seqs,
seq_idx: 0,
},
}
}
pub fn tail_state(&self) -> TailState {
self.tail_state
}
#[must_use]
pub fn next_lsn(&self) -> Lsn {
Lsn(self.next_lsn)
}
#[must_use]
pub fn segment_count(&self) -> usize {
self.segments.len() + 1
}
}
pub struct WalIter {
state: IterState,
}
enum IterState {
Init {
dir: PathBuf,
segment_seqs: Vec<u32>,
seq_idx: usize,
},
Reading {
dir: PathBuf,
segment_seqs: Vec<u32>,
seq_idx: usize,
reader: BufReader<File>,
},
Done,
}
impl Iterator for WalIter {
type Item = Result<(Lsn, Vec<u8>), WalError>;
fn next(&mut self) -> Option<Self::Item> {
loop {
match std::mem::replace(&mut self.state, IterState::Done) {
IterState::Init {
dir,
segment_seqs,
seq_idx,
} => {
if seq_idx >= segment_seqs.len() {
return None;
}
let path = segment::segment_path(&dir, segment_seqs[seq_idx]);
match File::open(&path) {
Ok(f) => {
self.state = IterState::Reading {
dir,
segment_seqs,
seq_idx,
reader: BufReader::new(f),
};
continue;
}
Err(e) => return Some(Err(WalError::Io(e))),
}
}
IterState::Reading {
dir,
segment_seqs,
seq_idx,
mut reader,
} => match frame::decode(&mut reader) {
Ok(DecodeOutcome::Entry { lsn, data, .. }) => {
self.state = IterState::Reading {
dir,
segment_seqs,
seq_idx,
reader,
};
return Some(Ok((Lsn(lsn), data)));
}
Ok(DecodeOutcome::EndOfLog) => {
self.state = IterState::Init {
dir,
segment_seqs,
seq_idx: seq_idx + 1,
};
continue;
}
Ok(DecodeOutcome::Corrupt) => return None,
Err(e) => return Some(Err(WalError::Io(e))),
},
IterState::Done => return None,
}
}
}
}
fn fsync_dir(path: &Path) -> std::io::Result<()> {
File::open(path)?.sync_all()
}