pub mod pipelined;
pub mod reader;
pub mod record;
use std::fs::{File, OpenOptions};
use std::io::{self, Read, Write};
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use thiserror::Error;
#[cfg(target_os = "macos")]
use std::os::unix::io::AsRawFd;
pub use pipelined::{PipelineConfig, PipelinedWAL};
pub use reader::WALReader;
pub use record::{BatchOp, Record};
const MAGIC: u32 = 0x574C_4F47;
const VERSION: u32 = 0x0000_0002;
const HEADER_SIZE: u64 = 8;
#[derive(Debug, Error)]
pub enum WALError {
#[error("IO error: {0}")]
Io(#[from] io::Error),
#[error("Record error: {0}")]
Record(#[from] record::RecordError),
#[error("Invalid WAL format: bad magic or version")]
InvalidFormat,
}
pub type Result<T> = std::result::Result<T, WALError>;
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum SyncPolicy {
SyncAll,
SyncData,
Barrier,
None,
}
fn sync_file(file: &File, policy: SyncPolicy) -> io::Result<()> {
match policy {
SyncPolicy::SyncAll => file.sync_all(),
SyncPolicy::SyncData => file.sync_data(),
SyncPolicy::Barrier => barrier_sync(file),
SyncPolicy::None => Ok(()),
}
}
#[cfg(target_os = "macos")]
fn barrier_sync(file: &File) -> io::Result<()> {
const F_BARRIERFSYNC: libc::c_int = 85;
let ret = unsafe { libc::fcntl(file.as_raw_fd(), F_BARRIERFSYNC) };
if ret == -1 {
Err(io::Error::last_os_error())
} else {
Ok(())
}
}
#[cfg(not(target_os = "macos"))]
fn barrier_sync(file: &File) -> io::Result<()> {
file.sync_data()
}
#[derive(Debug, Clone, Copy, PartialEq, Default)]
pub enum RecoveryMode {
Strict,
#[default]
BestEffort,
}
#[derive(Debug, Clone, Copy)]
pub struct BatchConfig {
pub max_batch_size: usize,
pub max_batch_timeout: Duration,
}
impl Default for BatchConfig {
fn default() -> Self {
Self {
max_batch_size: 32 * 1024 * 1024, max_batch_timeout: Duration::from_millis(10), }
}
}
pub struct WAL {
file: Arc<Mutex<File>>,
path: PathBuf,
offset: u64,
sync_policy: SyncPolicy,
batch: Vec<Record>,
batch_size_bytes: usize,
batch_config: BatchConfig,
last_flush: Instant,
}
impl WAL {
pub fn create(path: impl AsRef<Path>, sync_policy: SyncPolicy) -> Result<Self> {
Self::create_with_batch_config(path, sync_policy, BatchConfig::default())
}
pub fn create_with_batch_config(
path: impl AsRef<Path>,
sync_policy: SyncPolicy,
batch_config: BatchConfig,
) -> Result<Self> {
let path = path.as_ref().to_path_buf();
let mut file = OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(&path)?;
file.write_all(&MAGIC.to_le_bytes())?;
file.write_all(&VERSION.to_le_bytes())?;
file.sync_all()?;
Ok(Self {
file: Arc::new(Mutex::new(file)),
path,
offset: HEADER_SIZE,
sync_policy,
batch: Vec::new(),
batch_size_bytes: 0,
batch_config,
last_flush: Instant::now(),
})
}
pub fn open(path: impl AsRef<Path>, sync_policy: SyncPolicy) -> Result<Self> {
Self::open_with_batch_config(path, sync_policy, BatchConfig::default())
}
pub fn open_with_batch_config(
path: impl AsRef<Path>,
sync_policy: SyncPolicy,
batch_config: BatchConfig,
) -> Result<Self> {
let path = path.as_ref().to_path_buf();
let mut file = OpenOptions::new().read(true).write(true).open(&path)?;
let mut header = [0u8; 8];
file.read_exact(&mut header)?;
let magic = u32::from_le_bytes([header[0], header[1], header[2], header[3]]);
let version = u32::from_le_bytes([header[4], header[5], header[6], header[7]]);
if magic != MAGIC || version != VERSION {
return Err(WALError::InvalidFormat);
}
let offset = file.metadata()?.len();
Ok(Self {
file: Arc::new(Mutex::new(file)),
path,
offset,
sync_policy,
batch: Vec::new(),
batch_size_bytes: 0,
batch_config,
last_flush: Instant::now(),
})
}
pub fn write(&mut self, record: &Record) -> Result<u64> {
let encoded_size = record.encoded_len();
let record_offset = self.offset + self.batch_size_bytes as u64;
self.batch.push(record.clone());
self.batch_size_bytes += encoded_size;
let should_flush = self.batch_size_bytes >= self.batch_config.max_batch_size
|| self.last_flush.elapsed() >= self.batch_config.max_batch_timeout;
if should_flush {
self.flush_batch()?;
}
Ok(record_offset)
}
pub fn flush_batch(&mut self) -> Result<()> {
if self.batch.is_empty() {
return Ok(());
}
let records = std::mem::take(&mut self.batch);
self.write_batch(&records)?;
self.batch_size_bytes = 0;
self.last_flush = Instant::now();
Ok(())
}
pub fn write_batch(&mut self, records: &[Record]) -> Result<Vec<u64>> {
let mut offsets = Vec::with_capacity(records.len());
{
let mut file = self.file.lock().expect("WAL file mutex poisoned");
let mut batch_buffer = Vec::new();
for record in records {
let encoded = record.encode();
offsets.push(self.offset);
let len = encoded.len() as u32;
let len_bytes = len.to_be_bytes();
let crc = crc32c::crc32c_append(crc32c::crc32c(&len_bytes), &encoded);
batch_buffer.extend_from_slice(&crc.to_le_bytes());
batch_buffer.extend_from_slice(&len_bytes);
batch_buffer.extend_from_slice(&encoded);
self.offset += (4 + 4 + encoded.len()) as u64; }
if !batch_buffer.is_empty() {
file.write_all(&batch_buffer)?;
}
sync_file(&file, self.sync_policy)?;
crate::fail_point!("wal::after_sync");
}
Ok(offsets)
}
#[must_use]
pub const fn offset(&self) -> u64 {
self.offset
}
#[must_use]
pub fn path(&self) -> &Path {
&self.path
}
pub fn sync(&self) -> Result<()> {
let file = self.file.lock().expect("WAL file mutex poisoned");
file.sync_all()?;
Ok(())
}
pub fn clear(&mut self) -> Result<()> {
self.flush_batch()?;
let mut file = self.file.lock().expect("WAL file mutex poisoned");
file.set_len(HEADER_SIZE)?;
use std::io::Seek;
file.seek(std::io::SeekFrom::Start(HEADER_SIZE))?;
file.sync_all()?;
self.offset = HEADER_SIZE;
Ok(())
}
}
impl Drop for WAL {
fn drop(&mut self) {
let _ = self.flush_batch();
}
}
#[cfg(test)]
mod tests {
use super::*;
use bytes::Bytes;
use tempfile::tempdir;
#[test]
fn test_wal_create_and_write() {
let dir = tempdir().unwrap();
let wal_path = dir.path().join("test.wal");
let mut wal = WAL::create(&wal_path, SyncPolicy::SyncAll).unwrap();
let record = Record::Put {
key: Bytes::from("key1"),
value: Bytes::from("value1"),
seq: 1,
};
let offset = wal.write(&record).unwrap();
assert_eq!(offset, HEADER_SIZE);
assert!(wal_path.exists());
}
#[test]
fn test_wal_write_batch() {
let dir = tempdir().unwrap();
let wal_path = dir.path().join("test.wal");
let mut wal = WAL::create(&wal_path, SyncPolicy::SyncData).unwrap();
let records = vec![
Record::Put {
key: Bytes::from("key1"),
value: Bytes::from("value1"),
seq: 1,
},
Record::Put {
key: Bytes::from("key2"),
value: Bytes::from("value2"),
seq: 2,
},
Record::Delete {
key: Bytes::from("key1"),
seq: 3,
},
];
let offsets = wal.write_batch(&records).unwrap();
assert_eq!(offsets.len(), 3);
assert_eq!(offsets[0], HEADER_SIZE); }
#[test]
fn test_wal_reopen() {
let dir = tempdir().unwrap();
let wal_path = dir.path().join("test.wal");
{
let mut wal = WAL::create(&wal_path, SyncPolicy::SyncAll).unwrap();
let record = Record::Put {
key: Bytes::from("key1"),
value: Bytes::from("value1"),
seq: 1,
};
wal.write(&record).unwrap();
}
let wal = WAL::open(&wal_path, SyncPolicy::SyncAll).unwrap();
assert!(wal.offset() > 0);
}
}