#![cfg(feature = "write-support")]
use cqlite_core::schema::{Column, KeyColumn, TableSchema};
use cqlite_core::storage::write_engine::{
CellOperation, Mutation, PartitionKey, TableId, WriteEngine, WriteEngineConfig,
};
use cqlite_core::types::Value;
use std::collections::HashMap;
use tempfile::TempDir;
fn create_test_schema() -> TableSchema {
TableSchema {
keyspace: "test_ks".to_string(),
table: "test_table".to_string(),
partition_keys: vec![KeyColumn {
name: "id".to_string(),
data_type: "int".to_string(),
position: 0,
}],
clustering_keys: vec![],
columns: vec![
Column {
name: "id".to_string(),
data_type: "int".to_string(),
nullable: false,
default: None,
is_static: false,
},
Column {
name: "name".to_string(),
data_type: "text".to_string(),
nullable: true,
default: None,
is_static: false,
},
],
comments: HashMap::new(),
}
}
fn create_mutation(id: i32, name: &str, timestamp: i64) -> Mutation {
let table_id = TableId::new("test_ks", "test_table");
let pk = PartitionKey::single("id", Value::Integer(id));
let ops = vec![CellOperation::Write {
column: "name".to_string(),
value: Value::Text(name.to_string()),
}];
Mutation::new(table_id, pk, None, ops, timestamp, None)
}
fn parse_wal_entries(bytes: &[u8]) -> Vec<(usize, u32)> {
let mut entries = Vec::new();
let mut offset = 0usize;
while offset + 8 <= bytes.len() {
let entry_length = u32::from_le_bytes([
bytes[offset],
bytes[offset + 1],
bytes[offset + 2],
bytes[offset + 3],
]);
if entry_length > 16 * 1024 * 1024 {
break;
}
entries.push((offset, entry_length));
offset += 8 + entry_length as usize;
}
entries
}
#[tokio::test]
async fn test_wal_truncated_mid_entry() -> cqlite_core::error::Result<()> {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let wal_path = temp_dir.path().join("wal").join("commitlog.wal");
{
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema.clone(),
);
let mut engine = WriteEngine::new(config)?;
for i in 0..3 {
engine
.write_async(create_mutation(
i,
&format!("User{}", i),
1_000_000 + i as i64,
))
.await?;
}
assert_eq!(engine.memtable_row_count(), 3);
}
let mut wal_bytes = std::fs::read(&wal_path).unwrap();
assert!(wal_bytes.len() > 10, "WAL unexpectedly small");
let new_len = wal_bytes.len() - 10;
wal_bytes.truncate(new_len);
std::fs::write(&wal_path, &wal_bytes).unwrap();
{
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
);
let engine = WriteEngine::new(config)?;
assert_eq!(
engine.memtable_row_count(),
2,
"expected 2 recovered entries after truncation"
);
}
Ok(())
}
#[tokio::test]
async fn test_wal_corrupted_crc_skips_entry() -> cqlite_core::error::Result<()> {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let wal_path = temp_dir.path().join("wal").join("commitlog.wal");
{
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema.clone(),
);
let mut engine = WriteEngine::new(config)?;
for i in 0..3 {
engine
.write_async(create_mutation(
i,
&format!("User{}", i),
1_000_000 + i as i64,
))
.await?;
}
assert_eq!(engine.memtable_row_count(), 3);
}
let mut wal_bytes = std::fs::read(&wal_path).unwrap();
let entries = parse_wal_entries(&wal_bytes);
assert_eq!(entries.len(), 3, "expected 3 WAL entries before corruption");
let (entry2_offset, _) = entries[1];
let crc_offset = entry2_offset + 4; wal_bytes[crc_offset] ^= 0xFF; std::fs::write(&wal_path, &wal_bytes).unwrap();
{
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
);
let engine = WriteEngine::new(config)?;
assert_eq!(
engine.memtable_row_count(),
2,
"expected 2 recovered entries after CRC corruption of entry #2"
);
}
Ok(())
}
#[tokio::test]
async fn test_wal_corrupted_mutation_bytes() -> cqlite_core::error::Result<()> {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let wal_path = temp_dir.path().join("wal").join("commitlog.wal");
{
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema.clone(),
);
let mut engine = WriteEngine::new(config)?;
for i in 0..3 {
engine
.write_async(create_mutation(
i,
&format!("User{}", i),
1_000_000 + i as i64,
))
.await?;
}
assert_eq!(engine.memtable_row_count(), 3);
}
let mut wal_bytes = std::fs::read(&wal_path).unwrap();
let entries = parse_wal_entries(&wal_bytes);
assert_eq!(entries.len(), 3, "expected 3 WAL entries before corruption");
let (entry2_offset, entry2_len) = entries[1];
assert!(entry2_len > 0, "entry #2 payload is empty");
let payload_byte_offset = entry2_offset + 8; wal_bytes[payload_byte_offset] ^= 0xFF;
std::fs::write(&wal_path, &wal_bytes).unwrap();
{
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
);
let engine = WriteEngine::new(config)?;
assert_eq!(
engine.memtable_row_count(),
2,
"expected 2 recovered entries after payload corruption of entry #2"
);
}
Ok(())
}
#[tokio::test]
async fn test_wal_completely_corrupted() -> cqlite_core::error::Result<()> {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let wal_path = temp_dir.path().join("wal").join("commitlog.wal");
{
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema.clone(),
);
let mut engine = WriteEngine::new(config)?;
for i in 0..3 {
engine
.write_async(create_mutation(
i,
&format!("User{}", i),
1_000_000 + i as i64,
))
.await?;
}
}
std::fs::write(&wal_path, vec![0xAA_u8; 100]).unwrap();
{
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
);
let engine = WriteEngine::new(config)?;
assert_eq!(
engine.memtable_row_count(),
0,
"expected 0 recovered entries from completely corrupted WAL"
);
}
Ok(())
}
#[tokio::test]
async fn test_wal_empty_file() -> cqlite_core::error::Result<()> {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let wal_path = temp_dir.path().join("wal").join("commitlog.wal");
{
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema.clone(),
);
let mut engine = WriteEngine::new(config)?;
for i in 0..3 {
engine
.write_async(create_mutation(
i,
&format!("User{}", i),
1_000_000 + i as i64,
))
.await?;
}
}
std::fs::write(&wal_path, b"").unwrap();
{
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
);
let engine = WriteEngine::new(config)?;
assert_eq!(
engine.memtable_row_count(),
0,
"expected 0 recovered entries from empty WAL"
);
}
Ok(())
}
#[tokio::test]
async fn test_wal_truncated_header_only() -> cqlite_core::error::Result<()> {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let wal_path = temp_dir.path().join("wal").join("commitlog.wal");
{
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema.clone(),
);
let mut engine = WriteEngine::new(config)?;
for i in 0..3 {
engine
.write_async(create_mutation(
i,
&format!("User{}", i),
1_000_000 + i as i64,
))
.await?;
}
}
std::fs::write(&wal_path, [0x10_u8, 0x00, 0x00, 0x00]).unwrap();
{
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
);
let engine = WriteEngine::new(config)?;
assert_eq!(
engine.memtable_row_count(),
0,
"expected 0 recovered entries from incomplete header WAL"
);
}
Ok(())
}
#[tokio::test]
async fn test_flush_empty_memtable() -> cqlite_core::error::Result<()> {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
);
let mut engine = WriteEngine::new(config)?;
let result = engine.flush().await?;
assert!(
result.is_none(),
"flush of empty memtable should return None"
);
Ok(())
}
#[tokio::test]
async fn test_write_after_close() -> cqlite_core::error::Result<()> {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
);
let mut engine = WriteEngine::new(config)?;
engine.close().await?;
let mutation = create_mutation(1, "Alice", 1_000_000);
let result = engine.write_async(mutation).await;
assert!(result.is_err(), "write after close should return an error");
Ok(())
}
#[cfg(unix)]
#[tokio::test]
async fn test_readonly_data_dir_flush_fails() -> cqlite_core::error::Result<()> {
use std::os::unix::fs::PermissionsExt;
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let data_dir = temp_dir.path().join("data");
let wal_dir = temp_dir.path().join("wal");
let config = WriteEngineConfig::new(data_dir.clone(), wal_dir, schema);
let mut engine = WriteEngine::new(config)?;
engine
.write_async(create_mutation(1, "Alice", 1_000_000))
.await?;
assert_eq!(engine.memtable_row_count(), 1);
std::fs::set_permissions(&data_dir, std::fs::Permissions::from_mode(0o444)).unwrap();
let probe = data_dir.join(".permission-probe");
if std::fs::create_dir(&probe).is_ok() {
let _ = std::fs::remove_dir(&probe);
std::fs::set_permissions(&data_dir, std::fs::Permissions::from_mode(0o755)).unwrap();
return Ok(());
}
let flush_result = engine.flush().await;
std::fs::set_permissions(&data_dir, std::fs::Permissions::from_mode(0o755)).unwrap();
assert!(
flush_result.is_err(),
"flush to a read-only data directory should fail"
);
Ok(())
}
#[cfg(unix)]
#[tokio::test]
async fn test_readonly_wal_dir_write_fails() -> cqlite_core::error::Result<()> {
use std::os::unix::fs::PermissionsExt;
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let wal_dir = temp_dir.path().join("wal");
let config = WriteEngineConfig::new(temp_dir.path().join("data"), wal_dir.clone(), schema);
let mut engine = WriteEngine::new(config)?;
std::fs::set_permissions(&wal_dir, std::fs::Permissions::from_mode(0o444)).unwrap();
let mutation = create_mutation(1, "Alice", 1_000_000);
let write_result = engine.write_async(mutation).await;
std::fs::set_permissions(&wal_dir, std::fs::Permissions::from_mode(0o755)).unwrap();
match write_result {
Ok(_) => {
assert!(engine.memtable_row_count() > 0);
}
Err(e) => {
let msg = format!("{}", e);
assert!(
msg.contains("WAL")
|| msg.contains("sync")
|| msg.contains("permission")
|| msg.contains("Permission")
|| msg.contains("Storage"),
"Unexpected error message: {}",
msg
);
}
}
Ok(())
}