use std::fs;
use std::path::{Path, PathBuf};
use crate::error::{Result, WalError};
pub fn fsync_directory(dir: &Path) -> Result<()> {
let dir_file = fs::File::open(dir).map_err(WalError::Io)?;
dir_file.sync_all().map_err(WalError::Io)?;
Ok(())
}
pub const DEFAULT_SEGMENT_TARGET_SIZE: u64 = 64 * 1024 * 1024;
const SEGMENT_EXTENSION: &str = "seg";
const SEGMENT_PREFIX: &str = "wal-";
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SegmentMeta {
pub path: PathBuf,
pub first_lsn: u64,
pub file_size: u64,
}
impl SegmentMeta {
pub fn dwb_path(&self) -> PathBuf {
self.path.with_extension("dwb")
}
}
impl Ord for SegmentMeta {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.first_lsn.cmp(&other.first_lsn)
}
}
impl PartialOrd for SegmentMeta {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
pub fn segment_filename(first_lsn: u64) -> String {
format!("{SEGMENT_PREFIX}{first_lsn:020}.{SEGMENT_EXTENSION}")
}
pub fn segment_path(wal_dir: &Path, first_lsn: u64) -> PathBuf {
wal_dir.join(segment_filename(first_lsn))
}
fn parse_segment_filename(filename: &str) -> Option<u64> {
let stem = filename.strip_prefix(SEGMENT_PREFIX)?;
let lsn_str = stem.strip_suffix(&format!(".{SEGMENT_EXTENSION}"))?;
lsn_str.parse::<u64>().ok()
}
pub fn discover_segments(wal_dir: &Path) -> Result<Vec<SegmentMeta>> {
if !wal_dir.exists() {
return Ok(Vec::new());
}
let entries = fs::read_dir(wal_dir).map_err(WalError::Io)?;
let mut segments = Vec::new();
for entry in entries {
let entry = entry.map_err(WalError::Io)?;
let file_name = entry.file_name();
let name = file_name.to_string_lossy();
if let Some(first_lsn) = parse_segment_filename(&name) {
let metadata = entry.metadata().map_err(WalError::Io)?;
segments.push(SegmentMeta {
path: entry.path(),
first_lsn,
file_size: metadata.len(),
});
}
}
segments.sort();
Ok(segments)
}
pub fn migrate_legacy_wal(legacy_path: &Path, wal_dir: &Path) -> Result<bool> {
if !legacy_path.is_file() {
return Ok(false);
}
let metadata = fs::metadata(legacy_path).map_err(WalError::Io)?;
if metadata.len() == 0 {
let _ = fs::remove_file(legacy_path);
return Ok(false);
}
let info = crate::recovery::recover(legacy_path)?;
let first_lsn = if info.record_count == 0 {
1 } else {
let mut reader = crate::reader::WalReader::open(legacy_path)?;
match reader.next_record()? {
Some(record) => record.header.lsn,
None => 1,
}
};
fs::create_dir_all(wal_dir).map_err(WalError::Io)?;
let new_path = segment_path(wal_dir, first_lsn);
fs::rename(legacy_path, &new_path).map_err(WalError::Io)?;
let legacy_dwb = legacy_path.with_extension("dwb");
if legacy_dwb.exists() {
let new_dwb = new_path.with_extension("dwb");
fs::rename(&legacy_dwb, &new_dwb).map_err(WalError::Io)?;
}
tracing::info!(
legacy = %legacy_path.display(),
segment = %new_path.display(),
first_lsn,
"migrated legacy WAL to segmented format"
);
Ok(true)
}
pub fn truncate_segments(
wal_dir: &Path,
checkpoint_lsn: u64,
active_segment_first_lsn: u64,
) -> Result<TruncateResult> {
let segments = discover_segments(wal_dir)?;
let mut deleted_count = 0u64;
let mut bytes_reclaimed = 0u64;
for seg in &segments {
if seg.first_lsn == active_segment_first_lsn {
continue;
}
let next_first_lsn = segments
.iter()
.find(|s| s.first_lsn > seg.first_lsn)
.map(|s| s.first_lsn)
.unwrap_or(u64::MAX);
if next_first_lsn <= checkpoint_lsn {
bytes_reclaimed += seg.file_size;
fs::remove_file(&seg.path).map_err(WalError::Io)?;
let dwb_path = seg.dwb_path();
if dwb_path.exists() {
let _ = fs::remove_file(&dwb_path);
}
tracing::info!(
segment = %seg.path.display(),
first_lsn = seg.first_lsn,
"deleted WAL segment (checkpoint_lsn={})",
checkpoint_lsn
);
deleted_count += 1;
}
}
if deleted_count > 0 {
let _ = fsync_directory(wal_dir);
}
Ok(TruncateResult {
segments_deleted: deleted_count,
bytes_reclaimed,
})
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct TruncateResult {
pub segments_deleted: u64,
pub bytes_reclaimed: u64,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn segment_filename_format() {
assert_eq!(segment_filename(1), "wal-00000000000000000001.seg");
assert_eq!(segment_filename(999), "wal-00000000000000000999.seg");
assert_eq!(segment_filename(u64::MAX), "wal-18446744073709551615.seg");
}
#[test]
fn parse_segment_filename_valid() {
assert_eq!(
parse_segment_filename("wal-00000000000000000001.seg"),
Some(1)
);
assert_eq!(
parse_segment_filename("wal-00000000000000000999.seg"),
Some(999)
);
}
#[test]
fn parse_segment_filename_invalid() {
assert_eq!(parse_segment_filename("wal.log"), None);
assert_eq!(parse_segment_filename("wal-abc.seg"), None);
assert_eq!(parse_segment_filename("other-00001.seg"), None);
assert_eq!(parse_segment_filename("wal-00001.dwb"), None);
}
#[test]
fn discover_empty_dir() {
let dir = tempfile::tempdir().unwrap();
let segments = discover_segments(dir.path()).unwrap();
assert!(segments.is_empty());
}
#[test]
fn discover_nonexistent_dir() {
let segments = discover_segments(Path::new("/nonexistent/wal/dir")).unwrap();
assert!(segments.is_empty());
}
#[test]
fn discover_segments_sorted() {
let dir = tempfile::tempdir().unwrap();
fs::write(dir.path().join("wal-00000000000000000050.seg"), b"seg3").unwrap();
fs::write(dir.path().join("wal-00000000000000000001.seg"), b"seg1").unwrap();
fs::write(dir.path().join("wal-00000000000000000025.seg"), b"seg2").unwrap();
fs::write(dir.path().join("wal-00000000000000000001.dwb"), b"dwb").unwrap();
fs::write(dir.path().join("metadata.json"), b"{}").unwrap();
let segments = discover_segments(dir.path()).unwrap();
assert_eq!(segments.len(), 3);
assert_eq!(segments[0].first_lsn, 1);
assert_eq!(segments[1].first_lsn, 25);
assert_eq!(segments[2].first_lsn, 50);
}
#[test]
fn truncate_deletes_old_segments() {
let dir = tempfile::tempdir().unwrap();
fs::write(dir.path().join("wal-00000000000000000001.seg"), b"data1").unwrap();
fs::write(dir.path().join("wal-00000000000000000001.dwb"), b"dwb1").unwrap();
fs::write(dir.path().join("wal-00000000000000000050.seg"), b"data2").unwrap();
fs::write(dir.path().join("wal-00000000000000000100.seg"), b"data3").unwrap();
let result = truncate_segments(dir.path(), 100, 100).unwrap();
assert_eq!(result.segments_deleted, 2);
let remaining = discover_segments(dir.path()).unwrap();
assert_eq!(remaining.len(), 1);
assert_eq!(remaining[0].first_lsn, 100);
assert!(!dir.path().join("wal-00000000000000000001.dwb").exists());
}
#[test]
fn truncate_never_deletes_active_segment() {
let dir = tempfile::tempdir().unwrap();
fs::write(dir.path().join("wal-00000000000000000001.seg"), b"data").unwrap();
let result = truncate_segments(dir.path(), 999, 1).unwrap();
assert_eq!(result.segments_deleted, 0);
let remaining = discover_segments(dir.path()).unwrap();
assert_eq!(remaining.len(), 1);
}
#[test]
fn truncate_no_segments_below_checkpoint() {
let dir = tempfile::tempdir().unwrap();
fs::write(dir.path().join("wal-00000000000000000100.seg"), b"data").unwrap();
fs::write(dir.path().join("wal-00000000000000000200.seg"), b"data").unwrap();
let result = truncate_segments(dir.path(), 50, 200).unwrap();
assert_eq!(result.segments_deleted, 0);
}
#[test]
fn migrate_legacy_wal() {
let dir = tempfile::tempdir().unwrap();
let legacy_path = dir.path().join("test.wal");
let wal_dir = dir.path().join("wal_segments");
{
let mut writer =
crate::writer::WalWriter::open_without_direct_io(&legacy_path).unwrap();
writer
.append(crate::record::RecordType::Put as u16, 1, 0, b"hello")
.unwrap();
writer
.append(crate::record::RecordType::Put as u16, 1, 0, b"world")
.unwrap();
writer.sync().unwrap();
}
let migrated = super::migrate_legacy_wal(&legacy_path, &wal_dir).unwrap();
assert!(migrated);
assert!(!legacy_path.exists());
let segments = discover_segments(&wal_dir).unwrap();
assert_eq!(segments.len(), 1);
assert_eq!(segments[0].first_lsn, 1);
let reader = crate::reader::WalReader::open(&segments[0].path).unwrap();
let records: Vec<_> = reader.records().collect::<crate::Result<_>>().unwrap();
assert_eq!(records.len(), 2);
assert_eq!(records[0].payload, b"hello");
}
#[test]
fn migrate_nonexistent_legacy_is_noop() {
let dir = tempfile::tempdir().unwrap();
let legacy_path = dir.path().join("nonexistent.wal");
let wal_dir = dir.path().join("wal_segments");
let migrated = super::migrate_legacy_wal(&legacy_path, &wal_dir).unwrap();
assert!(!migrated);
}
#[test]
fn migrate_empty_legacy_is_noop() {
let dir = tempfile::tempdir().unwrap();
let legacy_path = dir.path().join("empty.wal");
let wal_dir = dir.path().join("wal_segments");
fs::write(&legacy_path, b"").unwrap();
let migrated = super::migrate_legacy_wal(&legacy_path, &wal_dir).unwrap();
assert!(!migrated);
assert!(!legacy_path.exists()); }
}