#![cfg(feature = "write-support")]
use cqlite_core::error::Result;
use cqlite_core::schema::{ClusteringColumn, ClusteringOrder, Column, KeyColumn, TableSchema};
use cqlite_core::storage::write_engine::{
CellOperation, ClusteringKey, Mutation, PartitionKey, TableId, WriteEngine, WriteEngineConfig,
};
use cqlite_core::types::Value;
use std::collections::HashMap;
use tempfile::TempDir;
fn create_test_schema() -> TableSchema {
TableSchema {
keyspace: "test_ks".to_string(),
table: "users".to_string(),
partition_keys: vec![KeyColumn {
name: "id".to_string(),
data_type: "int".to_string(),
position: 0,
}],
clustering_keys: vec![],
columns: vec![
Column {
name: "id".to_string(),
data_type: "int".to_string(),
nullable: false,
default: None,
is_static: false,
},
Column {
name: "name".to_string(),
data_type: "text".to_string(),
nullable: true,
default: None,
is_static: false,
},
Column {
name: "age".to_string(),
data_type: "int".to_string(),
nullable: true,
default: None,
is_static: false,
},
],
comments: HashMap::new(),
}
}
fn create_mutation(id: i32, name: &str, age: i32, timestamp: i64) -> Mutation {
let table_id = TableId::new("test_ks", "users");
let pk = PartitionKey::single("id", Value::Integer(id));
let ops = vec![
CellOperation::Write {
column: "name".to_string(),
value: Value::Text(name.to_string()),
},
CellOperation::Write {
column: "age".to_string(),
value: Value::Integer(age),
},
];
Mutation::new(table_id, pk, None, ops, timestamp, None)
}
#[tokio::test]
async fn test_write_engine_end_to_end() -> Result<()> {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
);
let mut engine = WriteEngine::new(config)?;
for i in 0..10 {
let mutation = create_mutation(i, &format!("User{}", i), 20 + i, 1000000 + i as i64);
engine.write_async(mutation).await?;
}
assert_eq!(engine.memtable_row_count(), 10);
assert!(engine.memtable_size() > 0);
assert!(engine.wal_size() > 0);
let info = engine.flush().await?;
assert!(info.is_some());
let info = info.unwrap();
assert_eq!(info.partition_count, 10);
assert!(info.data_size > 0);
assert!(info.data_path.exists());
assert!(info.index_path.exists());
assert!(info.filter_path.exists());
assert!(info.summary_path.exists());
assert!(info.stats_path.exists());
assert!(info.compression_info_path.is_none());
assert!(info.toc_path.exists());
assert!(info.digest_path.exists());
assert_eq!(engine.memtable_row_count(), 0);
assert_eq!(engine.memtable_size(), 0);
assert_eq!(engine.wal_size(), 0);
Ok(())
}
#[tokio::test]
async fn test_write_engine_wal_recovery_integration() -> Result<()> {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
{
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema.clone(),
);
let mut engine = WriteEngine::new(config)?;
for i in 0..5 {
let mutation = create_mutation(i, &format!("User{}", i), 20 + i, 1000000 + i as i64);
engine.write_async(mutation).await?;
}
assert_eq!(engine.memtable_row_count(), 5);
}
{
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
);
let engine = WriteEngine::new(config)?;
assert_eq!(engine.memtable_row_count(), 5);
assert!(engine.memtable_size() > 0);
}
Ok(())
}
#[tokio::test]
async fn test_write_engine_multiple_flushes() -> Result<()> {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
);
let mut engine = WriteEngine::new(config)?;
for i in 0..5 {
let mutation = create_mutation(i, &format!("User{}", i), 20 + i, 1000000 + i as i64);
engine.write_async(mutation).await?;
}
let info1 = engine.flush().await?.unwrap();
assert_eq!(info1.partition_count, 5);
assert_eq!(engine.generation(), 2);
for i in 5..10 {
let mutation = create_mutation(i, &format!("User{}", i), 20 + i, 1000000 + i as i64);
engine.write_async(mutation).await?;
}
let info2 = engine.flush().await?.unwrap();
assert_eq!(info2.partition_count, 5);
assert_eq!(engine.generation(), 3);
assert!(info1.data_path.exists());
assert!(info2.data_path.exists());
assert!(info1.data_path.to_string_lossy().contains("nb-1-big"));
assert!(info2.data_path.to_string_lossy().contains("nb-2-big"));
Ok(())
}
#[tokio::test]
async fn test_write_engine_close_flushes_data() -> Result<()> {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
);
let mut engine = WriteEngine::new(config)?;
for i in 0..5 {
let mutation = create_mutation(i, &format!("User{}", i), 20 + i, 1000000 + i as i64);
engine.write_async(mutation).await?;
}
engine.close().await?;
let data_dir = temp_dir.path().join("data");
assert!(data_dir.exists(), "Data directory should exist after close");
fn find_toc(dir: &std::path::Path) -> bool {
if let Ok(entries) = std::fs::read_dir(dir) {
for entry in entries.flatten() {
let name = entry.file_name().to_string_lossy().to_string();
if name.contains("TOC.txt") {
return true;
}
if entry.path().is_dir() && find_toc(&entry.path()) {
return true;
}
}
}
false
}
assert!(find_toc(&data_dir), "TOC.txt should exist");
Ok(())
}
#[tokio::test]
async fn test_write_engine_with_ttl() -> Result<()> {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
);
let mut engine = WriteEngine::new(config)?;
let table_id = TableId::new("test_ks", "users");
let pk = PartitionKey::single("id", Value::Integer(1));
let ops = vec![CellOperation::Write {
column: "name".to_string(),
value: Value::Text("Alice".to_string()),
}];
let mutation = Mutation::new(table_id, pk, None, ops, 1000000, Some(3600));
engine.write_async(mutation).await?;
let info = engine.flush().await?.unwrap();
assert_eq!(info.partition_count, 1);
Ok(())
}
#[tokio::test]
async fn test_write_engine_delete_operations() -> Result<()> {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
);
let mut engine = WriteEngine::new(config)?;
let mutation1 = create_mutation(1, "Alice", 30, 1000000);
engine.write_async(mutation1).await?;
let table_id = TableId::new("test_ks", "users");
let pk = PartitionKey::single("id", Value::Integer(2));
let ops = vec![CellOperation::Delete {
column: "name".to_string(),
}];
let mutation2 = Mutation::new(table_id.clone(), pk.clone(), None, ops, 1001000, None);
engine.write_async(mutation2).await?;
let pk2 = PartitionKey::single("id", Value::Integer(3));
let ops = vec![CellOperation::DeleteRow];
let mutation3 = Mutation::new(table_id, pk2, None, ops, 1002000, None);
engine.write_async(mutation3).await?;
let info = engine.flush().await?.unwrap();
assert_eq!(info.partition_count, 3);
Ok(())
}
#[tokio::test]
async fn test_write_engine_generation_persistence() -> Result<()> {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
{
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema.clone(),
);
let mut engine = WriteEngine::new(config)?;
assert_eq!(engine.generation(), 1);
let mutation = create_mutation(1, "Alice", 30, 1000000);
engine.write_async(mutation).await?;
engine.flush().await?;
assert_eq!(engine.generation(), 2);
}
{
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
);
let engine = WriteEngine::new(config)?;
assert_eq!(engine.generation(), 2);
}
Ok(())
}
#[test]
fn test_write_engine_custom_flush_threshold() -> Result<()> {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
)
.with_flush_threshold(128 * 1024 * 1024);
let mut engine = WriteEngine::new(config)?;
for i in 0..100 {
let mutation = create_mutation(i, &format!("User{}", i), 20 + i, 1000000 + i as i64);
engine.write(mutation)?;
}
assert_eq!(engine.generation(), 1);
assert_eq!(engine.memtable_row_count(), 100);
Ok(())
}
#[tokio::test]
async fn test_write_engine_toc_last() -> Result<()> {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
);
let mut engine = WriteEngine::new(config)?;
let mutation = create_mutation(1, "Alice", 30, 1000000);
engine.write_async(mutation).await?;
let info = engine.flush().await?.unwrap();
assert!(info.toc_path.exists());
let toc_contents = std::fs::read_to_string(&info.toc_path).unwrap();
assert!(toc_contents.contains("Data.db"));
assert!(toc_contents.contains("Index.db"));
assert!(toc_contents.contains("Filter.db"));
assert!(toc_contents.contains("Summary.db"));
assert!(toc_contents.contains("Statistics.db"));
assert!(!toc_contents.contains("CompressionInfo.db"));
assert!(toc_contents.contains("Digest.crc32"));
assert!(toc_contents.contains("TOC.txt"));
Ok(())
}
fn create_comprehensive_schema() -> TableSchema {
TableSchema {
keyspace: "test_roundtrip".to_string(),
table: "comprehensive_types".to_string(),
partition_keys: vec![KeyColumn {
name: "pk".to_string(),
data_type: "int".to_string(),
position: 0,
}],
clustering_keys: vec![ClusteringColumn {
name: "ck".to_string(),
data_type: "text".to_string(),
position: 0,
order: ClusteringOrder::Asc,
}],
columns: vec![
Column {
name: "pk".to_string(),
data_type: "int".to_string(),
nullable: false,
default: None,
is_static: false,
},
Column {
name: "ck".to_string(),
data_type: "text".to_string(),
nullable: false,
default: None,
is_static: false,
},
Column {
name: "text_col".to_string(),
data_type: "text".to_string(),
nullable: true,
default: None,
is_static: false,
},
Column {
name: "int_col".to_string(),
data_type: "int".to_string(),
nullable: true,
default: None,
is_static: false,
},
Column {
name: "bigint_col".to_string(),
data_type: "bigint".to_string(),
nullable: true,
default: None,
is_static: false,
},
Column {
name: "boolean_col".to_string(),
data_type: "boolean".to_string(),
nullable: true,
default: None,
is_static: false,
},
Column {
name: "timestamp_col".to_string(),
data_type: "timestamp".to_string(),
nullable: true,
default: None,
is_static: false,
},
Column {
name: "uuid_col".to_string(),
data_type: "uuid".to_string(),
nullable: true,
default: None,
is_static: false,
},
],
comments: HashMap::new(),
}
}
fn create_comprehensive_mutation(pk: i32, ck: &str, timestamp: i64) -> Mutation {
let table_id = TableId::new("test_roundtrip", "comprehensive_types");
let partition_key = PartitionKey::single("pk", Value::Integer(pk));
let clustering_key = Some(ClusteringKey::single("ck", Value::Text(ck.to_string())));
let ops = vec![
CellOperation::Write {
column: "text_col".to_string(),
value: Value::Text(format!("Text for {}-{}", pk, ck)),
},
CellOperation::Write {
column: "int_col".to_string(),
value: Value::Integer(pk * 100),
},
CellOperation::Write {
column: "bigint_col".to_string(),
value: Value::BigInt((pk as i64) * 1000000),
},
CellOperation::Write {
column: "boolean_col".to_string(),
value: Value::Boolean(pk % 2 == 0),
},
CellOperation::Write {
column: "timestamp_col".to_string(),
value: Value::Timestamp(timestamp),
},
CellOperation::Write {
column: "uuid_col".to_string(),
value: Value::Uuid(*uuid::Uuid::new_v4().as_bytes()),
},
];
Mutation::new(
table_id,
partition_key,
clustering_key,
ops,
timestamp,
None,
)
}
#[tokio::test]
#[cfg(feature = "state_machine")]
async fn test_stage0_write_read_roundtrip_simple_types() -> Result<()> {
let temp_dir = TempDir::new().unwrap();
let data_dir = temp_dir.path().join("data");
let wal_dir = temp_dir.path().join("wal");
let schema = create_comprehensive_schema();
let config = WriteEngineConfig::new(data_dir.clone(), wal_dir.clone(), schema.clone());
let mut engine = WriteEngine::new(config)?;
let mutation = create_comprehensive_mutation(1, "row1", 1000000);
engine.write_async(mutation).await?;
let info = engine.flush().await?;
assert!(info.is_some(), "Flush should return SSTableInfo");
let info = info.unwrap();
assert_eq!(info.partition_count, 1);
assert!(info.data_path.exists(), "Data.db should exist");
drop(engine);
assert!(info.data_path.exists(), "Data.db exists");
assert!(info.index_path.exists(), "Index.db exists");
assert!(info.filter_path.exists(), "Filter.db exists");
assert!(info.summary_path.exists(), "Summary.db exists");
assert!(info.stats_path.exists(), "Statistics.db exists");
assert!(
info.compression_info_path.is_none(),
"CompressionInfo.db omitted for uncompressed data"
);
assert!(info.toc_path.exists(), "TOC.txt exists");
assert!(info.digest_path.exists(), "Digest.crc32 exists");
let toc_contents = std::fs::read_to_string(&info.toc_path)?;
assert!(toc_contents.contains("Data.db"), "TOC lists Data.db");
assert!(toc_contents.contains("Index.db"), "TOC lists Index.db");
assert!(toc_contents.contains("Filter.db"), "TOC lists Filter.db");
assert!(toc_contents.contains("Summary.db"), "TOC lists Summary.db");
assert!(
toc_contents.contains("Statistics.db"),
"TOC lists Statistics.db"
);
assert!(
!toc_contents.contains("CompressionInfo.db"),
"TOC must not list CompressionInfo.db for uncompressed data"
);
assert!(
toc_contents.contains("Digest.crc32"),
"TOC lists Digest.crc32"
);
assert!(toc_contents.contains("TOC.txt"), "TOC lists itself");
assert!(
info.data_path
.file_name()
.unwrap()
.to_str()
.unwrap()
.starts_with("nb-1-big-"),
"File follows nb-{{gen}}-big- convention"
);
Ok(())
}
#[tokio::test]
#[cfg(feature = "state_machine")]
async fn test_stage0_write_read_roundtrip_multiple_rows_single_partition() -> Result<()> {
let temp_dir = TempDir::new().unwrap();
let data_dir = temp_dir.path().join("data");
let wal_dir = temp_dir.path().join("wal");
let schema = create_comprehensive_schema();
let config = WriteEngineConfig::new(data_dir.clone(), wal_dir, schema.clone());
let mut engine = WriteEngine::new(config)?;
for i in 0..5 {
let mutation = create_comprehensive_mutation(1, &format!("row{}", i), 1000000 + i as i64);
engine.write_async(mutation).await?;
}
let info = engine.flush().await?.unwrap();
assert_eq!(info.partition_count, 1, "Single partition with 5 rows");
assert!(info.data_size > 0);
drop(engine);
assert!(info.data_path.exists());
assert!(info.toc_path.exists());
let toc_contents = std::fs::read_to_string(&info.toc_path)?;
assert!(toc_contents.contains("Data.db"));
Ok(())
}
#[tokio::test]
#[cfg(feature = "state_machine")]
async fn test_stage0_write_read_roundtrip_multiple_partitions() -> Result<()> {
let temp_dir = TempDir::new().unwrap();
let data_dir = temp_dir.path().join("data");
let wal_dir = temp_dir.path().join("wal");
let schema = create_comprehensive_schema();
let config = WriteEngineConfig::new(data_dir.clone(), wal_dir, schema.clone());
let mut engine = WriteEngine::new(config)?;
let mutations: Vec<Mutation> = (0..10)
.map(|i| create_comprehensive_mutation(i, "row0", 1000000 + i as i64))
.collect();
let mut keyed_mutations: Vec<_> = mutations
.into_iter()
.map(|m| {
let key = m.decorated_key(&schema).unwrap();
(key, m)
})
.collect();
keyed_mutations.sort_by_key(|(k, _)| k.token);
for (_, mutation) in keyed_mutations {
engine.write_async(mutation).await?;
}
let info = engine.flush().await?.unwrap();
assert_eq!(info.partition_count, 10, "10 distinct partitions");
assert!(info.data_size > 0);
drop(engine);
assert!(info.data_path.exists());
assert!(
info.index_path.exists(),
"Index.db required for partition lookup"
);
Ok(())
}
#[tokio::test]
#[cfg(feature = "state_machine")]
async fn test_stage0_write_read_roundtrip_large_partition() -> Result<()> {
let temp_dir = TempDir::new().unwrap();
let data_dir = temp_dir.path().join("data");
let wal_dir = temp_dir.path().join("wal");
let schema = create_comprehensive_schema();
let config = WriteEngineConfig::new(data_dir.clone(), wal_dir, schema.clone());
let mut engine = WriteEngine::new(config)?;
for i in 0..150 {
let mutation =
create_comprehensive_mutation(1, &format!("row{:04}", i), 1000000 + i as i64);
engine.write_async(mutation).await?;
}
let info = engine.flush().await?.unwrap();
assert_eq!(info.partition_count, 1, "Single wide partition");
assert!(
info.data_size > 10000,
"Large partition should have substantial data"
);
drop(engine);
assert!(info.data_path.exists());
assert!(info.data_size > 0);
Ok(())
}
#[tokio::test]
#[cfg(feature = "state_machine")]
async fn test_stage0_sstable_format_validation() -> Result<()> {
let temp_dir = TempDir::new().unwrap();
let data_dir = temp_dir.path().join("data");
let wal_dir = temp_dir.path().join("wal");
let schema = create_test_schema();
let config = WriteEngineConfig::new(data_dir.clone(), wal_dir, schema.clone());
let mut engine = WriteEngine::new(config)?;
let mutation = create_mutation(1, "Alice", 30, 1000000);
engine.write_async(mutation).await?;
let info = engine.flush().await?.unwrap();
drop(engine);
let required_components = vec![
("Data.db", &info.data_path),
("Index.db", &info.index_path),
("Filter.db", &info.filter_path),
("Summary.db", &info.summary_path),
("Statistics.db", &info.stats_path),
("TOC.txt", &info.toc_path),
("Digest.crc32", &info.digest_path),
];
for (name, path) in &required_components {
assert!(path.exists(), "{} must exist", name);
let metadata = std::fs::metadata(path)?;
assert!(metadata.is_file(), "{} must be a regular file", name);
}
assert!(
info.data_path
.to_str()
.unwrap()
.contains("nb-1-big-Data.db"),
"Data.db follows naming convention"
);
assert!(
info.index_path
.to_str()
.unwrap()
.contains("nb-1-big-Index.db"),
"Index.db follows naming convention"
);
let toc_contents = std::fs::read_to_string(&info.toc_path)?;
let toc_lines: Vec<&str> = toc_contents.lines().collect();
assert!(toc_lines.contains(&"Data.db"), "TOC contains Data.db");
assert!(toc_lines.contains(&"Index.db"), "TOC contains Index.db");
assert!(toc_lines.contains(&"Filter.db"), "TOC contains Filter.db");
assert!(toc_lines.contains(&"Summary.db"), "TOC contains Summary.db");
assert!(
toc_lines.contains(&"Statistics.db"),
"TOC contains Statistics.db"
);
assert!(
!toc_lines.contains(&"CompressionInfo.db"),
"TOC must not contain CompressionInfo.db for uncompressed data"
);
assert!(
toc_lines.contains(&"Digest.crc32"),
"TOC contains Digest.crc32"
);
assert!(toc_lines.contains(&"TOC.txt"), "TOC contains itself");
assert!(
std::fs::metadata(&info.data_path)?.len() > 0,
"Data.db is non-empty"
);
assert!(
std::fs::metadata(&info.index_path)?.len() > 0,
"Index.db is non-empty"
);
assert!(
std::fs::metadata(&info.stats_path)?.len() > 0,
"Statistics.db is non-empty"
);
Ok(())
}
#[tokio::test]
#[cfg(feature = "state_machine")]
async fn test_stage0_delta_encoding_validation() -> Result<()> {
let temp_dir = TempDir::new().unwrap();
let data_dir = temp_dir.path().join("data");
let wal_dir = temp_dir.path().join("wal");
let schema = create_test_schema();
let config = WriteEngineConfig::new(data_dir.clone(), wal_dir, schema.clone());
let mut engine = WriteEngine::new(config)?;
let base_timestamp = 1000000i64;
for i in 0..10 {
let mutation = create_mutation(
i,
&format!("User{}", i),
30,
base_timestamp + (i as i64 * 1000),
);
engine.write_async(mutation).await?;
}
let info = engine.flush().await?.unwrap();
drop(engine);
assert!(
info.stats_path.exists(),
"Statistics.db exists for delta encoding"
);
assert!(info.data_path.exists());
assert!(info.data_size > 0);
Ok(())
}
#[tokio::test]
#[cfg(feature = "state_machine")]
async fn test_stage0_multi_partition_token_ordering() -> Result<()> {
let temp_dir = TempDir::new().unwrap();
let data_dir = temp_dir.path().join("data");
let wal_dir = temp_dir.path().join("wal");
let schema = create_test_schema();
let config = WriteEngineConfig::new(data_dir.clone(), wal_dir, schema.clone());
let mut engine = WriteEngine::new(config)?;
let mutations: Vec<_> = (0..20)
.map(|i| create_mutation(i, &format!("User{}", i), 30, 1000000 + i as i64))
.collect();
let mut keyed_mutations: Vec<_> = mutations
.into_iter()
.map(|m| {
let key = m.decorated_key(&schema).unwrap();
(key.token, key, m)
})
.collect();
keyed_mutations.sort_by_key(|(token, _, _)| *token);
for i in 1..keyed_mutations.len() {
assert!(
keyed_mutations[i].0 > keyed_mutations[i - 1].0,
"Tokens must be in ascending order"
);
}
for (_, _, mutation) in keyed_mutations {
engine.write_async(mutation).await?;
}
let info = engine.flush().await?.unwrap();
assert_eq!(info.partition_count, 20);
drop(engine);
assert!(info.index_path.exists());
Ok(())
}
#[tokio::test]
#[cfg(feature = "state_machine")]
async fn test_stage0_various_data_types() -> Result<()> {
let temp_dir = TempDir::new().unwrap();
let data_dir = temp_dir.path().join("data");
let wal_dir = temp_dir.path().join("wal");
let schema = TableSchema {
keyspace: "test_types".to_string(),
table: "all_types".to_string(),
partition_keys: vec![KeyColumn {
name: "id".to_string(),
data_type: "int".to_string(),
position: 0,
}],
clustering_keys: vec![],
columns: vec![
Column {
name: "id".to_string(),
data_type: "int".to_string(),
nullable: false,
default: None,
is_static: false,
},
Column {
name: "text_val".to_string(),
data_type: "text".to_string(),
nullable: true,
default: None,
is_static: false,
},
Column {
name: "int_val".to_string(),
data_type: "int".to_string(),
nullable: true,
default: None,
is_static: false,
},
Column {
name: "bigint_val".to_string(),
data_type: "bigint".to_string(),
nullable: true,
default: None,
is_static: false,
},
Column {
name: "bool_val".to_string(),
data_type: "boolean".to_string(),
nullable: true,
default: None,
is_static: false,
},
Column {
name: "timestamp_val".to_string(),
data_type: "timestamp".to_string(),
nullable: true,
default: None,
is_static: false,
},
Column {
name: "uuid_val".to_string(),
data_type: "uuid".to_string(),
nullable: true,
default: None,
is_static: false,
},
],
comments: HashMap::new(),
};
let config = WriteEngineConfig::new(data_dir.clone(), wal_dir, schema.clone());
let mut engine = WriteEngine::new(config)?;
let table_id = TableId::new("test_types", "all_types");
let pk1 = PartitionKey::single("id", Value::Integer(1));
let ops1 = vec![CellOperation::Write {
column: "text_val".to_string(),
value: Value::Text("Hello, CQLite!".to_string()),
}];
engine
.write_async(Mutation::new(
table_id.clone(),
pk1,
None,
ops1,
1000000,
None,
))
.await?;
let pk2 = PartitionKey::single("id", Value::Integer(2));
let ops2 = vec![CellOperation::Write {
column: "int_val".to_string(),
value: Value::Integer(42),
}];
engine
.write_async(Mutation::new(
table_id.clone(),
pk2,
None,
ops2,
1000001,
None,
))
.await?;
let pk3 = PartitionKey::single("id", Value::Integer(3));
let ops3 = vec![CellOperation::Write {
column: "bigint_val".to_string(),
value: Value::BigInt(9223372036854775807),
}];
engine
.write_async(Mutation::new(
table_id.clone(),
pk3,
None,
ops3,
1000002,
None,
))
.await?;
let pk4 = PartitionKey::single("id", Value::Integer(4));
let ops4 = vec![CellOperation::Write {
column: "bool_val".to_string(),
value: Value::Boolean(true),
}];
engine
.write_async(Mutation::new(
table_id.clone(),
pk4,
None,
ops4,
1000003,
None,
))
.await?;
let pk5 = PartitionKey::single("id", Value::Integer(5));
let ops5 = vec![CellOperation::Write {
column: "timestamp_val".to_string(),
value: Value::Timestamp(1704067200000), }];
engine
.write_async(Mutation::new(
table_id.clone(),
pk5,
None,
ops5,
1000004,
None,
))
.await?;
let pk6 = PartitionKey::single("id", Value::Integer(6));
let test_uuid = uuid::Uuid::parse_str("550e8400-e29b-41d4-a716-446655440000").unwrap();
let ops6 = vec![CellOperation::Write {
column: "uuid_val".to_string(),
value: Value::Uuid(*test_uuid.as_bytes()),
}];
engine
.write_async(Mutation::new(table_id, pk6, None, ops6, 1000005, None))
.await?;
let info = engine.flush().await?.unwrap();
assert_eq!(info.partition_count, 6);
drop(engine);
assert!(info.data_path.exists());
assert!(info.data_size > 0);
Ok(())
}
#[tokio::test]
#[cfg(feature = "state_machine")]
async fn test_stage0_sstable_component_order() -> Result<()> {
let temp_dir = TempDir::new().unwrap();
let data_dir = temp_dir.path().join("data");
let wal_dir = temp_dir.path().join("wal");
let schema = create_test_schema();
let config = WriteEngineConfig::new(data_dir.clone(), wal_dir, schema.clone());
let mut engine = WriteEngine::new(config)?;
let mutation = create_mutation(1, "Alice", 30, 1000000);
engine.write_async(mutation).await?;
let info = engine.flush().await?.unwrap();
drop(engine);
let toc_modified = std::fs::metadata(&info.toc_path)?.modified()?;
let data_modified = std::fs::metadata(&info.data_path)?.modified()?;
let stats_modified = std::fs::metadata(&info.stats_path)?.modified()?;
assert!(
toc_modified >= data_modified
|| toc_modified
.duration_since(data_modified)
.unwrap_or_default()
.as_millis()
< 1000,
"TOC.txt written as publication barrier"
);
assert!(
toc_modified >= stats_modified
|| toc_modified
.duration_since(stats_modified)
.unwrap_or_default()
.as_millis()
< 1000,
"TOC.txt written after Statistics.db"
);
Ok(())
}
#[tokio::test]
#[cfg(feature = "state_machine")]
async fn test_stage0_null_values() -> Result<()> {
let temp_dir = TempDir::new().unwrap();
let data_dir = temp_dir.path().join("data");
let wal_dir = temp_dir.path().join("wal");
let schema = create_test_schema();
let config = WriteEngineConfig::new(data_dir.clone(), wal_dir, schema.clone());
let mut engine = WriteEngine::new(config)?;
let table_id = TableId::new("test_ks", "users");
let pk = PartitionKey::single("id", Value::Integer(1));
let ops = vec![CellOperation::Write {
column: "name".to_string(),
value: Value::Text("Alice".to_string()),
}];
engine
.write_async(Mutation::new(table_id, pk, None, ops, 1000000, None))
.await?;
let info = engine.flush().await?.unwrap();
drop(engine);
assert_eq!(info.partition_count, 1);
assert!(info.data_path.exists());
Ok(())
}
#[tokio::test]
#[cfg(feature = "state_machine")]
async fn test_stage0_deterministic_writes() -> Result<()> {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let mutation = create_mutation(1, "Alice", 30, 1000000);
let data_dir1 = temp_dir.path().join("data1");
let wal_dir1 = temp_dir.path().join("wal1");
let config1 = WriteEngineConfig::new(data_dir1.clone(), wal_dir1, schema.clone());
let mut engine1 = WriteEngine::new(config1)?;
engine1.write_async(mutation.clone()).await?;
let info1 = engine1.flush().await?.unwrap();
drop(engine1);
let data_dir2 = temp_dir.path().join("data2");
let wal_dir2 = temp_dir.path().join("wal2");
let config2 = WriteEngineConfig::new(data_dir2.clone(), wal_dir2, schema.clone());
let mut engine2 = WriteEngine::new(config2)?;
engine2.write_async(mutation).await?;
let info2 = engine2.flush().await?.unwrap();
drop(engine2);
let size1 = std::fs::metadata(&info1.data_path)?.len();
let size2 = std::fs::metadata(&info2.data_path)?.len();
assert_eq!(
size1, size2,
"Deterministic writes should produce identical Data.db sizes"
);
Ok(())
}