#[cfg(feature = "write-support")]
use crate::error::{Error, Result};
#[cfg(feature = "write-support")]
use crate::schema::TableSchema;
#[cfg(feature = "write-support")]
use crate::storage::write_engine::mutation::{ClusteringKey, DecoratedKey};
#[cfg(feature = "write-support")]
use crate::types::Value;
#[cfg(feature = "write-support")]
use std::cmp::{Ordering, Reverse};
#[cfg(feature = "write-support")]
use std::collections::{BinaryHeap, VecDeque};
#[cfg(feature = "write-support")]
use std::path::{Path, PathBuf};
#[cfg(feature = "write-support")]
use std::time::{Duration, Instant};
#[cfg(feature = "write-support")]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct MergeEntry {
pub run_index: usize,
pub key: DecoratedKey,
pub clustering_key: Option<ClusteringKey>,
pub timestamp: i64,
pub row_data: RowData,
}
impl MergeEntry {
pub fn new(
run_index: usize,
key: DecoratedKey,
clustering_key: Option<ClusteringKey>,
timestamp: i64,
row_data: RowData,
) -> Self {
Self {
run_index,
key,
clustering_key,
timestamp,
row_data,
}
}
}
#[cfg(feature = "write-support")]
impl Ord for MergeEntry {
fn cmp(&self, other: &Self) -> Ordering {
match self.key.token.cmp(&other.key.token) {
Ordering::Equal => {
match self.key.key.cmp(&other.key.key) {
Ordering::Equal => {
match (&self.clustering_key, &other.clustering_key) {
(None, None) => {
self.run_index.cmp(&other.run_index)
}
(None, Some(_)) => Ordering::Less,
(Some(_), None) => Ordering::Greater,
(Some(a), Some(b)) => {
match a.cmp(b) {
Ordering::Equal => {
self.run_index.cmp(&other.run_index)
}
other_ord => other_ord,
}
}
}
}
other_ord => other_ord,
}
}
other_ord => other_ord,
}
}
}
#[cfg(feature = "write-support")]
impl PartialOrd for MergeEntry {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
#[cfg(feature = "write-support")]
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RowData {
Live {
cells: Vec<CellData>,
},
Tombstone {
deletion_time: i64,
local_deletion_time: i32,
},
}
#[cfg(feature = "write-support")]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CellData {
pub column: String,
pub value: Value,
pub timestamp: i64,
pub ttl: Option<u32>,
}
#[cfg(feature = "write-support")]
#[derive(Debug)]
pub enum MergeStep {
Partition {
key: DecoratedKey,
rows: Vec<MergeEntry>,
},
Complete,
}
#[cfg(feature = "write-support")]
#[derive(Debug, Clone)]
pub struct MergeStats {
pub input_files: usize,
pub output_partitions: u64,
pub output_rows: u64,
pub bytes_written: u64,
pub elapsed: Duration,
}
#[cfg(feature = "write-support")]
struct RunReader {
reader: Box<dyn SSTableRowIterator>,
buffer: VecDeque<MergeEntry>,
buffer_size: usize,
exhausted: bool,
}
#[cfg(feature = "write-support")]
impl std::fmt::Debug for RunReader {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RunReader")
.field("buffer_len", &self.buffer.len())
.field("buffer_size", &self.buffer_size)
.field("exhausted", &self.exhausted)
.finish()
}
}
#[cfg(feature = "write-support")]
impl RunReader {
const DEFAULT_BUFFER_SIZE: usize = 8 * 1024;
fn new(reader: Box<dyn SSTableRowIterator>) -> Self {
Self {
reader,
buffer: VecDeque::new(),
buffer_size: Self::DEFAULT_BUFFER_SIZE,
exhausted: false,
}
}
fn peek(&mut self) -> Result<Option<&MergeEntry>> {
if self.buffer.is_empty() && !self.exhausted {
self.refill_buffer()?;
}
Ok(self.buffer.front())
}
fn advance(&mut self) -> Result<Option<MergeEntry>> {
if let Some(entry) = self.buffer.pop_front() {
return Ok(Some(entry));
}
if !self.exhausted {
self.refill_buffer()?;
Ok(self.buffer.pop_front())
} else {
Ok(None)
}
}
fn is_exhausted(&self) -> bool {
self.exhausted && self.buffer.is_empty()
}
fn refill_buffer(&mut self) -> Result<()> {
let mut bytes_buffered = 0;
while bytes_buffered < self.buffer_size {
match self.reader.next() {
Some(Ok(entry)) => {
bytes_buffered += Self::estimate_entry_size(&entry);
self.buffer.push_back(entry);
}
Some(Err(e)) => return Err(e),
None => {
self.exhausted = true;
break;
}
}
}
Ok(())
}
fn estimate_entry_size(entry: &MergeEntry) -> usize {
let base_size = std::mem::size_of::<MergeEntry>();
let key_size = entry.key.key.len();
let clustering_size = entry
.clustering_key
.as_ref()
.map(|ck| {
ck.columns
.iter()
.map(|(name, value)| name.len() + Self::estimate_value_size(value))
.sum()
})
.unwrap_or(0);
let data_size = match &entry.row_data {
RowData::Live { cells } => cells
.iter()
.map(|cell| {
std::mem::size_of::<CellData>()
+ cell.column.len()
+ Self::estimate_value_size(&cell.value)
})
.sum(),
RowData::Tombstone { .. } => 16,
};
base_size + key_size + clustering_size + data_size
}
fn estimate_value_size(value: &Value) -> usize {
match value {
Value::Null => 0,
Value::Boolean(_) => 1,
Value::TinyInt(_) => 1,
Value::SmallInt(_) => 2,
Value::Integer(_) => 4,
Value::BigInt(_) | Value::Counter(_) | Value::Timestamp(_) | Value::Time(_) => 8,
Value::Float32(_) => 4,
Value::Float(_) => 8,
Value::Text(s) => s.len() + std::mem::size_of::<String>(),
Value::Blob(b) => b.len() + std::mem::size_of::<Vec<u8>>(),
Value::Uuid(_) => 16,
Value::Inet(b) => b.len() + std::mem::size_of::<Vec<u8>>(),
Value::Varint(b) => b.len() + std::mem::size_of::<Vec<u8>>(),
Value::Decimal { unscaled, .. } => unscaled.len() + 4 + std::mem::size_of::<Vec<u8>>(),
Value::Date(_) => 4,
Value::Duration { .. } => 20,
_ => 32, }
}
}
#[cfg(feature = "write-support")]
pub trait SSTableRowIterator: Send {
fn next(&mut self) -> Option<Result<MergeEntry>>;
}
#[cfg(feature = "write-support")]
pub(crate) fn block_on_async<F, T>(future: F) -> Result<T>
where
F: std::future::Future<Output = Result<T>> + Send,
T: Send,
{
match tokio::runtime::Handle::try_current() {
Ok(_) => std::thread::scope(|scope| {
scope
.spawn(|| {
let rt = tokio::runtime::Runtime::new().map_err(|e| {
Error::Storage(format!("Failed to create tokio runtime: {}", e))
})?;
rt.block_on(future)
})
.join()
.map_err(|_| Error::Storage("async-to-sync bridge thread panicked".to_string()))?
}),
Err(_) => {
let rt = tokio::runtime::Runtime::new()
.map_err(|e| Error::Storage(format!("Failed to create tokio runtime: {}", e)))?;
rt.block_on(future)
}
}
}
#[cfg(feature = "write-support")]
struct SSTableRowIteratorAdapter {
entries: std::vec::IntoIter<MergeEntry>,
}
#[cfg(feature = "write-support")]
impl SSTableRowIteratorAdapter {
fn open(path: &Path, run_index: usize) -> Result<Self> {
use crate::platform::Platform;
use crate::Config;
use std::sync::Arc;
let mut config = Config::default();
config.storage.use_mmap = false;
let path_buf = path.to_path_buf();
let raw_entries = block_on_async(async move {
let platform = Arc::new(Platform::new(&config).await?);
let reader =
crate::storage::sstable::reader::SSTableReader::open(&path_buf, &config, platform)
.await?;
reader.iterate_all_partitions_for_compaction(None).await
})?;
let mut entries = Vec::with_capacity(raw_entries.len());
for (row_key, value, timestamp) in raw_entries {
let key_bytes = row_key.0;
let decorated_key = DecoratedKey::from_key_bytes(key_bytes)?;
let row_data = Self::value_to_row_data(&value, timestamp)?;
entries.push(MergeEntry::new(
run_index,
decorated_key,
None, timestamp,
row_data,
));
}
Ok(Self {
entries: entries.into_iter(),
})
}
fn value_to_row_data(value: &crate::types::Value, row_timestamp: i64) -> Result<RowData> {
match value {
crate::types::Value::Tombstone(info) => Ok(RowData::Tombstone {
deletion_time: info.deletion_time,
local_deletion_time: 0, }),
crate::types::Value::Map(map_entries) => {
let mut cells = Vec::with_capacity(map_entries.len());
for (key, val) in map_entries {
let column = match key {
crate::types::Value::Text(s) => s.clone(),
other => format!("{:?}", other),
};
let cell_ts = match val {
crate::types::Value::Tombstone(info) => info.deletion_time,
_ => row_timestamp,
};
cells.push(CellData {
column,
value: val.clone(),
timestamp: cell_ts,
ttl: None,
});
}
Ok(RowData::Live { cells })
}
other => Ok(RowData::Live {
cells: vec![CellData {
column: "value".to_string(),
value: other.clone(),
timestamp: row_timestamp,
ttl: None,
}],
}),
}
}
}
#[cfg(feature = "write-support")]
impl SSTableRowIterator for SSTableRowIteratorAdapter {
fn next(&mut self) -> Option<Result<MergeEntry>> {
self.entries.next().map(Ok)
}
}
#[cfg(feature = "write-support")]
#[derive(Debug)]
pub struct KWayMerger {
runs: Vec<RunReader>,
heap: BinaryHeap<Reverse<MergeEntry>>,
current_partition: Option<DecoratedKey>,
schema: TableSchema,
}
#[cfg(feature = "write-support")]
impl KWayMerger {
pub fn new(input_paths: Vec<PathBuf>, schema: &TableSchema) -> Result<Self> {
if input_paths.is_empty() {
return Err(Error::InvalidInput(
"K-way merge requires at least one input file".to_string(),
));
}
let mut runs = Vec::with_capacity(input_paths.len());
for (run_index, path) in input_paths.iter().enumerate() {
let adapter = SSTableRowIteratorAdapter::open(path, run_index)?;
runs.push(RunReader::new(Box::new(adapter)));
}
let heap = BinaryHeap::new();
Ok(Self {
runs,
heap,
current_partition: None,
schema: schema.clone(),
})
}
pub fn merge(
mut self,
output_writer: &mut crate::storage::sstable::writer::SSTableWriter,
) -> Result<MergeStats> {
let start_time = Instant::now();
let mut stats = MergeStats {
input_files: self.runs.len(),
output_partitions: 0,
output_rows: 0,
bytes_written: 0,
elapsed: Duration::from_secs(0), };
while let MergeStep::Partition { key, rows } = self.step()? {
stats.output_partitions += 1;
stats.output_rows += rows.len() as u64;
let mutations = rows
.into_iter()
.map(|entry| Self::merge_entry_to_mutation(entry, &self.schema))
.collect::<Result<Vec<_>>>()?;
output_writer.write_partition(key, mutations)?;
}
stats.elapsed = start_time.elapsed();
Ok(stats)
}
pub fn step(&mut self) -> Result<MergeStep> {
if self.heap.is_empty() && self.current_partition.is_none() {
self.initialize_heap()?;
}
if self.heap.is_empty() {
return Ok(MergeStep::Complete);
}
let mut partition_rows = Vec::new();
let mut partition_key: Option<DecoratedKey> = None;
while let Some(Reverse(entry)) = self.heap.peek() {
if let Some(ref current_key) = partition_key {
if &entry.key != current_key {
break;
}
} else {
partition_key = Some(entry.key.clone());
}
let Reverse(entry) = self
.heap
.pop()
.ok_or_else(|| Error::InvalidInput("Merge heap unexpectedly empty".to_string()))?;
partition_rows.push(entry.clone());
self.refill_heap(entry.run_index)?;
}
if let Some(key) = partition_key {
let merged_rows = self.merge_partition_rows(partition_rows)?;
Ok(MergeStep::Partition {
key,
rows: merged_rows,
})
} else {
Ok(MergeStep::Complete)
}
}
fn initialize_heap(&mut self) -> Result<()> {
for run_index in 0..self.runs.len() {
self.refill_heap(run_index)?;
}
Ok(())
}
fn refill_heap(&mut self, run_index: usize) -> Result<()> {
if run_index >= self.runs.len() {
return Ok(());
}
let run = &mut self.runs[run_index];
if !run.is_exhausted() {
if let Some(entry) = run.peek()? {
let entry = entry.clone();
self.heap.push(Reverse(entry));
}
run.advance()?;
}
Ok(())
}
fn merge_partition_rows(&self, rows: Vec<MergeEntry>) -> Result<Vec<MergeEntry>> {
use std::collections::BTreeMap;
let mut clustered_rows: BTreeMap<Option<ClusteringKey>, Vec<MergeEntry>> = BTreeMap::new();
for row in rows {
clustered_rows
.entry(row.clustering_key.clone())
.or_default()
.push(row);
}
let mut merged = Vec::new();
for (ck, cluster_rows) in clustered_rows {
if let Some(entry) = Self::reconcile_cluster(ck, cluster_rows) {
merged.push(entry);
}
}
merged.sort_by(|a, b| match (&a.clustering_key, &b.clustering_key) {
(None, None) => Ordering::Equal,
(None, Some(_)) => Ordering::Less,
(Some(_), None) => Ordering::Greater,
(Some(ck_a), Some(ck_b)) => {
ck_a.compare(ck_b, &self.schema).unwrap_or_else(|e| {
log::warn!(
"Schema-aware clustering key comparison failed, using fallback: {}",
e
);
ck_a.cmp(ck_b)
})
}
});
Ok(merged)
}
fn is_cell_tombstone(cell: &CellData) -> bool {
matches!(
cell.value,
crate::types::Value::Tombstone(ref info)
if info.tombstone_type == crate::types::TombstoneType::CellTombstone
)
}
fn reconcile_cluster(
clustering_key: Option<ClusteringKey>,
cluster_rows: Vec<MergeEntry>,
) -> Option<MergeEntry> {
use std::collections::HashMap;
let mut key = None;
let mut run_index = usize::MAX;
let mut row_del: Option<i64> = None;
let mut order: Vec<String> = Vec::new();
let mut winners: HashMap<String, CellData> = HashMap::new();
for entry in &cluster_rows {
if key.is_none() {
key = Some(entry.key.clone());
}
run_index = run_index.min(entry.run_index);
match &entry.row_data {
RowData::Tombstone { deletion_time, .. } => {
row_del = Some(row_del.map_or(*deletion_time, |d| d.max(*deletion_time)));
}
RowData::Live { cells } => {
for cell in cells {
match winners.get(&cell.column) {
None => {
order.push(cell.column.clone());
winners.insert(cell.column.clone(), cell.clone());
}
Some(existing) => {
let replace = cell.timestamp > existing.timestamp
|| (cell.timestamp == existing.timestamp
&& Self::is_cell_tombstone(cell)
&& !Self::is_cell_tombstone(existing));
if replace {
winners.insert(cell.column.clone(), cell.clone());
}
}
}
}
}
}
}
let key = key?;
let surviving: Vec<CellData> = order
.into_iter()
.filter_map(|col| winners.remove(&col))
.filter(|cell| match row_del {
Some(d) => cell.timestamp > d,
None => true,
})
.collect();
match surviving.iter().map(|c| c.timestamp).max() {
Some(row_ts) => Some(MergeEntry::new(
run_index,
key,
clustering_key,
row_ts,
RowData::Live { cells: surviving },
)),
None => row_del.map(|deletion_time| {
MergeEntry::new(
run_index,
key,
clustering_key,
deletion_time,
RowData::Tombstone {
deletion_time,
local_deletion_time: 0,
},
)
}),
}
}
pub(crate) fn merge_entry_to_mutation(
entry: MergeEntry,
schema: &TableSchema,
) -> Result<crate::storage::write_engine::mutation::Mutation> {
use crate::storage::write_engine::mutation::{
CellOperation, Mutation, PartitionKey, TableId,
};
let partition_key = PartitionKey::from_bytes(&entry.key.key, schema)?;
let table_id = TableId::new(&schema.keyspace, &schema.table);
let operations = match entry.row_data {
RowData::Live { cells } => cells
.into_iter()
.map(|cell| {
if matches!(
cell.value,
crate::types::Value::Tombstone(ref info)
if info.tombstone_type == crate::types::TombstoneType::CellTombstone
) {
return CellOperation::Delete {
column: cell.column,
};
}
if let Some(ttl) = cell.ttl {
CellOperation::WriteWithTtl {
column: cell.column,
value: cell.value,
ttl_seconds: ttl,
}
} else {
CellOperation::Write {
column: cell.column,
value: cell.value,
}
}
})
.collect(),
RowData::Tombstone { .. } => vec![CellOperation::DeleteRow],
};
Ok(Mutation::new(
table_id,
partition_key,
entry.clustering_key,
operations,
entry.timestamp,
None,
))
}
}
#[cfg(all(test, feature = "write-support"))]
mod tests {
use super::*;
use crate::storage::write_engine::mutation::DecoratedKey;
#[test]
fn test_merge_entry_ordering_by_token() {
let entry1 = MergeEntry::new(
0,
DecoratedKey::new(100, vec![1, 2, 3]),
None,
1000,
RowData::Live { cells: vec![] },
);
let entry2 = MergeEntry::new(
0,
DecoratedKey::new(200, vec![1, 2, 3]),
None,
1000,
RowData::Live { cells: vec![] },
);
assert!(entry1 < entry2);
assert!(entry2 > entry1);
}
#[test]
fn test_merge_entry_ordering_by_key_bytes() {
let entry1 = MergeEntry::new(
0,
DecoratedKey::new(100, vec![1, 2, 3]),
None,
1000,
RowData::Live { cells: vec![] },
);
let entry2 = MergeEntry::new(
0,
DecoratedKey::new(100, vec![1, 2, 4]),
None,
1000,
RowData::Live { cells: vec![] },
);
assert!(entry1 < entry2);
assert!(entry2 > entry1);
}
#[test]
fn test_merge_entry_ordering_by_run_index() {
let entry1 = MergeEntry::new(
0,
DecoratedKey::new(100, vec![1, 2, 3]),
None,
1000,
RowData::Live { cells: vec![] },
);
let entry2 = MergeEntry::new(
1,
DecoratedKey::new(100, vec![1, 2, 3]),
None,
1000,
RowData::Live { cells: vec![] },
);
assert!(entry1 < entry2);
assert!(entry2 > entry1);
}
#[test]
fn test_merge_entry_min_heap() {
use std::cmp::Reverse;
use std::collections::BinaryHeap;
let mut heap: BinaryHeap<Reverse<MergeEntry>> = BinaryHeap::new();
let entry3 = MergeEntry::new(
0,
DecoratedKey::new(300, vec![3]),
None,
1000,
RowData::Live { cells: vec![] },
);
let entry1 = MergeEntry::new(
0,
DecoratedKey::new(100, vec![1]),
None,
1000,
RowData::Live { cells: vec![] },
);
let entry2 = MergeEntry::new(
0,
DecoratedKey::new(200, vec![2]),
None,
1000,
RowData::Live { cells: vec![] },
);
heap.push(Reverse(entry3.clone()));
heap.push(Reverse(entry1.clone()));
heap.push(Reverse(entry2.clone()));
assert_eq!(heap.pop().unwrap().0.key.token, 100);
assert_eq!(heap.pop().unwrap().0.key.token, 200);
assert_eq!(heap.pop().unwrap().0.key.token, 300);
}
#[test]
fn test_row_data_variants() {
let live = RowData::Live {
cells: vec![CellData {
column: "name".to_string(),
value: Value::Text("Alice".to_string()),
timestamp: 1000,
ttl: None,
}],
};
match live {
RowData::Live { cells } => {
assert_eq!(cells.len(), 1);
assert_eq!(cells[0].column, "name");
}
_ => panic!("Expected Live variant"),
}
let tombstone = RowData::Tombstone {
deletion_time: 2000,
local_deletion_time: 1000,
};
match tombstone {
RowData::Tombstone {
deletion_time,
local_deletion_time,
} => {
assert_eq!(deletion_time, 2000);
assert_eq!(local_deletion_time, 1000);
}
_ => panic!("Expected Tombstone variant"),
}
}
#[test]
fn test_cell_data_creation() {
let cell = CellData {
column: "age".to_string(),
value: Value::Integer(30),
timestamp: 1234567890,
ttl: Some(3600),
};
assert_eq!(cell.column, "age");
assert_eq!(cell.value, Value::Integer(30));
assert_eq!(cell.timestamp, 1234567890);
assert_eq!(cell.ttl, Some(3600));
}
#[test]
fn test_merge_stats_creation() {
let stats = MergeStats {
input_files: 5,
output_partitions: 1000,
output_rows: 5000,
bytes_written: 1024 * 1024,
elapsed: Duration::from_secs(10),
};
assert_eq!(stats.input_files, 5);
assert_eq!(stats.output_partitions, 1000);
assert_eq!(stats.output_rows, 5000);
assert_eq!(stats.bytes_written, 1024 * 1024);
assert_eq!(stats.elapsed.as_secs(), 10);
}
#[test]
fn test_run_reader_estimate_entry_size() {
let entry = MergeEntry::new(
0,
DecoratedKey::new(100, vec![1, 2, 3, 4]),
None,
1000,
RowData::Live {
cells: vec![CellData {
column: "name".to_string(),
value: Value::Text("Alice".to_string()),
timestamp: 1000,
ttl: None,
}],
},
);
let size = RunReader::estimate_entry_size(&entry);
let expected_min_size = std::mem::size_of::<MergeEntry>() + 4;
assert!(size >= expected_min_size);
}
#[test]
fn test_kway_merger_empty_input() {
use crate::schema::{KeyColumn, TableSchema};
use std::collections::HashMap;
let schema = TableSchema {
keyspace: "test_ks".to_string(),
table: "test_table".to_string(),
partition_keys: vec![KeyColumn {
name: "id".to_string(),
data_type: "int".to_string(),
position: 0,
}],
clustering_keys: vec![],
columns: vec![],
comments: HashMap::new(),
};
let result = KWayMerger::new(vec![], &schema);
assert!(result.is_err());
if let Err(Error::InvalidInput(msg)) = result {
assert!(msg.contains("at least one input file"));
} else {
panic!("Expected InvalidInput error");
}
}
#[test]
fn test_merge_entry_equal_timestamps_prefer_lower_run_index() {
let entry_run0 = MergeEntry::new(
0, DecoratedKey::new(100, vec![1, 2, 3]),
None,
1000, RowData::Live {
cells: vec![CellData {
column: "name".to_string(),
value: Value::Text("Newer".to_string()),
timestamp: 1000,
ttl: None,
}],
},
);
let entry_run1 = MergeEntry::new(
1, DecoratedKey::new(100, vec![1, 2, 3]),
None,
1000, RowData::Live {
cells: vec![CellData {
column: "name".to_string(),
value: Value::Text("Older".to_string()),
timestamp: 1000,
ttl: None,
}],
},
);
assert!(entry_run0 < entry_run1);
}
#[test]
fn test_merge_entry_tombstone() {
let tombstone_entry = MergeEntry::new(
0,
DecoratedKey::new(100, vec![1, 2, 3]),
None,
2000,
RowData::Tombstone {
deletion_time: 2000,
local_deletion_time: 1000,
},
);
match tombstone_entry.row_data {
RowData::Tombstone {
deletion_time,
local_deletion_time,
} => {
assert_eq!(deletion_time, 2000);
assert_eq!(local_deletion_time, 1000);
}
_ => panic!("Expected Tombstone"),
}
}
#[test]
fn test_real_merger_delete_wins_at_equal_timestamp() {
use crate::schema::{Column, KeyColumn, TableSchema};
use std::collections::HashMap;
let schema = TableSchema {
keyspace: "reconcile_ks".to_string(),
table: "reconcile_tbl".to_string(),
partition_keys: vec![KeyColumn {
name: "id".to_string(),
data_type: "int".to_string(),
position: 0,
}],
clustering_keys: vec![],
columns: vec![Column {
name: "value".to_string(),
data_type: "text".to_string(),
nullable: true,
default: None,
is_static: false,
}],
comments: HashMap::new(),
};
const EQUAL_TS: i64 = 1_700_000_000_000_000;
let partition_key = DecoratedKey::new(100, vec![0, 0, 0, 1]);
let live_entry = MergeEntry::new(
0,
partition_key.clone(),
None,
EQUAL_TS,
RowData::Live {
cells: vec![CellData {
column: "value".to_string(),
value: Value::Text("survivor-if-buggy".to_string()),
timestamp: EQUAL_TS,
ttl: None,
}],
},
);
let tombstone_entry = MergeEntry::new(
1,
partition_key.clone(),
None,
EQUAL_TS,
RowData::Tombstone {
deletion_time: EQUAL_TS,
local_deletion_time: 2_000_000,
},
);
let merger = KWayMerger {
runs: vec![],
heap: BinaryHeap::new(),
current_partition: None,
schema,
};
let merged = merger
.merge_partition_rows(vec![live_entry, tombstone_entry])
.expect("merge_partition_rows must not fail");
assert_eq!(merged.len(), 1, "one clustering key => one merged winner");
assert!(
matches!(merged[0].row_data, RowData::Tombstone { .. }),
"At equal timestamp the tombstone must win even though the live row is in \
the newer file (run_index 0). Got a live row => the equal-ts tiebreak \
reverted to run_index (Issue #498 regression)."
);
}
#[test]
fn test_real_merger_disjoint_columns_survive_compaction() {
use crate::schema::{ClusteringColumn, Column, KeyColumn, TableSchema};
use std::collections::HashMap;
let schema = TableSchema {
keyspace: "disjoint_ks".to_string(),
table: "disjoint_tbl".to_string(),
partition_keys: vec![KeyColumn {
name: "pk".to_string(),
data_type: "int".to_string(),
position: 0,
}],
clustering_keys: vec![ClusteringColumn {
name: "ck".to_string(),
data_type: "int".to_string(),
position: 0,
order: Default::default(),
}],
columns: vec![
Column {
name: "name".to_string(),
data_type: "text".to_string(),
nullable: true,
default: None,
is_static: false,
},
Column {
name: "score".to_string(),
data_type: "int".to_string(),
nullable: true,
default: None,
is_static: false,
},
],
comments: HashMap::new(),
};
let partition_key = DecoratedKey::new(100, vec![0, 0, 0, 1]);
let ck = ClusteringKey {
columns: vec![("ck".to_string(), Value::Integer(1))],
};
let entry_a = MergeEntry::new(
1,
partition_key.clone(),
Some(ck.clone()),
100,
RowData::Live {
cells: vec![CellData {
column: "name".to_string(),
value: Value::Text("alice".to_string()),
timestamp: 100,
ttl: None,
}],
},
);
let entry_b = MergeEntry::new(
0,
partition_key.clone(),
Some(ck.clone()),
200,
RowData::Live {
cells: vec![CellData {
column: "score".to_string(),
value: Value::Integer(42),
timestamp: 200,
ttl: None,
}],
},
);
let merger = KWayMerger {
runs: vec![],
heap: BinaryHeap::new(),
current_partition: None,
schema,
};
let merged = merger
.merge_partition_rows(vec![entry_b, entry_a])
.expect("merge_partition_rows must not fail");
assert_eq!(merged.len(), 1, "one clustering key => one merged row");
let cells = match &merged[0].row_data {
RowData::Live { cells } => cells,
other => panic!("expected a Live merged row, got {:?}", other),
};
let name = cells.iter().find(|c| c.column == "name");
let score = cells.iter().find(|c| c.column == "score");
assert!(
name.is_some(),
"disjoint column `name` from the older file was DROPPED — per-cell \
reconcile regression (Issue #533). Old whole-row-wins code fails here."
);
assert!(
score.is_some(),
"disjoint column `score` from the newer file is missing"
);
assert_eq!(
name.unwrap().value,
Value::Text("alice".to_string()),
"`name` must carry A's value"
);
assert_eq!(
score.unwrap().value,
Value::Integer(42),
"`score` must carry B's value"
);
assert_eq!(
merged[0].timestamp, 200,
"merged row timestamp must be the max surviving cell timestamp"
);
}
#[test]
fn test_real_merger_cell_tombstone_beats_live_at_equal_timestamp() {
use crate::schema::{ClusteringColumn, Column, KeyColumn, TableSchema};
use crate::types::{TombstoneInfo, TombstoneType};
use std::collections::HashMap;
let schema = TableSchema {
keyspace: "ct_ks".to_string(),
table: "ct_tbl".to_string(),
partition_keys: vec![KeyColumn {
name: "pk".to_string(),
data_type: "int".to_string(),
position: 0,
}],
clustering_keys: vec![ClusteringColumn {
name: "ck".to_string(),
data_type: "int".to_string(),
position: 0,
order: Default::default(),
}],
columns: vec![Column {
name: "score".to_string(),
data_type: "int".to_string(),
nullable: true,
default: None,
is_static: false,
}],
comments: HashMap::new(),
};
let partition_key = DecoratedKey::new(100, vec![0, 0, 0, 1]);
let ck = ClusteringKey {
columns: vec![("ck".to_string(), Value::Integer(1))],
};
let entry_a = MergeEntry::new(
0,
partition_key.clone(),
Some(ck.clone()),
100,
RowData::Live {
cells: vec![CellData {
column: "score".to_string(),
value: Value::Integer(42),
timestamp: 100,
ttl: None,
}],
},
);
let entry_b = MergeEntry::new(
1,
partition_key.clone(),
Some(ck.clone()),
100,
RowData::Live {
cells: vec![CellData {
column: "score".to_string(),
value: Value::Tombstone(TombstoneInfo {
deletion_time: 100,
tombstone_type: TombstoneType::CellTombstone,
ttl: None,
range_start: None,
range_end: None,
}),
timestamp: 100,
ttl: None,
}],
},
);
let merger = KWayMerger {
runs: vec![],
heap: BinaryHeap::new(),
current_partition: None,
schema,
};
let merged = merger
.merge_partition_rows(vec![entry_a, entry_b])
.expect("merge_partition_rows must not fail");
assert_eq!(merged.len(), 1, "one clustering key => one merged row");
let cells = match &merged[0].row_data {
RowData::Live { cells } => cells,
other => panic!("expected a Live merged row, got {:?}", other),
};
let score = cells
.iter()
.find(|c| c.column == "score")
.expect("score cell must be present (as a tombstone)");
assert!(
matches!(
score.value,
Value::Tombstone(ref info) if info.tombstone_type == TombstoneType::CellTombstone
),
"at equal ts the cell tombstone must win over the live value (got {:?}) — \
a recency-only tiebreak would have kept the newer file's live 42 (#498 per cell)",
score.value
);
}
#[test]
fn test_real_merger_same_column_conflict_resolves_by_timestamp() {
use crate::schema::{Column, KeyColumn, TableSchema};
use std::collections::HashMap;
let schema = TableSchema {
keyspace: "conflict_ks".to_string(),
table: "conflict_tbl".to_string(),
partition_keys: vec![KeyColumn {
name: "pk".to_string(),
data_type: "int".to_string(),
position: 0,
}],
clustering_keys: vec![],
columns: vec![
Column {
name: "name".to_string(),
data_type: "text".to_string(),
nullable: true,
default: None,
is_static: false,
},
Column {
name: "extra".to_string(),
data_type: "text".to_string(),
nullable: true,
default: None,
is_static: false,
},
],
comments: HashMap::new(),
};
let partition_key = DecoratedKey::new(100, vec![0, 0, 0, 1]);
let entry_a = MergeEntry::new(
1,
partition_key.clone(),
None,
100,
RowData::Live {
cells: vec![
CellData {
column: "name".to_string(),
value: Value::Text("old".to_string()),
timestamp: 100,
ttl: None,
},
CellData {
column: "extra".to_string(),
value: Value::Text("a-only".to_string()),
timestamp: 100,
ttl: None,
},
],
},
);
let entry_b = MergeEntry::new(
0,
partition_key.clone(),
None,
200,
RowData::Live {
cells: vec![CellData {
column: "name".to_string(),
value: Value::Text("new".to_string()),
timestamp: 200,
ttl: None,
}],
},
);
let merger = KWayMerger {
runs: vec![],
heap: BinaryHeap::new(),
current_partition: None,
schema,
};
let merged = merger
.merge_partition_rows(vec![entry_b, entry_a])
.expect("merge_partition_rows must not fail");
assert_eq!(merged.len(), 1);
let cells = match &merged[0].row_data {
RowData::Live { cells } => cells,
other => panic!("expected Live, got {:?}", other),
};
let name = cells
.iter()
.find(|c| c.column == "name")
.expect("name present");
let extra = cells
.iter()
.find(|c| c.column == "extra")
.expect("extra (disjoint) must survive");
assert_eq!(
name.value,
Value::Text("new".to_string()),
"same-column conflict must resolve to the higher-timestamp value"
);
assert_eq!(
extra.value,
Value::Text("a-only".to_string()),
"disjoint column from the older file must survive the conflict merge"
);
}
#[test]
fn test_real_merger_row_tombstone_shadows_old_cells_keeps_new() {
use crate::schema::{Column, KeyColumn, TableSchema};
use std::collections::HashMap;
let schema = TableSchema {
keyspace: "shadow_ks".to_string(),
table: "shadow_tbl".to_string(),
partition_keys: vec![KeyColumn {
name: "pk".to_string(),
data_type: "int".to_string(),
position: 0,
}],
clustering_keys: vec![],
columns: vec![
Column {
name: "name".to_string(),
data_type: "text".to_string(),
nullable: true,
default: None,
is_static: false,
},
Column {
name: "score".to_string(),
data_type: "int".to_string(),
nullable: true,
default: None,
is_static: false,
},
],
comments: HashMap::new(),
};
let pk = DecoratedKey::new(100, vec![0, 0, 0, 1]);
let entry_a = MergeEntry::new(
2,
pk.clone(),
None,
100,
RowData::Live {
cells: vec![CellData {
column: "name".to_string(),
value: Value::Text("old".to_string()),
timestamp: 100,
ttl: None,
}],
},
);
let entry_b = MergeEntry::new(
1,
pk.clone(),
None,
200,
RowData::Tombstone {
deletion_time: 200,
local_deletion_time: 0,
},
);
let entry_c = MergeEntry::new(
0,
pk.clone(),
None,
300,
RowData::Live {
cells: vec![CellData {
column: "score".to_string(),
value: Value::Integer(7),
timestamp: 300,
ttl: None,
}],
},
);
let merger = KWayMerger {
runs: vec![],
heap: BinaryHeap::new(),
current_partition: None,
schema,
};
let merged = merger
.merge_partition_rows(vec![entry_c, entry_b, entry_a])
.expect("merge must not fail");
assert_eq!(merged.len(), 1);
let cells = match &merged[0].row_data {
RowData::Live { cells } => cells,
other => panic!(
"expected Live (score survives the tombstone), got {:?}",
other
),
};
assert!(
cells.iter().all(|c| c.column != "name"),
"`name` (ts=100 <= row_del=200) must be shadowed by the row tombstone"
);
let score = cells
.iter()
.find(|c| c.column == "score")
.expect("`score` (ts=300 > row_del=200) must survive the row tombstone");
assert_eq!(score.value, Value::Integer(7));
}
#[test]
fn test_real_merger_row_tombstone_only_emits_tombstone() {
use crate::schema::{Column, KeyColumn, TableSchema};
use std::collections::HashMap;
let schema = TableSchema {
keyspace: "ts_only_ks".to_string(),
table: "ts_only_tbl".to_string(),
partition_keys: vec![KeyColumn {
name: "pk".to_string(),
data_type: "int".to_string(),
position: 0,
}],
clustering_keys: vec![],
columns: vec![Column {
name: "name".to_string(),
data_type: "text".to_string(),
nullable: true,
default: None,
is_static: false,
}],
comments: HashMap::new(),
};
let pk = DecoratedKey::new(100, vec![0, 0, 0, 1]);
let live = MergeEntry::new(
1,
pk.clone(),
None,
100,
RowData::Live {
cells: vec![CellData {
column: "name".to_string(),
value: Value::Text("doomed".to_string()),
timestamp: 100,
ttl: None,
}],
},
);
let tomb = MergeEntry::new(
0,
pk.clone(),
None,
300,
RowData::Tombstone {
deletion_time: 300,
local_deletion_time: 0,
},
);
let merger = KWayMerger {
runs: vec![],
heap: BinaryHeap::new(),
current_partition: None,
schema,
};
let merged = merger
.merge_partition_rows(vec![tomb, live])
.expect("merge must not fail");
assert_eq!(merged.len(), 1);
match &merged[0].row_data {
RowData::Tombstone { deletion_time, .. } => {
assert_eq!(*deletion_time, 300, "tombstone deletion_time preserved");
}
other => panic!("expected a Tombstone entry, got {:?}", other),
}
}
#[test]
fn test_merge_step_variants() {
let key = DecoratedKey::new(100, vec![1, 2, 3]);
let rows = vec![];
let partition_step = MergeStep::Partition { key, rows };
match partition_step {
MergeStep::Partition { key, rows } => {
assert_eq!(key.token, 100);
assert_eq!(rows.len(), 0);
}
_ => panic!("Expected Partition variant"),
}
let complete_step = MergeStep::Complete;
match complete_step {
MergeStep::Complete => {}
_ => panic!("Expected Complete variant"),
}
}
#[test]
fn test_cell_merge_last_write_wins_higher_timestamp() {
let cell1 = CellData {
column: "name".to_string(),
value: Value::Text("Old".to_string()),
timestamp: 1000,
ttl: None,
};
let cell2 = CellData {
column: "name".to_string(),
value: Value::Text("New".to_string()),
timestamp: 2000, ttl: None,
};
assert!(cell2.timestamp > cell1.timestamp);
}
#[test]
fn test_memory_budget_calculation() {
let k = 10;
let buffer_size_per_run = RunReader::DEFAULT_BUFFER_SIZE;
let total_memory = k * buffer_size_per_run;
assert_eq!(buffer_size_per_run, 8 * 1024); assert_eq!(total_memory, 80 * 1024); }
#[test]
fn test_merge_entry_to_mutation_live_cells() {
use crate::schema::{KeyColumn, TableSchema};
use crate::storage::write_engine::mutation::{CellOperation, DecoratedKey};
use std::collections::HashMap;
let schema = TableSchema {
keyspace: "test_ks".to_string(),
table: "test_table".to_string(),
partition_keys: vec![KeyColumn {
name: "id".to_string(),
data_type: "int".to_string(),
position: 0,
}],
clustering_keys: vec![],
columns: vec![],
comments: HashMap::new(),
};
let key_bytes = 42i32.to_be_bytes().to_vec();
let entry = MergeEntry::new(
0,
DecoratedKey::new(1000, key_bytes),
None,
999_000_000,
RowData::Live {
cells: vec![
CellData {
column: "name".to_string(),
value: Value::Text("Alice".to_string()),
timestamp: 999_000_000,
ttl: None,
},
CellData {
column: "age".to_string(),
value: Value::Integer(30),
timestamp: 999_000_000,
ttl: Some(3600),
},
],
},
);
let mutation =
KWayMerger::merge_entry_to_mutation(entry, &schema).expect("conversion should succeed");
assert_eq!(mutation.partition_key.columns.len(), 1);
assert_eq!(mutation.partition_key.columns[0].0, "id");
assert_eq!(mutation.operations.len(), 2);
assert_eq!(mutation.timestamp_micros, 999_000_000);
let has_write = mutation
.operations
.iter()
.any(|op| matches!(op, CellOperation::Write { column, .. } if column == "name"));
let has_ttl_write = mutation.operations.iter().any(|op| {
matches!(op, CellOperation::WriteWithTtl { column, ttl_seconds, .. }
if column == "age" && *ttl_seconds == 3600)
});
assert!(has_write, "Expected Write operation for 'name'");
assert!(has_ttl_write, "Expected WriteWithTtl operation for 'age'");
}
#[test]
fn test_merge_entry_to_mutation_tombstone() {
use crate::schema::{KeyColumn, TableSchema};
use crate::storage::write_engine::mutation::{CellOperation, DecoratedKey};
use std::collections::HashMap;
let schema = TableSchema {
keyspace: "test_ks".to_string(),
table: "test_table".to_string(),
partition_keys: vec![KeyColumn {
name: "id".to_string(),
data_type: "int".to_string(),
position: 0,
}],
clustering_keys: vec![],
columns: vec![],
comments: HashMap::new(),
};
let key_bytes = 7i32.to_be_bytes().to_vec();
let entry = MergeEntry::new(
0,
DecoratedKey::new(500, key_bytes),
None,
888_000_000,
RowData::Tombstone {
deletion_time: 888_000_000,
local_deletion_time: 1_700_000_000,
},
);
let mutation =
KWayMerger::merge_entry_to_mutation(entry, &schema).expect("conversion should succeed");
assert_eq!(mutation.operations.len(), 1);
assert!(
matches!(mutation.operations[0], CellOperation::DeleteRow),
"Expected DeleteRow operation for tombstone entry"
);
}
}
#[cfg(all(test, feature = "write-support"))]
mod merge_property_tests {
use super::*;
use proptest::prelude::*;
use std::collections::HashMap;
const MERGE_TIME_SECS: i32 = 1_000;
#[derive(Debug, Clone)]
enum CellOp {
Write {
timestamp: i64,
local_deletion_time: Option<i32>,
},
Delete { timestamp: i64 },
RangeTombstone {
start_ck: u8,
end_ck: u8,
marked_for_delete_at: i64,
},
}
#[derive(Debug, Clone)]
struct CellInput {
partition: u8,
clustering: u8,
column: u8,
op: CellOp,
}
type CellKey = (u8, u8, u8);
#[derive(Debug, Clone, PartialEq, Eq)]
enum MergedCell {
Live { timestamp: i64 },
Dead { timestamp: i64 },
}
fn reference_merge(inputs: &[CellInput]) -> HashMap<CellKey, MergedCell> {
let mut per_slot: HashMap<CellKey, MergedCell> = HashMap::new();
let mut range_tombstones: Vec<CellInput> = Vec::new();
for ci in inputs {
match &ci.op {
CellOp::RangeTombstone { .. } => {
range_tombstones.push(ci.clone());
}
CellOp::Write {
timestamp,
local_deletion_time,
} => {
if local_deletion_time
.map(|ldt| ldt < MERGE_TIME_SECS)
.unwrap_or(false)
{
continue;
}
let key = (ci.partition, ci.clustering, ci.column);
let candidate = MergedCell::Live {
timestamp: *timestamp,
};
per_slot
.entry(key)
.and_modify(|existing| {
match existing {
MergedCell::Live { timestamp: ex_ts } => {
if *timestamp > *ex_ts {
*existing = candidate.clone();
}
}
MergedCell::Dead { timestamp: ex_ts } => {
if *timestamp > *ex_ts {
*existing = candidate.clone();
}
}
}
})
.or_insert(candidate);
}
CellOp::Delete { timestamp } => {
let key = (ci.partition, ci.clustering, ci.column);
let candidate = MergedCell::Dead {
timestamp: *timestamp,
};
per_slot
.entry(key)
.and_modify(|existing| {
match existing {
MergedCell::Live { timestamp: ex_ts } => {
if *timestamp >= *ex_ts {
*existing = candidate.clone();
}
}
MergedCell::Dead { timestamp: ex_ts } => {
if *timestamp > *ex_ts {
*existing = candidate.clone();
}
}
}
})
.or_insert(candidate);
}
}
}
per_slot.retain(|&(pk, ck, _col), cell| {
for rt in &range_tombstones {
if rt.partition != pk {
continue;
}
if let CellOp::RangeTombstone {
start_ck,
end_ck,
marked_for_delete_at,
} = rt.op
{
if ck >= start_ck && ck <= end_ck {
if let MergedCell::Live { timestamp } = cell {
if marked_for_delete_at >= *timestamp {
return false; }
}
}
}
}
true
});
per_slot
}
fn arb_timestamp() -> impl Strategy<Value = i64> {
1i64..=20i64
}
fn arb_local_deletion_time() -> impl Strategy<Value = Option<i32>> {
prop_oneof![
3 => Just(None), 1 => (990i32..=999i32).prop_map(Some), 1 => (1000i32..=1010i32).prop_map(Some), ]
}
fn arb_cell_op() -> impl Strategy<Value = CellOp> {
prop_oneof![
5 => (arb_timestamp(), arb_local_deletion_time())
.prop_map(|(ts, ldt)| CellOp::Write {
timestamp: ts,
local_deletion_time: ldt,
}),
3 => arb_timestamp().prop_map(|ts| CellOp::Delete { timestamp: ts }),
2 => (0u8..=3u8, 0u8..=3u8, arb_timestamp()).prop_map(|(s, e, ts)| {
let (start_ck, end_ck) = if s <= e { (s, e) } else { (e, s) };
CellOp::RangeTombstone {
start_ck,
end_ck,
marked_for_delete_at: ts,
}
}),
]
}
fn arb_cell_input() -> impl Strategy<Value = CellInput> {
(0u8..4u8, 0u8..4u8, 0u8..3u8, arb_cell_op()).prop_map(
|(partition, clustering, column, op)| CellInput {
partition,
clustering,
column,
op,
},
)
}
fn arb_cell_stream() -> impl Strategy<Value = Vec<CellInput>> {
prop::collection::vec(arb_cell_input(), 4..=32)
}
fn sorted_keys(m: &HashMap<CellKey, MergedCell>) -> Vec<(CellKey, MergedCell)> {
let mut v: Vec<_> = m.iter().map(|(&k, v)| (k, v.clone())).collect();
v.sort_by_key(|(k, _)| *k);
v
}
#[test]
fn ref_tombstone_shadows_earlier_write() {
let inputs = vec![
CellInput {
partition: 0,
clustering: 0,
column: 0,
op: CellOp::Write {
timestamp: 5,
local_deletion_time: None,
},
},
CellInput {
partition: 0,
clustering: 0,
column: 0,
op: CellOp::Delete { timestamp: 10 },
},
];
let result = reference_merge(&inputs);
assert_eq!(
result.get(&(0, 0, 0)),
Some(&MergedCell::Dead { timestamp: 10 }),
"Delete(ts=10) must shadow Write(ts=5)"
);
}
#[test]
fn ref_write_not_shadowed_by_older_tombstone() {
let inputs = vec![
CellInput {
partition: 0,
clustering: 0,
column: 0,
op: CellOp::Write {
timestamp: 10,
local_deletion_time: None,
},
},
CellInput {
partition: 0,
clustering: 0,
column: 0,
op: CellOp::Delete { timestamp: 5 },
},
];
let result = reference_merge(&inputs);
assert_eq!(
result.get(&(0, 0, 0)),
Some(&MergedCell::Live { timestamp: 10 }),
"Write(ts=10) must win over Delete(ts=5)"
);
}
#[test]
fn ref_delete_wins_at_equal_timestamp() {
let inputs = vec![
CellInput {
partition: 0,
clustering: 0,
column: 0,
op: CellOp::Write {
timestamp: 5,
local_deletion_time: None,
},
},
CellInput {
partition: 0,
clustering: 0,
column: 0,
op: CellOp::Delete { timestamp: 5 },
},
];
let result = reference_merge(&inputs);
assert_eq!(
result.get(&(0, 0, 0)),
Some(&MergedCell::Dead { timestamp: 5 }),
"Delete must win at equal timestamp (Cassandra reconcile rule)"
);
}
#[test]
fn ref_expired_ttl_drops_cell() {
let inputs = vec![CellInput {
partition: 0,
clustering: 0,
column: 0,
op: CellOp::Write {
timestamp: 5,
local_deletion_time: Some(500), },
}];
let result = reference_merge(&inputs);
assert!(
!result.contains_key(&(0, 0, 0)),
"Expired TTL cell must be absent from merged output"
);
}
#[test]
fn ref_live_ttl_keeps_cell() {
let inputs = vec![CellInput {
partition: 0,
clustering: 0,
column: 0,
op: CellOp::Write {
timestamp: 5,
local_deletion_time: Some(1500), },
}];
let result = reference_merge(&inputs);
assert_eq!(
result.get(&(0, 0, 0)),
Some(&MergedCell::Live { timestamp: 5 }),
"Non-expired TTL cell must be present"
);
}
#[test]
fn ref_range_tombstone_suppresses_row_in_range() {
let inputs = vec![
CellInput {
partition: 0,
clustering: 2,
column: 0,
op: CellOp::Write {
timestamp: 5,
local_deletion_time: None,
},
},
CellInput {
partition: 0,
clustering: 0, column: 0,
op: CellOp::RangeTombstone {
start_ck: 0,
end_ck: 5,
marked_for_delete_at: 10,
},
},
];
let result = reference_merge(&inputs);
assert!(
!result.contains_key(&(0, 2, 0)),
"Cell with ts=5 at clustering=2 must be suppressed by RangeTombstone(mfda=10, [0,5])"
);
}
#[test]
fn ref_range_tombstone_does_not_suppress_newer_write() {
let inputs = vec![
CellInput {
partition: 0,
clustering: 2,
column: 0,
op: CellOp::Write {
timestamp: 15,
local_deletion_time: None,
},
},
CellInput {
partition: 0,
clustering: 0,
column: 0,
op: CellOp::RangeTombstone {
start_ck: 0,
end_ck: 5,
marked_for_delete_at: 10,
},
},
];
let result = reference_merge(&inputs);
assert_eq!(
result.get(&(0, 2, 0)),
Some(&MergedCell::Live { timestamp: 15 }),
"Write(ts=15) must NOT be suppressed by RangeTombstone(mfda=10)"
);
}
#[test]
fn ref_range_tombstone_only_applies_within_partition() {
let inputs = vec![
CellInput {
partition: 1,
clustering: 2,
column: 0,
op: CellOp::Write {
timestamp: 5,
local_deletion_time: None,
},
},
CellInput {
partition: 0,
clustering: 0,
column: 0,
op: CellOp::RangeTombstone {
start_ck: 0,
end_ck: 5,
marked_for_delete_at: 10,
},
},
];
let result = reference_merge(&inputs);
assert_eq!(
result.get(&(1, 2, 0)),
Some(&MergedCell::Live { timestamp: 5 }),
"RangeTombstone in partition 0 must not affect partition 1"
);
}
proptest! {
#![proptest_config(ProptestConfig::with_cases(64))]
#[test]
fn prop_tombstone_shadowing_consistent(inputs in arb_cell_stream()) {
let merged = reference_merge(&inputs);
for (&(pk, ck, col), cell) in &merged {
if let MergedCell::Dead { timestamp: dead_ts } = cell {
let best_delete = inputs.iter()
.filter(|ci| ci.partition == pk && ci.clustering == ck && ci.column == col)
.filter_map(|ci| {
if let CellOp::Delete { timestamp } = ci.op {
Some(timestamp)
} else {
None
}
})
.max();
prop_assert!(
best_delete.is_some(),
"Dead cell at ({},{},{}) but no Delete in inputs",
pk, ck, col
);
prop_assert_eq!(
best_delete.unwrap(),
*dead_ts,
"Dead cell timestamp must equal best Delete timestamp for ({},{},{})",
pk, ck, col
);
}
}
}
#[test]
fn prop_ttl_expiry_no_expired_live_cells(inputs in arb_cell_stream()) {
let merged = reference_merge(&inputs);
for (&(pk, ck, col), cell) in &merged {
if let MergedCell::Live { .. } = cell {
let has_live_write = inputs.iter()
.filter(|ci| ci.partition == pk && ci.clustering == ck && ci.column == col)
.any(|ci| {
if let CellOp::Write { local_deletion_time, .. } = &ci.op {
local_deletion_time
.map(|ldt| ldt >= MERGE_TIME_SECS)
.unwrap_or(true)
} else {
false
}
});
prop_assert!(
has_live_write,
"Live cell at ({},{},{}) but all writes are expired",
pk, ck, col
);
}
}
}
#[test]
fn prop_range_tombstone_suppresses_covered_live_cells(inputs in arb_cell_stream()) {
let merged = reference_merge(&inputs);
let range_tombstones: Vec<(u8, u8, u8, i64)> = inputs.iter()
.filter_map(|ci| {
if let CellOp::RangeTombstone { start_ck, end_ck, marked_for_delete_at } = ci.op {
Some((ci.partition, start_ck, end_ck, marked_for_delete_at))
} else {
None
}
})
.collect();
for (&(pk, ck, _col), cell) in &merged {
if let MergedCell::Live { timestamp } = cell {
for &(rt_pk, start_ck, end_ck, mfda) in &range_tombstones {
if rt_pk == pk && ck >= start_ck && ck <= end_ck && mfda >= *timestamp {
prop_assert!(
false,
"Live cell at ({},{}) ts={} should be suppressed by \
RangeTombstone(part={}, [{},{}], mfda={})",
pk, ck, timestamp, rt_pk, start_ck, end_ck, mfda
);
}
}
}
}
}
#[test]
fn prop_live_cell_has_max_write_timestamp(inputs in arb_cell_stream()) {
let merged = reference_merge(&inputs);
for (&(pk, ck, col), cell) in &merged {
if let MergedCell::Live { timestamp: live_ts } = cell {
let max_ts = inputs.iter()
.filter(|ci| ci.partition == pk && ci.clustering == ck && ci.column == col)
.filter_map(|ci| {
if let CellOp::Write { timestamp, local_deletion_time } = &ci.op {
let not_expired = local_deletion_time
.map(|ldt| ldt >= MERGE_TIME_SECS)
.unwrap_or(true);
if not_expired { Some(*timestamp) } else { None }
} else {
None
}
})
.max();
prop_assert_eq!(
max_ts,
Some(*live_ts),
"Live cell at ({},{},{}) must have max non-expired write timestamp",
pk, ck, col
);
}
}
}
#[test]
fn prop_reference_merge_is_deterministic(inputs in arb_cell_stream()) {
let result_a = reference_merge(&inputs);
let result_b = reference_merge(&inputs);
prop_assert_eq!(
sorted_keys(&result_a),
sorted_keys(&result_b),
"reference_merge must be deterministic"
);
}
#[test]
fn prop_real_merger_lww_agrees_with_reference(
entries in prop::collection::vec(
// (clustering_key 0..4, run_index 0..2, timestamp 1..20)
(0u8..4u8, 0usize..2usize, 1i64..=20i64),
2..=12usize,
)
) {
use crate::schema::{Column, KeyColumn};
use std::collections::HashMap as SchemaMap;
let schema = TableSchema {
keyspace: "prop_test_ks".to_string(),
table: "prop_test_table".to_string(),
partition_keys: vec![KeyColumn {
name: "id".to_string(),
data_type: "int".to_string(),
position: 0,
}],
clustering_keys: vec![],
columns: vec![Column {
name: "value".to_string(),
data_type: "text".to_string(),
nullable: true,
default: None,
is_static: false,
}],
comments: SchemaMap::new(),
};
let partition_key = DecoratedKey::new(100, vec![0, 0, 0, 1]);
let merge_entries: Vec<MergeEntry> = entries.iter().map(|&(ck, run_index, ts)| {
let ck_key = ClusteringKey {
columns: vec![("ck".to_string(), Value::TinyInt(ck as i8))],
};
MergeEntry::new(
run_index,
partition_key.clone(),
Some(ck_key),
ts,
RowData::Live {
cells: vec![CellData {
column: "value".to_string(),
value: Value::Integer(ts as i32),
timestamp: ts,
ttl: None,
}],
},
)
}).collect();
let merger = KWayMerger {
runs: vec![],
heap: std::collections::BinaryHeap::new(),
current_partition: None,
schema: schema.clone(),
};
let real_merged = merger.merge_partition_rows(merge_entries.clone())
.expect("merge_partition_rows must not fail");
let mut ref_map: HashMap<u8, (i64, usize)> = HashMap::new();
for &(ck, run_index, ts) in &entries {
ref_map.entry(ck)
.and_modify(|(best_ts, best_run)| {
if ts > *best_ts || (ts == *best_ts && run_index < *best_run) {
*best_ts = ts;
*best_run = run_index;
}
})
.or_insert((ts, run_index));
}
prop_assert_eq!(
real_merged.len(),
ref_map.len(),
"real merger output row count must match reference"
);
for entry in &real_merged {
let ck_byte = match entry.clustering_key.as_ref()
.and_then(|ck| ck.columns.first())
.map(|(_, v)| v)
{
Some(Value::TinyInt(b)) => *b as u8,
_ => {
prop_assert!(false, "unexpected clustering key value");
unreachable!()
}
};
let (ref_ts, _ref_run) = ref_map[&ck_byte];
prop_assert_eq!(
entry.timestamp,
ref_ts,
"real merger winner timestamp must match reference for ck={}",
ck_byte
);
}
}
#[test]
fn prop_real_merger_tombstone_vs_live(
ts_write in 1i64..=10i64,
ts_delete in 1i64..=20i64,
) {
use crate::schema::{Column, KeyColumn};
use std::collections::HashMap as SchemaMap;
let schema = TableSchema {
keyspace: "prop_test_ks".to_string(),
table: "prop_test_table".to_string(),
partition_keys: vec![KeyColumn {
name: "id".to_string(),
data_type: "int".to_string(),
position: 0,
}],
clustering_keys: vec![],
columns: vec![Column {
name: "value".to_string(),
data_type: "text".to_string(),
nullable: true,
default: None,
is_static: false,
}],
comments: SchemaMap::new(),
};
let partition_key = DecoratedKey::new(100, vec![0, 0, 0, 1]);
let ck = ClusteringKey {
columns: vec![("ck".to_string(), Value::TinyInt(0))],
};
let live_entry = MergeEntry::new(
0, partition_key.clone(),
Some(ck.clone()),
ts_write,
RowData::Live {
cells: vec![CellData {
column: "value".to_string(),
value: Value::Integer(42),
timestamp: ts_write,
ttl: None,
}],
},
);
let tombstone_entry = MergeEntry::new(
1, partition_key.clone(),
Some(ck.clone()),
ts_delete,
RowData::Tombstone {
deletion_time: ts_delete,
local_deletion_time: 2000,
},
);
let merger = KWayMerger {
runs: vec![],
heap: std::collections::BinaryHeap::new(),
current_partition: None,
schema: schema.clone(),
};
let merged = merger.merge_partition_rows(vec![live_entry, tombstone_entry])
.expect("merge_partition_rows must not fail");
prop_assert_eq!(merged.len(), 1, "one clustering key => one merged row");
let winner = &merged[0];
if ts_delete > ts_write {
prop_assert!(
matches!(winner.row_data, RowData::Tombstone { .. }),
"Tombstone(ts={}) must win over Live(ts={})",
ts_delete, ts_write
);
} else if ts_write > ts_delete {
prop_assert!(
matches!(winner.row_data, RowData::Live { .. }),
"Live(ts={}) must win over Tombstone(ts={})",
ts_write, ts_delete
);
} else {
prop_assert!(
matches!(winner.row_data, RowData::Tombstone { .. }),
"At equal ts={}, Tombstone must win over Live (Cassandra reconcile rule)",
ts_delete
);
}
}
}
}