use crate::config::{WalConfig, WalSyncPolicy};
use crate::error::{Error, Result};
use crate::storage::{Entry, ValuePointer};
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use std::collections::VecDeque;
use std::fs::{File, OpenOptions};
use std::io::{BufWriter, Write};
use std::path::PathBuf;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::time::{self, Duration};
use tracing::{debug, error, info, warn};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum WalRecord {
Put {
key: Vec<u8>,
value: Vec<u8>,
sequence: u64,
timestamp: u64,
},
PutPointer {
key: Vec<u8>,
value_pointer: ValuePointer,
sequence: u64,
timestamp: u64,
},
Delete {
key: Vec<u8>,
sequence: u64,
timestamp: u64,
},
Batch {
operations: Vec<WalRecord>,
sequence: u64,
timestamp: u64,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WalHeader {
pub magic: [u8; 8],
pub version: u32,
pub created_at: u64,
pub checksum: u32,
}
impl WalHeader {
const MAGIC: [u8; 8] = [0x41, 0x55, 0x52, 0x41, 0x44, 0x42, 0x57, 0x41]; const VERSION: u32 = 1;
pub fn new() -> Self {
let created_at = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
Self {
magic: Self::MAGIC,
version: Self::VERSION,
created_at,
checksum: 0, }
}
pub fn calculate_checksum(&self) -> u32 {
use crc32fast::Hasher;
let mut hasher = Hasher::new();
hasher.update(&self.magic);
hasher.update(&self.version.to_le_bytes());
hasher.update(&self.created_at.to_le_bytes());
hasher.finalize()
}
pub fn validate(&self) -> bool {
self.magic == Self::MAGIC
&& self.version == Self::VERSION
&& self.checksum == self.calculate_checksum()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WalFileMeta {
pub path: PathBuf,
pub size: u64,
pub first_sequence: u64,
pub last_sequence: u64,
pub created_at: u64,
pub closed: bool,
}
pub struct WalWriter {
current_file: Option<WalFile>,
config: WalConfig,
sequence: AtomicU64,
wal_dir: PathBuf,
async_sender: Option<mpsc::UnboundedSender<AsyncWriteRequest>>,
background_handle: Option<tokio::task::JoinHandle<()>>,
}
impl WalWriter {
pub fn new(config: WalConfig) -> Result<Self> {
let wal_dir = config.wal_path.clone();
std::fs::create_dir_all(&wal_dir)?;
let mut writer = Self {
current_file: None,
config,
sequence: AtomicU64::new(0),
wal_dir,
async_sender: None,
background_handle: None,
};
if writer.config.async_writes {
writer.start_async_writer()?;
}
writer.rotate_file()?;
Ok(writer)
}
fn start_async_writer(&mut self) -> Result<()> {
let (tx, mut rx) = mpsc::unbounded_channel();
self.async_sender = Some(tx);
let wal_dir = self.wal_dir.clone();
let config = self.config.clone();
let handle = tokio::spawn(async move {
let mut current_file = None;
let mut write_buffer = Vec::new();
while let Some(request) = rx.recv().await {
match request {
AsyncWriteRequest::Write(record) => {
write_buffer.push(record);
if write_buffer.len() >= 1000 {
if let Err(e) = Self::flush_records(&mut current_file, &wal_dir, &config, &mut write_buffer).await {
error!("Failed to flush WAL records: {}", e);
}
}
}
AsyncWriteRequest::Sync => {
if let Err(e) = Self::flush_records(&mut current_file, &wal_dir, &config, &mut write_buffer).await {
error!("Failed to sync WAL records: {}", e);
}
}
AsyncWriteRequest::Shutdown => break,
}
}
});
self.background_handle = Some(handle);
Ok(())
}
async fn flush_records(
current_file: &mut Option<WalFile>,
wal_dir: &PathBuf,
config: &WalConfig,
records: &mut Vec<WalRecord>,
) -> Result<()> {
if records.is_empty() {
return Ok(());
}
if current_file.is_none() {
*current_file = Some(WalFile::new(wal_dir, config)?);
}
let file = current_file.as_mut().unwrap();
for record in records.drain(..) {
file.write_record(&record)?;
}
match config.sync_policy {
WalSyncPolicy::EveryWrite => file.sync()?,
WalSyncPolicy::EveryNWrites(n) if file.record_count() % n == 0 => file.sync()?,
WalSyncPolicy::EveryNMs(ms) => {
time::sleep(Duration::from_millis(ms)).await;
file.sync()?;
}
_ => {}
}
Ok(())
}
pub fn write_record(&mut self, record: &WalRecord) -> Result<u64> {
let sequence = self.sequence.fetch_add(1, Ordering::SeqCst);
if self.config.async_writes {
if let Some(sender) = &self.async_sender {
let _ = sender.send(AsyncWriteRequest::Write(record.clone()));
}
} else {
self.ensure_current_file()?;
self.current_file.as_mut().unwrap().write_record(record)?;
match self.config.sync_policy {
WalSyncPolicy::EveryWrite => self.sync()?,
WalSyncPolicy::EveryNWrites(n) if sequence % n == 0 => self.sync()?,
_ => {}
}
}
Ok(sequence)
}
pub fn write_batch(&mut self, entries: &[Entry]) -> Result<u64> {
let sequence = self.sequence.fetch_add(1, Ordering::SeqCst);
let records: Vec<WalRecord> = entries
.iter()
.map(|entry| {
if let Some(value) = &entry.value {
WalRecord::Put {
key: entry.key.data.clone(),
value: value.data.clone(),
sequence: entry.sequence,
timestamp: entry.timestamp,
}
} else if let Some(vptr) = &entry.value_pointer {
WalRecord::PutPointer {
key: entry.key.data.clone(),
value_pointer: vptr.clone(),
sequence: entry.sequence,
timestamp: entry.timestamp,
}
} else {
WalRecord::Delete {
key: entry.key.data.clone(),
sequence: entry.sequence,
timestamp: entry.timestamp,
}
}
})
.collect();
let batch_record = WalRecord::Batch {
operations: records,
sequence,
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64,
};
self.write_record(&batch_record)?;
Ok(sequence)
}
fn ensure_current_file(&mut self) -> Result<()> {
if self.current_file.is_none() || self.should_rotate() {
self.rotate_file()?;
}
Ok(())
}
fn should_rotate(&self) -> bool {
if let Some(file) = &self.current_file {
file.size() >= self.config.max_file_size
} else {
true
}
}
fn rotate_file(&mut self) -> Result<()> {
if let Some(mut file) = self.current_file.take() {
file.close()?;
}
let file = WalFile::new(&self.wal_dir, &self.config)?;
self.current_file = Some(file);
info!("Rotated to new WAL file");
Ok(())
}
pub fn sync(&mut self) -> Result<()> {
if let Some(file) = &mut self.current_file {
file.sync()?;
}
Ok(())
}
pub fn current_sequence(&self) -> u64 {
self.sequence.load(Ordering::SeqCst)
}
pub fn close(&mut self) -> Result<()> {
if let Some(sender) = &self.async_sender {
let _ = sender.send(AsyncWriteRequest::Shutdown);
}
if let Some(handle) = self.background_handle.take() {
let _ = std::panic::catch_unwind(|| {
});
}
if let Some(mut file) = self.current_file.take() {
file.close()?;
}
Ok(())
}
}
#[derive(Debug, Clone)]
pub enum AsyncWriteRequest {
Write(WalRecord),
Sync,
Shutdown,
}
struct WalFile {
file: BufWriter<File>,
meta: WalFileMeta,
record_count: u64,
}
impl WalFile {
fn new(wal_dir: &PathBuf, config: &WalConfig) -> Result<Self> {
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
let filename = format!("wal_{:016x}.log", timestamp);
let path = wal_dir.join(filename);
let file = OpenOptions::new()
.create(true)
.write(true)
.append(true)
.open(&path)?;
let mut buf_writer = BufWriter::with_capacity(config.buffer_size, file);
let header = WalHeader::new();
let header_bytes = bincode::serialize(&header)?;
buf_writer.write_all(&header_bytes)?;
buf_writer.flush()?;
let meta = WalFileMeta {
path: path.clone(),
size: header_bytes.len() as u64,
first_sequence: 0,
last_sequence: 0,
created_at: timestamp,
closed: false,
};
Ok(Self {
file: buf_writer,
meta,
record_count: 0,
})
}
fn write_record(&mut self, record: &WalRecord) -> Result<()> {
let record_bytes = bincode::serialize(record)?;
let record_len = record_bytes.len() as u32;
self.file.write_all(&record_len.to_le_bytes())?;
self.file.write_all(&record_bytes)?;
self.meta.size += 4 + record_bytes.len() as u64;
self.record_count += 1;
Ok(())
}
fn sync(&mut self) -> Result<()> {
self.file.flush()?;
self.file.get_ref().sync_all()?;
Ok(())
}
fn close(&mut self) -> Result<()> {
self.sync()?;
self.meta.closed = true;
Ok(())
}
fn size(&self) -> u64 {
self.meta.size
}
fn record_count(&self) -> u64 {
self.record_count
}
}
pub struct WalReader {
wal_dir: PathBuf,
current_file: Option<WalFileReader>,
files: VecDeque<PathBuf>,
}
impl WalReader {
pub fn new(wal_dir: PathBuf) -> Result<Self> {
let mut files: Vec<PathBuf> = std::fs::read_dir(&wal_dir)?
.filter_map(|entry| entry.ok())
.filter(|entry| {
entry.path().extension().map_or(false, |ext| ext == "log")
})
.map(|entry| entry.path())
.collect();
files.sort();
Ok(Self {
wal_dir,
current_file: None,
files: files.into(),
})
}
pub fn read_next(&mut self) -> Result<Option<WalRecord>> {
loop {
if self.current_file.is_none() {
if let Some(file_path) = self.files.pop_front() {
self.current_file = Some(WalFileReader::new(file_path)?);
} else {
return Ok(None); }
}
if let Some(file) = &mut self.current_file {
match file.read_record()? {
Some(record) => return Ok(Some(record)),
None => {
self.current_file = None;
continue;
}
}
}
}
}
}
struct WalFileReader {
file: std::io::BufReader<File>,
path: PathBuf,
}
impl WalFileReader {
fn new(path: PathBuf) -> Result<Self> {
let file = OpenOptions::new().read(true).open(&path)?;
let reader = std::io::BufReader::new(file);
Ok(Self { file: reader, path })
}
fn read_record(&mut self) -> Result<Option<WalRecord>> {
let mut len_bytes = [0u8; 4];
if self.file.read_exact(&mut len_bytes).is_err() {
return Ok(None); }
let record_len = u32::from_le_bytes(len_bytes) as usize;
let mut record_bytes = vec![0u8; record_len];
self.file.read_exact(&mut record_bytes)?;
let record: WalRecord = bincode::deserialize(&record_bytes)?;
Ok(Some(record))
}
}
impl Drop for WalWriter {
fn drop(&mut self) {
let _ = self.close();
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[test]
fn test_wal_writer_creation() {
let temp_dir = tempdir().unwrap();
let config = WalConfig {
wal_path: temp_dir.path().to_path_buf(),
..Default::default()
};
let writer = WalWriter::new(config);
assert!(writer.is_ok());
}
#[test]
fn test_wal_record_serialization() {
let record = WalRecord::Put {
key: b"test_key".to_vec(),
value: b"test_value".to_vec(),
sequence: 1,
timestamp: 1234567890,
};
let serialized = bincode::serialize(&record).unwrap();
let deserialized: WalRecord = bincode::deserialize(&serialized).unwrap();
match deserialized {
WalRecord::Put { key, value, sequence, timestamp } => {
assert_eq!(key, b"test_key");
assert_eq!(value, b"test_value");
assert_eq!(sequence, 1);
assert_eq!(timestamp, 1234567890);
}
_ => panic!("Unexpected record type"),
}
}
}