use crate::{PicanteError, PicanteResult};
use facet::Facet;
use std::fs::{File, OpenOptions};
use std::io::{BufReader, BufWriter, Read, Write};
use std::path::{Path, PathBuf};
use std::sync::Arc;
const WAL_FORMAT_VERSION: u32 = 1;
const WAL_MAGIC: &[u8; 8] = b"PICANTE\0";
#[derive(Debug, Clone, Facet)]
pub struct WalHeader {
pub format_version: u32,
pub base_revision: u64,
}
#[derive(Debug, Clone, Facet)]
pub struct WalEntry {
pub revision: u64,
pub kind_id: u32,
pub operation: WalOperation,
}
#[repr(u8)]
#[derive(Debug, Clone, Facet)]
pub enum WalOperation {
Set {
key: Vec<u8>,
value: Vec<u8>,
},
Delete {
key: Vec<u8>,
},
}
pub struct WalWriter {
path: PathBuf,
writer: BufWriter<File>,
base_revision: u64,
entries_since_flush: usize,
pub auto_flush_threshold: usize,
}
impl WalWriter {
pub const DEFAULT_AUTO_FLUSH_THRESHOLD: usize = 100;
pub fn create(path: impl AsRef<Path>, base_revision: u64) -> PicanteResult<Self> {
Self::create_with_threshold(path, base_revision, Self::DEFAULT_AUTO_FLUSH_THRESHOLD)
}
pub fn create_with_threshold(
path: impl AsRef<Path>,
base_revision: u64,
auto_flush_threshold: usize,
) -> PicanteResult<Self> {
let path = path.as_ref().to_path_buf();
let file = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(&path)
.map_err(|e| {
Arc::new(PicanteError::Cache {
message: format!("Failed to create WAL file at {}: {}", path.display(), e),
})
})?;
let mut writer = BufWriter::new(file);
writer.write_all(WAL_MAGIC).map_err(|e| {
Arc::new(PicanteError::Cache {
message: format!("Failed to write WAL magic bytes: {}", e),
})
})?;
let header = WalHeader {
format_version: WAL_FORMAT_VERSION,
base_revision,
};
let header_bytes = facet_postcard::to_vec(&header).map_err(|e| {
Arc::new(PicanteError::Encode {
what: "WAL header",
message: format!("{}", e),
})
})?;
let header_len = header_bytes.len() as u32;
writer.write_all(&header_len.to_le_bytes()).map_err(|e| {
Arc::new(PicanteError::Cache {
message: format!("Failed to write WAL header length: {}", e),
})
})?;
writer.write_all(&header_bytes).map_err(|e| {
Arc::new(PicanteError::Cache {
message: format!("Failed to write WAL header: {}", e),
})
})?;
writer.flush().map_err(|e| {
Arc::new(PicanteError::Cache {
message: format!("Failed to flush WAL header: {}", e),
})
})?;
Ok(Self {
path,
writer,
base_revision,
entries_since_flush: 0,
auto_flush_threshold,
})
}
pub fn append(&mut self, entry: WalEntry) -> PicanteResult<()> {
let entry_bytes = facet_postcard::to_vec(&entry).map_err(|e| {
Arc::new(PicanteError::Encode {
what: "WAL entry",
message: format!("{}", e),
})
})?;
let entry_len = entry_bytes.len() as u32;
self.writer
.write_all(&entry_len.to_le_bytes())
.map_err(|e| {
Arc::new(PicanteError::Cache {
message: format!("Failed to write WAL entry length: {}", e),
})
})?;
self.writer.write_all(&entry_bytes).map_err(|e| {
Arc::new(PicanteError::Cache {
message: format!("Failed to write WAL entry: {}", e),
})
})?;
self.entries_since_flush += 1;
if self.entries_since_flush >= self.auto_flush_threshold {
self.flush()?;
}
Ok(())
}
pub fn flush(&mut self) -> PicanteResult<()> {
self.writer.flush().map_err(|e| {
Arc::new(PicanteError::Cache {
message: format!("Failed to flush WAL: {}", e),
})
})?;
self.entries_since_flush = 0;
Ok(())
}
pub fn base_revision(&self) -> u64 {
self.base_revision
}
pub fn path(&self) -> &Path {
&self.path
}
}
impl Drop for WalWriter {
fn drop(&mut self) {
if let Err(e) = self.flush() {
tracing::warn!(
path = %self.path.display(),
error = %e,
"Failed to flush WAL on drop - unflushed entries may be lost"
);
}
}
}
#[derive(Debug)]
pub struct WalReader {
#[allow(dead_code)] path: PathBuf,
reader: BufReader<File>,
header: WalHeader,
}
impl WalReader {
pub fn open(path: impl AsRef<Path>) -> PicanteResult<Self> {
let path = path.as_ref().to_path_buf();
let file = File::open(&path).map_err(|e| {
Arc::new(PicanteError::Cache {
message: format!("Failed to open WAL file at {}: {}", path.display(), e),
})
})?;
let mut reader = BufReader::new(file);
let mut magic = [0u8; 8];
reader.read_exact(&mut magic).map_err(|e| {
Arc::new(PicanteError::Cache {
message: format!("Failed to read WAL magic bytes: {}", e),
})
})?;
if &magic != WAL_MAGIC {
return Err(Arc::new(PicanteError::Cache {
message: format!(
"Invalid WAL magic bytes (expected {:?}, got {:?})",
WAL_MAGIC, magic
),
}));
}
let mut header_len_bytes = [0u8; 4];
reader.read_exact(&mut header_len_bytes).map_err(|e| {
Arc::new(PicanteError::Cache {
message: format!("Failed to read WAL header length: {}", e),
})
})?;
let header_len = u32::from_le_bytes(header_len_bytes) as usize;
const MAX_HEADER_LEN: usize = 1_000_000;
if header_len > MAX_HEADER_LEN {
return Err(Arc::new(PicanteError::Cache {
message: format!(
"WAL header length ({} bytes) exceeds maximum allowed ({} bytes) - file may be corrupted",
header_len, MAX_HEADER_LEN
),
}));
}
let mut header_bytes = vec![0u8; header_len];
reader.read_exact(&mut header_bytes).map_err(|e| {
Arc::new(PicanteError::Cache {
message: format!("Failed to read WAL header: {}", e),
})
})?;
let header: WalHeader = facet_postcard::from_slice(&header_bytes).map_err(|e| {
Arc::new(PicanteError::Decode {
what: "WAL header",
message: format!("{}", e),
})
})?;
if header.format_version != WAL_FORMAT_VERSION {
return Err(Arc::new(PicanteError::Cache {
message: format!(
"Unsupported WAL format version (expected {}, got {})",
WAL_FORMAT_VERSION, header.format_version
),
}));
}
Ok(Self {
path,
reader,
header,
})
}
pub fn header(&self) -> &WalHeader {
&self.header
}
pub fn next_entry(&mut self) -> PicanteResult<Option<WalEntry>> {
let mut entry_len_bytes = [0u8; 4];
match self.reader.read_exact(&mut entry_len_bytes) {
Ok(_) => {}
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
return Ok(None);
}
Err(e) => {
return Err(Arc::new(PicanteError::Cache {
message: format!("Failed to read WAL entry length: {}", e),
}));
}
}
let entry_len = u32::from_le_bytes(entry_len_bytes) as usize;
const MAX_ENTRY_LEN: usize = 100_000_000;
if entry_len > MAX_ENTRY_LEN {
return Err(Arc::new(PicanteError::Cache {
message: format!(
"WAL entry length ({} bytes) exceeds maximum allowed ({} bytes) - file may be corrupted",
entry_len, MAX_ENTRY_LEN
),
}));
}
let mut entry_bytes = vec![0u8; entry_len];
self.reader.read_exact(&mut entry_bytes).map_err(|e| {
Arc::new(PicanteError::Cache {
message: format!("Failed to read WAL entry: {}", e),
})
})?;
let entry: WalEntry = facet_postcard::from_slice(&entry_bytes).map_err(|e| {
Arc::new(PicanteError::Decode {
what: "WAL entry",
message: format!("{}", e),
})
})?;
Ok(Some(entry))
}
pub fn entries(&mut self) -> WalEntryIterator<'_> {
WalEntryIterator { reader: self }
}
}
pub struct WalEntryIterator<'a> {
reader: &'a mut WalReader,
}
impl<'a> Iterator for WalEntryIterator<'a> {
type Item = PicanteResult<WalEntry>;
fn next(&mut self) -> Option<Self::Item> {
match self.reader.next_entry() {
Ok(Some(entry)) => Some(Ok(entry)),
Ok(None) => None,
Err(e) => Some(Err(e)),
}
}
}