use crate::error::{Error, Result};
use crate::storage::write_engine::mutation::Mutation;
use crc32fast::Hasher;
use std::fs::{File, OpenOptions};
use std::io::{BufWriter, Read, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
#[cfg(unix)]
fn sync_directory(dir: &Path) -> Result<()> {
let dir_file = File::open(dir)
.map_err(|e| Error::Storage(format!("Failed to open directory for sync: {}", e)))?;
dir_file
.sync_all()
.map_err(|e| Error::Storage(format!("Failed to sync directory: {}", e)))?;
Ok(())
}
#[cfg(not(unix))]
fn sync_directory(_dir: &Path) -> Result<()> {
Ok(())
}
fn validate_wal_directory(dir: &Path) -> Result<PathBuf> {
if !dir.exists() {
return Err(Error::InvalidPath(format!(
"WAL directory does not exist: {:?}",
dir
)));
}
if !dir.is_dir() {
return Err(Error::InvalidPath(format!(
"WAL path is not a directory: {:?}",
dir
)));
}
let canonical = dir
.canonicalize()
.map_err(|e| Error::InvalidPath(format!("Failed to canonicalize WAL directory: {}", e)))?;
let path_str = canonical.to_string_lossy();
if path_str.chars().any(|c| c.is_control()) {
return Err(Error::InvalidPath(
"WAL directory path contains control characters".to_string(),
));
}
Ok(canonical)
}
#[cfg(unix)]
fn set_secure_permissions(file: &File) -> Result<()> {
use std::os::unix::fs::PermissionsExt;
let mut perms = file
.metadata()
.map_err(|e| Error::Storage(format!("Failed to read file metadata: {}", e)))?
.permissions();
perms.set_mode(0o600);
file.set_permissions(perms)
.map_err(|e| Error::Storage(format!("Failed to set file permissions: {}", e)))?;
Ok(())
}
#[cfg(not(unix))]
fn set_secure_permissions(_file: &File) -> Result<()> {
Ok(())
}
#[derive(Debug)]
pub struct WriteAheadLog {
file: BufWriter<File>,
path: PathBuf,
#[allow(dead_code)]
buffer_size: usize,
current_size: u64,
}
impl WriteAheadLog {
pub const DEFAULT_BUFFER_SIZE: usize = 4096;
pub const WAL_FILENAME: &'static str = "commitlog.wal";
pub fn create(dir: &Path) -> Result<Self> {
Self::create_with_buffer_size(dir, Self::DEFAULT_BUFFER_SIZE)
}
pub fn create_with_buffer_size(dir: &Path, buffer_size: usize) -> Result<Self> {
let validated_dir = validate_wal_directory(dir)?;
let path = validated_dir.join(Self::WAL_FILENAME);
let file = OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(&path)
.map_err(|e| Error::Storage(format!("Failed to create WAL at {:?}: {}", path, e)))?;
set_secure_permissions(&file)?;
sync_directory(&validated_dir)?;
Ok(Self {
file: BufWriter::with_capacity(buffer_size, file),
path,
buffer_size,
current_size: 0,
})
}
pub fn open_existing(path: &Path) -> Result<Self> {
let file = OpenOptions::new()
.read(true)
.append(true)
.open(path)
.map_err(|e| Error::Storage(format!("Failed to open WAL at {:?}: {}", path, e)))?;
let metadata = file
.metadata()
.map_err(|e| Error::Storage(format!("Failed to read WAL metadata: {}", e)))?;
let current_size = metadata.len();
Ok(Self {
file: BufWriter::with_capacity(Self::DEFAULT_BUFFER_SIZE, file),
path: path.to_path_buf(),
buffer_size: Self::DEFAULT_BUFFER_SIZE,
current_size,
})
}
pub fn append(&mut self, mutation: &Mutation) -> Result<()> {
let mutation_bytes = bincode::serialize(mutation)
.map_err(|e| Error::Storage(format!("Failed to serialize mutation: {}", e)))?;
let entry_length = mutation_bytes.len() as u32;
let mut hasher = Hasher::new();
hasher.update(&mutation_bytes);
let crc32 = hasher.finalize();
self.file
.write_all(&entry_length.to_le_bytes())
.map_err(|e| Error::Storage(format!("Failed to write entry length: {}", e)))?;
self.file
.write_all(&crc32.to_le_bytes())
.map_err(|e| Error::Storage(format!("Failed to write CRC32: {}", e)))?;
self.file
.write_all(&mutation_bytes)
.map_err(|e| Error::Storage(format!("Failed to write mutation bytes: {}", e)))?;
self.current_size += 8 + entry_length as u64;
Ok(())
}
pub fn sync(&mut self) -> Result<()> {
self.file
.flush()
.map_err(|e| Error::Storage(format!("Failed to flush WAL buffer: {}", e)))?;
self.file
.get_ref()
.sync_all()
.map_err(|e| Error::Storage(format!("Failed to sync WAL to disk: {}", e)))?;
Ok(())
}
pub fn replay(&self) -> Result<Vec<Mutation>> {
let mut file = File::open(&self.path)
.map_err(|e| Error::Storage(format!("Failed to open WAL for replay: {}", e)))?;
let mut mutations = Vec::new();
let mut offset = 0u64;
loop {
let mut header = [0u8; 8];
match file.read_exact(&mut header) {
Ok(_) => {}
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
break;
}
Err(e) => {
return Err(Error::Storage(format!(
"Failed to read WAL header at offset {}: {}",
offset, e
)));
}
}
let entry_length = u32::from_le_bytes([header[0], header[1], header[2], header[3]]);
let expected_crc = u32::from_le_bytes([header[4], header[5], header[6], header[7]]);
if entry_length > 16 * 1024 * 1024 {
log::warn!(
"WAL entry at offset {} has unreasonable length {} - stopping replay",
offset,
entry_length
);
break;
}
let mut mutation_bytes = vec![0u8; entry_length as usize];
match file.read_exact(&mut mutation_bytes) {
Ok(_) => {}
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
log::warn!(
"WAL entry at offset {} is truncated (expected {} bytes) - stopping replay",
offset,
entry_length
);
break;
}
Err(e) => {
return Err(Error::Storage(format!(
"Failed to read WAL entry at offset {}: {}",
offset, e
)));
}
}
let mut hasher = Hasher::new();
hasher.update(&mutation_bytes);
let actual_crc = hasher.finalize();
if actual_crc != expected_crc {
log::warn!(
"WAL entry at offset {} has CRC mismatch (expected 0x{:08x}, got 0x{:08x}) - skipping",
offset,
expected_crc,
actual_crc
);
offset += 8 + entry_length as u64;
continue;
}
match bincode::deserialize::<Mutation>(&mutation_bytes) {
Ok(mutation) => {
mutations.push(mutation);
}
Err(e) => {
log::warn!(
"WAL entry at offset {} failed to deserialize: {} - skipping",
offset,
e
);
}
}
offset += 8 + entry_length as u64;
}
Ok(mutations)
}
pub fn truncate(&mut self) -> Result<()> {
self.file
.flush()
.map_err(|e| Error::Storage(format!("Failed to flush before truncate: {}", e)))?;
self.file
.get_mut()
.set_len(0)
.map_err(|e| Error::Storage(format!("Failed to truncate WAL: {}", e)))?;
self.file
.get_ref()
.sync_all()
.map_err(|e| Error::Storage(format!("Failed to sync after truncate: {}", e)))?;
self.file
.get_mut()
.seek(SeekFrom::Start(0))
.map_err(|e| Error::Storage(format!("Failed to seek after truncate: {}", e)))?;
self.current_size = 0;
Ok(())
}
pub fn size(&self) -> u64 {
self.current_size
}
pub fn path(&self) -> &Path {
&self.path
}
pub fn rotate(mut self, dir: &Path) -> Result<Self> {
self.sync()?;
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
let old_path = self.path.clone();
let archived_path = dir.join(format!("commitlog.wal.{}", timestamp));
drop(self.file);
std::fs::rename(&old_path, &archived_path)
.map_err(|e| Error::Storage(format!("Failed to rename WAL during rotation: {}", e)))?;
sync_directory(dir)?;
Self::create(dir)
}
pub fn delete_old(path: &Path) -> Result<()> {
std::fs::remove_file(path)
.map_err(|e| Error::Storage(format!("Failed to delete old WAL: {}", e)))?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::write_engine::mutation::{
CellOperation, ClusteringKey, Mutation, PartitionKey, TableId,
};
use crate::types::Value;
use tempfile::TempDir;
fn create_test_mutation(id: i32, name: &str) -> Mutation {
let table_id = TableId::new("test_ks", "test_table");
let pk = PartitionKey::single("id", Value::Integer(id));
let ops = vec![CellOperation::Write {
column: "name".to_string(),
value: Value::Text(name.to_string()),
}];
Mutation::new(table_id, pk, None, ops, 1234567890, None)
}
#[test]
fn test_wal_create() {
let temp_dir = TempDir::new().unwrap();
let wal = WriteAheadLog::create(temp_dir.path()).unwrap();
assert_eq!(wal.size(), 0);
assert!(wal.path().exists());
}
#[test]
fn test_wal_append_and_sync() {
let temp_dir = TempDir::new().unwrap();
let mut wal = WriteAheadLog::create(temp_dir.path()).unwrap();
let mutation = create_test_mutation(1, "Alice");
wal.append(&mutation).unwrap();
assert!(wal.size() > 0);
wal.sync().unwrap();
}
#[test]
fn test_wal_replay_empty() {
let temp_dir = TempDir::new().unwrap();
let wal = WriteAheadLog::create(temp_dir.path()).unwrap();
let mutations = wal.replay().unwrap();
assert_eq!(mutations.len(), 0);
}
#[test]
fn test_wal_replay_single_entry() {
let temp_dir = TempDir::new().unwrap();
let mut wal = WriteAheadLog::create(temp_dir.path()).unwrap();
let mutation = create_test_mutation(1, "Alice");
wal.append(&mutation).unwrap();
wal.sync().unwrap();
let mutations = wal.replay().unwrap();
assert_eq!(mutations.len(), 1);
assert_eq!(mutations[0].table.keyspace, "test_ks");
assert_eq!(mutations[0].table.table, "test_table");
}
#[test]
fn test_wal_replay_multiple_entries() {
let temp_dir = TempDir::new().unwrap();
let mut wal = WriteAheadLog::create(temp_dir.path()).unwrap();
for i in 0..10 {
let mutation = create_test_mutation(i, &format!("User{}", i));
wal.append(&mutation).unwrap();
}
wal.sync().unwrap();
let mutations = wal.replay().unwrap();
assert_eq!(mutations.len(), 10);
for (i, mutation) in mutations.iter().enumerate() {
assert_eq!(mutation.table.keyspace, "test_ks");
match &mutation.operations[0] {
CellOperation::Write { column, value } => {
assert_eq!(column, "name");
if let Value::Text(name) = value {
assert_eq!(name, &format!("User{}", i));
} else {
panic!("Expected Text value");
}
}
_ => panic!("Expected Write operation"),
}
}
}
#[test]
fn test_wal_truncate() {
let temp_dir = TempDir::new().unwrap();
let mut wal = WriteAheadLog::create(temp_dir.path()).unwrap();
let mutation = create_test_mutation(1, "Alice");
wal.append(&mutation).unwrap();
wal.sync().unwrap();
assert!(wal.size() > 0);
wal.truncate().unwrap();
assert_eq!(wal.size(), 0);
let mutations = wal.replay().unwrap();
assert_eq!(mutations.len(), 0);
}
#[test]
fn test_wal_crc_corruption() {
let temp_dir = TempDir::new().unwrap();
let mut wal = WriteAheadLog::create(temp_dir.path()).unwrap();
let mutation = create_test_mutation(1, "Alice");
wal.append(&mutation).unwrap();
wal.sync().unwrap();
let wal_path = wal.path().to_path_buf();
drop(wal);
let mut file = OpenOptions::new().write(true).open(&wal_path).unwrap();
file.seek(SeekFrom::Start(4)).unwrap();
file.write_all(&[0xFF, 0xFF, 0xFF, 0xFF]).unwrap();
file.sync_all().unwrap();
drop(file);
let wal = WriteAheadLog::open_existing(&wal_path).unwrap();
let mutations = wal.replay().unwrap();
assert_eq!(mutations.len(), 0);
}
#[test]
fn test_wal_truncated_entry() {
let temp_dir = TempDir::new().unwrap();
let mut wal = WriteAheadLog::create(temp_dir.path()).unwrap();
let mutation = create_test_mutation(1, "Alice");
wal.append(&mutation).unwrap();
wal.sync().unwrap();
let wal_path = wal.path().to_path_buf();
let original_size = wal.size();
drop(wal);
let file = OpenOptions::new().write(true).open(&wal_path).unwrap();
file.set_len(original_size - 10).unwrap();
drop(file);
let wal = WriteAheadLog::open_existing(&wal_path).unwrap();
let mutations = wal.replay().unwrap();
assert_eq!(mutations.len(), 0);
}
#[test]
fn test_wal_rotate() {
let temp_dir = TempDir::new().unwrap();
let mut wal = WriteAheadLog::create(temp_dir.path()).unwrap();
let mutation = create_test_mutation(1, "Alice");
wal.append(&mutation).unwrap();
wal.sync().unwrap();
let wal = wal.rotate(temp_dir.path()).unwrap();
assert_eq!(wal.size(), 0);
let archived_files: Vec<_> = std::fs::read_dir(temp_dir.path())
.unwrap()
.filter_map(|e| e.ok())
.filter(|e| {
e.file_name()
.to_string_lossy()
.starts_with("commitlog.wal.")
})
.collect();
assert_eq!(archived_files.len(), 1);
}
#[test]
fn test_wal_delete_old() {
let temp_dir = TempDir::new().unwrap();
let wal_path = temp_dir.path().join("test.wal");
File::create(&wal_path).unwrap();
assert!(wal_path.exists());
WriteAheadLog::delete_old(&wal_path).unwrap();
assert!(!wal_path.exists());
}
#[test]
fn test_wal_open_existing() {
let temp_dir = TempDir::new().unwrap();
let mut wal = WriteAheadLog::create(temp_dir.path()).unwrap();
let mutation1 = create_test_mutation(1, "Alice");
wal.append(&mutation1).unwrap();
wal.sync().unwrap();
let wal_path = wal.path().to_path_buf();
drop(wal);
let mut wal = WriteAheadLog::open_existing(&wal_path).unwrap();
let mutation2 = create_test_mutation(2, "Bob");
wal.append(&mutation2).unwrap();
wal.sync().unwrap();
let mutations = wal.replay().unwrap();
assert_eq!(mutations.len(), 2);
}
#[test]
fn test_wal_with_clustering_key() {
let temp_dir = TempDir::new().unwrap();
let mut wal = WriteAheadLog::create(temp_dir.path()).unwrap();
let table_id = TableId::new("test_ks", "test_table");
let pk = PartitionKey::single("id", Value::Integer(1));
let ck = Some(ClusteringKey::single("ts", Value::Timestamp(1000)));
let ops = vec![CellOperation::Write {
column: "value".to_string(),
value: Value::Text("test".to_string()),
}];
let mutation = Mutation::new(table_id, pk, ck, ops, 1234567890, None);
wal.append(&mutation).unwrap();
wal.sync().unwrap();
let mutations = wal.replay().unwrap();
assert_eq!(mutations.len(), 1);
assert!(mutations[0].clustering_key.is_some());
}
#[test]
fn test_wal_with_ttl() {
let temp_dir = TempDir::new().unwrap();
let mut wal = WriteAheadLog::create(temp_dir.path()).unwrap();
let table_id = TableId::new("test_ks", "test_table");
let pk = PartitionKey::single("id", Value::Integer(1));
let ops = vec![CellOperation::Write {
column: "value".to_string(),
value: Value::Text("test".to_string()),
}];
let mutation = Mutation::new(table_id, pk, None, ops, 1234567890, Some(3600));
wal.append(&mutation).unwrap();
wal.sync().unwrap();
let mutations = wal.replay().unwrap();
assert_eq!(mutations.len(), 1);
assert_eq!(mutations[0].ttl_seconds, Some(3600));
}
#[test]
fn test_wal_delete_operation() {
let temp_dir = TempDir::new().unwrap();
let mut wal = WriteAheadLog::create(temp_dir.path()).unwrap();
let table_id = TableId::new("test_ks", "test_table");
let pk = PartitionKey::single("id", Value::Integer(1));
let ops = vec![CellOperation::Delete {
column: "name".to_string(),
}];
let mutation = Mutation::new(table_id, pk, None, ops, 1234567890, None);
wal.append(&mutation).unwrap();
wal.sync().unwrap();
let mutations = wal.replay().unwrap();
assert_eq!(mutations.len(), 1);
assert!(matches!(
&mutations[0].operations[0],
CellOperation::Delete { .. }
));
}
#[test]
fn test_wal_delete_row_operation() {
let temp_dir = TempDir::new().unwrap();
let mut wal = WriteAheadLog::create(temp_dir.path()).unwrap();
let table_id = TableId::new("test_ks", "test_table");
let pk = PartitionKey::single("id", Value::Integer(1));
let ops = vec![CellOperation::DeleteRow];
let mutation = Mutation::new(table_id, pk, None, ops, 1234567890, None);
wal.append(&mutation).unwrap();
wal.sync().unwrap();
let mutations = wal.replay().unwrap();
assert_eq!(mutations.len(), 1);
assert!(matches!(
&mutations[0].operations[0],
CellOperation::DeleteRow
));
}
#[test]
fn test_wal_buffer_size() {
let temp_dir = TempDir::new().unwrap();
let wal = WriteAheadLog::create_with_buffer_size(temp_dir.path(), 8192).unwrap();
assert_eq!(wal.buffer_size, 8192);
}
#[test]
fn test_wal_directory_sync_on_create() {
let temp_dir = TempDir::new().unwrap();
let wal = WriteAheadLog::create(temp_dir.path()).unwrap();
assert!(wal.path().exists());
}
#[test]
fn test_wal_directory_sync_on_rotate() {
let temp_dir = TempDir::new().unwrap();
let mut wal = WriteAheadLog::create(temp_dir.path()).unwrap();
let mutation = create_test_mutation(1, "Alice");
wal.append(&mutation).unwrap();
wal.sync().unwrap();
let new_wal = wal.rotate(temp_dir.path()).unwrap();
assert!(new_wal.path().exists());
let archived_files: Vec<_> = std::fs::read_dir(temp_dir.path())
.unwrap()
.filter_map(|e| e.ok())
.filter(|e| {
e.file_name()
.to_string_lossy()
.starts_with("commitlog.wal.")
})
.collect();
assert_eq!(archived_files.len(), 1);
}
#[test]
fn test_wal_fsync_after_truncate() {
let temp_dir = TempDir::new().unwrap();
let mut wal = WriteAheadLog::create(temp_dir.path()).unwrap();
let mutation = create_test_mutation(1, "Alice");
wal.append(&mutation).unwrap();
wal.sync().unwrap();
let size_before = wal.size();
assert!(size_before > 0);
wal.truncate().unwrap();
assert_eq!(wal.size(), 0);
let metadata = std::fs::metadata(wal.path()).unwrap();
assert_eq!(metadata.len(), 0);
}
#[test]
fn test_validate_wal_directory_nonexistent() {
let nonexistent = PathBuf::from("/nonexistent/path/that/does/not/exist");
let result = validate_wal_directory(&nonexistent);
assert!(result.is_err());
match result {
Err(Error::InvalidPath(_)) => {}
_ => panic!("Expected InvalidPath error"),
}
}
#[test]
fn test_validate_wal_directory_is_file() {
let temp_dir = TempDir::new().unwrap();
let file_path = temp_dir.path().join("not_a_dir");
File::create(&file_path).unwrap();
let result = validate_wal_directory(&file_path);
assert!(result.is_err());
match result {
Err(Error::InvalidPath(_)) => {}
_ => panic!("Expected InvalidPath error"),
}
}
#[test]
fn test_validate_wal_directory_valid() {
let temp_dir = TempDir::new().unwrap();
let result = validate_wal_directory(temp_dir.path());
assert!(result.is_ok());
let canonical = result.unwrap();
assert!(canonical.is_absolute());
}
#[test]
#[cfg(unix)]
fn test_wal_file_permissions() {
use std::os::unix::fs::PermissionsExt;
let temp_dir = TempDir::new().unwrap();
let wal = WriteAheadLog::create(temp_dir.path()).unwrap();
let metadata = std::fs::metadata(wal.path()).unwrap();
let permissions = metadata.permissions();
let mode = permissions.mode();
assert_eq!(mode & 0o777, 0o600);
}
#[test]
fn test_wal_create_validates_directory() {
let temp_dir = TempDir::new().unwrap();
let result = WriteAheadLog::create(temp_dir.path());
assert!(result.is_ok());
let nonexistent = temp_dir.path().join("nonexistent");
let result = WriteAheadLog::create(&nonexistent);
assert!(result.is_err());
}
#[test]
fn test_sync_directory_invalid_path() {
let invalid_path = PathBuf::from("/nonexistent/path");
let result = sync_directory(&invalid_path);
assert!(result.is_err());
match result {
Err(Error::Storage(_)) => {}
_ => panic!("Expected Storage error"),
}
}
}