use std::{
collections::HashMap,
ffi::OsStr,
fs::{self, File, OpenOptions},
io,
path::{Path, PathBuf},
sync::{
Arc, PoisonError, RwLock, RwLockReadGuard, RwLockWriteGuard,
atomic::{AtomicU64, Ordering},
},
};
use crate::{
error::{Result, WalError},
store::{WalStore, durable_sync, pread_fill, pwrite_all},
};
const NAME_DIGITS: usize = 20;
const NAME_EXT: &str = "wal";
const HEAD_FILE: &str = "head";
const HEAD_FILE_LEN: usize = 12;
#[derive(Debug)]
pub struct SegmentedStore {
dir: PathBuf,
segment_size: u64,
segments: RwLock<HashMap<u64, Arc<File>>>,
max_written: AtomicU64,
synced_from: AtomicU64,
head: AtomicU64,
}
impl SegmentedStore {
pub fn open(dir: impl AsRef<Path>, segment_size: u64) -> Result<Self> {
if segment_size == 0 {
return Err(WalError::io(
"opening the segmented log",
io::Error::other("segment size must be non-zero"),
));
}
let dir = dir.as_ref().to_path_buf();
fs::create_dir_all(&dir).map_err(|e| WalError::io("creating the log directory", e))?;
let mut highest: Option<(u64, u64)> = None; for entry in fs::read_dir(&dir).map_err(|e| WalError::io("reading the log directory", e))? {
let entry = entry.map_err(|e| WalError::io("reading the log directory", e))?;
if let Some(index) = parse_segment_name(&entry.file_name()) {
let len = entry
.metadata()
.map_err(|e| WalError::io("reading segment metadata", e))?
.len();
if highest.is_none_or(|(h, _)| index > h) {
highest = Some((index, len));
}
}
}
let total_len = match highest {
Some((index, len)) => index.saturating_mul(segment_size).saturating_add(len),
None => 0,
};
let active = total_len / segment_size;
let head = read_head_file(&dir)?.unwrap_or(0).min(total_len);
Ok(SegmentedStore {
dir,
segment_size,
segments: RwLock::new(HashMap::new()),
max_written: AtomicU64::new(total_len),
synced_from: AtomicU64::new(active),
head: AtomicU64::new(head),
})
}
#[must_use]
pub fn dir(&self) -> &Path {
&self.dir
}
#[must_use]
pub fn segment_size(&self) -> u64 {
self.segment_size
}
fn read_map(&self) -> RwLockReadGuard<'_, HashMap<u64, Arc<File>>> {
self.segments.read().unwrap_or_else(PoisonError::into_inner)
}
fn write_map(&self) -> RwLockWriteGuard<'_, HashMap<u64, Arc<File>>> {
self.segments
.write()
.unwrap_or_else(PoisonError::into_inner)
}
fn segment_for_write(&self, index: u64) -> Result<Arc<File>> {
if let Some(file) = self.read_map().get(&index) {
return Ok(Arc::clone(file));
}
let mut map = self.write_map();
if let Some(file) = map.get(&index) {
return Ok(Arc::clone(file));
}
let path = self.dir.join(segment_name(index));
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(&path)
.map_err(|e| WalError::io("creating a log segment", e))?;
let file = Arc::new(file);
let _ = map.insert(index, Arc::clone(&file));
Ok(file)
}
fn segment_for_read(&self, index: u64) -> Result<Option<Arc<File>>> {
if let Some(file) = self.read_map().get(&index) {
return Ok(Some(Arc::clone(file)));
}
let path = self.dir.join(segment_name(index));
match OpenOptions::new().read(true).write(true).open(&path) {
Ok(file) => {
let file = Arc::new(file);
let mut map = self.write_map();
if let Some(existing) = map.get(&index) {
return Ok(Some(Arc::clone(existing)));
}
let _ = map.insert(index, Arc::clone(&file));
Ok(Some(file))
}
Err(error) if error.kind() == io::ErrorKind::NotFound => Ok(None),
Err(error) => Err(WalError::io("opening a log segment", error)),
}
}
fn open_segment(&self, index: u64) -> Option<Arc<File>> {
self.read_map().get(&index).map(Arc::clone)
}
fn write_head_file(&self, head: u64) -> Result<()> {
let mut buf = [0u8; HEAD_FILE_LEN];
buf[..8].copy_from_slice(&head.to_le_bytes());
let crc = crc32c::crc32c(&buf[..8]);
buf[8..].copy_from_slice(&crc.to_le_bytes());
let path = self.dir.join(HEAD_FILE);
let file = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(&path)
.map_err(|e| WalError::io("writing the head marker", e))?;
pwrite_all(&file, 0, &buf).map_err(|e| WalError::io("writing the head marker", e))?;
durable_sync(&file).map_err(|e| WalError::io("flushing the head marker", e))?;
Ok(())
}
}
fn read_head_file(dir: &Path) -> Result<Option<u64>> {
match fs::read(dir.join(HEAD_FILE)) {
Ok(bytes) if bytes.len() >= HEAD_FILE_LEN => {
let head = u64::from_le_bytes([
bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
]);
let stored = u32::from_le_bytes([bytes[8], bytes[9], bytes[10], bytes[11]]);
if crc32c::crc32c(&bytes[..8]) == stored {
Ok(Some(head))
} else {
Ok(None) }
}
Ok(_) => Ok(None),
Err(error) if error.kind() == io::ErrorKind::NotFound => Ok(None),
Err(error) => Err(WalError::io("reading the head marker", error)),
}
}
impl WalStore for SegmentedStore {
fn write_at(&self, offset: u64, bytes: &[u8]) -> Result<()> {
let mut pos = offset;
let mut remaining = bytes;
while !remaining.is_empty() {
let index = pos / self.segment_size;
let local = pos % self.segment_size;
let room = (self.segment_size - local) as usize;
let take = remaining.len().min(room);
let file = self.segment_for_write(index)?;
pwrite_all(&file, local, &remaining[..take])
.map_err(|e| WalError::io("writing a record", e))?;
pos += take as u64;
remaining = &remaining[take..];
}
let end = offset.saturating_add(bytes.len() as u64);
let _ = self.max_written.fetch_max(end, Ordering::Relaxed);
Ok(())
}
fn read_at(&self, offset: u64, buf: &mut [u8]) -> Result<usize> {
let mut pos = offset;
let mut filled = 0;
while filled < buf.len() {
let index = pos / self.segment_size;
let local = pos % self.segment_size;
let room = (self.segment_size - local) as usize;
let want = (buf.len() - filled).min(room);
let Some(file) = self.segment_for_read(index)? else {
break; };
let got = pread_fill(&file, local, &mut buf[filled..filled + want])
.map_err(|e| WalError::io("reading from the log", e))?;
filled += got;
pos += got as u64;
if got < want {
break; }
}
Ok(filled)
}
fn truncate(&self, len: u64) -> Result<()> {
let last_index = len / self.segment_size;
let last_local = len % self.segment_size;
let entries =
fs::read_dir(&self.dir).map_err(|e| WalError::io("reading the log directory", e))?;
for entry in entries {
let entry = entry.map_err(|e| WalError::io("reading the log directory", e))?;
let Some(index) = parse_segment_name(&entry.file_name()) else {
continue;
};
match index.cmp(&last_index) {
std::cmp::Ordering::Greater => {
fs::remove_file(entry.path())
.map_err(|e| WalError::io("removing a truncated segment", e))?;
let _ = self.write_map().remove(&index);
}
std::cmp::Ordering::Equal => {
let file = self.segment_for_write(index)?;
file.set_len(last_local)
.map_err(|e| WalError::io("truncating a log segment", e))?;
}
std::cmp::Ordering::Less => {}
}
}
self.max_written.store(len, Ordering::Relaxed);
self.synced_from.store(last_index, Ordering::Relaxed);
Ok(())
}
fn sync(&self) -> Result<()> {
let written = self.max_written.load(Ordering::Acquire);
if written == 0 {
return Ok(());
}
let active = (written - 1) / self.segment_size;
let from = self.synced_from.load(Ordering::Acquire);
for index in from..=active {
if let Some(file) = self.open_segment(index) {
durable_sync(&file).map_err(|e| WalError::io("flushing to stable storage", e))?;
}
}
self.synced_from.store(active, Ordering::Release);
Ok(())
}
fn len(&self) -> Result<u64> {
Ok(self.max_written.load(Ordering::Acquire))
}
fn head(&self) -> Result<u64> {
Ok(self.head.load(Ordering::Acquire))
}
fn truncate_before(&self, offset: u64) -> Result<u64> {
let written = self.max_written.load(Ordering::Acquire);
let prev = self.head.load(Ordering::Acquire);
let new_head = offset.clamp(prev, written);
self.write_head_file(new_head)?;
let last_segment = written.saturating_sub(1) / self.segment_size;
let keep_from = (new_head / self.segment_size).min(last_segment);
let entries =
fs::read_dir(&self.dir).map_err(|e| WalError::io("reading the log directory", e))?;
for entry in entries {
let entry = entry.map_err(|e| WalError::io("reading the log directory", e))?;
let Some(index) = parse_segment_name(&entry.file_name()) else {
continue;
};
if index < keep_from {
let _ = self.write_map().remove(&index);
fs::remove_file(entry.path())
.map_err(|e| WalError::io("removing a dropped segment", e))?;
}
}
self.head.store(new_head, Ordering::Release);
Ok(new_head)
}
}
fn segment_name(index: u64) -> String {
format!("{index:0NAME_DIGITS$}.{NAME_EXT}")
}
fn parse_segment_name(name: &OsStr) -> Option<u64> {
let name = name.to_str()?;
let stem = name.strip_suffix(&format!(".{NAME_EXT}"))?;
if stem.len() != NAME_DIGITS || !stem.bytes().all(|b| b.is_ascii_digit()) {
return None;
}
stem.parse().ok()
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod tests {
use super::*;
#[test]
fn test_segment_name_roundtrips() {
assert_eq!(segment_name(0), "00000000000000000000.wal");
assert_eq!(segment_name(42), "00000000000000000042.wal");
assert_eq!(
parse_segment_name(OsStr::new("00000000000000000042.wal")),
Some(42)
);
assert_eq!(parse_segment_name(OsStr::new("README.md")), None);
assert_eq!(parse_segment_name(OsStr::new("42.wal")), None);
assert_eq!(
parse_segment_name(OsStr::new("0000000000000000004x.wal")),
None
);
}
#[test]
fn test_write_read_within_one_segment() {
let dir = tempfile::tempdir().unwrap();
let store = SegmentedStore::open(dir.path(), 64).unwrap();
store.write_at(0, b"hello").unwrap();
store.sync().unwrap();
let mut buf = [0u8; 5];
assert_eq!(store.read_at(0, &mut buf).unwrap(), 5);
assert_eq!(&buf, b"hello");
assert_eq!(store.len().unwrap(), 5);
}
#[test]
fn test_write_spans_segment_boundary() {
let dir = tempfile::tempdir().unwrap();
let store = SegmentedStore::open(dir.path(), 8).unwrap();
store.write_at(0, b"ABCDEFGHIJKL").unwrap(); store.sync().unwrap();
assert!(dir.path().join("00000000000000000000.wal").exists());
assert!(dir.path().join("00000000000000000001.wal").exists());
let mut buf = [0u8; 12];
assert_eq!(store.read_at(0, &mut buf).unwrap(), 12);
assert_eq!(&buf, b"ABCDEFGHIJKL");
}
#[test]
fn test_read_at_arbitrary_offset_across_boundary() {
let dir = tempfile::tempdir().unwrap();
let store = SegmentedStore::open(dir.path(), 4).unwrap();
store.write_at(0, b"0123456789").unwrap();
let mut buf = [0u8; 5];
let n = store.read_at(3, &mut buf).unwrap(); assert_eq!(n, 5);
assert_eq!(&buf, b"34567");
}
#[test]
fn test_reopen_reports_correct_length() {
let dir = tempfile::tempdir().unwrap();
{
let store = SegmentedStore::open(dir.path(), 8).unwrap();
store.write_at(0, b"ABCDEFGHIJKLM").unwrap(); store.sync().unwrap();
assert_eq!(store.len().unwrap(), 13);
}
let store = SegmentedStore::open(dir.path(), 8).unwrap();
assert_eq!(store.len().unwrap(), 13);
let mut buf = [0u8; 13];
assert_eq!(store.read_at(0, &mut buf).unwrap(), 13);
assert_eq!(&buf, b"ABCDEFGHIJKLM");
}
#[test]
fn test_truncate_removes_later_segments() {
let dir = tempfile::tempdir().unwrap();
let store = SegmentedStore::open(dir.path(), 8).unwrap();
store.write_at(0, &[0xAB; 30]).unwrap(); store.sync().unwrap();
assert!(dir.path().join("00000000000000000003.wal").exists());
store.truncate(10).unwrap(); assert_eq!(store.len().unwrap(), 10);
assert!(dir.path().join("00000000000000000001.wal").exists());
assert!(!dir.path().join("00000000000000000002.wal").exists());
assert!(!dir.path().join("00000000000000000003.wal").exists());
let mut buf = [0u8; 16];
assert_eq!(store.read_at(0, &mut buf).unwrap(), 10);
}
#[test]
fn test_read_past_end_is_short() {
let dir = tempfile::tempdir().unwrap();
let store = SegmentedStore::open(dir.path(), 8).unwrap();
store.write_at(0, b"abc").unwrap();
let mut buf = [0u8; 16];
assert_eq!(store.read_at(0, &mut buf).unwrap(), 3);
assert_eq!(store.read_at(100, &mut buf).unwrap(), 0);
}
}