cqlite_core/storage/write_engine/merge.rs
1//! K-way merge for combining multiple L0 SSTables
2//!
3//! Implements efficient k-way merge using a binary heap for producing
4//! compacted SSTables from multiple runs.
5//!
6//! ## Architecture
7//!
8//! The K-way merger uses a min-heap to efficiently merge k sorted SSTable
9//! runs into a single output SSTable. Each run maintains a peek buffer for
10//! efficient lookahead.
11//!
12//! ## Ordering
13//!
14//! The `Ord`/`PartialOrd` impl on `MergeEntry` governs **heap routing only**
15//! (which partition/clustering bucket an entry belongs to) — NOT winner
16//! selection. Winner selection among entries with the same clustering key is
17//! done by `merge_partition_rows` (see "Cell Merge Rule" below), which layers a
18//! timestamp + liveness comparison on top.
19//!
20//! Heap-routing order:
21//! 1. Token (ascending) - Primary partitioning
22//! 2. Key bytes (ascending) - Hash collision resolution
23//! 3. Clustering key (schema-aware) - Within partition ordering
24//! 4. Run index (ascending) - Stable tiebreak for routing (NOT the LWW rule)
25//!
26//! ## Memory Budget
27//!
28//! Total memory: k × 8KB peek buffers (where k = number of input SSTables)
29//! For 10 SSTables: ~80KB memory footprint
30//!
31//! ## Cell Merge Rule
32//!
33//! Last-write-wins by timestamp, following Cassandra `Cells#reconcile`:
34//! - Keep the entry with the highest timestamp.
35//! - If timestamps are equal, the tombstone (Delete) wins over a live entry,
36//! independent of which file it came from (Issue #498).
37//! - If timestamp AND liveness are equal, prefer the lower run_index (newer file).
38//!
39//! Implementation for M5.2 (Issue #382)
40
41#[cfg(feature = "write-support")]
42use crate::error::{Error, Result};
43#[cfg(feature = "write-support")]
44use crate::schema::TableSchema;
45#[cfg(feature = "write-support")]
46use crate::storage::write_engine::mutation::{ClusteringKey, DecoratedKey};
47#[cfg(feature = "write-support")]
48use crate::types::Value;
49
50#[cfg(feature = "write-support")]
51use std::cmp::{Ordering, Reverse};
52#[cfg(feature = "write-support")]
53use std::collections::{BinaryHeap, VecDeque};
54#[cfg(feature = "write-support")]
55use std::path::{Path, PathBuf};
56#[cfg(feature = "write-support")]
57use std::time::{Duration, Instant};
58
59/// Entry in the merge stream
60///
61/// Represents a single row from one of the input SSTables. This is the
62/// fundamental unit that flows through the merge heap.
63#[cfg(feature = "write-support")]
64#[derive(Debug, Clone, PartialEq, Eq)]
65pub struct MergeEntry {
66 /// Which SSTable this came from (0 = newest)
67 pub run_index: usize,
68 /// Partition key with token
69 pub key: DecoratedKey,
70 /// Clustering key (None for tables without clustering)
71 pub clustering_key: Option<ClusteringKey>,
72 /// Timestamp in microseconds since Unix epoch
73 pub timestamp: i64,
74 /// Row data (live cells or tombstone)
75 pub row_data: RowData,
76}
77
78impl MergeEntry {
79 /// Create a new merge entry
80 pub fn new(
81 run_index: usize,
82 key: DecoratedKey,
83 clustering_key: Option<ClusteringKey>,
84 timestamp: i64,
85 row_data: RowData,
86 ) -> Self {
87 Self {
88 run_index,
89 key,
90 clustering_key,
91 timestamp,
92 row_data,
93 }
94 }
95}
96
97/// Ord implementation for min-heap routing ONLY (not LWW winner selection).
98///
99/// This orders entries so the heap yields them grouped by partition and
100/// clustering key. The actual equal-timestamp Delete-vs-Live winner is chosen
101/// in `merge_partition_rows` (timestamp → liveness → run_index), NOT here.
102///
103/// Order by:
104/// 1. Token (ascending)
105/// 2. Key bytes (ascending, for hash collisions)
106/// 3. Clustering key (ascending, schema-aware)
107/// 4. Run index (ascending) - stable routing tiebreak only
108#[cfg(feature = "write-support")]
109impl Ord for MergeEntry {
110 fn cmp(&self, other: &Self) -> Ordering {
111 // Primary: by token
112 match self.key.token.cmp(&other.key.token) {
113 Ordering::Equal => {
114 // Secondary: by key bytes (hash collision resolution)
115 match self.key.key.cmp(&other.key.key) {
116 Ordering::Equal => {
117 // Tertiary: by clustering key
118 match (&self.clustering_key, &other.clustering_key) {
119 (None, None) => {
120 // Quaternary: by run_index (lower = newer)
121 self.run_index.cmp(&other.run_index)
122 }
123 (None, Some(_)) => Ordering::Less,
124 (Some(_), None) => Ordering::Greater,
125 (Some(a), Some(b)) => {
126 // Use fallback Ord (not schema-aware at this level)
127 // Schema-aware comparison happens during partition merge
128 match a.cmp(b) {
129 Ordering::Equal => {
130 // Equal clustering keys: prefer lower run_index
131 self.run_index.cmp(&other.run_index)
132 }
133 other_ord => other_ord,
134 }
135 }
136 }
137 }
138 other_ord => other_ord,
139 }
140 }
141 other_ord => other_ord,
142 }
143 }
144}
145
146#[cfg(feature = "write-support")]
147impl PartialOrd for MergeEntry {
148 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
149 Some(self.cmp(other))
150 }
151}
152
153/// Row data: live cells or tombstone
154#[cfg(feature = "write-support")]
155#[derive(Debug, Clone, PartialEq, Eq)]
156pub enum RowData {
157 /// Live row with cell data
158 Live {
159 /// Cell data for this row
160 cells: Vec<CellData>,
161 },
162 /// Row tombstone
163 Tombstone {
164 /// Deletion timestamp (microseconds)
165 deletion_time: i64,
166 /// Local deletion time (seconds since epoch)
167 local_deletion_time: i32,
168 },
169}
170
171/// Cell data with timestamp and optional TTL
172#[cfg(feature = "write-support")]
173#[derive(Debug, Clone, PartialEq, Eq)]
174pub struct CellData {
175 /// Column name
176 pub column: String,
177 /// Column value
178 pub value: Value,
179 /// Cell timestamp (microseconds)
180 pub timestamp: i64,
181 /// TTL in seconds (None = no expiration)
182 pub ttl: Option<u32>,
183}
184
185/// Result of a merge step (incremental merge)
186#[cfg(feature = "write-support")]
187#[derive(Debug)]
188pub enum MergeStep {
189 /// Merged partition with all its rows
190 Partition {
191 /// Partition key
192 key: DecoratedKey,
193 /// All rows in this partition (already merged)
194 rows: Vec<MergeEntry>,
195 },
196 /// Merge is complete
197 Complete,
198}
199
200/// Statistics collected during merge
201#[cfg(feature = "write-support")]
202#[derive(Debug, Clone)]
203pub struct MergeStats {
204 /// Number of input files
205 pub input_files: usize,
206 /// Number of output partitions
207 pub output_partitions: u64,
208 /// Number of output rows
209 pub output_rows: u64,
210 /// Bytes written to output
211 pub bytes_written: u64,
212 /// Elapsed time
213 pub elapsed: Duration,
214}
215
216/// Buffered reader for a single SSTable run
217///
218/// Maintains a peek buffer for efficient lookahead without repeated I/O.
219/// Buffer size is fixed at 8KB worth of entries for predictable memory usage.
220#[cfg(feature = "write-support")]
221struct RunReader {
222 /// Abstract SSTable row iterator (boxed, not Debug)
223 reader: Box<dyn SSTableRowIterator>,
224 /// Peek buffer (FIFO)
225 buffer: VecDeque<MergeEntry>,
226 /// Target buffer size in bytes (~8KB)
227 buffer_size: usize,
228 /// Whether this run is exhausted
229 exhausted: bool,
230}
231
232#[cfg(feature = "write-support")]
233impl std::fmt::Debug for RunReader {
234 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
235 f.debug_struct("RunReader")
236 .field("buffer_len", &self.buffer.len())
237 .field("buffer_size", &self.buffer_size)
238 .field("exhausted", &self.exhausted)
239 .finish()
240 }
241}
242
243#[cfg(feature = "write-support")]
244impl RunReader {
245 /// Default buffer size (8KB worth of entries)
246 const DEFAULT_BUFFER_SIZE: usize = 8 * 1024;
247
248 /// Create a new run reader
249 fn new(reader: Box<dyn SSTableRowIterator>) -> Self {
250 Self {
251 reader,
252 buffer: VecDeque::new(),
253 buffer_size: Self::DEFAULT_BUFFER_SIZE,
254 exhausted: false,
255 }
256 }
257
258 /// Peek at the next entry without consuming it
259 ///
260 /// Returns None if this run is exhausted.
261 fn peek(&mut self) -> Result<Option<&MergeEntry>> {
262 // Refill buffer if empty and not exhausted
263 if self.buffer.is_empty() && !self.exhausted {
264 self.refill_buffer()?;
265 }
266
267 Ok(self.buffer.front())
268 }
269
270 /// Advance to the next entry
271 ///
272 /// Consumes the front entry and returns it.
273 fn advance(&mut self) -> Result<Option<MergeEntry>> {
274 if let Some(entry) = self.buffer.pop_front() {
275 return Ok(Some(entry));
276 }
277
278 // Buffer empty, try to refill
279 if !self.exhausted {
280 self.refill_buffer()?;
281 Ok(self.buffer.pop_front())
282 } else {
283 Ok(None)
284 }
285 }
286
287 /// Check if this run is exhausted
288 fn is_exhausted(&self) -> bool {
289 self.exhausted && self.buffer.is_empty()
290 }
291
292 /// Refill the peek buffer from the underlying reader
293 fn refill_buffer(&mut self) -> Result<()> {
294 let mut bytes_buffered = 0;
295
296 while bytes_buffered < self.buffer_size {
297 match self.reader.next() {
298 Some(Ok(entry)) => {
299 // Estimate entry size for buffer management
300 bytes_buffered += Self::estimate_entry_size(&entry);
301 self.buffer.push_back(entry);
302 }
303 Some(Err(e)) => return Err(e),
304 None => {
305 self.exhausted = true;
306 break;
307 }
308 }
309 }
310
311 Ok(())
312 }
313
314 /// Estimate the memory size of an entry
315 ///
316 /// This is approximate - just for buffer management.
317 fn estimate_entry_size(entry: &MergeEntry) -> usize {
318 let base_size = std::mem::size_of::<MergeEntry>();
319 let key_size = entry.key.key.len();
320 let clustering_size = entry
321 .clustering_key
322 .as_ref()
323 .map(|ck| {
324 ck.columns
325 .iter()
326 .map(|(name, value)| name.len() + Self::estimate_value_size(value))
327 .sum()
328 })
329 .unwrap_or(0);
330
331 let data_size = match &entry.row_data {
332 RowData::Live { cells } => cells
333 .iter()
334 .map(|cell| {
335 std::mem::size_of::<CellData>()
336 + cell.column.len()
337 + Self::estimate_value_size(&cell.value)
338 })
339 .sum(),
340 RowData::Tombstone { .. } => 16,
341 };
342
343 base_size + key_size + clustering_size + data_size
344 }
345
346 /// Estimate the memory size of a Value
347 fn estimate_value_size(value: &Value) -> usize {
348 match value {
349 Value::Null => 0,
350 Value::Boolean(_) => 1,
351 Value::TinyInt(_) => 1,
352 Value::SmallInt(_) => 2,
353 Value::Integer(_) => 4,
354 Value::BigInt(_) | Value::Counter(_) | Value::Timestamp(_) | Value::Time(_) => 8,
355 Value::Float32(_) => 4,
356 Value::Float(_) => 8,
357 Value::Text(s) => s.len() + std::mem::size_of::<String>(),
358 Value::Blob(b) => b.len() + std::mem::size_of::<Vec<u8>>(),
359 Value::Uuid(_) => 16,
360 Value::Inet(b) => b.len() + std::mem::size_of::<Vec<u8>>(),
361 Value::Varint(b) => b.len() + std::mem::size_of::<Vec<u8>>(),
362 Value::Decimal { unscaled, .. } => unscaled.len() + 4 + std::mem::size_of::<Vec<u8>>(),
363 Value::Date(_) => 4,
364 Value::Duration { .. } => 20,
365 _ => 32, // Default estimate for complex types
366 }
367 }
368}
369
370/// Abstract iterator trait for SSTable rows
371///
372/// This allows the K-way merger to work with different SSTable reader
373/// implementations without coupling to specific reader types.
374#[cfg(feature = "write-support")]
375pub trait SSTableRowIterator: Send {
376 /// Get the next row from this SSTable
377 fn next(&mut self) -> Option<Result<MergeEntry>>;
378}
379
380/// Run an async future to completion from a synchronous context, safely whether
381/// or not a Tokio runtime is already running on the current thread.
382///
383/// This is the shared async-to-sync bridge for the write engine's blocking
384/// helpers: [`SSTableRowIteratorAdapter`] (the k-way merge readers),
385/// `WriteEngine::flush_internal`, and `WriteEngine::finalize_merge_blocking`.
386///
387/// ## Why not `Handle::block_on`?
388///
389/// When this bridge is reached from a thread that is already driving a Tokio
390/// runtime — anything under `#[tokio::main]` or `#[tokio::test]`, which is how
391/// the CLI (`maintenance`, `export-sstable --compact`) and any async caller
392/// reach compaction — `Handle::current().block_on()` panics with *"Cannot start
393/// a runtime from within a runtime"* (Issue #587). Compaction only reaches the
394/// bridge once a merge has input SSTables to read, which is why STCS worked in
395/// isolation but blew up from async callers.
396///
397/// `tokio::task::block_in_place` is not a general fix either: it panics on a
398/// current-thread runtime (e.g. the default `#[tokio::test]` flavor).
399///
400/// ## Strategy
401///
402/// - **No runtime on the current thread** (`Handle::try_current()` is `Err`):
403/// create a temporary runtime and block on it directly.
404/// - **Already inside a runtime** (`Ok`): hand the future to a dedicated scoped
405/// thread that owns a fresh runtime, then join it. That thread is free to
406/// block because it is not driving the caller's runtime, so this works for
407/// both the multi-thread and current-thread runtime flavors.
408/// [`std::thread::scope`] (rather than [`std::thread::spawn`]) lets the future
409/// borrow from the caller's stack — `flush_internal`/`finalize_merge_blocking`
410/// pass futures that borrow `&mut self` — so it need not be `'static`.
411///
412/// The future and its output must be `Send` because they cross a thread boundary
413/// in the in-runtime case.
414#[cfg(feature = "write-support")]
415pub(crate) fn block_on_async<F, T>(future: F) -> Result<T>
416where
417 F: std::future::Future<Output = Result<T>> + Send,
418 T: Send,
419{
420 match tokio::runtime::Handle::try_current() {
421 // Already inside a runtime: a nested `block_on` on this thread would
422 // panic. Run the future on a scoped thread with its own runtime instead.
423 Ok(_) => std::thread::scope(|scope| {
424 scope
425 .spawn(|| {
426 let rt = tokio::runtime::Runtime::new().map_err(|e| {
427 Error::Storage(format!("Failed to create tokio runtime: {}", e))
428 })?;
429 rt.block_on(future)
430 })
431 .join()
432 .map_err(|_| Error::Storage("async-to-sync bridge thread panicked".to_string()))?
433 }),
434 // No runtime on this thread: safe to create one and block directly.
435 Err(_) => {
436 let rt = tokio::runtime::Runtime::new()
437 .map_err(|e| Error::Storage(format!("Failed to create tokio runtime: {}", e)))?;
438 rt.block_on(future)
439 }
440 }
441}
442
443/// Adapter that wraps async SSTableReader into sync SSTableRowIterator.
444///
445/// Pre-loads all entries from an SSTable into memory, converting
446/// `(RowKey, Value)` pairs into `MergeEntry` format.
447///
448/// TODO(#447): Implement true streaming iteration to stay within the 128MB
449/// memory budget. Currently loads all entries upfront.
450#[cfg(feature = "write-support")]
451struct SSTableRowIteratorAdapter {
452 /// Pre-loaded entries
453 entries: std::vec::IntoIter<MergeEntry>,
454}
455
456#[cfg(feature = "write-support")]
457impl SSTableRowIteratorAdapter {
458 /// Open an SSTable and load all entries as MergeEntry.
459 ///
460 /// Uses [`SSTableReader::iterate_all_partitions_for_compaction`] which
461 /// returns actual per-row timestamps decoded from the on-disk row headers.
462 /// This allows the k-way merger to perform timestamp-accurate last-write-wins
463 /// ordering, which is essential for tombstone shadowing (Issue #505).
464 fn open(path: &Path, run_index: usize) -> Result<Self> {
465 use crate::platform::Platform;
466 use crate::Config;
467 use std::sync::Arc;
468
469 let mut config = Config::default();
470 // Issue #591: compaction MUST read its inputs through buffered I/O, never
471 // a memory map. `finalize_merge_async` deletes these input files once the
472 // merged output is published; a live mmap over a file that is then
473 // truncated or removed can fault with SIGBUS on Unix (unrecoverable as an
474 // `io::Error`) and can block deletion on Windows. Reading buffered — and
475 // draining every entry into memory in this constructor, before finalize
476 // deletes the inputs — guarantees no mapping outlives the file. This is
477 // pinned explicitly rather than relying on the (currently `false`) global
478 // default so the invariant cannot silently regress.
479 config.storage.use_mmap = false;
480 let path_buf = path.to_path_buf();
481
482 // Open SSTable reader and load all partitions with actual row timestamps.
483 let raw_entries = block_on_async(async move {
484 let platform = Arc::new(Platform::new(&config).await?);
485 let reader =
486 crate::storage::sstable::reader::SSTableReader::open(&path_buf, &config, platform)
487 .await?;
488 // Use the compaction-specific path: returns (RowKey, Value, row_timestamp_micros).
489 // Row/cell tombstones are emitted as Value::Tombstone with their actual
490 // deletion timestamps so the merger can apply shadowing semantics (Issue #505).
491 reader.iterate_all_partitions_for_compaction(None).await
492 })?;
493
494 // Convert (RowKey, Value, timestamp) tuples to MergeEntry
495 let mut entries = Vec::with_capacity(raw_entries.len());
496 for (row_key, value, timestamp) in raw_entries {
497 let key_bytes = row_key.0;
498 let decorated_key = DecoratedKey::from_key_bytes(key_bytes)?;
499 let row_data = Self::value_to_row_data(&value, timestamp)?;
500
501 entries.push(MergeEntry::new(
502 run_index,
503 decorated_key,
504 None, // Clustering key extraction deferred
505 timestamp,
506 row_data,
507 ));
508 }
509
510 // SSTable data is already in token order from the reader, no sort needed
511
512 Ok(Self {
513 entries: entries.into_iter(),
514 })
515 }
516
517 /// Convert a reader Value to RowData.
518 ///
519 /// `row_timestamp` is the per-row timestamp decoded from the on-disk row
520 /// header (see [`SSTableReader::iterate_all_partitions_for_compaction`]). The
521 /// reader does not surface per-cell timestamps for live cells, so each live
522 /// cell inherits the row timestamp. This is required for per-cell reconcile
523 /// and row-tombstone shadowing to compare cell timestamps correctly
524 /// (Issue #533) — without it live cells would default to 0 and be wrongly
525 /// shadowed by any row tombstone.
526 ///
527 /// Issue #505: `Value::Tombstone(RowTombstone)` is now correctly emitted by
528 /// the V5CompressedLegacy parser for deleted rows, and
529 /// `Value::Tombstone(CellTombstone)` appears inside `Value::Map` entries for
530 /// deleted cells. Both are surfaced here so the merger can apply shadowing
531 /// semantics. A cell tombstone keeps its own `deletion_time` so equal-ts
532 /// reconcile still resolves it correctly.
533 fn value_to_row_data(value: &crate::types::Value, row_timestamp: i64) -> Result<RowData> {
534 match value {
535 crate::types::Value::Tombstone(info) => Ok(RowData::Tombstone {
536 deletion_time: info.deletion_time,
537 local_deletion_time: 0, // TombstoneInfo does not carry local_deletion_time
538 }),
539 crate::types::Value::Map(map_entries) => {
540 let mut cells = Vec::with_capacity(map_entries.len());
541 for (key, val) in map_entries {
542 let column = match key {
543 crate::types::Value::Text(s) => s.clone(),
544 other => format!("{:?}", other),
545 };
546 // Cell tombstones carry their own deletion_time (Issue #505);
547 // live cells inherit the row timestamp (Issue #533) so per-cell
548 // shadowing and LWW order them against row tombstones correctly.
549 let cell_ts = match val {
550 crate::types::Value::Tombstone(info) => info.deletion_time,
551 _ => row_timestamp,
552 };
553 cells.push(CellData {
554 column,
555 value: val.clone(),
556 timestamp: cell_ts,
557 ttl: None,
558 });
559 }
560 Ok(RowData::Live { cells })
561 }
562 // Single value or other formats - wrap as a single cell
563 other => Ok(RowData::Live {
564 cells: vec![CellData {
565 column: "value".to_string(),
566 value: other.clone(),
567 timestamp: row_timestamp,
568 ttl: None,
569 }],
570 }),
571 }
572 }
573}
574
575#[cfg(feature = "write-support")]
576impl SSTableRowIterator for SSTableRowIteratorAdapter {
577 fn next(&mut self) -> Option<Result<MergeEntry>> {
578 self.entries.next().map(Ok)
579 }
580}
581
582/// K-way merger for combining multiple SSTables
583///
584/// Uses a min-heap to efficiently merge k sorted SSTable runs into a single
585/// output. Each run maintains a small peek buffer for efficient lookahead.
586///
587/// ## Usage
588///
589/// ```rust,ignore
590/// // Create merger from input SSTable paths
591/// let merger = KWayMerger::new(input_paths, &schema)?;
592///
593/// // Option 1: Full merge to output writer
594/// let stats = merger.merge(&mut output_writer)?;
595///
596/// // Option 2: Incremental merge (step-by-step)
597/// loop {
598/// match merger.step()? {
599/// MergeStep::Partition { key, rows } => {
600/// // Process partition
601/// }
602/// MergeStep::Complete => break,
603/// }
604/// }
605/// ```
606#[cfg(feature = "write-support")]
607#[derive(Debug)]
608pub struct KWayMerger {
609 /// Input runs (one per SSTable)
610 runs: Vec<RunReader>,
611 /// Min-heap for efficient merge
612 heap: BinaryHeap<Reverse<MergeEntry>>,
613 /// Current partition being merged (for partition boundary detection)
614 current_partition: Option<DecoratedKey>,
615 /// Table schema for schema-aware merging
616 schema: TableSchema,
617}
618
619#[cfg(feature = "write-support")]
620impl KWayMerger {
621 /// Create a new k-way merger from input SSTable paths
622 ///
623 /// # Arguments
624 ///
625 /// * `input_paths` - Paths to input SSTable Data.db files (ordered newest to oldest)
626 /// * `schema` - Table schema for schema-aware merging
627 ///
628 /// # Returns
629 ///
630 /// A new KWayMerger ready to merge the input SSTables.
631 ///
632 /// # Errors
633 ///
634 /// Returns an error if any input SSTable cannot be opened.
635 ///
636 /// # Example
637 ///
638 /// ```rust,ignore
639 /// let input_paths = vec![
640 /// PathBuf::from("data/nb-1-big-Data.db"),
641 /// PathBuf::from("data/nb-2-big-Data.db"),
642 /// ];
643 /// let merger = KWayMerger::new(input_paths, &schema)?;
644 /// ```
645 pub fn new(input_paths: Vec<PathBuf>, schema: &TableSchema) -> Result<Self> {
646 if input_paths.is_empty() {
647 return Err(Error::InvalidInput(
648 "K-way merge requires at least one input file".to_string(),
649 ));
650 }
651
652 // Create run readers for each input SSTable (ordered newest to oldest)
653 let mut runs = Vec::with_capacity(input_paths.len());
654 for (run_index, path) in input_paths.iter().enumerate() {
655 let adapter = SSTableRowIteratorAdapter::open(path, run_index)?;
656 runs.push(RunReader::new(Box::new(adapter)));
657 }
658
659 // Initialize heap (will be populated on first step)
660 let heap = BinaryHeap::new();
661
662 Ok(Self {
663 runs,
664 heap,
665 current_partition: None,
666 schema: schema.clone(),
667 })
668 }
669
670 /// Perform a full merge to the output writer
671 ///
672 /// This is a convenience method that repeatedly calls `step()` until
673 /// the merge is complete, writing each partition to the output writer.
674 ///
675 /// # Arguments
676 ///
677 /// * `output_writer` - SSTableWriter to write merged output
678 ///
679 /// # Returns
680 ///
681 /// Statistics about the merge operation.
682 ///
683 /// # Errors
684 ///
685 /// Returns an error if reading or writing fails.
686 pub fn merge(
687 mut self,
688 output_writer: &mut crate::storage::sstable::writer::SSTableWriter,
689 ) -> Result<MergeStats> {
690 let start_time = Instant::now();
691 let mut stats = MergeStats {
692 input_files: self.runs.len(),
693 output_partitions: 0,
694 output_rows: 0,
695 bytes_written: 0,
696 elapsed: Duration::from_secs(0), // Will be updated at the end
697 };
698
699 while let MergeStep::Partition { key, rows } = self.step()? {
700 stats.output_partitions += 1;
701 stats.output_rows += rows.len() as u64;
702
703 // Convert MergeEntry rows back to Mutation format for writer
704 let mutations = rows
705 .into_iter()
706 .map(|entry| Self::merge_entry_to_mutation(entry, &self.schema))
707 .collect::<Result<Vec<_>>>()?;
708
709 output_writer.write_partition(key, mutations)?;
710 }
711
712 stats.elapsed = start_time.elapsed();
713 Ok(stats)
714 }
715
716 /// Perform one merge step (one partition)
717 ///
718 /// Returns the next merged partition, or Complete if the merge is done.
719 /// This allows incremental merging for better memory control.
720 ///
721 /// # Returns
722 ///
723 /// - `MergeStep::Partition` - Next merged partition with all its rows
724 /// - `MergeStep::Complete` - Merge is complete
725 ///
726 /// # Errors
727 ///
728 /// Returns an error if reading fails.
729 pub fn step(&mut self) -> Result<MergeStep> {
730 // Initialize heap on first call
731 if self.heap.is_empty() && self.current_partition.is_none() {
732 self.initialize_heap()?;
733 }
734
735 // If heap is empty, merge is complete
736 if self.heap.is_empty() {
737 return Ok(MergeStep::Complete);
738 }
739
740 // Collect all rows for the next partition
741 let mut partition_rows = Vec::new();
742 let mut partition_key: Option<DecoratedKey> = None;
743
744 while let Some(Reverse(entry)) = self.heap.peek() {
745 // Check if we've moved to a new partition
746 if let Some(ref current_key) = partition_key {
747 if &entry.key != current_key {
748 // Partition boundary - stop here
749 break;
750 }
751 } else {
752 // First entry of new partition
753 partition_key = Some(entry.key.clone());
754 }
755
756 // Pop entry from heap
757 let Reverse(entry) = self
758 .heap
759 .pop()
760 .ok_or_else(|| Error::InvalidInput("Merge heap unexpectedly empty".to_string()))?;
761
762 // Add to partition rows
763 partition_rows.push(entry.clone());
764
765 // Refill heap from the run we just consumed from
766 self.refill_heap(entry.run_index)?;
767 }
768
769 if let Some(key) = partition_key {
770 // Merge cells within this partition (last-write-wins)
771 let merged_rows = self.merge_partition_rows(partition_rows)?;
772 Ok(MergeStep::Partition {
773 key,
774 rows: merged_rows,
775 })
776 } else {
777 Ok(MergeStep::Complete)
778 }
779 }
780
781 /// Initialize the heap with the first entry from each run
782 fn initialize_heap(&mut self) -> Result<()> {
783 for run_index in 0..self.runs.len() {
784 self.refill_heap(run_index)?;
785 }
786 Ok(())
787 }
788
789 /// Refill the heap from a specific run
790 fn refill_heap(&mut self, run_index: usize) -> Result<()> {
791 if run_index >= self.runs.len() {
792 return Ok(());
793 }
794
795 let run = &mut self.runs[run_index];
796 if !run.is_exhausted() {
797 if let Some(entry) = run.peek()? {
798 // Clone and push to heap
799 let entry = entry.clone();
800 self.heap.push(Reverse(entry));
801 }
802
803 // Advance the run reader
804 run.advance()?;
805 }
806
807 Ok(())
808 }
809
810 /// Merge rows within a single partition using **per-cell reconcile**
811 /// (Cassandra `org.apache.cassandra.db.rows.Cells#reconcile`).
812 ///
813 /// The pre-#533 implementation selected a single whole winning `MergeEntry`
814 /// per clustering key, which DROPPED columns when two SSTables shared the same
815 /// `(pk, ck)` but carried DISJOINT columns (e.g. A→{name}, B→{score} merged to
816 /// only B's column). This now reconciles cell-by-cell so disjoint columns from
817 /// every input survive (Issue #533).
818 ///
819 /// Algorithm per clustering-key group:
820 /// 1. **Effective row deletion** — among `RowData::Tombstone` entries take the
821 /// max `deletion_time` (`row_del`). A row tombstone shadows any cell whose
822 /// `timestamp <= row_del`.
823 /// 2. **Per-column cell reconcile** — across all `RowData::Live` entries, for
824 /// each column name pick the winning cell by:
825 /// - higher `timestamp` wins (last-write-wins);
826 /// - at EQUAL timestamp a cell tombstone (`Value::Tombstone(CellTombstone)`)
827 /// beats a live value (same rule as #498, applied per cell);
828 /// - otherwise the existing winner is kept (stable; heap routing already
829 /// ordered inputs by run_index so the first-seen at a tie is the newer
830 /// file).
831 /// 3. **Row-tombstone shadowing per cell** — drop any reconciled cell whose
832 /// `timestamp <= row_del`. The `<=` makes the tombstone win at equal ts,
833 /// consistent with #498. Cells written strictly AFTER `row_del` survive.
834 /// 4. **Build the merged result** — if any cells survive, emit a `Live`
835 /// entry whose row timestamp is the max surviving cell timestamp; else if a
836 /// row tombstone was present, emit a `Tombstone` entry at `row_del` so the
837 /// row stays shadowed downstream; else emit nothing.
838 fn merge_partition_rows(&self, rows: Vec<MergeEntry>) -> Result<Vec<MergeEntry>> {
839 use std::collections::BTreeMap;
840
841 // Group by clustering key using BTreeMap (ClusteringKey implements Ord).
842 // Preserve heap-routing order within each group so the per-cell tiebreak
843 // (first-seen wins at equal timestamp+liveness) follows run_index.
844 let mut clustered_rows: BTreeMap<Option<ClusteringKey>, Vec<MergeEntry>> = BTreeMap::new();
845
846 for row in rows {
847 clustered_rows
848 .entry(row.clustering_key.clone())
849 .or_default()
850 .push(row);
851 }
852
853 let mut merged = Vec::new();
854 for (ck, cluster_rows) in clustered_rows {
855 if let Some(entry) = Self::reconcile_cluster(ck, cluster_rows) {
856 merged.push(entry);
857 }
858 }
859
860 // Sort merged rows by clustering key for output order
861 merged.sort_by(|a, b| match (&a.clustering_key, &b.clustering_key) {
862 (None, None) => Ordering::Equal,
863 (None, Some(_)) => Ordering::Less,
864 (Some(_), None) => Ordering::Greater,
865 (Some(ck_a), Some(ck_b)) => {
866 // Use schema-aware comparison if available
867 ck_a.compare(ck_b, &self.schema).unwrap_or_else(|e| {
868 log::warn!(
869 "Schema-aware clustering key comparison failed, using fallback: {}",
870 e
871 );
872 ck_a.cmp(ck_b)
873 })
874 }
875 });
876
877 Ok(merged)
878 }
879
880 /// Returns true when a cell carries a cell-level tombstone
881 /// (`Value::Tombstone(CellTombstone)`), the representation produced by #505.
882 ///
883 /// Cell tombstones participate in per-cell reconcile like any other cell, but
884 /// at EQUAL timestamp a cell tombstone beats a live value (Cassandra
885 /// `Cells#reconcile`, same rule as #498 applied per cell).
886 fn is_cell_tombstone(cell: &CellData) -> bool {
887 matches!(
888 cell.value,
889 crate::types::Value::Tombstone(ref info)
890 if info.tombstone_type == crate::types::TombstoneType::CellTombstone
891 )
892 }
893
894 /// Reconcile all entries for a single clustering-key group into at most one
895 /// merged `MergeEntry`, applying per-cell last-write-wins plus row-tombstone
896 /// shadowing (Issue #533). See [`Self::merge_partition_rows`] for the rules.
897 ///
898 /// `cluster_rows` is in heap-routing order (run_index ascending within equal
899 /// keys), so when two cells tie on both timestamp and liveness the first-seen
900 /// (newer file) is kept.
901 fn reconcile_cluster(
902 clustering_key: Option<ClusteringKey>,
903 cluster_rows: Vec<MergeEntry>,
904 ) -> Option<MergeEntry> {
905 use std::collections::HashMap;
906
907 // Carry-through key fields: every entry in this group shares the same
908 // partition key and clustering key. Use the lowest run_index seen (newest
909 // file) so downstream ordering is stable.
910 let mut key = None;
911 let mut run_index = usize::MAX;
912
913 // Step 1: effective row deletion — max deletion_time across row tombstones.
914 let mut row_del: Option<i64> = None;
915
916 // Step 2: per-column cell reconcile. Preserve first-seen column order for
917 // deterministic output while resolving winners in a side map.
918 let mut order: Vec<String> = Vec::new();
919 let mut winners: HashMap<String, CellData> = HashMap::new();
920
921 for entry in &cluster_rows {
922 if key.is_none() {
923 key = Some(entry.key.clone());
924 }
925 run_index = run_index.min(entry.run_index);
926
927 match &entry.row_data {
928 RowData::Tombstone { deletion_time, .. } => {
929 row_del = Some(row_del.map_or(*deletion_time, |d| d.max(*deletion_time)));
930 }
931 RowData::Live { cells } => {
932 for cell in cells {
933 match winners.get(&cell.column) {
934 None => {
935 order.push(cell.column.clone());
936 winners.insert(cell.column.clone(), cell.clone());
937 }
938 Some(existing) => {
939 // Higher timestamp wins. At EQUAL timestamp a cell
940 // tombstone beats a live value (Issue #498 per cell).
941 // Otherwise keep the existing (first-seen = newer
942 // file) winner.
943 let replace = cell.timestamp > existing.timestamp
944 || (cell.timestamp == existing.timestamp
945 && Self::is_cell_tombstone(cell)
946 && !Self::is_cell_tombstone(existing));
947 if replace {
948 winners.insert(cell.column.clone(), cell.clone());
949 }
950 }
951 }
952 }
953 }
954 }
955 }
956
957 let key = key?; // empty group => nothing to emit
958
959 // Step 3: apply row-tombstone shadowing per cell. A cell whose timestamp is
960 // <= row_del is shadowed (`<=` lets the tombstone win at equal ts, #498).
961 // Cells written strictly after row_del survive. This shadowing applies to
962 // cell tombstones too: a row tombstone at ts=T supersedes a cell tombstone at
963 // ts<=T (real Cassandra semantics). Note this is INTENTIONALLY stricter than
964 // the `reference_merge` model, whose range-tombstone path only suppresses
965 // live cells — `reconcile_cluster` is the authoritative behavior here.
966 let surviving: Vec<CellData> = order
967 .into_iter()
968 .filter_map(|col| winners.remove(&col))
969 .filter(|cell| match row_del {
970 Some(d) => cell.timestamp > d,
971 None => true,
972 })
973 .collect();
974
975 // Step 4: build the merged result. `max()` is `Some` exactly when `surviving`
976 // is non-empty, so this match needs no unreachable fallback timestamp.
977 match surviving.iter().map(|c| c.timestamp).max() {
978 Some(row_ts) => Some(MergeEntry::new(
979 run_index,
980 key,
981 clustering_key,
982 row_ts,
983 RowData::Live { cells: surviving },
984 )),
985 // No surviving cells. If a row tombstone exists, keep the row shadowed
986 // so downstream still emits the deletion (preserves #505/#498 absence).
987 // Otherwise the row is empty/absent.
988 None => row_del.map(|deletion_time| {
989 MergeEntry::new(
990 run_index,
991 key,
992 clustering_key,
993 deletion_time,
994 RowData::Tombstone {
995 deletion_time,
996 local_deletion_time: 0,
997 },
998 )
999 }),
1000 }
1001 }
1002
1003 /// Convert a MergeEntry back to Mutation for writing
1004 pub(crate) fn merge_entry_to_mutation(
1005 entry: MergeEntry,
1006 schema: &TableSchema,
1007 ) -> Result<crate::storage::write_engine::mutation::Mutation> {
1008 use crate::storage::write_engine::mutation::{
1009 CellOperation, Mutation, PartitionKey, TableId,
1010 };
1011
1012 let partition_key = PartitionKey::from_bytes(&entry.key.key, schema)?;
1013 let table_id = TableId::new(&schema.keyspace, &schema.table);
1014
1015 let operations = match entry.row_data {
1016 RowData::Live { cells } => cells
1017 .into_iter()
1018 .map(|cell| {
1019 // Issue #505: cell-level tombstones are represented as
1020 // Value::Tombstone(CellTombstone) inside the Map. Translate
1021 // them to CellOperation::Delete so the SSTableWriter writes a
1022 // proper cell tombstone rather than a live cell with a null value.
1023 if matches!(
1024 cell.value,
1025 crate::types::Value::Tombstone(ref info)
1026 if info.tombstone_type == crate::types::TombstoneType::CellTombstone
1027 ) {
1028 return CellOperation::Delete {
1029 column: cell.column,
1030 };
1031 }
1032 if let Some(ttl) = cell.ttl {
1033 CellOperation::WriteWithTtl {
1034 column: cell.column,
1035 value: cell.value,
1036 ttl_seconds: ttl,
1037 }
1038 } else {
1039 CellOperation::Write {
1040 column: cell.column,
1041 value: cell.value,
1042 }
1043 }
1044 })
1045 .collect(),
1046 RowData::Tombstone { .. } => vec![CellOperation::DeleteRow],
1047 };
1048
1049 Ok(Mutation::new(
1050 table_id,
1051 partition_key,
1052 entry.clustering_key,
1053 operations,
1054 entry.timestamp,
1055 None,
1056 ))
1057 }
1058}
1059
1060#[cfg(all(test, feature = "write-support"))]
1061mod tests {
1062 use super::*;
1063 use crate::storage::write_engine::mutation::DecoratedKey;
1064
1065 #[test]
1066 fn test_merge_entry_ordering_by_token() {
1067 let entry1 = MergeEntry::new(
1068 0,
1069 DecoratedKey::new(100, vec![1, 2, 3]),
1070 None,
1071 1000,
1072 RowData::Live { cells: vec![] },
1073 );
1074
1075 let entry2 = MergeEntry::new(
1076 0,
1077 DecoratedKey::new(200, vec![1, 2, 3]),
1078 None,
1079 1000,
1080 RowData::Live { cells: vec![] },
1081 );
1082
1083 // Entry with lower token should come first
1084 assert!(entry1 < entry2);
1085 assert!(entry2 > entry1);
1086 }
1087
1088 #[test]
1089 fn test_merge_entry_ordering_by_key_bytes() {
1090 // Same token, different key bytes (hash collision)
1091 let entry1 = MergeEntry::new(
1092 0,
1093 DecoratedKey::new(100, vec![1, 2, 3]),
1094 None,
1095 1000,
1096 RowData::Live { cells: vec![] },
1097 );
1098
1099 let entry2 = MergeEntry::new(
1100 0,
1101 DecoratedKey::new(100, vec![1, 2, 4]),
1102 None,
1103 1000,
1104 RowData::Live { cells: vec![] },
1105 );
1106
1107 // Entry with smaller key bytes should come first
1108 assert!(entry1 < entry2);
1109 assert!(entry2 > entry1);
1110 }
1111
1112 #[test]
1113 fn test_merge_entry_ordering_by_run_index() {
1114 // Same token and key, different run indices
1115 let entry1 = MergeEntry::new(
1116 0,
1117 DecoratedKey::new(100, vec![1, 2, 3]),
1118 None,
1119 1000,
1120 RowData::Live { cells: vec![] },
1121 );
1122
1123 let entry2 = MergeEntry::new(
1124 1,
1125 DecoratedKey::new(100, vec![1, 2, 3]),
1126 None,
1127 1000,
1128 RowData::Live { cells: vec![] },
1129 );
1130
1131 // Entry with lower run_index should come first (newer file wins)
1132 assert!(entry1 < entry2);
1133 assert!(entry2 > entry1);
1134 }
1135
1136 #[test]
1137 fn test_merge_entry_min_heap() {
1138 use std::cmp::Reverse;
1139 use std::collections::BinaryHeap;
1140
1141 let mut heap: BinaryHeap<Reverse<MergeEntry>> = BinaryHeap::new();
1142
1143 // Insert in reverse order
1144 let entry3 = MergeEntry::new(
1145 0,
1146 DecoratedKey::new(300, vec![3]),
1147 None,
1148 1000,
1149 RowData::Live { cells: vec![] },
1150 );
1151 let entry1 = MergeEntry::new(
1152 0,
1153 DecoratedKey::new(100, vec![1]),
1154 None,
1155 1000,
1156 RowData::Live { cells: vec![] },
1157 );
1158 let entry2 = MergeEntry::new(
1159 0,
1160 DecoratedKey::new(200, vec![2]),
1161 None,
1162 1000,
1163 RowData::Live { cells: vec![] },
1164 );
1165
1166 heap.push(Reverse(entry3.clone()));
1167 heap.push(Reverse(entry1.clone()));
1168 heap.push(Reverse(entry2.clone()));
1169
1170 // Should pop in ascending order
1171 assert_eq!(heap.pop().unwrap().0.key.token, 100);
1172 assert_eq!(heap.pop().unwrap().0.key.token, 200);
1173 assert_eq!(heap.pop().unwrap().0.key.token, 300);
1174 }
1175
1176 #[test]
1177 fn test_row_data_variants() {
1178 let live = RowData::Live {
1179 cells: vec![CellData {
1180 column: "name".to_string(),
1181 value: Value::Text("Alice".to_string()),
1182 timestamp: 1000,
1183 ttl: None,
1184 }],
1185 };
1186
1187 match live {
1188 RowData::Live { cells } => {
1189 assert_eq!(cells.len(), 1);
1190 assert_eq!(cells[0].column, "name");
1191 }
1192 _ => panic!("Expected Live variant"),
1193 }
1194
1195 let tombstone = RowData::Tombstone {
1196 deletion_time: 2000,
1197 local_deletion_time: 1000,
1198 };
1199
1200 match tombstone {
1201 RowData::Tombstone {
1202 deletion_time,
1203 local_deletion_time,
1204 } => {
1205 assert_eq!(deletion_time, 2000);
1206 assert_eq!(local_deletion_time, 1000);
1207 }
1208 _ => panic!("Expected Tombstone variant"),
1209 }
1210 }
1211
1212 #[test]
1213 fn test_cell_data_creation() {
1214 let cell = CellData {
1215 column: "age".to_string(),
1216 value: Value::Integer(30),
1217 timestamp: 1234567890,
1218 ttl: Some(3600),
1219 };
1220
1221 assert_eq!(cell.column, "age");
1222 assert_eq!(cell.value, Value::Integer(30));
1223 assert_eq!(cell.timestamp, 1234567890);
1224 assert_eq!(cell.ttl, Some(3600));
1225 }
1226
1227 #[test]
1228 fn test_merge_stats_creation() {
1229 let stats = MergeStats {
1230 input_files: 5,
1231 output_partitions: 1000,
1232 output_rows: 5000,
1233 bytes_written: 1024 * 1024,
1234 elapsed: Duration::from_secs(10),
1235 };
1236
1237 assert_eq!(stats.input_files, 5);
1238 assert_eq!(stats.output_partitions, 1000);
1239 assert_eq!(stats.output_rows, 5000);
1240 assert_eq!(stats.bytes_written, 1024 * 1024);
1241 assert_eq!(stats.elapsed.as_secs(), 10);
1242 }
1243
1244 #[test]
1245 fn test_run_reader_estimate_entry_size() {
1246 let entry = MergeEntry::new(
1247 0,
1248 DecoratedKey::new(100, vec![1, 2, 3, 4]),
1249 None,
1250 1000,
1251 RowData::Live {
1252 cells: vec![CellData {
1253 column: "name".to_string(),
1254 value: Value::Text("Alice".to_string()),
1255 timestamp: 1000,
1256 ttl: None,
1257 }],
1258 },
1259 );
1260
1261 let size = RunReader::estimate_entry_size(&entry);
1262
1263 // Size should be at least the base struct size plus key bytes
1264 let expected_min_size = std::mem::size_of::<MergeEntry>() + 4;
1265 assert!(size >= expected_min_size);
1266 }
1267
1268 #[test]
1269 fn test_kway_merger_empty_input() {
1270 use crate::schema::{KeyColumn, TableSchema};
1271 use std::collections::HashMap;
1272
1273 let schema = TableSchema {
1274 keyspace: "test_ks".to_string(),
1275 table: "test_table".to_string(),
1276 partition_keys: vec![KeyColumn {
1277 name: "id".to_string(),
1278 data_type: "int".to_string(),
1279 position: 0,
1280 }],
1281 clustering_keys: vec![],
1282 columns: vec![],
1283 comments: HashMap::new(),
1284 };
1285
1286 let result = KWayMerger::new(vec![], &schema);
1287 assert!(result.is_err());
1288
1289 if let Err(Error::InvalidInput(msg)) = result {
1290 assert!(msg.contains("at least one input file"));
1291 } else {
1292 panic!("Expected InvalidInput error");
1293 }
1294 }
1295
1296 #[test]
1297 fn test_merge_entry_equal_timestamps_prefer_lower_run_index() {
1298 // Same partition, same clustering, same timestamp
1299 // Lower run_index should win (newer file)
1300 let entry_run0 = MergeEntry::new(
1301 0, // run_index 0 (newer)
1302 DecoratedKey::new(100, vec![1, 2, 3]),
1303 None,
1304 1000, // same timestamp
1305 RowData::Live {
1306 cells: vec![CellData {
1307 column: "name".to_string(),
1308 value: Value::Text("Newer".to_string()),
1309 timestamp: 1000,
1310 ttl: None,
1311 }],
1312 },
1313 );
1314
1315 let entry_run1 = MergeEntry::new(
1316 1, // run_index 1 (older)
1317 DecoratedKey::new(100, vec![1, 2, 3]),
1318 None,
1319 1000, // same timestamp
1320 RowData::Live {
1321 cells: vec![CellData {
1322 column: "name".to_string(),
1323 value: Value::Text("Older".to_string()),
1324 timestamp: 1000,
1325 ttl: None,
1326 }],
1327 },
1328 );
1329
1330 // Entry from run 0 should come first in ordering
1331 assert!(entry_run0 < entry_run1);
1332 }
1333
1334 #[test]
1335 fn test_merge_entry_tombstone() {
1336 let tombstone_entry = MergeEntry::new(
1337 0,
1338 DecoratedKey::new(100, vec![1, 2, 3]),
1339 None,
1340 2000,
1341 RowData::Tombstone {
1342 deletion_time: 2000,
1343 local_deletion_time: 1000,
1344 },
1345 );
1346
1347 match tombstone_entry.row_data {
1348 RowData::Tombstone {
1349 deletion_time,
1350 local_deletion_time,
1351 } => {
1352 assert_eq!(deletion_time, 2000);
1353 assert_eq!(local_deletion_time, 1000);
1354 }
1355 _ => panic!("Expected Tombstone"),
1356 }
1357 }
1358
1359 #[test]
1360 fn test_real_merger_delete_wins_at_equal_timestamp() {
1361 // Issue #498: at EQUAL timestamp, a Delete (tombstone) must beat a Live
1362 // row regardless of file recency (Cassandra `Cells#reconcile`).
1363 //
1364 // We drive the REAL merger entry point (`merge_partition_rows`) with two
1365 // entries that share the SAME clustering key and the SAME timestamp:
1366 // - A: Live, run_index 0 (the NEWER file — would win a run_index tiebreak)
1367 // - B: Delete, run_index 1 (the OLDER file)
1368 //
1369 // The pre-fix merger sorted equal-timestamp ties by run_index only, so the
1370 // live row (run_index 0) would win and survive. With the fix the tombstone
1371 // wins. This test therefore FAILS if the tiebreak reverts to run_index.
1372 use crate::schema::{Column, KeyColumn, TableSchema};
1373 use std::collections::HashMap;
1374
1375 let schema = TableSchema {
1376 keyspace: "reconcile_ks".to_string(),
1377 table: "reconcile_tbl".to_string(),
1378 partition_keys: vec![KeyColumn {
1379 name: "id".to_string(),
1380 data_type: "int".to_string(),
1381 position: 0,
1382 }],
1383 clustering_keys: vec![],
1384 columns: vec![Column {
1385 name: "value".to_string(),
1386 data_type: "text".to_string(),
1387 nullable: true,
1388 default: None,
1389 is_static: false,
1390 }],
1391 comments: HashMap::new(),
1392 };
1393
1394 const EQUAL_TS: i64 = 1_700_000_000_000_000;
1395
1396 let partition_key = DecoratedKey::new(100, vec![0, 0, 0, 1]);
1397
1398 // A = Live, in the NEWER file (run_index 0).
1399 let live_entry = MergeEntry::new(
1400 0,
1401 partition_key.clone(),
1402 None,
1403 EQUAL_TS,
1404 RowData::Live {
1405 cells: vec![CellData {
1406 column: "value".to_string(),
1407 value: Value::Text("survivor-if-buggy".to_string()),
1408 timestamp: EQUAL_TS,
1409 ttl: None,
1410 }],
1411 },
1412 );
1413
1414 // B = Delete (row tombstone), in the OLDER file (run_index 1).
1415 let tombstone_entry = MergeEntry::new(
1416 1,
1417 partition_key.clone(),
1418 None,
1419 EQUAL_TS,
1420 RowData::Tombstone {
1421 deletion_time: EQUAL_TS,
1422 local_deletion_time: 2_000_000,
1423 },
1424 );
1425
1426 let merger = KWayMerger {
1427 runs: vec![],
1428 heap: BinaryHeap::new(),
1429 current_partition: None,
1430 schema,
1431 };
1432
1433 // Drive the real merger. Order the input so the live (newer-file) entry is
1434 // first — pre-fix this is exactly the entry that wins by run_index.
1435 let merged = merger
1436 .merge_partition_rows(vec![live_entry, tombstone_entry])
1437 .expect("merge_partition_rows must not fail");
1438
1439 assert_eq!(merged.len(), 1, "one clustering key => one merged winner");
1440
1441 assert!(
1442 matches!(merged[0].row_data, RowData::Tombstone { .. }),
1443 "At equal timestamp the tombstone must win even though the live row is in \
1444 the newer file (run_index 0). Got a live row => the equal-ts tiebreak \
1445 reverted to run_index (Issue #498 regression)."
1446 );
1447 }
1448
1449 #[test]
1450 fn test_real_merger_disjoint_columns_survive_compaction() {
1451 // Issue #533: when two SSTables share the same (pk, ck) but carry DISJOINT
1452 // columns, per-cell reconcile must keep cells from BOTH. The pre-fix merger
1453 // picked one whole winning row and DROPPED the loser's columns.
1454 //
1455 // A (run_index 1, ts=100): {name: "alice"}
1456 // B (run_index 0, ts=200): {score: 42}
1457 //
1458 // Cassandra `Cells#reconcile` => {name: "alice", score: 42}.
1459 // The old whole-row-wins code returned only {score: 42} (name LOST).
1460 use crate::schema::{ClusteringColumn, Column, KeyColumn, TableSchema};
1461 use std::collections::HashMap;
1462
1463 let schema = TableSchema {
1464 keyspace: "disjoint_ks".to_string(),
1465 table: "disjoint_tbl".to_string(),
1466 partition_keys: vec![KeyColumn {
1467 name: "pk".to_string(),
1468 data_type: "int".to_string(),
1469 position: 0,
1470 }],
1471 clustering_keys: vec![ClusteringColumn {
1472 name: "ck".to_string(),
1473 data_type: "int".to_string(),
1474 position: 0,
1475 order: Default::default(),
1476 }],
1477 columns: vec![
1478 Column {
1479 name: "name".to_string(),
1480 data_type: "text".to_string(),
1481 nullable: true,
1482 default: None,
1483 is_static: false,
1484 },
1485 Column {
1486 name: "score".to_string(),
1487 data_type: "int".to_string(),
1488 nullable: true,
1489 default: None,
1490 is_static: false,
1491 },
1492 ],
1493 comments: HashMap::new(),
1494 };
1495
1496 let partition_key = DecoratedKey::new(100, vec![0, 0, 0, 1]);
1497 let ck = ClusteringKey {
1498 columns: vec![("ck".to_string(), Value::Integer(1))],
1499 };
1500
1501 // A: older file (run_index 1), only `name` at ts=100.
1502 let entry_a = MergeEntry::new(
1503 1,
1504 partition_key.clone(),
1505 Some(ck.clone()),
1506 100,
1507 RowData::Live {
1508 cells: vec![CellData {
1509 column: "name".to_string(),
1510 value: Value::Text("alice".to_string()),
1511 timestamp: 100,
1512 ttl: None,
1513 }],
1514 },
1515 );
1516
1517 // B: newer file (run_index 0), only `score` at ts=200.
1518 let entry_b = MergeEntry::new(
1519 0,
1520 partition_key.clone(),
1521 Some(ck.clone()),
1522 200,
1523 RowData::Live {
1524 cells: vec![CellData {
1525 column: "score".to_string(),
1526 value: Value::Integer(42),
1527 timestamp: 200,
1528 ttl: None,
1529 }],
1530 },
1531 );
1532
1533 let merger = KWayMerger {
1534 runs: vec![],
1535 heap: BinaryHeap::new(),
1536 current_partition: None,
1537 schema,
1538 };
1539
1540 // Pass in heap-routing order (run_index ascending): B then A.
1541 let merged = merger
1542 .merge_partition_rows(vec![entry_b, entry_a])
1543 .expect("merge_partition_rows must not fail");
1544
1545 assert_eq!(merged.len(), 1, "one clustering key => one merged row");
1546
1547 let cells = match &merged[0].row_data {
1548 RowData::Live { cells } => cells,
1549 other => panic!("expected a Live merged row, got {:?}", other),
1550 };
1551
1552 let name = cells.iter().find(|c| c.column == "name");
1553 let score = cells.iter().find(|c| c.column == "score");
1554
1555 assert!(
1556 name.is_some(),
1557 "disjoint column `name` from the older file was DROPPED — per-cell \
1558 reconcile regression (Issue #533). Old whole-row-wins code fails here."
1559 );
1560 assert!(
1561 score.is_some(),
1562 "disjoint column `score` from the newer file is missing"
1563 );
1564 assert_eq!(
1565 name.unwrap().value,
1566 Value::Text("alice".to_string()),
1567 "`name` must carry A's value"
1568 );
1569 assert_eq!(
1570 score.unwrap().value,
1571 Value::Integer(42),
1572 "`score` must carry B's value"
1573 );
1574
1575 // Row timestamp must be the max surviving cell timestamp.
1576 assert_eq!(
1577 merged[0].timestamp, 200,
1578 "merged row timestamp must be the max surviving cell timestamp"
1579 );
1580 }
1581
1582 #[test]
1583 fn test_real_merger_cell_tombstone_beats_live_at_equal_timestamp() {
1584 // Issue #533/#498 (per cell): when two SSTables write the SAME column at the
1585 // SAME timestamp, a cell tombstone (Delete) must beat the live value,
1586 // independent of file recency.
1587 //
1588 // A (run_index 0, NEWER file, ts=100): {score: 42} (live)
1589 // B (run_index 1, OLDER file, ts=100): {score: <cell tombstone>}
1590 //
1591 // Cassandra `Cells#reconcile` => score is deleted. The adversarial part: A is
1592 // the newer file, so a recency-only tiebreak would wrongly keep the live 42.
1593 use crate::schema::{ClusteringColumn, Column, KeyColumn, TableSchema};
1594 use crate::types::{TombstoneInfo, TombstoneType};
1595 use std::collections::HashMap;
1596
1597 let schema = TableSchema {
1598 keyspace: "ct_ks".to_string(),
1599 table: "ct_tbl".to_string(),
1600 partition_keys: vec![KeyColumn {
1601 name: "pk".to_string(),
1602 data_type: "int".to_string(),
1603 position: 0,
1604 }],
1605 clustering_keys: vec![ClusteringColumn {
1606 name: "ck".to_string(),
1607 data_type: "int".to_string(),
1608 position: 0,
1609 order: Default::default(),
1610 }],
1611 columns: vec![Column {
1612 name: "score".to_string(),
1613 data_type: "int".to_string(),
1614 nullable: true,
1615 default: None,
1616 is_static: false,
1617 }],
1618 comments: HashMap::new(),
1619 };
1620
1621 let partition_key = DecoratedKey::new(100, vec![0, 0, 0, 1]);
1622 let ck = ClusteringKey {
1623 columns: vec![("ck".to_string(), Value::Integer(1))],
1624 };
1625
1626 // A: newer file (run_index 0), live `score` = 42 at ts=100.
1627 let entry_a = MergeEntry::new(
1628 0,
1629 partition_key.clone(),
1630 Some(ck.clone()),
1631 100,
1632 RowData::Live {
1633 cells: vec![CellData {
1634 column: "score".to_string(),
1635 value: Value::Integer(42),
1636 timestamp: 100,
1637 ttl: None,
1638 }],
1639 },
1640 );
1641
1642 // B: older file (run_index 1), cell tombstone on `score` at the SAME ts=100.
1643 let entry_b = MergeEntry::new(
1644 1,
1645 partition_key.clone(),
1646 Some(ck.clone()),
1647 100,
1648 RowData::Live {
1649 cells: vec![CellData {
1650 column: "score".to_string(),
1651 value: Value::Tombstone(TombstoneInfo {
1652 deletion_time: 100,
1653 tombstone_type: TombstoneType::CellTombstone,
1654 ttl: None,
1655 range_start: None,
1656 range_end: None,
1657 }),
1658 timestamp: 100,
1659 ttl: None,
1660 }],
1661 },
1662 );
1663
1664 let merger = KWayMerger {
1665 runs: vec![],
1666 heap: BinaryHeap::new(),
1667 current_partition: None,
1668 schema,
1669 };
1670
1671 // Heap-routing order (run_index ascending): A then B.
1672 let merged = merger
1673 .merge_partition_rows(vec![entry_a, entry_b])
1674 .expect("merge_partition_rows must not fail");
1675
1676 assert_eq!(merged.len(), 1, "one clustering key => one merged row");
1677
1678 let cells = match &merged[0].row_data {
1679 RowData::Live { cells } => cells,
1680 other => panic!("expected a Live merged row, got {:?}", other),
1681 };
1682 let score = cells
1683 .iter()
1684 .find(|c| c.column == "score")
1685 .expect("score cell must be present (as a tombstone)");
1686
1687 assert!(
1688 matches!(
1689 score.value,
1690 Value::Tombstone(ref info) if info.tombstone_type == TombstoneType::CellTombstone
1691 ),
1692 "at equal ts the cell tombstone must win over the live value (got {:?}) — \
1693 a recency-only tiebreak would have kept the newer file's live 42 (#498 per cell)",
1694 score.value
1695 );
1696 }
1697
1698 #[test]
1699 fn test_real_merger_same_column_conflict_resolves_by_timestamp() {
1700 // Issue #533: when both SSTables write the SAME column, the higher-timestamp
1701 // value wins (last-write-wins), but disjoint columns still survive.
1702 //
1703 // A (run_index 1, ts=100): {name: "old", extra: "a-only"}
1704 // B (run_index 0, ts=200): {name: "new"}
1705 // => {name: "new" (ts=200 wins), extra: "a-only" (survives)}
1706 use crate::schema::{Column, KeyColumn, TableSchema};
1707 use std::collections::HashMap;
1708
1709 let schema = TableSchema {
1710 keyspace: "conflict_ks".to_string(),
1711 table: "conflict_tbl".to_string(),
1712 partition_keys: vec![KeyColumn {
1713 name: "pk".to_string(),
1714 data_type: "int".to_string(),
1715 position: 0,
1716 }],
1717 clustering_keys: vec![],
1718 columns: vec![
1719 Column {
1720 name: "name".to_string(),
1721 data_type: "text".to_string(),
1722 nullable: true,
1723 default: None,
1724 is_static: false,
1725 },
1726 Column {
1727 name: "extra".to_string(),
1728 data_type: "text".to_string(),
1729 nullable: true,
1730 default: None,
1731 is_static: false,
1732 },
1733 ],
1734 comments: HashMap::new(),
1735 };
1736
1737 let partition_key = DecoratedKey::new(100, vec![0, 0, 0, 1]);
1738
1739 let entry_a = MergeEntry::new(
1740 1,
1741 partition_key.clone(),
1742 None,
1743 100,
1744 RowData::Live {
1745 cells: vec![
1746 CellData {
1747 column: "name".to_string(),
1748 value: Value::Text("old".to_string()),
1749 timestamp: 100,
1750 ttl: None,
1751 },
1752 CellData {
1753 column: "extra".to_string(),
1754 value: Value::Text("a-only".to_string()),
1755 timestamp: 100,
1756 ttl: None,
1757 },
1758 ],
1759 },
1760 );
1761
1762 let entry_b = MergeEntry::new(
1763 0,
1764 partition_key.clone(),
1765 None,
1766 200,
1767 RowData::Live {
1768 cells: vec![CellData {
1769 column: "name".to_string(),
1770 value: Value::Text("new".to_string()),
1771 timestamp: 200,
1772 ttl: None,
1773 }],
1774 },
1775 );
1776
1777 let merger = KWayMerger {
1778 runs: vec![],
1779 heap: BinaryHeap::new(),
1780 current_partition: None,
1781 schema,
1782 };
1783
1784 let merged = merger
1785 .merge_partition_rows(vec![entry_b, entry_a])
1786 .expect("merge_partition_rows must not fail");
1787
1788 assert_eq!(merged.len(), 1);
1789 let cells = match &merged[0].row_data {
1790 RowData::Live { cells } => cells,
1791 other => panic!("expected Live, got {:?}", other),
1792 };
1793
1794 let name = cells
1795 .iter()
1796 .find(|c| c.column == "name")
1797 .expect("name present");
1798 let extra = cells
1799 .iter()
1800 .find(|c| c.column == "extra")
1801 .expect("extra (disjoint) must survive");
1802
1803 assert_eq!(
1804 name.value,
1805 Value::Text("new".to_string()),
1806 "same-column conflict must resolve to the higher-timestamp value"
1807 );
1808 assert_eq!(
1809 extra.value,
1810 Value::Text("a-only".to_string()),
1811 "disjoint column from the older file must survive the conflict merge"
1812 );
1813 }
1814
1815 #[test]
1816 fn test_real_merger_row_tombstone_shadows_old_cells_keeps_new() {
1817 // Issue #533 / #505: a row tombstone shadows cells with ts <= row_del but
1818 // a cell written strictly AFTER the tombstone survives.
1819 //
1820 // A (ts=100): {name: "old"} -> 100 <= 200 row_del => shadowed
1821 // B (ts=200, row tombstone)
1822 // C (ts=300): {score: 7} -> 300 > 200 => survives
1823 use crate::schema::{Column, KeyColumn, TableSchema};
1824 use std::collections::HashMap;
1825
1826 let schema = TableSchema {
1827 keyspace: "shadow_ks".to_string(),
1828 table: "shadow_tbl".to_string(),
1829 partition_keys: vec![KeyColumn {
1830 name: "pk".to_string(),
1831 data_type: "int".to_string(),
1832 position: 0,
1833 }],
1834 clustering_keys: vec![],
1835 columns: vec![
1836 Column {
1837 name: "name".to_string(),
1838 data_type: "text".to_string(),
1839 nullable: true,
1840 default: None,
1841 is_static: false,
1842 },
1843 Column {
1844 name: "score".to_string(),
1845 data_type: "int".to_string(),
1846 nullable: true,
1847 default: None,
1848 is_static: false,
1849 },
1850 ],
1851 comments: HashMap::new(),
1852 };
1853
1854 let pk = DecoratedKey::new(100, vec![0, 0, 0, 1]);
1855
1856 let entry_a = MergeEntry::new(
1857 2,
1858 pk.clone(),
1859 None,
1860 100,
1861 RowData::Live {
1862 cells: vec![CellData {
1863 column: "name".to_string(),
1864 value: Value::Text("old".to_string()),
1865 timestamp: 100,
1866 ttl: None,
1867 }],
1868 },
1869 );
1870 let entry_b = MergeEntry::new(
1871 1,
1872 pk.clone(),
1873 None,
1874 200,
1875 RowData::Tombstone {
1876 deletion_time: 200,
1877 local_deletion_time: 0,
1878 },
1879 );
1880 let entry_c = MergeEntry::new(
1881 0,
1882 pk.clone(),
1883 None,
1884 300,
1885 RowData::Live {
1886 cells: vec![CellData {
1887 column: "score".to_string(),
1888 value: Value::Integer(7),
1889 timestamp: 300,
1890 ttl: None,
1891 }],
1892 },
1893 );
1894
1895 let merger = KWayMerger {
1896 runs: vec![],
1897 heap: BinaryHeap::new(),
1898 current_partition: None,
1899 schema,
1900 };
1901
1902 let merged = merger
1903 .merge_partition_rows(vec![entry_c, entry_b, entry_a])
1904 .expect("merge must not fail");
1905
1906 assert_eq!(merged.len(), 1);
1907 let cells = match &merged[0].row_data {
1908 RowData::Live { cells } => cells,
1909 other => panic!(
1910 "expected Live (score survives the tombstone), got {:?}",
1911 other
1912 ),
1913 };
1914
1915 assert!(
1916 cells.iter().all(|c| c.column != "name"),
1917 "`name` (ts=100 <= row_del=200) must be shadowed by the row tombstone"
1918 );
1919 let score = cells
1920 .iter()
1921 .find(|c| c.column == "score")
1922 .expect("`score` (ts=300 > row_del=200) must survive the row tombstone");
1923 assert_eq!(score.value, Value::Integer(7));
1924 }
1925
1926 #[test]
1927 fn test_real_merger_row_tombstone_only_emits_tombstone() {
1928 // When every cell is shadowed by a row tombstone (no later writes), the
1929 // merger must emit a Tombstone entry so the row stays deleted downstream
1930 // (preserves #505/#498 absence semantics).
1931 use crate::schema::{Column, KeyColumn, TableSchema};
1932 use std::collections::HashMap;
1933
1934 let schema = TableSchema {
1935 keyspace: "ts_only_ks".to_string(),
1936 table: "ts_only_tbl".to_string(),
1937 partition_keys: vec![KeyColumn {
1938 name: "pk".to_string(),
1939 data_type: "int".to_string(),
1940 position: 0,
1941 }],
1942 clustering_keys: vec![],
1943 columns: vec![Column {
1944 name: "name".to_string(),
1945 data_type: "text".to_string(),
1946 nullable: true,
1947 default: None,
1948 is_static: false,
1949 }],
1950 comments: HashMap::new(),
1951 };
1952
1953 let pk = DecoratedKey::new(100, vec![0, 0, 0, 1]);
1954
1955 let live = MergeEntry::new(
1956 1,
1957 pk.clone(),
1958 None,
1959 100,
1960 RowData::Live {
1961 cells: vec![CellData {
1962 column: "name".to_string(),
1963 value: Value::Text("doomed".to_string()),
1964 timestamp: 100,
1965 ttl: None,
1966 }],
1967 },
1968 );
1969 let tomb = MergeEntry::new(
1970 0,
1971 pk.clone(),
1972 None,
1973 300,
1974 RowData::Tombstone {
1975 deletion_time: 300,
1976 local_deletion_time: 0,
1977 },
1978 );
1979
1980 let merger = KWayMerger {
1981 runs: vec![],
1982 heap: BinaryHeap::new(),
1983 current_partition: None,
1984 schema,
1985 };
1986
1987 let merged = merger
1988 .merge_partition_rows(vec![tomb, live])
1989 .expect("merge must not fail");
1990
1991 assert_eq!(merged.len(), 1);
1992 match &merged[0].row_data {
1993 RowData::Tombstone { deletion_time, .. } => {
1994 assert_eq!(*deletion_time, 300, "tombstone deletion_time preserved");
1995 }
1996 other => panic!("expected a Tombstone entry, got {:?}", other),
1997 }
1998 }
1999
2000 #[test]
2001 fn test_merge_step_variants() {
2002 let key = DecoratedKey::new(100, vec![1, 2, 3]);
2003 let rows = vec![];
2004
2005 let partition_step = MergeStep::Partition { key, rows };
2006
2007 match partition_step {
2008 MergeStep::Partition { key, rows } => {
2009 assert_eq!(key.token, 100);
2010 assert_eq!(rows.len(), 0);
2011 }
2012 _ => panic!("Expected Partition variant"),
2013 }
2014
2015 let complete_step = MergeStep::Complete;
2016 match complete_step {
2017 MergeStep::Complete => {}
2018 _ => panic!("Expected Complete variant"),
2019 }
2020 }
2021
2022 #[test]
2023 fn test_cell_merge_last_write_wins_higher_timestamp() {
2024 // Two cells with different timestamps
2025 let cell1 = CellData {
2026 column: "name".to_string(),
2027 value: Value::Text("Old".to_string()),
2028 timestamp: 1000,
2029 ttl: None,
2030 };
2031
2032 let cell2 = CellData {
2033 column: "name".to_string(),
2034 value: Value::Text("New".to_string()),
2035 timestamp: 2000, // Higher timestamp wins
2036 ttl: None,
2037 };
2038
2039 // Cell2 should win in last-write-wins merge
2040 assert!(cell2.timestamp > cell1.timestamp);
2041 }
2042
2043 #[test]
2044 fn test_memory_budget_calculation() {
2045 // For k=10 SSTables, memory budget should be ~80KB
2046 let k = 10;
2047 let buffer_size_per_run = RunReader::DEFAULT_BUFFER_SIZE;
2048 let total_memory = k * buffer_size_per_run;
2049
2050 assert_eq!(buffer_size_per_run, 8 * 1024); // 8KB
2051 assert_eq!(total_memory, 80 * 1024); // 80KB total
2052 }
2053
2054 #[test]
2055 fn test_merge_entry_to_mutation_live_cells() {
2056 use crate::schema::{KeyColumn, TableSchema};
2057 use crate::storage::write_engine::mutation::{CellOperation, DecoratedKey};
2058 use std::collections::HashMap;
2059
2060 let schema = TableSchema {
2061 keyspace: "test_ks".to_string(),
2062 table: "test_table".to_string(),
2063 partition_keys: vec![KeyColumn {
2064 name: "id".to_string(),
2065 data_type: "int".to_string(),
2066 position: 0,
2067 }],
2068 clustering_keys: vec![],
2069 columns: vec![],
2070 comments: HashMap::new(),
2071 };
2072
2073 // Encode key as 4-byte big-endian int (42)
2074 let key_bytes = 42i32.to_be_bytes().to_vec();
2075
2076 let entry = MergeEntry::new(
2077 0,
2078 DecoratedKey::new(1000, key_bytes),
2079 None,
2080 999_000_000,
2081 RowData::Live {
2082 cells: vec![
2083 CellData {
2084 column: "name".to_string(),
2085 value: Value::Text("Alice".to_string()),
2086 timestamp: 999_000_000,
2087 ttl: None,
2088 },
2089 CellData {
2090 column: "age".to_string(),
2091 value: Value::Integer(30),
2092 timestamp: 999_000_000,
2093 ttl: Some(3600),
2094 },
2095 ],
2096 },
2097 );
2098
2099 let mutation =
2100 KWayMerger::merge_entry_to_mutation(entry, &schema).expect("conversion should succeed");
2101
2102 // Partition key should have one column named "id"
2103 assert_eq!(mutation.partition_key.columns.len(), 1);
2104 assert_eq!(mutation.partition_key.columns[0].0, "id");
2105
2106 // Two operations: one Write and one WriteWithTtl
2107 assert_eq!(mutation.operations.len(), 2);
2108 assert_eq!(mutation.timestamp_micros, 999_000_000);
2109
2110 let has_write = mutation
2111 .operations
2112 .iter()
2113 .any(|op| matches!(op, CellOperation::Write { column, .. } if column == "name"));
2114 let has_ttl_write = mutation.operations.iter().any(|op| {
2115 matches!(op, CellOperation::WriteWithTtl { column, ttl_seconds, .. }
2116 if column == "age" && *ttl_seconds == 3600)
2117 });
2118 assert!(has_write, "Expected Write operation for 'name'");
2119 assert!(has_ttl_write, "Expected WriteWithTtl operation for 'age'");
2120 }
2121
2122 #[test]
2123 fn test_merge_entry_to_mutation_tombstone() {
2124 use crate::schema::{KeyColumn, TableSchema};
2125 use crate::storage::write_engine::mutation::{CellOperation, DecoratedKey};
2126 use std::collections::HashMap;
2127
2128 let schema = TableSchema {
2129 keyspace: "test_ks".to_string(),
2130 table: "test_table".to_string(),
2131 partition_keys: vec![KeyColumn {
2132 name: "id".to_string(),
2133 data_type: "int".to_string(),
2134 position: 0,
2135 }],
2136 clustering_keys: vec![],
2137 columns: vec![],
2138 comments: HashMap::new(),
2139 };
2140
2141 let key_bytes = 7i32.to_be_bytes().to_vec();
2142
2143 let entry = MergeEntry::new(
2144 0,
2145 DecoratedKey::new(500, key_bytes),
2146 None,
2147 888_000_000,
2148 RowData::Tombstone {
2149 deletion_time: 888_000_000,
2150 local_deletion_time: 1_700_000_000,
2151 },
2152 );
2153
2154 let mutation =
2155 KWayMerger::merge_entry_to_mutation(entry, &schema).expect("conversion should succeed");
2156
2157 assert_eq!(mutation.operations.len(), 1);
2158 assert!(
2159 matches!(mutation.operations[0], CellOperation::DeleteRow),
2160 "Expected DeleteRow operation for tombstone entry"
2161 );
2162 }
2163}
2164
2165// ─────────────────────────────────────────────────────────────────────────────
2166// Property tests for compaction merge semantics (Issue #475, Epic #469)
2167// ─────────────────────────────────────────────────────────────────────────────
2168//
2169// Strategy: define a small in-memory `reference_merge` that applies the full
2170// Cassandra per-key merge rules (timestamp LWW, tombstone shadowing, TTL expiry,
2171// range tombstone application), generate randomised cell streams with proptest,
2172// and assert that both the reference and the real KWayMerger agree.
2173//
2174// Three coverage areas required by the issue:
2175// A. Tombstone shadowing – delete-ts > write-ts => cell suppressed
2176// B. TTL expiry – write with TTL whose local_deletion_time < merge_time => dropped
2177// C. Range tombstone – row in range with marked_for_delete_at >= cell-ts => dropped
2178//
2179// The reference implementation is tested directly via proptest.
2180// The real merger (merge_partition_rows) is also exercised for the cases it
2181// handles today (LWW by timestamp for live rows and tombstones).
2182
2183#[cfg(all(test, feature = "write-support"))]
2184mod merge_property_tests {
2185 use super::*;
2186 use proptest::prelude::*;
2187 use std::collections::HashMap;
2188
2189 // ─── Fixed "wall clock" used in all TTL expiry tests ─────────────────────
2190 // Unix seconds; cells with local_deletion_time < MERGE_TIME_SECS are expired.
2191 const MERGE_TIME_SECS: i32 = 1_000;
2192
2193 // ─── Cell operation model ─────────────────────────────────────────────────
2194
2195 /// The three kinds of cell operations that a compaction must resolve.
2196 #[derive(Debug, Clone)]
2197 enum CellOp {
2198 /// A live write: column <- value, recorded at `timestamp`.
2199 /// When `local_deletion_time` is Some(t) it is an expiring cell; the
2200 /// cell is considered dead when `t < MERGE_TIME_SECS`.
2201 Write {
2202 timestamp: i64,
2203 local_deletion_time: Option<i32>,
2204 },
2205 /// A cell tombstone (DELETE column): column is dead at `timestamp`.
2206 Delete { timestamp: i64 },
2207 /// A range tombstone covering the inclusive integer range
2208 /// `[start_ck, end_ck]`. Any row whose clustering key integer falls
2209 /// within the range and whose write-timestamp <= `marked_for_delete_at`
2210 /// is suppressed.
2211 RangeTombstone {
2212 start_ck: u8,
2213 end_ck: u8,
2214 marked_for_delete_at: i64,
2215 },
2216 }
2217
2218 /// A single entry in the randomised cell stream.
2219 ///
2220 /// We work with small integer partition/clustering/column spaces so
2221 /// collisions occur frequently and the interesting merge cases arise.
2222 #[derive(Debug, Clone)]
2223 struct CellInput {
2224 /// 0..4
2225 partition: u8,
2226 /// 0..4
2227 clustering: u8,
2228 /// 0..3
2229 column: u8,
2230 op: CellOp,
2231 }
2232
2233 // ─── Key type for the merged output map ──────────────────────────────────
2234
2235 /// (partition, clustering, column) triple identifying a unique cell slot.
2236 type CellKey = (u8, u8, u8);
2237
2238 /// What the reference merge produces for a cell slot.
2239 #[derive(Debug, Clone, PartialEq, Eq)]
2240 enum MergedCell {
2241 /// The cell is alive with the given write timestamp.
2242 Live { timestamp: i64 },
2243 /// The cell is a tombstone (deleted) at the given timestamp.
2244 Dead { timestamp: i64 },
2245 }
2246
2247 // ─── Reference implementation ─────────────────────────────────────────────
2248
2249 /// Reference merge over a flat cell-stream, applying full Cassandra rules.
2250 ///
2251 /// Rules (applied in order):
2252 /// 1. Per (partition, clustering, column), keep the op with the highest
2253 /// `timestamp`. Ties: Delete wins over Write (Cassandra reconcile).
2254 /// 2. A `RangeTombstone` with `marked_for_delete_at >= cell.timestamp`
2255 /// covering a clustering key suppresses the live cell in that slot.
2256 /// 3. A `Write` whose `local_deletion_time < MERGE_TIME_SECS` (TTL expired)
2257 /// is dropped from the output even if it has the highest timestamp.
2258 fn reference_merge(inputs: &[CellInput]) -> HashMap<CellKey, MergedCell> {
2259 // ── Step 1: per-slot LWW ──────────────────────────────────────────────
2260 let mut per_slot: HashMap<CellKey, MergedCell> = HashMap::new();
2261
2262 // Collect range tombstones grouped by (partition, clustering range).
2263 let mut range_tombstones: Vec<CellInput> = Vec::new();
2264
2265 for ci in inputs {
2266 match &ci.op {
2267 CellOp::RangeTombstone { .. } => {
2268 range_tombstones.push(ci.clone());
2269 }
2270 CellOp::Write {
2271 timestamp,
2272 local_deletion_time,
2273 } => {
2274 // TTL expiry: drop the write if its local_deletion_time has passed.
2275 if local_deletion_time
2276 .map(|ldt| ldt < MERGE_TIME_SECS)
2277 .unwrap_or(false)
2278 {
2279 // Expired: treat as if this write never happened.
2280 continue;
2281 }
2282 let key = (ci.partition, ci.clustering, ci.column);
2283 let candidate = MergedCell::Live {
2284 timestamp: *timestamp,
2285 };
2286 per_slot
2287 .entry(key)
2288 .and_modify(|existing| {
2289 match existing {
2290 MergedCell::Live { timestamp: ex_ts } => {
2291 if *timestamp > *ex_ts {
2292 *existing = candidate.clone();
2293 }
2294 }
2295 MergedCell::Dead { timestamp: ex_ts } => {
2296 // Dead wins over a live cell at the same timestamp;
2297 // only replace if the write is strictly newer.
2298 if *timestamp > *ex_ts {
2299 *existing = candidate.clone();
2300 }
2301 }
2302 }
2303 })
2304 .or_insert(candidate);
2305 }
2306 CellOp::Delete { timestamp } => {
2307 let key = (ci.partition, ci.clustering, ci.column);
2308 let candidate = MergedCell::Dead {
2309 timestamp: *timestamp,
2310 };
2311 per_slot
2312 .entry(key)
2313 .and_modify(|existing| {
2314 match existing {
2315 MergedCell::Live { timestamp: ex_ts } => {
2316 if *timestamp >= *ex_ts {
2317 // Delete wins at equal timestamp (Cassandra rule).
2318 *existing = candidate.clone();
2319 }
2320 }
2321 MergedCell::Dead { timestamp: ex_ts } => {
2322 if *timestamp > *ex_ts {
2323 *existing = candidate.clone();
2324 }
2325 }
2326 }
2327 })
2328 .or_insert(candidate);
2329 }
2330 }
2331 }
2332
2333 // ── Step 2: apply range tombstones ────────────────────────────────────
2334 // A range tombstone suppresses a live cell when:
2335 // - partition matches
2336 // - clustering key is within [start_ck, end_ck]
2337 // - marked_for_delete_at >= cell write timestamp
2338 per_slot.retain(|&(pk, ck, _col), cell| {
2339 for rt in &range_tombstones {
2340 if rt.partition != pk {
2341 continue;
2342 }
2343 if let CellOp::RangeTombstone {
2344 start_ck,
2345 end_ck,
2346 marked_for_delete_at,
2347 } = rt.op
2348 {
2349 if ck >= start_ck && ck <= end_ck {
2350 if let MergedCell::Live { timestamp } = cell {
2351 if marked_for_delete_at >= *timestamp {
2352 return false; // suppressed
2353 }
2354 }
2355 }
2356 }
2357 }
2358 true
2359 });
2360
2361 // Dead cells (tombstones) are kept in the output so callers can verify
2362 // they appear rather than a live cell with a lower timestamp.
2363 per_slot
2364 }
2365
2366 // ─── Proptest strategies ──────────────────────────────────────────────────
2367
2368 fn arb_timestamp() -> impl Strategy<Value = i64> {
2369 1i64..=20i64
2370 }
2371
2372 /// local_deletion_time: sometimes None, sometimes expired (<MERGE_TIME_SECS),
2373 /// sometimes live (>=MERGE_TIME_SECS).
2374 fn arb_local_deletion_time() -> impl Strategy<Value = Option<i32>> {
2375 prop_oneof![
2376 3 => Just(None), // no TTL
2377 1 => (990i32..=999i32).prop_map(Some), // expired TTL
2378 1 => (1000i32..=1010i32).prop_map(Some), // live TTL
2379 ]
2380 }
2381
2382 fn arb_cell_op() -> impl Strategy<Value = CellOp> {
2383 prop_oneof![
2384 5 => (arb_timestamp(), arb_local_deletion_time())
2385 .prop_map(|(ts, ldt)| CellOp::Write {
2386 timestamp: ts,
2387 local_deletion_time: ldt,
2388 }),
2389 3 => arb_timestamp().prop_map(|ts| CellOp::Delete { timestamp: ts }),
2390 2 => (0u8..=3u8, 0u8..=3u8, arb_timestamp()).prop_map(|(s, e, ts)| {
2391 let (start_ck, end_ck) = if s <= e { (s, e) } else { (e, s) };
2392 CellOp::RangeTombstone {
2393 start_ck,
2394 end_ck,
2395 marked_for_delete_at: ts,
2396 }
2397 }),
2398 ]
2399 }
2400
2401 fn arb_cell_input() -> impl Strategy<Value = CellInput> {
2402 (0u8..4u8, 0u8..4u8, 0u8..3u8, arb_cell_op()).prop_map(
2403 |(partition, clustering, column, op)| CellInput {
2404 partition,
2405 clustering,
2406 column,
2407 op,
2408 },
2409 )
2410 }
2411
2412 fn arb_cell_stream() -> impl Strategy<Value = Vec<CellInput>> {
2413 prop::collection::vec(arb_cell_input(), 4..=32)
2414 }
2415
2416 // ─── Helper: sort merged output for stable comparison ─────────────────────
2417 fn sorted_keys(m: &HashMap<CellKey, MergedCell>) -> Vec<(CellKey, MergedCell)> {
2418 let mut v: Vec<_> = m.iter().map(|(&k, v)| (k, v.clone())).collect();
2419 v.sort_by_key(|(k, _)| *k);
2420 v
2421 }
2422
2423 // ─── Deterministic unit tests for reference semantics ────────────────────
2424
2425 #[test]
2426 fn ref_tombstone_shadows_earlier_write() {
2427 // Write at ts=5, then Delete at ts=10 => cell must be Dead(10).
2428 let inputs = vec![
2429 CellInput {
2430 partition: 0,
2431 clustering: 0,
2432 column: 0,
2433 op: CellOp::Write {
2434 timestamp: 5,
2435 local_deletion_time: None,
2436 },
2437 },
2438 CellInput {
2439 partition: 0,
2440 clustering: 0,
2441 column: 0,
2442 op: CellOp::Delete { timestamp: 10 },
2443 },
2444 ];
2445 let result = reference_merge(&inputs);
2446 assert_eq!(
2447 result.get(&(0, 0, 0)),
2448 Some(&MergedCell::Dead { timestamp: 10 }),
2449 "Delete(ts=10) must shadow Write(ts=5)"
2450 );
2451 }
2452
2453 #[test]
2454 fn ref_write_not_shadowed_by_older_tombstone() {
2455 // Write at ts=10, Delete at ts=5 => cell must be Live(10).
2456 let inputs = vec![
2457 CellInput {
2458 partition: 0,
2459 clustering: 0,
2460 column: 0,
2461 op: CellOp::Write {
2462 timestamp: 10,
2463 local_deletion_time: None,
2464 },
2465 },
2466 CellInput {
2467 partition: 0,
2468 clustering: 0,
2469 column: 0,
2470 op: CellOp::Delete { timestamp: 5 },
2471 },
2472 ];
2473 let result = reference_merge(&inputs);
2474 assert_eq!(
2475 result.get(&(0, 0, 0)),
2476 Some(&MergedCell::Live { timestamp: 10 }),
2477 "Write(ts=10) must win over Delete(ts=5)"
2478 );
2479 }
2480
2481 #[test]
2482 fn ref_delete_wins_at_equal_timestamp() {
2483 // Write at ts=5, Delete at ts=5 => Delete must win (Cassandra reconcile).
2484 let inputs = vec![
2485 CellInput {
2486 partition: 0,
2487 clustering: 0,
2488 column: 0,
2489 op: CellOp::Write {
2490 timestamp: 5,
2491 local_deletion_time: None,
2492 },
2493 },
2494 CellInput {
2495 partition: 0,
2496 clustering: 0,
2497 column: 0,
2498 op: CellOp::Delete { timestamp: 5 },
2499 },
2500 ];
2501 let result = reference_merge(&inputs);
2502 assert_eq!(
2503 result.get(&(0, 0, 0)),
2504 Some(&MergedCell::Dead { timestamp: 5 }),
2505 "Delete must win at equal timestamp (Cassandra reconcile rule)"
2506 );
2507 }
2508
2509 #[test]
2510 fn ref_expired_ttl_drops_cell() {
2511 // Write at ts=5 with local_deletion_time=500 (< MERGE_TIME_SECS=1000).
2512 // No other ops => cell should be absent from merged output.
2513 let inputs = vec![CellInput {
2514 partition: 0,
2515 clustering: 0,
2516 column: 0,
2517 op: CellOp::Write {
2518 timestamp: 5,
2519 local_deletion_time: Some(500), // expired
2520 },
2521 }];
2522 let result = reference_merge(&inputs);
2523 assert!(
2524 !result.contains_key(&(0, 0, 0)),
2525 "Expired TTL cell must be absent from merged output"
2526 );
2527 }
2528
2529 #[test]
2530 fn ref_live_ttl_keeps_cell() {
2531 // Write at ts=5 with local_deletion_time=1500 (>= MERGE_TIME_SECS=1000).
2532 // Cell should still be present.
2533 let inputs = vec![CellInput {
2534 partition: 0,
2535 clustering: 0,
2536 column: 0,
2537 op: CellOp::Write {
2538 timestamp: 5,
2539 local_deletion_time: Some(1500), // not expired
2540 },
2541 }];
2542 let result = reference_merge(&inputs);
2543 assert_eq!(
2544 result.get(&(0, 0, 0)),
2545 Some(&MergedCell::Live { timestamp: 5 }),
2546 "Non-expired TTL cell must be present"
2547 );
2548 }
2549
2550 #[test]
2551 fn ref_range_tombstone_suppresses_row_in_range() {
2552 // Write at ts=5 for clustering key 2 in partition 0.
2553 // RangeTombstone covering [0, 5] with marked_for_delete_at=10 => suppressed.
2554 let inputs = vec![
2555 CellInput {
2556 partition: 0,
2557 clustering: 2,
2558 column: 0,
2559 op: CellOp::Write {
2560 timestamp: 5,
2561 local_deletion_time: None,
2562 },
2563 },
2564 CellInput {
2565 partition: 0,
2566 clustering: 0, // column/clustering fields ignored for RT; range is in op
2567 column: 0,
2568 op: CellOp::RangeTombstone {
2569 start_ck: 0,
2570 end_ck: 5,
2571 marked_for_delete_at: 10,
2572 },
2573 },
2574 ];
2575 let result = reference_merge(&inputs);
2576 assert!(
2577 !result.contains_key(&(0, 2, 0)),
2578 "Cell with ts=5 at clustering=2 must be suppressed by RangeTombstone(mfda=10, [0,5])"
2579 );
2580 }
2581
2582 #[test]
2583 fn ref_range_tombstone_does_not_suppress_newer_write() {
2584 // Write at ts=15, RangeTombstone with mfda=10 => not suppressed.
2585 let inputs = vec![
2586 CellInput {
2587 partition: 0,
2588 clustering: 2,
2589 column: 0,
2590 op: CellOp::Write {
2591 timestamp: 15,
2592 local_deletion_time: None,
2593 },
2594 },
2595 CellInput {
2596 partition: 0,
2597 clustering: 0,
2598 column: 0,
2599 op: CellOp::RangeTombstone {
2600 start_ck: 0,
2601 end_ck: 5,
2602 marked_for_delete_at: 10,
2603 },
2604 },
2605 ];
2606 let result = reference_merge(&inputs);
2607 assert_eq!(
2608 result.get(&(0, 2, 0)),
2609 Some(&MergedCell::Live { timestamp: 15 }),
2610 "Write(ts=15) must NOT be suppressed by RangeTombstone(mfda=10)"
2611 );
2612 }
2613
2614 #[test]
2615 fn ref_range_tombstone_only_applies_within_partition() {
2616 // Write in partition 1, RangeTombstone in partition 0 => not suppressed.
2617 let inputs = vec![
2618 CellInput {
2619 partition: 1,
2620 clustering: 2,
2621 column: 0,
2622 op: CellOp::Write {
2623 timestamp: 5,
2624 local_deletion_time: None,
2625 },
2626 },
2627 CellInput {
2628 partition: 0,
2629 clustering: 0,
2630 column: 0,
2631 op: CellOp::RangeTombstone {
2632 start_ck: 0,
2633 end_ck: 5,
2634 marked_for_delete_at: 10,
2635 },
2636 },
2637 ];
2638 let result = reference_merge(&inputs);
2639 assert_eq!(
2640 result.get(&(1, 2, 0)),
2641 Some(&MergedCell::Live { timestamp: 5 }),
2642 "RangeTombstone in partition 0 must not affect partition 1"
2643 );
2644 }
2645
2646 // ─── Property tests ───────────────────────────────────────────────────────
2647
2648 proptest! {
2649 #![proptest_config(ProptestConfig::with_cases(64))]
2650
2651 // Property A: Tombstone shadowing
2652 // After merging, for every (partition, clustering, column) cell slot, if
2653 // the reference says Dead(ts=T) then the highest-timestamp Delete in the
2654 // input stream for that slot must have timestamp T.
2655 #[test]
2656 fn prop_tombstone_shadowing_consistent(inputs in arb_cell_stream()) {
2657 let merged = reference_merge(&inputs);
2658
2659 for (&(pk, ck, col), cell) in &merged {
2660 if let MergedCell::Dead { timestamp: dead_ts } = cell {
2661 // Find the highest-timestamp Delete for this slot in the input.
2662 let best_delete = inputs.iter()
2663 .filter(|ci| ci.partition == pk && ci.clustering == ck && ci.column == col)
2664 .filter_map(|ci| {
2665 if let CellOp::Delete { timestamp } = ci.op {
2666 Some(timestamp)
2667 } else {
2668 None
2669 }
2670 })
2671 .max();
2672
2673 prop_assert!(
2674 best_delete.is_some(),
2675 "Dead cell at ({},{},{}) but no Delete in inputs",
2676 pk, ck, col
2677 );
2678 prop_assert_eq!(
2679 best_delete.unwrap(),
2680 *dead_ts,
2681 "Dead cell timestamp must equal best Delete timestamp for ({},{},{})",
2682 pk, ck, col
2683 );
2684 }
2685 }
2686 }
2687
2688 // Property B: TTL expiry
2689 // After merging, no cell should be Live if all its Write ops are TTL-expired.
2690 #[test]
2691 fn prop_ttl_expiry_no_expired_live_cells(inputs in arb_cell_stream()) {
2692 let merged = reference_merge(&inputs);
2693
2694 for (&(pk, ck, col), cell) in &merged {
2695 if let MergedCell::Live { .. } = cell {
2696 // There must be at least one non-expired Write for this slot.
2697 let has_live_write = inputs.iter()
2698 .filter(|ci| ci.partition == pk && ci.clustering == ck && ci.column == col)
2699 .any(|ci| {
2700 if let CellOp::Write { local_deletion_time, .. } = &ci.op {
2701 // A live write: no TTL, or TTL not expired.
2702 local_deletion_time
2703 .map(|ldt| ldt >= MERGE_TIME_SECS)
2704 .unwrap_or(true)
2705 } else {
2706 false
2707 }
2708 });
2709
2710 prop_assert!(
2711 has_live_write,
2712 "Live cell at ({},{},{}) but all writes are expired",
2713 pk, ck, col
2714 );
2715 }
2716 }
2717 }
2718
2719 // Property C: Range tombstone application
2720 // After merging, no Live cell should exist that is fully covered by a
2721 // range tombstone whose marked_for_delete_at >= the cell's write timestamp.
2722 #[test]
2723 fn prop_range_tombstone_suppresses_covered_live_cells(inputs in arb_cell_stream()) {
2724 let merged = reference_merge(&inputs);
2725
2726 // Collect all range tombstones from the input stream.
2727 let range_tombstones: Vec<(u8, u8, u8, i64)> = inputs.iter()
2728 .filter_map(|ci| {
2729 if let CellOp::RangeTombstone { start_ck, end_ck, marked_for_delete_at } = ci.op {
2730 Some((ci.partition, start_ck, end_ck, marked_for_delete_at))
2731 } else {
2732 None
2733 }
2734 })
2735 .collect();
2736
2737 for (&(pk, ck, _col), cell) in &merged {
2738 if let MergedCell::Live { timestamp } = cell {
2739 // Verify no range tombstone shadows this cell.
2740 for &(rt_pk, start_ck, end_ck, mfda) in &range_tombstones {
2741 if rt_pk == pk && ck >= start_ck && ck <= end_ck && mfda >= *timestamp {
2742 prop_assert!(
2743 false,
2744 "Live cell at ({},{}) ts={} should be suppressed by \
2745 RangeTombstone(part={}, [{},{}], mfda={})",
2746 pk, ck, timestamp, rt_pk, start_ck, end_ck, mfda
2747 );
2748 }
2749 }
2750 }
2751 }
2752 }
2753
2754 // Property D: LWW correctness
2755 // Every Live cell in the merged output must have a timestamp equal to
2756 // the maximum non-expired Write timestamp for that cell slot.
2757 #[test]
2758 fn prop_live_cell_has_max_write_timestamp(inputs in arb_cell_stream()) {
2759 let merged = reference_merge(&inputs);
2760
2761 for (&(pk, ck, col), cell) in &merged {
2762 if let MergedCell::Live { timestamp: live_ts } = cell {
2763 // Find the maximum non-expired Write timestamp for this slot.
2764 let max_ts = inputs.iter()
2765 .filter(|ci| ci.partition == pk && ci.clustering == ck && ci.column == col)
2766 .filter_map(|ci| {
2767 if let CellOp::Write { timestamp, local_deletion_time } = &ci.op {
2768 // Only include non-expired writes.
2769 let not_expired = local_deletion_time
2770 .map(|ldt| ldt >= MERGE_TIME_SECS)
2771 .unwrap_or(true);
2772 if not_expired { Some(*timestamp) } else { None }
2773 } else {
2774 None
2775 }
2776 })
2777 .max();
2778
2779 prop_assert_eq!(
2780 max_ts,
2781 Some(*live_ts),
2782 "Live cell at ({},{},{}) must have max non-expired write timestamp",
2783 pk, ck, col
2784 );
2785 }
2786 }
2787 }
2788
2789 // Property E: Output is deterministic (idempotent reference)
2790 // Calling reference_merge twice on the same input produces identical output.
2791 #[test]
2792 fn prop_reference_merge_is_deterministic(inputs in arb_cell_stream()) {
2793 let result_a = reference_merge(&inputs);
2794 let result_b = reference_merge(&inputs);
2795 prop_assert_eq!(
2796 sorted_keys(&result_a),
2797 sorted_keys(&result_b),
2798 "reference_merge must be deterministic"
2799 );
2800 }
2801
2802 // Property F: Real merger LWW parity
2803 // For cell streams containing only non-expired Writes (no Deletes,
2804 // no RangeTombstones, no TTL), the real KWayMerger.merge_partition_rows
2805 // must agree with the reference on which row wins per clustering key.
2806 //
2807 // We drive merge_partition_rows directly with synthetic MergeEntry inputs
2808 // that represent the Write ops.
2809 #[test]
2810 fn prop_real_merger_lww_agrees_with_reference(
2811 entries in prop::collection::vec(
2812 // (clustering_key 0..4, run_index 0..2, timestamp 1..20)
2813 (0u8..4u8, 0usize..2usize, 1i64..=20i64),
2814 2..=12usize,
2815 )
2816 ) {
2817 use crate::schema::{Column, KeyColumn};
2818 use std::collections::HashMap as SchemaMap;
2819
2820 let schema = TableSchema {
2821 keyspace: "prop_test_ks".to_string(),
2822 table: "prop_test_table".to_string(),
2823 partition_keys: vec![KeyColumn {
2824 name: "id".to_string(),
2825 data_type: "int".to_string(),
2826 position: 0,
2827 }],
2828 clustering_keys: vec![],
2829 columns: vec![Column {
2830 name: "value".to_string(),
2831 data_type: "text".to_string(),
2832 nullable: true,
2833 default: None,
2834 is_static: false,
2835 }],
2836 comments: SchemaMap::new(),
2837 };
2838
2839 // Build MergeEntry stream — one per (ck, run_index, timestamp) tuple.
2840 // All entries share the same partition.
2841 let partition_key = DecoratedKey::new(100, vec![0, 0, 0, 1]);
2842 let merge_entries: Vec<MergeEntry> = entries.iter().map(|&(ck, run_index, ts)| {
2843 let ck_key = ClusteringKey {
2844 columns: vec![("ck".to_string(), Value::TinyInt(ck as i8))],
2845 };
2846 MergeEntry::new(
2847 run_index,
2848 partition_key.clone(),
2849 Some(ck_key),
2850 ts,
2851 RowData::Live {
2852 cells: vec![CellData {
2853 column: "value".to_string(),
2854 value: Value::Integer(ts as i32),
2855 timestamp: ts,
2856 ttl: None,
2857 }],
2858 },
2859 )
2860 }).collect();
2861
2862 // Drive the real merger.
2863 let merger = KWayMerger {
2864 runs: vec![],
2865 heap: std::collections::BinaryHeap::new(),
2866 current_partition: None,
2867 schema: schema.clone(),
2868 };
2869 let real_merged = merger.merge_partition_rows(merge_entries.clone())
2870 .expect("merge_partition_rows must not fail");
2871
2872 // Build the reference result: per clustering-key int, highest timestamp wins.
2873 // (run_index as tie-breaker: lower run_index wins at equal ts — same as merger)
2874 let mut ref_map: HashMap<u8, (i64, usize)> = HashMap::new();
2875 for &(ck, run_index, ts) in &entries {
2876 ref_map.entry(ck)
2877 .and_modify(|(best_ts, best_run)| {
2878 if ts > *best_ts || (ts == *best_ts && run_index < *best_run) {
2879 *best_ts = ts;
2880 *best_run = run_index;
2881 }
2882 })
2883 .or_insert((ts, run_index));
2884 }
2885
2886 // Verify each winner in the real output matches the reference.
2887 prop_assert_eq!(
2888 real_merged.len(),
2889 ref_map.len(),
2890 "real merger output row count must match reference"
2891 );
2892
2893 for entry in &real_merged {
2894 let ck_byte = match entry.clustering_key.as_ref()
2895 .and_then(|ck| ck.columns.first())
2896 .map(|(_, v)| v)
2897 {
2898 Some(Value::TinyInt(b)) => *b as u8,
2899 _ => {
2900 prop_assert!(false, "unexpected clustering key value");
2901 unreachable!()
2902 }
2903 };
2904
2905 let (ref_ts, _ref_run) = ref_map[&ck_byte];
2906 prop_assert_eq!(
2907 entry.timestamp,
2908 ref_ts,
2909 "real merger winner timestamp must match reference for ck={}",
2910 ck_byte
2911 );
2912 }
2913 }
2914
2915 // Property G: Tombstone wins over live row at same clustering key in real merger
2916 // When a Tombstone and a Live row have the same clustering key, the one with
2917 // the higher timestamp must win — and the real merger must reflect this.
2918 #[test]
2919 fn prop_real_merger_tombstone_vs_live(
2920 ts_write in 1i64..=10i64,
2921 ts_delete in 1i64..=20i64,
2922 ) {
2923 use crate::schema::{Column, KeyColumn};
2924 use std::collections::HashMap as SchemaMap;
2925
2926 let schema = TableSchema {
2927 keyspace: "prop_test_ks".to_string(),
2928 table: "prop_test_table".to_string(),
2929 partition_keys: vec![KeyColumn {
2930 name: "id".to_string(),
2931 data_type: "int".to_string(),
2932 position: 0,
2933 }],
2934 clustering_keys: vec![],
2935 columns: vec![Column {
2936 name: "value".to_string(),
2937 data_type: "text".to_string(),
2938 nullable: true,
2939 default: None,
2940 is_static: false,
2941 }],
2942 comments: SchemaMap::new(),
2943 };
2944
2945 let partition_key = DecoratedKey::new(100, vec![0, 0, 0, 1]);
2946 let ck = ClusteringKey {
2947 columns: vec![("ck".to_string(), Value::TinyInt(0))],
2948 };
2949
2950 // Deliberately give the LIVE row the NEWER file (run_index 0) and the
2951 // tombstone the OLDER file (run_index 1). A run_index-only tiebreak at
2952 // equal timestamp would wrongly pick the live row; the Cassandra
2953 // liveness rule must pick the tombstone regardless of file recency.
2954 let live_entry = MergeEntry::new(
2955 0, // run_index 0 = newer file
2956 partition_key.clone(),
2957 Some(ck.clone()),
2958 ts_write,
2959 RowData::Live {
2960 cells: vec![CellData {
2961 column: "value".to_string(),
2962 value: Value::Integer(42),
2963 timestamp: ts_write,
2964 ttl: None,
2965 }],
2966 },
2967 );
2968 let tombstone_entry = MergeEntry::new(
2969 1, // run_index 1 = older file
2970 partition_key.clone(),
2971 Some(ck.clone()),
2972 ts_delete,
2973 RowData::Tombstone {
2974 deletion_time: ts_delete,
2975 local_deletion_time: 2000,
2976 },
2977 );
2978
2979 let merger = KWayMerger {
2980 runs: vec![],
2981 heap: std::collections::BinaryHeap::new(),
2982 current_partition: None,
2983 schema: schema.clone(),
2984 };
2985 let merged = merger.merge_partition_rows(vec![live_entry, tombstone_entry])
2986 .expect("merge_partition_rows must not fail");
2987
2988 prop_assert_eq!(merged.len(), 1, "one clustering key => one merged row");
2989
2990 let winner = &merged[0];
2991 if ts_delete > ts_write {
2992 // Tombstone has higher timestamp => should win.
2993 prop_assert!(
2994 matches!(winner.row_data, RowData::Tombstone { .. }),
2995 "Tombstone(ts={}) must win over Live(ts={})",
2996 ts_delete, ts_write
2997 );
2998 } else if ts_write > ts_delete {
2999 // Live write has higher timestamp => should win.
3000 prop_assert!(
3001 matches!(winner.row_data, RowData::Live { .. }),
3002 "Live(ts={}) must win over Tombstone(ts={})",
3003 ts_write, ts_delete
3004 );
3005 } else {
3006 // Equal timestamps: the tombstone (Delete) ALWAYS wins, matching
3007 // Cassandra `Cells#reconcile`. This must hold regardless of file
3008 // recency — the assertion previously carved this case out, hiding
3009 // the run_index-only tiebreak bug (Issue #498).
3010 prop_assert!(
3011 matches!(winner.row_data, RowData::Tombstone { .. }),
3012 "At equal ts={}, Tombstone must win over Live (Cassandra reconcile rule)",
3013 ts_delete
3014 );
3015 }
3016 }
3017 }
3018}