#![cfg(feature = "write-support")]
use cqlite_core::platform::Platform;
use cqlite_core::schema::{Column, KeyColumn, TableSchema};
use cqlite_core::storage::sstable::SSTableManager;
use cqlite_core::storage::write_engine::{
CellOperation, Mutation, PartitionKey, STCSPolicy, TableId, WriteEngine, WriteEngineConfig,
};
use cqlite_core::types::TableId as CqlTableId;
use cqlite_core::types::Value;
use cqlite_core::Config;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tempfile::TempDir;
fn make_schema() -> TableSchema {
TableSchema {
keyspace: "compact_ks".to_string(),
table: "items".to_string(),
partition_keys: vec![KeyColumn {
name: "id".to_string(),
data_type: "int".to_string(),
position: 0,
}],
clustering_keys: vec![],
columns: vec![
Column {
name: "id".to_string(),
data_type: "int".to_string(),
nullable: false,
default: None,
is_static: false,
},
Column {
name: "name".to_string(),
data_type: "text".to_string(),
nullable: true,
default: None,
is_static: false,
},
Column {
name: "score".to_string(),
data_type: "int".to_string(),
nullable: true,
default: None,
is_static: false,
},
],
comments: HashMap::new(),
}
}
fn write_row(id: i32, name: &str, score: i32, timestamp: i64) -> Mutation {
let table_id = TableId::new("compact_ks", "items");
let pk = PartitionKey::single("id", Value::Integer(id));
let ops = vec![
CellOperation::Write {
column: "name".to_string(),
value: Value::Text(name.to_string()),
},
CellOperation::Write {
column: "score".to_string(),
value: Value::Integer(score),
},
];
Mutation::new(table_id, pk, None, ops, timestamp, None)
}
fn write_name_only(id: i32, name: &str, timestamp: i64) -> Mutation {
let table_id = TableId::new("compact_ks", "items");
let pk = PartitionKey::single("id", Value::Integer(id));
let ops = vec![CellOperation::Write {
column: "name".to_string(),
value: Value::Text(name.to_string()),
}];
Mutation::new(table_id, pk, None, ops, timestamp, None)
}
fn write_score_only(id: i32, score: i32, timestamp: i64) -> Mutation {
let table_id = TableId::new("compact_ks", "items");
let pk = PartitionKey::single("id", Value::Integer(id));
let ops = vec![CellOperation::Write {
column: "score".to_string(),
value: Value::Integer(score),
}];
Mutation::new(table_id, pk, None, ops, timestamp, None)
}
fn delete_row(id: i32, timestamp: i64) -> Mutation {
let table_id = TableId::new("compact_ks", "items");
let pk = PartitionKey::single("id", Value::Integer(id));
Mutation::new(
table_id,
pk,
None,
vec![CellOperation::DeleteRow],
timestamp,
None,
)
}
fn delete_score_column(id: i32, timestamp: i64) -> Mutation {
let table_id = TableId::new("compact_ks", "items");
let pk = PartitionKey::single("id", Value::Integer(id));
let ops = vec![CellOperation::Delete {
column: "score".to_string(),
}];
Mutation::new(table_id, pk, None, ops, timestamp, None)
}
fn make_policy() -> STCSPolicy {
STCSPolicy::new(
3, 32, 0.5, 1.5, 0, )
.expect("valid STCS parameters")
}
fn write_three_sstables_and_compact(
rt: &tokio::runtime::Runtime,
) -> (TempDir, std::path::PathBuf, std::path::PathBuf) {
let temp_dir = TempDir::new().unwrap();
let data_dir = temp_dir.path().join("data");
let wal_dir = temp_dir.path().join("wal");
let schema = make_schema();
let config = WriteEngineConfig::new(data_dir.clone(), wal_dir.clone(), schema.clone());
let mut engine = WriteEngine::new(config).expect("engine creation");
for id in 1_i32..=20 {
let m = write_row(id, &format!("a-name-{}", id), id * 10, 100);
engine.write(m).expect("write A");
}
let info_a = rt
.block_on(engine.flush())
.expect("flush A")
.expect("info A");
assert_eq!(info_a.partition_count, 20, "SSTable A: 20 partitions");
assert!(info_a.data_path.exists(), "SSTable A Data.db must exist");
for id in 11_i32..=30 {
let m = write_row(id, &format!("b-name-{}", id), id * 20, 200);
engine.write(m).expect("write B");
}
let info_b = rt
.block_on(engine.flush())
.expect("flush B")
.expect("info B");
assert_eq!(info_b.partition_count, 20, "SSTable B: 20 partitions");
for id in 21_i32..=40 {
let m = write_row(id, &format!("c-name-{}", id), id * 30, 300);
engine.write(m).expect("write C rows");
}
for id in 1_i32..=5 {
let m = delete_row(id, 300);
engine.write(m).expect("write row-delete C");
}
engine
.write(delete_score_column(11, 300))
.expect("write cell-delete C");
let info_c = rt
.block_on(engine.flush())
.expect("flush C")
.expect("info C");
assert!(info_c.partition_count > 0, "SSTable C: non-empty");
let sstable_dir = data_dir.join("compact_ks").join("items");
let count_data_files = |dir: &std::path::Path| -> usize {
std::fs::read_dir(dir)
.expect("read sstable dir")
.filter_map(|e| e.ok())
.filter(|e| e.file_name().to_string_lossy().ends_with("-big-Data.db"))
.count()
};
assert_eq!(
count_data_files(&sstable_dir),
3,
"Expected 3 Data.db files before compaction"
);
let policy = make_policy();
engine
.set_merge_policy(Box::new(policy))
.expect("set merge policy");
let budget = Duration::from_secs(30);
let mut compaction_completed = false;
for _iteration in 0..5 {
let report = engine.maintenance_step(budget).expect("maintenance_step");
if !report.completed_merges.is_empty() {
compaction_completed = true;
break;
}
if !report.pending_compaction {
break;
}
}
assert!(
compaction_completed,
"Compaction must complete within 5 maintenance_step calls"
);
let stats = engine.maintenance_stats();
assert_eq!(
stats.compactions_completed, 1,
"Exactly 1 compaction must have completed"
);
assert_eq!(
stats.sstables_merged_in, 3,
"3 input SSTables must have been consumed (got {})",
stats.sstables_merged_in
);
assert_eq!(
stats.sstables_produced, 1,
"1 output SSTable must have been produced"
);
assert_eq!(
count_data_files(&sstable_dir),
1,
"After compaction exactly 1 Data.db must exist (atomicity guarantee)"
);
let toc_count = std::fs::read_dir(&sstable_dir)
.expect("read sstable dir for TOC check")
.filter_map(|e| e.ok())
.filter(|e| e.file_name().to_string_lossy().ends_with("-TOC.txt"))
.count();
assert_eq!(
toc_count, 1,
"Exactly 1 TOC.txt must exist after compaction (publication barrier)"
);
rt.block_on(engine.close()).expect("close engine");
(temp_dir, data_dir, sstable_dir)
}
#[test]
fn compaction_3_sstables_mechanics() {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("build tokio runtime");
let (_temp_dir, _data_dir, _sstable_dir) = write_three_sstables_and_compact(&rt);
}
#[test]
fn compaction_3_sstables_read_back_correctness() {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("build tokio runtime");
let schema = make_schema();
let (temp_dir, data_dir, sstable_dir) = write_three_sstables_and_compact(&rt);
let merged_data_count = std::fs::read_dir(&sstable_dir)
.expect("read sstable dir")
.filter_map(|e| e.ok())
.filter(|e| e.file_name().to_string_lossy().ends_with("-big-Data.db"))
.count();
assert_eq!(
merged_data_count, 1,
"Merged Data.db must be present on disk before attempting read-back"
);
let cqlite_config = Config::default();
let manager = rt.block_on(async {
let platform = Arc::new(
Platform::new(&cqlite_config)
.await
.expect("platform creation"),
);
SSTableManager::new(
&data_dir,
&cqlite_config,
platform,
#[cfg(feature = "state_machine")]
None,
)
.await
.expect("SSTableManager must open without error")
});
let sstable_stats = rt.block_on(manager.stats()).expect("manager stats");
eprintln!(
"post_compaction_sstable_count = {}",
sstable_stats.sstable_count
);
let table_id = CqlTableId::from("compact_ks.items");
let results = rt
.block_on(manager.scan(&table_id, None, None, None, Some(&schema)))
.expect("post-compaction scan must not error");
let row_count = results.len();
eprintln!("post_compaction_row_count = {}", row_count);
assert!(
row_count >= 35,
"post-compaction scan must return >= 35 rows (got {}). \
If this is 0, #500 (iterate_all_partitions reads 0 rows from writer-produced \
SSTables) has regressed.",
row_count
);
let result_map: HashMap<Vec<u8>, Value> = results.into_iter().map(|(k, v)| (k.0, v)).collect();
for id in 1_i32..=5 {
let key: Vec<u8> = id.to_be_bytes().into();
assert!(
!result_map.contains_key(&key),
"PK {} was row-deleted by C (ts=300) but is still present in merged output (Issue #505)",
id
);
}
for id in 6_i32..=10 {
let key: Vec<u8> = id.to_be_bytes().into();
assert!(
result_map.contains_key(&key),
"PK {} (A-only, non-deleted) must be present in merged output",
id
);
}
for id in 11_i32..=20 {
let key: Vec<u8> = id.to_be_bytes().into();
assert!(
result_map.contains_key(&key),
"PK {} (B/C wins over A) must be present in merged output",
id
);
}
let key_11: Vec<u8> = 11_i32.to_be_bytes().into();
if let Some(row_value) = result_map.get(&key_11) {
match row_value {
Value::Map(pairs) => {
for (col_key, col_val) in pairs {
if let Value::Text(col_name) = col_key {
if col_name == "score" {
assert!(
matches!(col_val, Value::Null | Value::Tombstone(_)),
"PK=11 score column was cell-deleted by C (ts=300) \
but has live value: {:?} (Issue #505)",
col_val
);
}
}
}
}
Value::Tombstone(_) => {
panic!("PK=11 row should be live (only score is deleted) but returned Tombstone");
}
_ => {
eprintln!(
"PK=11 value is not a Map (got {:?}), column-level assertion skipped",
row_value
);
}
}
}
for id in 21_i32..=30 {
let key: Vec<u8> = id.to_be_bytes().into();
assert!(
result_map.contains_key(&key),
"PK {} (C wins over B) must be present in merged output",
id
);
}
for id in 31_i32..=40 {
let key: Vec<u8> = id.to_be_bytes().into();
assert!(
result_map.contains_key(&key),
"PK {} (C-only) must be present in merged output",
id
);
}
eprintln!(
"Read-back correctness checks PASSED: {} rows, tombstone shadowing and \
live-row presence assertions all satisfied (Issues #500, #505)",
row_count
);
drop(temp_dir);
}
#[test]
fn compaction_3_sstables_tombstone_shadowing() {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("build tokio runtime");
let schema = make_schema();
let (temp_dir, data_dir, _sstable_dir) = write_three_sstables_and_compact(&rt);
let cqlite_config = Config::default();
let manager = rt.block_on(async {
let platform = Arc::new(
Platform::new(&cqlite_config)
.await
.expect("platform creation"),
);
SSTableManager::new(
&data_dir,
&cqlite_config,
platform,
#[cfg(feature = "state_machine")]
None,
)
.await
.expect("SSTableManager must open without error")
});
let table_id = CqlTableId::from("compact_ks.items");
let results = rt
.block_on(manager.scan(&table_id, None, None, None, Some(&schema)))
.expect("post-compaction scan must not error");
let result_map: HashMap<Vec<u8>, Value> = results.into_iter().map(|(k, v)| (k.0, v)).collect();
for id in 1_i32..=5 {
let key: Vec<u8> = id.to_be_bytes().into();
assert!(
!result_map.contains_key(&key),
"PK {} was row-deleted by C (ts=300) but is still present in merged output",
id
);
}
let key_11: Vec<u8> = 11_i32.to_be_bytes().into();
if let Some(row_value) = result_map.get(&key_11) {
match row_value {
Value::Map(pairs) => {
for (col_key, col_val) in pairs {
if let Value::Text(col_name) = col_key {
if col_name == "score" {
assert!(
matches!(col_val, Value::Null | Value::Tombstone(_)),
"PK=11 score column was cell-deleted by C but has value: {:?}",
col_val
);
}
}
}
}
Value::Tombstone(_) => {
panic!("PK=11 row should be live (only score is deleted) but returned Tombstone");
}
_ => {
eprintln!(
"PK=11 value is not a Map (got {:?}), column-level assertion skipped",
row_value
);
}
}
}
drop(temp_dir);
}
#[test]
fn row_tombstone_shadows_live_row_after_compaction() {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("build tokio runtime");
let temp_dir = TempDir::new().expect("tempdir");
let data_dir = temp_dir.path().join("data");
let wal_dir = temp_dir.path().join("wal");
let schema = make_schema();
let config = WriteEngineConfig::new(data_dir.clone(), wal_dir.clone(), schema.clone());
let mut engine = WriteEngine::new(config).expect("engine creation");
for id in 1_i32..=2 {
let m = write_row(id, &format!("live-{}", id), id * 10, 100);
engine.write(m).expect("write X");
}
engine
.write(delete_row(3, 100))
.expect("write low-ts row-tombstone X");
let info_x = rt
.block_on(engine.flush())
.expect("flush X")
.expect("info X");
assert_eq!(info_x.partition_count, 3, "SSTable X: 3 partitions");
engine
.write(delete_row(1, 300))
.expect("write row-tombstone Y");
engine
.write(write_row(3, "revived-3", 333, 300))
.expect("write high-ts live PK=3 Y");
let info_y = rt
.block_on(engine.flush())
.expect("flush Y")
.expect("info Y");
assert!(info_y.partition_count > 0, "SSTable Y: non-empty");
let policy = STCSPolicy::new(2, 32, 0.01, 100.0, 0).expect("valid STCS params");
engine
.set_merge_policy(Box::new(policy))
.expect("set policy");
let budget = std::time::Duration::from_secs(30);
let mut compacted = false;
for _ in 0..5 {
let report = engine.maintenance_step(budget).expect("maintenance_step");
if !report.completed_merges.is_empty() {
compacted = true;
break;
}
if !report.pending_compaction {
break;
}
}
assert!(compacted, "Compaction must complete");
rt.block_on(engine.close()).expect("close engine");
let cqlite_config = Config::default();
let manager = rt.block_on(async {
let platform = Arc::new(
Platform::new(&cqlite_config)
.await
.expect("platform creation"),
);
SSTableManager::new(
&data_dir,
&cqlite_config,
platform,
#[cfg(feature = "state_machine")]
None,
)
.await
.expect("SSTableManager must open")
});
let table_id = CqlTableId::from("compact_ks.items");
let results = rt
.block_on(manager.scan(&table_id, None, None, None, Some(&schema)))
.expect("post-compaction scan");
let row_count = results.len();
eprintln!(
"row_tombstone_shadows_live_row_after_compaction: {} rows returned",
row_count
);
let result_map: HashMap<Vec<u8>, Value> = results.into_iter().map(|(k, v)| (k.0, v)).collect();
let key_1: Vec<u8> = 1_i32.to_be_bytes().into();
assert!(
!result_map.contains_key(&key_1),
"PK=1 was row-deleted at ts=300 (> live ts=100) but is present after compaction \
— tombstone shadowing regression (Issue #505)"
);
let key_2: Vec<u8> = 2_i32.to_be_bytes().into();
assert!(
result_map.contains_key(&key_2),
"PK=2 (live, never deleted) must be present after compaction"
);
let key_3: Vec<u8> = 3_i32.to_be_bytes().into();
let pk3 = result_map.get(&key_3).unwrap_or_else(|| {
panic!(
"PK=3 live write (ts=300) must survive the older tombstone (ts=100) \
but is absent after compaction — tombstone timestamp regression (Issue #505)"
)
});
match pk3 {
Value::Map(pairs) => {
let name = pairs.iter().find_map(|(k, v)| match (k, v) {
(Value::Text(col), Value::Text(val)) if col == "name" => Some(val.clone()),
_ => None,
});
assert_eq!(
name.as_deref(),
Some("revived-3"),
"PK=3 must carry the live ts=300 value 'revived-3', not be shadowed by the \
older ts=100 tombstone (Issue #505)"
);
}
other => panic!(
"PK=3 must be a live row (Value::Map) carrying the ts=300 write, got {:?} (Issue #505)",
other
),
}
eprintln!(
"row_tombstone_shadows_live_row_after_compaction PASSED: \
PK=1 correctly absent (higher-ts tombstone shadows), \
PK=2 correctly present (never deleted), \
PK=3 correctly present with live value (lower-ts tombstone does NOT shadow)"
);
drop(temp_dir);
}
#[test]
fn test_real_merger_delete_wins_at_equal_timestamp() {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("build tokio runtime");
let temp_dir = TempDir::new().expect("tempdir");
let data_dir = temp_dir.path().join("data");
let wal_dir = temp_dir.path().join("wal");
let schema = make_schema();
let config = WriteEngineConfig::new(data_dir.clone(), wal_dir.clone(), schema.clone());
let mut engine = WriteEngine::new(config).expect("engine creation");
const EQUAL_TS: i64 = 200;
engine
.write(delete_row(1, EQUAL_TS))
.expect("write equal-ts row-tombstone A");
engine
.write(write_row(2, "control-live", 22, EQUAL_TS))
.expect("write live PK=2 A");
let info_a = rt
.block_on(engine.flush())
.expect("flush A")
.expect("info A");
assert_eq!(info_a.partition_count, 2, "SSTable A: 2 partitions");
engine
.write(write_row(1, "live-at-equal-ts", 11, EQUAL_TS))
.expect("write live PK=1 B");
let info_b = rt
.block_on(engine.flush())
.expect("flush B")
.expect("info B");
assert!(info_b.partition_count > 0, "SSTable B: non-empty");
let policy = STCSPolicy::new(2, 32, 0.01, 100.0, 0).expect("valid STCS params");
engine
.set_merge_policy(Box::new(policy))
.expect("set policy");
let budget = Duration::from_secs(30);
let mut compacted = false;
for _ in 0..5 {
let report = engine.maintenance_step(budget).expect("maintenance_step");
if !report.completed_merges.is_empty() {
compacted = true;
break;
}
if !report.pending_compaction {
break;
}
}
assert!(compacted, "Compaction must complete");
rt.block_on(engine.close()).expect("close engine");
let cqlite_config = Config::default();
let manager = rt.block_on(async {
let platform = Arc::new(
Platform::new(&cqlite_config)
.await
.expect("platform creation"),
);
SSTableManager::new(
&data_dir,
&cqlite_config,
platform,
#[cfg(feature = "state_machine")]
None,
)
.await
.expect("SSTableManager must open")
});
let table_id = CqlTableId::from("compact_ks.items");
let results = rt
.block_on(manager.scan(&table_id, None, None, None, Some(&schema)))
.expect("post-compaction scan");
let result_map: HashMap<Vec<u8>, Value> = results.into_iter().map(|(k, v)| (k.0, v)).collect();
let key_1: Vec<u8> = 1_i32.to_be_bytes().into();
assert!(
!result_map.contains_key(&key_1),
"PK=1 was row-deleted at the SAME timestamp (ts={}) as its live write but is \
present after compaction — equal-ts tiebreak reverted to file recency \
instead of letting the tombstone win (Issue #498)",
EQUAL_TS
);
let key_2: Vec<u8> = 2_i32.to_be_bytes().into();
assert!(
result_map.contains_key(&key_2),
"PK=2 (live control, never deleted) must be present after compaction"
);
eprintln!(
"test_real_merger_delete_wins_at_equal_timestamp PASSED: \
PK=1 absent (equal-ts tombstone wins), PK=2 present (live control)"
);
drop(temp_dir);
}
#[test]
fn disjoint_columns_survive_compaction() {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("build tokio runtime");
let temp_dir = TempDir::new().expect("tempdir");
let data_dir = temp_dir.path().join("data");
let wal_dir = temp_dir.path().join("wal");
let schema = make_schema();
let config = WriteEngineConfig::new(data_dir.clone(), wal_dir.clone(), schema.clone());
let mut engine = WriteEngine::new(config).expect("engine creation");
engine
.write(write_name_only(1, "alice", 100))
.expect("write A PK=1 name");
engine
.write(write_name_only(3, "x", 100))
.expect("write A PK=3 name");
let info_a = rt
.block_on(engine.flush())
.expect("flush A")
.expect("info A");
assert_eq!(info_a.partition_count, 2, "SSTable A: 2 partitions");
engine
.write(write_score_only(1, 42, 200))
.expect("write B PK=1 score");
engine
.write(write_name_only(3, "y", 200))
.expect("write B PK=3 name");
let info_b = rt
.block_on(engine.flush())
.expect("flush B")
.expect("info B");
assert!(info_b.partition_count > 0, "SSTable B: non-empty");
let policy = STCSPolicy::new(2, 32, 0.01, 100.0, 0).expect("valid STCS params");
engine
.set_merge_policy(Box::new(policy))
.expect("set policy");
let budget = Duration::from_secs(30);
let mut compacted = false;
for _ in 0..5 {
let report = engine.maintenance_step(budget).expect("maintenance_step");
if !report.completed_merges.is_empty() {
compacted = true;
break;
}
if !report.pending_compaction {
break;
}
}
assert!(compacted, "Compaction must complete");
rt.block_on(engine.close()).expect("close engine");
let cqlite_config = Config::default();
let manager = rt.block_on(async {
let platform = Arc::new(
Platform::new(&cqlite_config)
.await
.expect("platform creation"),
);
SSTableManager::new(
&data_dir,
&cqlite_config,
platform,
#[cfg(feature = "state_machine")]
None,
)
.await
.expect("SSTableManager must open")
});
let table_id = CqlTableId::from("compact_ks.items");
let results = rt
.block_on(manager.scan(&table_id, None, None, None, Some(&schema)))
.expect("post-compaction scan");
let result_map: HashMap<Vec<u8>, Value> = results.into_iter().map(|(k, v)| (k.0, v)).collect();
fn find_col<'a>(row: &'a Value, col: &str) -> Option<&'a Value> {
match row {
Value::Map(pairs) => pairs.iter().find_map(|(k, v)| match k {
Value::Text(name) if name == col => Some(v),
_ => None,
}),
_ => None,
}
}
let key_1: Vec<u8> = 1_i32.to_be_bytes().into();
let pk1 = result_map
.get(&key_1)
.expect("PK=1 must be present after compaction");
let name_1 = find_col(pk1, "name");
let score_1 = find_col(pk1, "score");
assert_eq!(
name_1,
Some(&Value::Text("alice".to_string())),
"PK=1 `name` from SSTable A was DROPPED after compaction — per-cell reconcile \
regression (Issue #533). The old whole-row-wins merger keeps only B's `score`."
);
assert_eq!(
score_1,
Some(&Value::Integer(42)),
"PK=1 `score` from SSTable B must be present"
);
let key_3: Vec<u8> = 3_i32.to_be_bytes().into();
let pk3 = result_map
.get(&key_3)
.expect("PK=3 must be present after compaction");
assert_eq!(
find_col(pk3, "name"),
Some(&Value::Text("y".to_string())),
"PK=3 `name` conflict must resolve to the higher-timestamp value (B ts=200)"
);
eprintln!(
"disjoint_columns_survive_compaction PASSED: PK=1 has both name+score \
(disjoint columns preserved), PK=3 name resolves by timestamp (Issue #533)"
);
drop(temp_dir);
}