Skip to main content

cqlite_core/storage/write_engine/
merge.rs

1//! K-way merge for combining multiple L0 SSTables
2//!
3//! Implements efficient k-way merge using a binary heap for producing
4//! compacted SSTables from multiple runs.
5//!
6//! ## Architecture
7//!
8//! The K-way merger uses a min-heap to efficiently merge k sorted SSTable
9//! runs into a single output SSTable. Each run maintains a peek buffer for
10//! efficient lookahead.
11//!
12//! ## Ordering
13//!
14//! The `Ord`/`PartialOrd` impl on `MergeEntry` governs **heap routing only**
15//! (which partition/clustering bucket an entry belongs to) — NOT winner
16//! selection. Winner selection among entries with the same clustering key is
17//! done by `merge_partition_rows` (see "Cell Merge Rule" below), which layers a
18//! timestamp + liveness comparison on top.
19//!
20//! Heap-routing order:
21//! 1. Token (ascending) - Primary partitioning
22//! 2. Key bytes (ascending) - Hash collision resolution
23//! 3. Clustering key (schema-aware) - Within partition ordering
24//! 4. Run index (ascending) - Stable tiebreak for routing (NOT the LWW rule)
25//!
26//! ## Memory Budget
27//!
28//! Total memory: k × 8KB peek buffers (where k = number of input SSTables)
29//! For 10 SSTables: ~80KB memory footprint
30//!
31//! ## Cell Merge Rule
32//!
33//! Last-write-wins by timestamp, following Cassandra `Cells#reconcile`:
34//! - Keep the entry with the highest timestamp.
35//! - If timestamps are equal, the tombstone (Delete) wins over a live entry,
36//!   independent of which file it came from (Issue #498).
37//! - If timestamp AND liveness are equal, prefer the lower run_index (newer file).
38//!
39//! Implementation for M5.2 (Issue #382)
40
41#[cfg(feature = "write-support")]
42use crate::error::{Error, Result};
43#[cfg(feature = "write-support")]
44use crate::schema::TableSchema;
45#[cfg(feature = "write-support")]
46use crate::storage::write_engine::mutation::{ClusteringKey, DecoratedKey};
47#[cfg(feature = "write-support")]
48use crate::types::Value;
49
50#[cfg(feature = "write-support")]
51use std::cmp::{Ordering, Reverse};
52#[cfg(feature = "write-support")]
53use std::collections::{BinaryHeap, VecDeque};
54#[cfg(feature = "write-support")]
55use std::path::{Path, PathBuf};
56#[cfg(feature = "write-support")]
57use std::time::{Duration, Instant};
58
59/// Entry in the merge stream
60///
61/// Represents a single row from one of the input SSTables. This is the
62/// fundamental unit that flows through the merge heap.
63#[cfg(feature = "write-support")]
64#[derive(Debug, Clone, PartialEq, Eq)]
65pub struct MergeEntry {
66    /// Which SSTable this came from (0 = newest)
67    pub run_index: usize,
68    /// Partition key with token
69    pub key: DecoratedKey,
70    /// Clustering key (None for tables without clustering)
71    pub clustering_key: Option<ClusteringKey>,
72    /// Timestamp in microseconds since Unix epoch
73    pub timestamp: i64,
74    /// Row data (live cells or tombstone)
75    pub row_data: RowData,
76}
77
78impl MergeEntry {
79    /// Create a new merge entry
80    pub fn new(
81        run_index: usize,
82        key: DecoratedKey,
83        clustering_key: Option<ClusteringKey>,
84        timestamp: i64,
85        row_data: RowData,
86    ) -> Self {
87        Self {
88            run_index,
89            key,
90            clustering_key,
91            timestamp,
92            row_data,
93        }
94    }
95}
96
97/// Ord implementation for min-heap routing ONLY (not LWW winner selection).
98///
99/// This orders entries so the heap yields them grouped by partition and
100/// clustering key. The actual equal-timestamp Delete-vs-Live winner is chosen
101/// in `merge_partition_rows` (timestamp → liveness → run_index), NOT here.
102///
103/// Order by:
104/// 1. Token (ascending)
105/// 2. Key bytes (ascending, for hash collisions)
106/// 3. Clustering key (ascending, schema-aware)
107/// 4. Run index (ascending) - stable routing tiebreak only
108#[cfg(feature = "write-support")]
109impl Ord for MergeEntry {
110    fn cmp(&self, other: &Self) -> Ordering {
111        // Primary: by token
112        match self.key.token.cmp(&other.key.token) {
113            Ordering::Equal => {
114                // Secondary: by key bytes (hash collision resolution)
115                match self.key.key.cmp(&other.key.key) {
116                    Ordering::Equal => {
117                        // Tertiary: by clustering key
118                        match (&self.clustering_key, &other.clustering_key) {
119                            (None, None) => {
120                                // Quaternary: by run_index (lower = newer)
121                                self.run_index.cmp(&other.run_index)
122                            }
123                            (None, Some(_)) => Ordering::Less,
124                            (Some(_), None) => Ordering::Greater,
125                            (Some(a), Some(b)) => {
126                                // Use fallback Ord (not schema-aware at this level)
127                                // Schema-aware comparison happens during partition merge
128                                match a.cmp(b) {
129                                    Ordering::Equal => {
130                                        // Equal clustering keys: prefer lower run_index
131                                        self.run_index.cmp(&other.run_index)
132                                    }
133                                    other_ord => other_ord,
134                                }
135                            }
136                        }
137                    }
138                    other_ord => other_ord,
139                }
140            }
141            other_ord => other_ord,
142        }
143    }
144}
145
146#[cfg(feature = "write-support")]
147impl PartialOrd for MergeEntry {
148    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
149        Some(self.cmp(other))
150    }
151}
152
153/// Row data: live cells or tombstone
154#[cfg(feature = "write-support")]
155#[derive(Debug, Clone, PartialEq, Eq)]
156pub enum RowData {
157    /// Live row with cell data
158    Live {
159        /// Cell data for this row
160        cells: Vec<CellData>,
161    },
162    /// Row tombstone
163    Tombstone {
164        /// Deletion timestamp (microseconds)
165        deletion_time: i64,
166        /// Local deletion time (seconds since epoch)
167        local_deletion_time: i32,
168    },
169}
170
171/// Cell data with timestamp and optional TTL
172#[cfg(feature = "write-support")]
173#[derive(Debug, Clone, PartialEq, Eq)]
174pub struct CellData {
175    /// Column name
176    pub column: String,
177    /// Column value
178    pub value: Value,
179    /// Cell timestamp (microseconds)
180    pub timestamp: i64,
181    /// TTL in seconds (None = no expiration)
182    pub ttl: Option<u32>,
183}
184
185/// Result of a merge step (incremental merge)
186#[cfg(feature = "write-support")]
187#[derive(Debug)]
188pub enum MergeStep {
189    /// Merged partition with all its rows
190    Partition {
191        /// Partition key
192        key: DecoratedKey,
193        /// All rows in this partition (already merged)
194        rows: Vec<MergeEntry>,
195    },
196    /// Merge is complete
197    Complete,
198}
199
200/// Statistics collected during merge
201#[cfg(feature = "write-support")]
202#[derive(Debug, Clone)]
203pub struct MergeStats {
204    /// Number of input files
205    pub input_files: usize,
206    /// Number of output partitions
207    pub output_partitions: u64,
208    /// Number of output rows
209    pub output_rows: u64,
210    /// Bytes written to output
211    pub bytes_written: u64,
212    /// Elapsed time
213    pub elapsed: Duration,
214}
215
216/// Buffered reader for a single SSTable run
217///
218/// Maintains a peek buffer for efficient lookahead without repeated I/O.
219/// Buffer size is fixed at 8KB worth of entries for predictable memory usage.
220#[cfg(feature = "write-support")]
221struct RunReader {
222    /// Abstract SSTable row iterator (boxed, not Debug)
223    reader: Box<dyn SSTableRowIterator>,
224    /// Peek buffer (FIFO)
225    buffer: VecDeque<MergeEntry>,
226    /// Target buffer size in bytes (~8KB)
227    buffer_size: usize,
228    /// Whether this run is exhausted
229    exhausted: bool,
230}
231
232#[cfg(feature = "write-support")]
233impl std::fmt::Debug for RunReader {
234    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
235        f.debug_struct("RunReader")
236            .field("buffer_len", &self.buffer.len())
237            .field("buffer_size", &self.buffer_size)
238            .field("exhausted", &self.exhausted)
239            .finish()
240    }
241}
242
243#[cfg(feature = "write-support")]
244impl RunReader {
245    /// Default buffer size (8KB worth of entries)
246    const DEFAULT_BUFFER_SIZE: usize = 8 * 1024;
247
248    /// Create a new run reader
249    fn new(reader: Box<dyn SSTableRowIterator>) -> Self {
250        Self {
251            reader,
252            buffer: VecDeque::new(),
253            buffer_size: Self::DEFAULT_BUFFER_SIZE,
254            exhausted: false,
255        }
256    }
257
258    /// Peek at the next entry without consuming it
259    ///
260    /// Returns None if this run is exhausted.
261    fn peek(&mut self) -> Result<Option<&MergeEntry>> {
262        // Refill buffer if empty and not exhausted
263        if self.buffer.is_empty() && !self.exhausted {
264            self.refill_buffer()?;
265        }
266
267        Ok(self.buffer.front())
268    }
269
270    /// Advance to the next entry
271    ///
272    /// Consumes the front entry and returns it.
273    fn advance(&mut self) -> Result<Option<MergeEntry>> {
274        if let Some(entry) = self.buffer.pop_front() {
275            return Ok(Some(entry));
276        }
277
278        // Buffer empty, try to refill
279        if !self.exhausted {
280            self.refill_buffer()?;
281            Ok(self.buffer.pop_front())
282        } else {
283            Ok(None)
284        }
285    }
286
287    /// Check if this run is exhausted
288    fn is_exhausted(&self) -> bool {
289        self.exhausted && self.buffer.is_empty()
290    }
291
292    /// Refill the peek buffer from the underlying reader
293    fn refill_buffer(&mut self) -> Result<()> {
294        let mut bytes_buffered = 0;
295
296        while bytes_buffered < self.buffer_size {
297            match self.reader.next() {
298                Some(Ok(entry)) => {
299                    // Estimate entry size for buffer management
300                    bytes_buffered += Self::estimate_entry_size(&entry);
301                    self.buffer.push_back(entry);
302                }
303                Some(Err(e)) => return Err(e),
304                None => {
305                    self.exhausted = true;
306                    break;
307                }
308            }
309        }
310
311        Ok(())
312    }
313
314    /// Estimate the memory size of an entry
315    ///
316    /// This is approximate - just for buffer management.
317    fn estimate_entry_size(entry: &MergeEntry) -> usize {
318        let base_size = std::mem::size_of::<MergeEntry>();
319        let key_size = entry.key.key.len();
320        let clustering_size = entry
321            .clustering_key
322            .as_ref()
323            .map(|ck| {
324                ck.columns
325                    .iter()
326                    .map(|(name, value)| name.len() + Self::estimate_value_size(value))
327                    .sum()
328            })
329            .unwrap_or(0);
330
331        let data_size = match &entry.row_data {
332            RowData::Live { cells } => cells
333                .iter()
334                .map(|cell| {
335                    std::mem::size_of::<CellData>()
336                        + cell.column.len()
337                        + Self::estimate_value_size(&cell.value)
338                })
339                .sum(),
340            RowData::Tombstone { .. } => 16,
341        };
342
343        base_size + key_size + clustering_size + data_size
344    }
345
346    /// Estimate the memory size of a Value
347    fn estimate_value_size(value: &Value) -> usize {
348        match value {
349            Value::Null => 0,
350            Value::Boolean(_) => 1,
351            Value::TinyInt(_) => 1,
352            Value::SmallInt(_) => 2,
353            Value::Integer(_) => 4,
354            Value::BigInt(_) | Value::Counter(_) | Value::Timestamp(_) | Value::Time(_) => 8,
355            Value::Float32(_) => 4,
356            Value::Float(_) => 8,
357            Value::Text(s) => s.len() + std::mem::size_of::<String>(),
358            Value::Blob(b) => b.len() + std::mem::size_of::<Vec<u8>>(),
359            Value::Uuid(_) => 16,
360            Value::Inet(b) => b.len() + std::mem::size_of::<Vec<u8>>(),
361            Value::Varint(b) => b.len() + std::mem::size_of::<Vec<u8>>(),
362            Value::Decimal { unscaled, .. } => unscaled.len() + 4 + std::mem::size_of::<Vec<u8>>(),
363            Value::Date(_) => 4,
364            Value::Duration { .. } => 20,
365            _ => 32, // Default estimate for complex types
366        }
367    }
368}
369
370/// Abstract iterator trait for SSTable rows
371///
372/// This allows the K-way merger to work with different SSTable reader
373/// implementations without coupling to specific reader types.
374#[cfg(feature = "write-support")]
375pub trait SSTableRowIterator: Send {
376    /// Get the next row from this SSTable
377    fn next(&mut self) -> Option<Result<MergeEntry>>;
378}
379
380/// Run an async future to completion from a synchronous context, safely whether
381/// or not a Tokio runtime is already running on the current thread.
382///
383/// This is the shared async-to-sync bridge for the write engine's blocking
384/// helpers: [`SSTableRowIteratorAdapter`] (the k-way merge readers),
385/// `WriteEngine::flush_internal`, and `WriteEngine::finalize_merge_blocking`.
386///
387/// ## Why not `Handle::block_on`?
388///
389/// When this bridge is reached from a thread that is already driving a Tokio
390/// runtime — anything under `#[tokio::main]` or `#[tokio::test]`, which is how
391/// the CLI (`maintenance`, `export-sstable --compact`) and any async caller
392/// reach compaction — `Handle::current().block_on()` panics with *"Cannot start
393/// a runtime from within a runtime"* (Issue #587). Compaction only reaches the
394/// bridge once a merge has input SSTables to read, which is why STCS worked in
395/// isolation but blew up from async callers.
396///
397/// `tokio::task::block_in_place` is not a general fix either: it panics on a
398/// current-thread runtime (e.g. the default `#[tokio::test]` flavor).
399///
400/// ## Strategy
401///
402/// - **No runtime on the current thread** (`Handle::try_current()` is `Err`):
403///   create a temporary runtime and block on it directly.
404/// - **Already inside a runtime** (`Ok`): hand the future to a dedicated scoped
405///   thread that owns a fresh runtime, then join it. That thread is free to
406///   block because it is not driving the caller's runtime, so this works for
407///   both the multi-thread and current-thread runtime flavors.
408///   [`std::thread::scope`] (rather than [`std::thread::spawn`]) lets the future
409///   borrow from the caller's stack — `flush_internal`/`finalize_merge_blocking`
410///   pass futures that borrow `&mut self` — so it need not be `'static`.
411///
412/// The future and its output must be `Send` because they cross a thread boundary
413/// in the in-runtime case.
414#[cfg(feature = "write-support")]
415pub(crate) fn block_on_async<F, T>(future: F) -> Result<T>
416where
417    F: std::future::Future<Output = Result<T>> + Send,
418    T: Send,
419{
420    match tokio::runtime::Handle::try_current() {
421        // Already inside a runtime: a nested `block_on` on this thread would
422        // panic. Run the future on a scoped thread with its own runtime instead.
423        Ok(_) => std::thread::scope(|scope| {
424            scope
425                .spawn(|| {
426                    let rt = tokio::runtime::Runtime::new().map_err(|e| {
427                        Error::Storage(format!("Failed to create tokio runtime: {}", e))
428                    })?;
429                    rt.block_on(future)
430                })
431                .join()
432                .map_err(|_| Error::Storage("async-to-sync bridge thread panicked".to_string()))?
433        }),
434        // No runtime on this thread: safe to create one and block directly.
435        Err(_) => {
436            let rt = tokio::runtime::Runtime::new()
437                .map_err(|e| Error::Storage(format!("Failed to create tokio runtime: {}", e)))?;
438            rt.block_on(future)
439        }
440    }
441}
442
443/// Adapter that wraps async SSTableReader into sync SSTableRowIterator.
444///
445/// Pre-loads all entries from an SSTable into memory, converting
446/// `(RowKey, Value)` pairs into `MergeEntry` format.
447///
448/// TODO(#447): Implement true streaming iteration to stay within the 128MB
449/// memory budget. Currently loads all entries upfront.
450#[cfg(feature = "write-support")]
451struct SSTableRowIteratorAdapter {
452    /// Pre-loaded entries
453    entries: std::vec::IntoIter<MergeEntry>,
454}
455
456#[cfg(feature = "write-support")]
457impl SSTableRowIteratorAdapter {
458    /// Open an SSTable and load all entries as MergeEntry.
459    ///
460    /// Uses [`SSTableReader::iterate_all_partitions_for_compaction`] which
461    /// returns actual per-row timestamps decoded from the on-disk row headers.
462    /// This allows the k-way merger to perform timestamp-accurate last-write-wins
463    /// ordering, which is essential for tombstone shadowing (Issue #505).
464    fn open(path: &Path, run_index: usize) -> Result<Self> {
465        use crate::platform::Platform;
466        use crate::Config;
467        use std::sync::Arc;
468
469        let mut config = Config::default();
470        // Issue #591: compaction MUST read its inputs through buffered I/O, never
471        // a memory map. `finalize_merge_async` deletes these input files once the
472        // merged output is published; a live mmap over a file that is then
473        // truncated or removed can fault with SIGBUS on Unix (unrecoverable as an
474        // `io::Error`) and can block deletion on Windows. Reading buffered — and
475        // draining every entry into memory in this constructor, before finalize
476        // deletes the inputs — guarantees no mapping outlives the file. This is
477        // pinned explicitly rather than relying on the (currently `false`) global
478        // default so the invariant cannot silently regress.
479        config.storage.use_mmap = false;
480        let path_buf = path.to_path_buf();
481
482        // Open SSTable reader and load all partitions with actual row timestamps.
483        let raw_entries = block_on_async(async move {
484            let platform = Arc::new(Platform::new(&config).await?);
485            let reader =
486                crate::storage::sstable::reader::SSTableReader::open(&path_buf, &config, platform)
487                    .await?;
488            // Use the compaction-specific path: returns (RowKey, Value, row_timestamp_micros).
489            // Row/cell tombstones are emitted as Value::Tombstone with their actual
490            // deletion timestamps so the merger can apply shadowing semantics (Issue #505).
491            reader.iterate_all_partitions_for_compaction(None).await
492        })?;
493
494        // Convert (RowKey, Value, timestamp) tuples to MergeEntry
495        let mut entries = Vec::with_capacity(raw_entries.len());
496        for (row_key, value, timestamp) in raw_entries {
497            let key_bytes = row_key.0;
498            let decorated_key = DecoratedKey::from_key_bytes(key_bytes)?;
499            let row_data = Self::value_to_row_data(&value, timestamp)?;
500
501            entries.push(MergeEntry::new(
502                run_index,
503                decorated_key,
504                None, // Clustering key extraction deferred
505                timestamp,
506                row_data,
507            ));
508        }
509
510        // SSTable data is already in token order from the reader, no sort needed
511
512        Ok(Self {
513            entries: entries.into_iter(),
514        })
515    }
516
517    /// Convert a reader Value to RowData.
518    ///
519    /// `row_timestamp` is the per-row timestamp decoded from the on-disk row
520    /// header (see [`SSTableReader::iterate_all_partitions_for_compaction`]). The
521    /// reader does not surface per-cell timestamps for live cells, so each live
522    /// cell inherits the row timestamp. This is required for per-cell reconcile
523    /// and row-tombstone shadowing to compare cell timestamps correctly
524    /// (Issue #533) — without it live cells would default to 0 and be wrongly
525    /// shadowed by any row tombstone.
526    ///
527    /// Issue #505: `Value::Tombstone(RowTombstone)` is now correctly emitted by
528    /// the V5CompressedLegacy parser for deleted rows, and
529    /// `Value::Tombstone(CellTombstone)` appears inside `Value::Map` entries for
530    /// deleted cells.  Both are surfaced here so the merger can apply shadowing
531    /// semantics.  A cell tombstone keeps its own `deletion_time` so equal-ts
532    /// reconcile still resolves it correctly.
533    fn value_to_row_data(value: &crate::types::Value, row_timestamp: i64) -> Result<RowData> {
534        match value {
535            crate::types::Value::Tombstone(info) => Ok(RowData::Tombstone {
536                deletion_time: info.deletion_time,
537                local_deletion_time: 0, // TombstoneInfo does not carry local_deletion_time
538            }),
539            crate::types::Value::Map(map_entries) => {
540                let mut cells = Vec::with_capacity(map_entries.len());
541                for (key, val) in map_entries {
542                    let column = match key {
543                        crate::types::Value::Text(s) => s.clone(),
544                        other => format!("{:?}", other),
545                    };
546                    // Cell tombstones carry their own deletion_time (Issue #505);
547                    // live cells inherit the row timestamp (Issue #533) so per-cell
548                    // shadowing and LWW order them against row tombstones correctly.
549                    let cell_ts = match val {
550                        crate::types::Value::Tombstone(info) => info.deletion_time,
551                        _ => row_timestamp,
552                    };
553                    cells.push(CellData {
554                        column,
555                        value: val.clone(),
556                        timestamp: cell_ts,
557                        ttl: None,
558                    });
559                }
560                Ok(RowData::Live { cells })
561            }
562            // Single value or other formats - wrap as a single cell
563            other => Ok(RowData::Live {
564                cells: vec![CellData {
565                    column: "value".to_string(),
566                    value: other.clone(),
567                    timestamp: row_timestamp,
568                    ttl: None,
569                }],
570            }),
571        }
572    }
573}
574
575#[cfg(feature = "write-support")]
576impl SSTableRowIterator for SSTableRowIteratorAdapter {
577    fn next(&mut self) -> Option<Result<MergeEntry>> {
578        self.entries.next().map(Ok)
579    }
580}
581
582/// K-way merger for combining multiple SSTables
583///
584/// Uses a min-heap to efficiently merge k sorted SSTable runs into a single
585/// output. Each run maintains a small peek buffer for efficient lookahead.
586///
587/// ## Usage
588///
589/// ```rust,ignore
590/// // Create merger from input SSTable paths
591/// let merger = KWayMerger::new(input_paths, &schema)?;
592///
593/// // Option 1: Full merge to output writer
594/// let stats = merger.merge(&mut output_writer)?;
595///
596/// // Option 2: Incremental merge (step-by-step)
597/// loop {
598///     match merger.step()? {
599///         MergeStep::Partition { key, rows } => {
600///             // Process partition
601///         }
602///         MergeStep::Complete => break,
603///     }
604/// }
605/// ```
606#[cfg(feature = "write-support")]
607#[derive(Debug)]
608pub struct KWayMerger {
609    /// Input runs (one per SSTable)
610    runs: Vec<RunReader>,
611    /// Min-heap for efficient merge
612    heap: BinaryHeap<Reverse<MergeEntry>>,
613    /// Current partition being merged (for partition boundary detection)
614    current_partition: Option<DecoratedKey>,
615    /// Table schema for schema-aware merging
616    schema: TableSchema,
617}
618
619#[cfg(feature = "write-support")]
620impl KWayMerger {
621    /// Create a new k-way merger from input SSTable paths
622    ///
623    /// # Arguments
624    ///
625    /// * `input_paths` - Paths to input SSTable Data.db files (ordered newest to oldest)
626    /// * `schema` - Table schema for schema-aware merging
627    ///
628    /// # Returns
629    ///
630    /// A new KWayMerger ready to merge the input SSTables.
631    ///
632    /// # Errors
633    ///
634    /// Returns an error if any input SSTable cannot be opened.
635    ///
636    /// # Example
637    ///
638    /// ```rust,ignore
639    /// let input_paths = vec![
640    ///     PathBuf::from("data/nb-1-big-Data.db"),
641    ///     PathBuf::from("data/nb-2-big-Data.db"),
642    /// ];
643    /// let merger = KWayMerger::new(input_paths, &schema)?;
644    /// ```
645    pub fn new(input_paths: Vec<PathBuf>, schema: &TableSchema) -> Result<Self> {
646        if input_paths.is_empty() {
647            return Err(Error::InvalidInput(
648                "K-way merge requires at least one input file".to_string(),
649            ));
650        }
651
652        // Create run readers for each input SSTable (ordered newest to oldest)
653        let mut runs = Vec::with_capacity(input_paths.len());
654        for (run_index, path) in input_paths.iter().enumerate() {
655            let adapter = SSTableRowIteratorAdapter::open(path, run_index)?;
656            runs.push(RunReader::new(Box::new(adapter)));
657        }
658
659        // Initialize heap (will be populated on first step)
660        let heap = BinaryHeap::new();
661
662        Ok(Self {
663            runs,
664            heap,
665            current_partition: None,
666            schema: schema.clone(),
667        })
668    }
669
670    /// Perform a full merge to the output writer
671    ///
672    /// This is a convenience method that repeatedly calls `step()` until
673    /// the merge is complete, writing each partition to the output writer.
674    ///
675    /// # Arguments
676    ///
677    /// * `output_writer` - SSTableWriter to write merged output
678    ///
679    /// # Returns
680    ///
681    /// Statistics about the merge operation.
682    ///
683    /// # Errors
684    ///
685    /// Returns an error if reading or writing fails.
686    pub fn merge(
687        mut self,
688        output_writer: &mut crate::storage::sstable::writer::SSTableWriter,
689    ) -> Result<MergeStats> {
690        let start_time = Instant::now();
691        let mut stats = MergeStats {
692            input_files: self.runs.len(),
693            output_partitions: 0,
694            output_rows: 0,
695            bytes_written: 0,
696            elapsed: Duration::from_secs(0), // Will be updated at the end
697        };
698
699        while let MergeStep::Partition { key, rows } = self.step()? {
700            stats.output_partitions += 1;
701            stats.output_rows += rows.len() as u64;
702
703            // Convert MergeEntry rows back to Mutation format for writer
704            let mutations = rows
705                .into_iter()
706                .map(|entry| Self::merge_entry_to_mutation(entry, &self.schema))
707                .collect::<Result<Vec<_>>>()?;
708
709            output_writer.write_partition(key, mutations)?;
710        }
711
712        stats.elapsed = start_time.elapsed();
713        Ok(stats)
714    }
715
716    /// Perform one merge step (one partition)
717    ///
718    /// Returns the next merged partition, or Complete if the merge is done.
719    /// This allows incremental merging for better memory control.
720    ///
721    /// # Returns
722    ///
723    /// - `MergeStep::Partition` - Next merged partition with all its rows
724    /// - `MergeStep::Complete` - Merge is complete
725    ///
726    /// # Errors
727    ///
728    /// Returns an error if reading fails.
729    pub fn step(&mut self) -> Result<MergeStep> {
730        // Initialize heap on first call
731        if self.heap.is_empty() && self.current_partition.is_none() {
732            self.initialize_heap()?;
733        }
734
735        // If heap is empty, merge is complete
736        if self.heap.is_empty() {
737            return Ok(MergeStep::Complete);
738        }
739
740        // Collect all rows for the next partition
741        let mut partition_rows = Vec::new();
742        let mut partition_key: Option<DecoratedKey> = None;
743
744        while let Some(Reverse(entry)) = self.heap.peek() {
745            // Check if we've moved to a new partition
746            if let Some(ref current_key) = partition_key {
747                if &entry.key != current_key {
748                    // Partition boundary - stop here
749                    break;
750                }
751            } else {
752                // First entry of new partition
753                partition_key = Some(entry.key.clone());
754            }
755
756            // Pop entry from heap
757            let Reverse(entry) = self
758                .heap
759                .pop()
760                .ok_or_else(|| Error::InvalidInput("Merge heap unexpectedly empty".to_string()))?;
761
762            // Add to partition rows
763            partition_rows.push(entry.clone());
764
765            // Refill heap from the run we just consumed from
766            self.refill_heap(entry.run_index)?;
767        }
768
769        if let Some(key) = partition_key {
770            // Merge cells within this partition (last-write-wins)
771            let merged_rows = self.merge_partition_rows(partition_rows)?;
772            Ok(MergeStep::Partition {
773                key,
774                rows: merged_rows,
775            })
776        } else {
777            Ok(MergeStep::Complete)
778        }
779    }
780
781    /// Initialize the heap with the first entry from each run
782    fn initialize_heap(&mut self) -> Result<()> {
783        for run_index in 0..self.runs.len() {
784            self.refill_heap(run_index)?;
785        }
786        Ok(())
787    }
788
789    /// Refill the heap from a specific run
790    fn refill_heap(&mut self, run_index: usize) -> Result<()> {
791        if run_index >= self.runs.len() {
792            return Ok(());
793        }
794
795        let run = &mut self.runs[run_index];
796        if !run.is_exhausted() {
797            if let Some(entry) = run.peek()? {
798                // Clone and push to heap
799                let entry = entry.clone();
800                self.heap.push(Reverse(entry));
801            }
802
803            // Advance the run reader
804            run.advance()?;
805        }
806
807        Ok(())
808    }
809
810    /// Merge rows within a single partition using **per-cell reconcile**
811    /// (Cassandra `org.apache.cassandra.db.rows.Cells#reconcile`).
812    ///
813    /// The pre-#533 implementation selected a single whole winning `MergeEntry`
814    /// per clustering key, which DROPPED columns when two SSTables shared the same
815    /// `(pk, ck)` but carried DISJOINT columns (e.g. A→{name}, B→{score} merged to
816    /// only B's column). This now reconciles cell-by-cell so disjoint columns from
817    /// every input survive (Issue #533).
818    ///
819    /// Algorithm per clustering-key group:
820    ///   1. **Effective row deletion** — among `RowData::Tombstone` entries take the
821    ///      max `deletion_time` (`row_del`). A row tombstone shadows any cell whose
822    ///      `timestamp <= row_del`.
823    ///   2. **Per-column cell reconcile** — across all `RowData::Live` entries, for
824    ///      each column name pick the winning cell by:
825    ///        - higher `timestamp` wins (last-write-wins);
826    ///        - at EQUAL timestamp a cell tombstone (`Value::Tombstone(CellTombstone)`)
827    ///          beats a live value (same rule as #498, applied per cell);
828    ///        - otherwise the existing winner is kept (stable; heap routing already
829    ///          ordered inputs by run_index so the first-seen at a tie is the newer
830    ///          file).
831    ///   3. **Row-tombstone shadowing per cell** — drop any reconciled cell whose
832    ///      `timestamp <= row_del`. The `<=` makes the tombstone win at equal ts,
833    ///      consistent with #498. Cells written strictly AFTER `row_del` survive.
834    ///   4. **Build the merged result** — if any cells survive, emit a `Live`
835    ///      entry whose row timestamp is the max surviving cell timestamp; else if a
836    ///      row tombstone was present, emit a `Tombstone` entry at `row_del` so the
837    ///      row stays shadowed downstream; else emit nothing.
838    fn merge_partition_rows(&self, rows: Vec<MergeEntry>) -> Result<Vec<MergeEntry>> {
839        use std::collections::BTreeMap;
840
841        // Group by clustering key using BTreeMap (ClusteringKey implements Ord).
842        // Preserve heap-routing order within each group so the per-cell tiebreak
843        // (first-seen wins at equal timestamp+liveness) follows run_index.
844        let mut clustered_rows: BTreeMap<Option<ClusteringKey>, Vec<MergeEntry>> = BTreeMap::new();
845
846        for row in rows {
847            clustered_rows
848                .entry(row.clustering_key.clone())
849                .or_default()
850                .push(row);
851        }
852
853        let mut merged = Vec::new();
854        for (ck, cluster_rows) in clustered_rows {
855            if let Some(entry) = Self::reconcile_cluster(ck, cluster_rows) {
856                merged.push(entry);
857            }
858        }
859
860        // Sort merged rows by clustering key for output order
861        merged.sort_by(|a, b| match (&a.clustering_key, &b.clustering_key) {
862            (None, None) => Ordering::Equal,
863            (None, Some(_)) => Ordering::Less,
864            (Some(_), None) => Ordering::Greater,
865            (Some(ck_a), Some(ck_b)) => {
866                // Use schema-aware comparison if available
867                ck_a.compare(ck_b, &self.schema).unwrap_or_else(|e| {
868                    log::warn!(
869                        "Schema-aware clustering key comparison failed, using fallback: {}",
870                        e
871                    );
872                    ck_a.cmp(ck_b)
873                })
874            }
875        });
876
877        Ok(merged)
878    }
879
880    /// Returns true when a cell carries a cell-level tombstone
881    /// (`Value::Tombstone(CellTombstone)`), the representation produced by #505.
882    ///
883    /// Cell tombstones participate in per-cell reconcile like any other cell, but
884    /// at EQUAL timestamp a cell tombstone beats a live value (Cassandra
885    /// `Cells#reconcile`, same rule as #498 applied per cell).
886    fn is_cell_tombstone(cell: &CellData) -> bool {
887        matches!(
888            cell.value,
889            crate::types::Value::Tombstone(ref info)
890                if info.tombstone_type == crate::types::TombstoneType::CellTombstone
891        )
892    }
893
894    /// Reconcile all entries for a single clustering-key group into at most one
895    /// merged `MergeEntry`, applying per-cell last-write-wins plus row-tombstone
896    /// shadowing (Issue #533). See [`Self::merge_partition_rows`] for the rules.
897    ///
898    /// `cluster_rows` is in heap-routing order (run_index ascending within equal
899    /// keys), so when two cells tie on both timestamp and liveness the first-seen
900    /// (newer file) is kept.
901    fn reconcile_cluster(
902        clustering_key: Option<ClusteringKey>,
903        cluster_rows: Vec<MergeEntry>,
904    ) -> Option<MergeEntry> {
905        use std::collections::HashMap;
906
907        // Carry-through key fields: every entry in this group shares the same
908        // partition key and clustering key. Use the lowest run_index seen (newest
909        // file) so downstream ordering is stable.
910        let mut key = None;
911        let mut run_index = usize::MAX;
912
913        // Step 1: effective row deletion — max deletion_time across row tombstones.
914        let mut row_del: Option<i64> = None;
915
916        // Step 2: per-column cell reconcile. Preserve first-seen column order for
917        // deterministic output while resolving winners in a side map.
918        let mut order: Vec<String> = Vec::new();
919        let mut winners: HashMap<String, CellData> = HashMap::new();
920
921        for entry in &cluster_rows {
922            if key.is_none() {
923                key = Some(entry.key.clone());
924            }
925            run_index = run_index.min(entry.run_index);
926
927            match &entry.row_data {
928                RowData::Tombstone { deletion_time, .. } => {
929                    row_del = Some(row_del.map_or(*deletion_time, |d| d.max(*deletion_time)));
930                }
931                RowData::Live { cells } => {
932                    for cell in cells {
933                        match winners.get(&cell.column) {
934                            None => {
935                                order.push(cell.column.clone());
936                                winners.insert(cell.column.clone(), cell.clone());
937                            }
938                            Some(existing) => {
939                                // Higher timestamp wins. At EQUAL timestamp a cell
940                                // tombstone beats a live value (Issue #498 per cell).
941                                // Otherwise keep the existing (first-seen = newer
942                                // file) winner.
943                                let replace = cell.timestamp > existing.timestamp
944                                    || (cell.timestamp == existing.timestamp
945                                        && Self::is_cell_tombstone(cell)
946                                        && !Self::is_cell_tombstone(existing));
947                                if replace {
948                                    winners.insert(cell.column.clone(), cell.clone());
949                                }
950                            }
951                        }
952                    }
953                }
954            }
955        }
956
957        let key = key?; // empty group => nothing to emit
958
959        // Step 3: apply row-tombstone shadowing per cell. A cell whose timestamp is
960        // <= row_del is shadowed (`<=` lets the tombstone win at equal ts, #498).
961        // Cells written strictly after row_del survive. This shadowing applies to
962        // cell tombstones too: a row tombstone at ts=T supersedes a cell tombstone at
963        // ts<=T (real Cassandra semantics). Note this is INTENTIONALLY stricter than
964        // the `reference_merge` model, whose range-tombstone path only suppresses
965        // live cells — `reconcile_cluster` is the authoritative behavior here.
966        let surviving: Vec<CellData> = order
967            .into_iter()
968            .filter_map(|col| winners.remove(&col))
969            .filter(|cell| match row_del {
970                Some(d) => cell.timestamp > d,
971                None => true,
972            })
973            .collect();
974
975        // Step 4: build the merged result. `max()` is `Some` exactly when `surviving`
976        // is non-empty, so this match needs no unreachable fallback timestamp.
977        match surviving.iter().map(|c| c.timestamp).max() {
978            Some(row_ts) => Some(MergeEntry::new(
979                run_index,
980                key,
981                clustering_key,
982                row_ts,
983                RowData::Live { cells: surviving },
984            )),
985            // No surviving cells. If a row tombstone exists, keep the row shadowed
986            // so downstream still emits the deletion (preserves #505/#498 absence).
987            // Otherwise the row is empty/absent.
988            None => row_del.map(|deletion_time| {
989                MergeEntry::new(
990                    run_index,
991                    key,
992                    clustering_key,
993                    deletion_time,
994                    RowData::Tombstone {
995                        deletion_time,
996                        local_deletion_time: 0,
997                    },
998                )
999            }),
1000        }
1001    }
1002
1003    /// Convert a MergeEntry back to Mutation for writing
1004    pub(crate) fn merge_entry_to_mutation(
1005        entry: MergeEntry,
1006        schema: &TableSchema,
1007    ) -> Result<crate::storage::write_engine::mutation::Mutation> {
1008        use crate::storage::write_engine::mutation::{
1009            CellOperation, Mutation, PartitionKey, TableId,
1010        };
1011
1012        let partition_key = PartitionKey::from_bytes(&entry.key.key, schema)?;
1013        let table_id = TableId::new(&schema.keyspace, &schema.table);
1014
1015        let operations = match entry.row_data {
1016            RowData::Live { cells } => cells
1017                .into_iter()
1018                .map(|cell| {
1019                    // Issue #505: cell-level tombstones are represented as
1020                    // Value::Tombstone(CellTombstone) inside the Map.  Translate
1021                    // them to CellOperation::Delete so the SSTableWriter writes a
1022                    // proper cell tombstone rather than a live cell with a null value.
1023                    if matches!(
1024                        cell.value,
1025                        crate::types::Value::Tombstone(ref info)
1026                            if info.tombstone_type == crate::types::TombstoneType::CellTombstone
1027                    ) {
1028                        return CellOperation::Delete {
1029                            column: cell.column,
1030                        };
1031                    }
1032                    if let Some(ttl) = cell.ttl {
1033                        CellOperation::WriteWithTtl {
1034                            column: cell.column,
1035                            value: cell.value,
1036                            ttl_seconds: ttl,
1037                        }
1038                    } else {
1039                        CellOperation::Write {
1040                            column: cell.column,
1041                            value: cell.value,
1042                        }
1043                    }
1044                })
1045                .collect(),
1046            RowData::Tombstone { .. } => vec![CellOperation::DeleteRow],
1047        };
1048
1049        Ok(Mutation::new(
1050            table_id,
1051            partition_key,
1052            entry.clustering_key,
1053            operations,
1054            entry.timestamp,
1055            None,
1056        ))
1057    }
1058}
1059
1060#[cfg(all(test, feature = "write-support"))]
1061mod tests {
1062    use super::*;
1063    use crate::storage::write_engine::mutation::DecoratedKey;
1064
1065    #[test]
1066    fn test_merge_entry_ordering_by_token() {
1067        let entry1 = MergeEntry::new(
1068            0,
1069            DecoratedKey::new(100, vec![1, 2, 3]),
1070            None,
1071            1000,
1072            RowData::Live { cells: vec![] },
1073        );
1074
1075        let entry2 = MergeEntry::new(
1076            0,
1077            DecoratedKey::new(200, vec![1, 2, 3]),
1078            None,
1079            1000,
1080            RowData::Live { cells: vec![] },
1081        );
1082
1083        // Entry with lower token should come first
1084        assert!(entry1 < entry2);
1085        assert!(entry2 > entry1);
1086    }
1087
1088    #[test]
1089    fn test_merge_entry_ordering_by_key_bytes() {
1090        // Same token, different key bytes (hash collision)
1091        let entry1 = MergeEntry::new(
1092            0,
1093            DecoratedKey::new(100, vec![1, 2, 3]),
1094            None,
1095            1000,
1096            RowData::Live { cells: vec![] },
1097        );
1098
1099        let entry2 = MergeEntry::new(
1100            0,
1101            DecoratedKey::new(100, vec![1, 2, 4]),
1102            None,
1103            1000,
1104            RowData::Live { cells: vec![] },
1105        );
1106
1107        // Entry with smaller key bytes should come first
1108        assert!(entry1 < entry2);
1109        assert!(entry2 > entry1);
1110    }
1111
1112    #[test]
1113    fn test_merge_entry_ordering_by_run_index() {
1114        // Same token and key, different run indices
1115        let entry1 = MergeEntry::new(
1116            0,
1117            DecoratedKey::new(100, vec![1, 2, 3]),
1118            None,
1119            1000,
1120            RowData::Live { cells: vec![] },
1121        );
1122
1123        let entry2 = MergeEntry::new(
1124            1,
1125            DecoratedKey::new(100, vec![1, 2, 3]),
1126            None,
1127            1000,
1128            RowData::Live { cells: vec![] },
1129        );
1130
1131        // Entry with lower run_index should come first (newer file wins)
1132        assert!(entry1 < entry2);
1133        assert!(entry2 > entry1);
1134    }
1135
1136    #[test]
1137    fn test_merge_entry_min_heap() {
1138        use std::cmp::Reverse;
1139        use std::collections::BinaryHeap;
1140
1141        let mut heap: BinaryHeap<Reverse<MergeEntry>> = BinaryHeap::new();
1142
1143        // Insert in reverse order
1144        let entry3 = MergeEntry::new(
1145            0,
1146            DecoratedKey::new(300, vec![3]),
1147            None,
1148            1000,
1149            RowData::Live { cells: vec![] },
1150        );
1151        let entry1 = MergeEntry::new(
1152            0,
1153            DecoratedKey::new(100, vec![1]),
1154            None,
1155            1000,
1156            RowData::Live { cells: vec![] },
1157        );
1158        let entry2 = MergeEntry::new(
1159            0,
1160            DecoratedKey::new(200, vec![2]),
1161            None,
1162            1000,
1163            RowData::Live { cells: vec![] },
1164        );
1165
1166        heap.push(Reverse(entry3.clone()));
1167        heap.push(Reverse(entry1.clone()));
1168        heap.push(Reverse(entry2.clone()));
1169
1170        // Should pop in ascending order
1171        assert_eq!(heap.pop().unwrap().0.key.token, 100);
1172        assert_eq!(heap.pop().unwrap().0.key.token, 200);
1173        assert_eq!(heap.pop().unwrap().0.key.token, 300);
1174    }
1175
1176    #[test]
1177    fn test_row_data_variants() {
1178        let live = RowData::Live {
1179            cells: vec![CellData {
1180                column: "name".to_string(),
1181                value: Value::Text("Alice".to_string()),
1182                timestamp: 1000,
1183                ttl: None,
1184            }],
1185        };
1186
1187        match live {
1188            RowData::Live { cells } => {
1189                assert_eq!(cells.len(), 1);
1190                assert_eq!(cells[0].column, "name");
1191            }
1192            _ => panic!("Expected Live variant"),
1193        }
1194
1195        let tombstone = RowData::Tombstone {
1196            deletion_time: 2000,
1197            local_deletion_time: 1000,
1198        };
1199
1200        match tombstone {
1201            RowData::Tombstone {
1202                deletion_time,
1203                local_deletion_time,
1204            } => {
1205                assert_eq!(deletion_time, 2000);
1206                assert_eq!(local_deletion_time, 1000);
1207            }
1208            _ => panic!("Expected Tombstone variant"),
1209        }
1210    }
1211
1212    #[test]
1213    fn test_cell_data_creation() {
1214        let cell = CellData {
1215            column: "age".to_string(),
1216            value: Value::Integer(30),
1217            timestamp: 1234567890,
1218            ttl: Some(3600),
1219        };
1220
1221        assert_eq!(cell.column, "age");
1222        assert_eq!(cell.value, Value::Integer(30));
1223        assert_eq!(cell.timestamp, 1234567890);
1224        assert_eq!(cell.ttl, Some(3600));
1225    }
1226
1227    #[test]
1228    fn test_merge_stats_creation() {
1229        let stats = MergeStats {
1230            input_files: 5,
1231            output_partitions: 1000,
1232            output_rows: 5000,
1233            bytes_written: 1024 * 1024,
1234            elapsed: Duration::from_secs(10),
1235        };
1236
1237        assert_eq!(stats.input_files, 5);
1238        assert_eq!(stats.output_partitions, 1000);
1239        assert_eq!(stats.output_rows, 5000);
1240        assert_eq!(stats.bytes_written, 1024 * 1024);
1241        assert_eq!(stats.elapsed.as_secs(), 10);
1242    }
1243
1244    #[test]
1245    fn test_run_reader_estimate_entry_size() {
1246        let entry = MergeEntry::new(
1247            0,
1248            DecoratedKey::new(100, vec![1, 2, 3, 4]),
1249            None,
1250            1000,
1251            RowData::Live {
1252                cells: vec![CellData {
1253                    column: "name".to_string(),
1254                    value: Value::Text("Alice".to_string()),
1255                    timestamp: 1000,
1256                    ttl: None,
1257                }],
1258            },
1259        );
1260
1261        let size = RunReader::estimate_entry_size(&entry);
1262
1263        // Size should be at least the base struct size plus key bytes
1264        let expected_min_size = std::mem::size_of::<MergeEntry>() + 4;
1265        assert!(size >= expected_min_size);
1266    }
1267
1268    #[test]
1269    fn test_kway_merger_empty_input() {
1270        use crate::schema::{KeyColumn, TableSchema};
1271        use std::collections::HashMap;
1272
1273        let schema = TableSchema {
1274            keyspace: "test_ks".to_string(),
1275            table: "test_table".to_string(),
1276            partition_keys: vec![KeyColumn {
1277                name: "id".to_string(),
1278                data_type: "int".to_string(),
1279                position: 0,
1280            }],
1281            clustering_keys: vec![],
1282            columns: vec![],
1283            comments: HashMap::new(),
1284        };
1285
1286        let result = KWayMerger::new(vec![], &schema);
1287        assert!(result.is_err());
1288
1289        if let Err(Error::InvalidInput(msg)) = result {
1290            assert!(msg.contains("at least one input file"));
1291        } else {
1292            panic!("Expected InvalidInput error");
1293        }
1294    }
1295
1296    #[test]
1297    fn test_merge_entry_equal_timestamps_prefer_lower_run_index() {
1298        // Same partition, same clustering, same timestamp
1299        // Lower run_index should win (newer file)
1300        let entry_run0 = MergeEntry::new(
1301            0, // run_index 0 (newer)
1302            DecoratedKey::new(100, vec![1, 2, 3]),
1303            None,
1304            1000, // same timestamp
1305            RowData::Live {
1306                cells: vec![CellData {
1307                    column: "name".to_string(),
1308                    value: Value::Text("Newer".to_string()),
1309                    timestamp: 1000,
1310                    ttl: None,
1311                }],
1312            },
1313        );
1314
1315        let entry_run1 = MergeEntry::new(
1316            1, // run_index 1 (older)
1317            DecoratedKey::new(100, vec![1, 2, 3]),
1318            None,
1319            1000, // same timestamp
1320            RowData::Live {
1321                cells: vec![CellData {
1322                    column: "name".to_string(),
1323                    value: Value::Text("Older".to_string()),
1324                    timestamp: 1000,
1325                    ttl: None,
1326                }],
1327            },
1328        );
1329
1330        // Entry from run 0 should come first in ordering
1331        assert!(entry_run0 < entry_run1);
1332    }
1333
1334    #[test]
1335    fn test_merge_entry_tombstone() {
1336        let tombstone_entry = MergeEntry::new(
1337            0,
1338            DecoratedKey::new(100, vec![1, 2, 3]),
1339            None,
1340            2000,
1341            RowData::Tombstone {
1342                deletion_time: 2000,
1343                local_deletion_time: 1000,
1344            },
1345        );
1346
1347        match tombstone_entry.row_data {
1348            RowData::Tombstone {
1349                deletion_time,
1350                local_deletion_time,
1351            } => {
1352                assert_eq!(deletion_time, 2000);
1353                assert_eq!(local_deletion_time, 1000);
1354            }
1355            _ => panic!("Expected Tombstone"),
1356        }
1357    }
1358
1359    #[test]
1360    fn test_real_merger_delete_wins_at_equal_timestamp() {
1361        // Issue #498: at EQUAL timestamp, a Delete (tombstone) must beat a Live
1362        // row regardless of file recency (Cassandra `Cells#reconcile`).
1363        //
1364        // We drive the REAL merger entry point (`merge_partition_rows`) with two
1365        // entries that share the SAME clustering key and the SAME timestamp:
1366        //   - A: Live, run_index 0  (the NEWER file — would win a run_index tiebreak)
1367        //   - B: Delete, run_index 1 (the OLDER file)
1368        //
1369        // The pre-fix merger sorted equal-timestamp ties by run_index only, so the
1370        // live row (run_index 0) would win and survive. With the fix the tombstone
1371        // wins. This test therefore FAILS if the tiebreak reverts to run_index.
1372        use crate::schema::{Column, KeyColumn, TableSchema};
1373        use std::collections::HashMap;
1374
1375        let schema = TableSchema {
1376            keyspace: "reconcile_ks".to_string(),
1377            table: "reconcile_tbl".to_string(),
1378            partition_keys: vec![KeyColumn {
1379                name: "id".to_string(),
1380                data_type: "int".to_string(),
1381                position: 0,
1382            }],
1383            clustering_keys: vec![],
1384            columns: vec![Column {
1385                name: "value".to_string(),
1386                data_type: "text".to_string(),
1387                nullable: true,
1388                default: None,
1389                is_static: false,
1390            }],
1391            comments: HashMap::new(),
1392        };
1393
1394        const EQUAL_TS: i64 = 1_700_000_000_000_000;
1395
1396        let partition_key = DecoratedKey::new(100, vec![0, 0, 0, 1]);
1397
1398        // A = Live, in the NEWER file (run_index 0).
1399        let live_entry = MergeEntry::new(
1400            0,
1401            partition_key.clone(),
1402            None,
1403            EQUAL_TS,
1404            RowData::Live {
1405                cells: vec![CellData {
1406                    column: "value".to_string(),
1407                    value: Value::Text("survivor-if-buggy".to_string()),
1408                    timestamp: EQUAL_TS,
1409                    ttl: None,
1410                }],
1411            },
1412        );
1413
1414        // B = Delete (row tombstone), in the OLDER file (run_index 1).
1415        let tombstone_entry = MergeEntry::new(
1416            1,
1417            partition_key.clone(),
1418            None,
1419            EQUAL_TS,
1420            RowData::Tombstone {
1421                deletion_time: EQUAL_TS,
1422                local_deletion_time: 2_000_000,
1423            },
1424        );
1425
1426        let merger = KWayMerger {
1427            runs: vec![],
1428            heap: BinaryHeap::new(),
1429            current_partition: None,
1430            schema,
1431        };
1432
1433        // Drive the real merger. Order the input so the live (newer-file) entry is
1434        // first — pre-fix this is exactly the entry that wins by run_index.
1435        let merged = merger
1436            .merge_partition_rows(vec![live_entry, tombstone_entry])
1437            .expect("merge_partition_rows must not fail");
1438
1439        assert_eq!(merged.len(), 1, "one clustering key => one merged winner");
1440
1441        assert!(
1442            matches!(merged[0].row_data, RowData::Tombstone { .. }),
1443            "At equal timestamp the tombstone must win even though the live row is in \
1444             the newer file (run_index 0). Got a live row => the equal-ts tiebreak \
1445             reverted to run_index (Issue #498 regression)."
1446        );
1447    }
1448
1449    #[test]
1450    fn test_real_merger_disjoint_columns_survive_compaction() {
1451        // Issue #533: when two SSTables share the same (pk, ck) but carry DISJOINT
1452        // columns, per-cell reconcile must keep cells from BOTH. The pre-fix merger
1453        // picked one whole winning row and DROPPED the loser's columns.
1454        //
1455        //   A (run_index 1, ts=100): {name: "alice"}
1456        //   B (run_index 0, ts=200): {score: 42}
1457        //
1458        // Cassandra `Cells#reconcile` => {name: "alice", score: 42}.
1459        // The old whole-row-wins code returned only {score: 42} (name LOST).
1460        use crate::schema::{ClusteringColumn, Column, KeyColumn, TableSchema};
1461        use std::collections::HashMap;
1462
1463        let schema = TableSchema {
1464            keyspace: "disjoint_ks".to_string(),
1465            table: "disjoint_tbl".to_string(),
1466            partition_keys: vec![KeyColumn {
1467                name: "pk".to_string(),
1468                data_type: "int".to_string(),
1469                position: 0,
1470            }],
1471            clustering_keys: vec![ClusteringColumn {
1472                name: "ck".to_string(),
1473                data_type: "int".to_string(),
1474                position: 0,
1475                order: Default::default(),
1476            }],
1477            columns: vec![
1478                Column {
1479                    name: "name".to_string(),
1480                    data_type: "text".to_string(),
1481                    nullable: true,
1482                    default: None,
1483                    is_static: false,
1484                },
1485                Column {
1486                    name: "score".to_string(),
1487                    data_type: "int".to_string(),
1488                    nullable: true,
1489                    default: None,
1490                    is_static: false,
1491                },
1492            ],
1493            comments: HashMap::new(),
1494        };
1495
1496        let partition_key = DecoratedKey::new(100, vec![0, 0, 0, 1]);
1497        let ck = ClusteringKey {
1498            columns: vec![("ck".to_string(), Value::Integer(1))],
1499        };
1500
1501        // A: older file (run_index 1), only `name` at ts=100.
1502        let entry_a = MergeEntry::new(
1503            1,
1504            partition_key.clone(),
1505            Some(ck.clone()),
1506            100,
1507            RowData::Live {
1508                cells: vec![CellData {
1509                    column: "name".to_string(),
1510                    value: Value::Text("alice".to_string()),
1511                    timestamp: 100,
1512                    ttl: None,
1513                }],
1514            },
1515        );
1516
1517        // B: newer file (run_index 0), only `score` at ts=200.
1518        let entry_b = MergeEntry::new(
1519            0,
1520            partition_key.clone(),
1521            Some(ck.clone()),
1522            200,
1523            RowData::Live {
1524                cells: vec![CellData {
1525                    column: "score".to_string(),
1526                    value: Value::Integer(42),
1527                    timestamp: 200,
1528                    ttl: None,
1529                }],
1530            },
1531        );
1532
1533        let merger = KWayMerger {
1534            runs: vec![],
1535            heap: BinaryHeap::new(),
1536            current_partition: None,
1537            schema,
1538        };
1539
1540        // Pass in heap-routing order (run_index ascending): B then A.
1541        let merged = merger
1542            .merge_partition_rows(vec![entry_b, entry_a])
1543            .expect("merge_partition_rows must not fail");
1544
1545        assert_eq!(merged.len(), 1, "one clustering key => one merged row");
1546
1547        let cells = match &merged[0].row_data {
1548            RowData::Live { cells } => cells,
1549            other => panic!("expected a Live merged row, got {:?}", other),
1550        };
1551
1552        let name = cells.iter().find(|c| c.column == "name");
1553        let score = cells.iter().find(|c| c.column == "score");
1554
1555        assert!(
1556            name.is_some(),
1557            "disjoint column `name` from the older file was DROPPED — per-cell \
1558             reconcile regression (Issue #533). Old whole-row-wins code fails here."
1559        );
1560        assert!(
1561            score.is_some(),
1562            "disjoint column `score` from the newer file is missing"
1563        );
1564        assert_eq!(
1565            name.unwrap().value,
1566            Value::Text("alice".to_string()),
1567            "`name` must carry A's value"
1568        );
1569        assert_eq!(
1570            score.unwrap().value,
1571            Value::Integer(42),
1572            "`score` must carry B's value"
1573        );
1574
1575        // Row timestamp must be the max surviving cell timestamp.
1576        assert_eq!(
1577            merged[0].timestamp, 200,
1578            "merged row timestamp must be the max surviving cell timestamp"
1579        );
1580    }
1581
1582    #[test]
1583    fn test_real_merger_cell_tombstone_beats_live_at_equal_timestamp() {
1584        // Issue #533/#498 (per cell): when two SSTables write the SAME column at the
1585        // SAME timestamp, a cell tombstone (Delete) must beat the live value,
1586        // independent of file recency.
1587        //
1588        //   A (run_index 0, NEWER file, ts=100): {score: 42}            (live)
1589        //   B (run_index 1, OLDER file, ts=100): {score: <cell tombstone>}
1590        //
1591        // Cassandra `Cells#reconcile` => score is deleted. The adversarial part: A is
1592        // the newer file, so a recency-only tiebreak would wrongly keep the live 42.
1593        use crate::schema::{ClusteringColumn, Column, KeyColumn, TableSchema};
1594        use crate::types::{TombstoneInfo, TombstoneType};
1595        use std::collections::HashMap;
1596
1597        let schema = TableSchema {
1598            keyspace: "ct_ks".to_string(),
1599            table: "ct_tbl".to_string(),
1600            partition_keys: vec![KeyColumn {
1601                name: "pk".to_string(),
1602                data_type: "int".to_string(),
1603                position: 0,
1604            }],
1605            clustering_keys: vec![ClusteringColumn {
1606                name: "ck".to_string(),
1607                data_type: "int".to_string(),
1608                position: 0,
1609                order: Default::default(),
1610            }],
1611            columns: vec![Column {
1612                name: "score".to_string(),
1613                data_type: "int".to_string(),
1614                nullable: true,
1615                default: None,
1616                is_static: false,
1617            }],
1618            comments: HashMap::new(),
1619        };
1620
1621        let partition_key = DecoratedKey::new(100, vec![0, 0, 0, 1]);
1622        let ck = ClusteringKey {
1623            columns: vec![("ck".to_string(), Value::Integer(1))],
1624        };
1625
1626        // A: newer file (run_index 0), live `score` = 42 at ts=100.
1627        let entry_a = MergeEntry::new(
1628            0,
1629            partition_key.clone(),
1630            Some(ck.clone()),
1631            100,
1632            RowData::Live {
1633                cells: vec![CellData {
1634                    column: "score".to_string(),
1635                    value: Value::Integer(42),
1636                    timestamp: 100,
1637                    ttl: None,
1638                }],
1639            },
1640        );
1641
1642        // B: older file (run_index 1), cell tombstone on `score` at the SAME ts=100.
1643        let entry_b = MergeEntry::new(
1644            1,
1645            partition_key.clone(),
1646            Some(ck.clone()),
1647            100,
1648            RowData::Live {
1649                cells: vec![CellData {
1650                    column: "score".to_string(),
1651                    value: Value::Tombstone(TombstoneInfo {
1652                        deletion_time: 100,
1653                        tombstone_type: TombstoneType::CellTombstone,
1654                        ttl: None,
1655                        range_start: None,
1656                        range_end: None,
1657                    }),
1658                    timestamp: 100,
1659                    ttl: None,
1660                }],
1661            },
1662        );
1663
1664        let merger = KWayMerger {
1665            runs: vec![],
1666            heap: BinaryHeap::new(),
1667            current_partition: None,
1668            schema,
1669        };
1670
1671        // Heap-routing order (run_index ascending): A then B.
1672        let merged = merger
1673            .merge_partition_rows(vec![entry_a, entry_b])
1674            .expect("merge_partition_rows must not fail");
1675
1676        assert_eq!(merged.len(), 1, "one clustering key => one merged row");
1677
1678        let cells = match &merged[0].row_data {
1679            RowData::Live { cells } => cells,
1680            other => panic!("expected a Live merged row, got {:?}", other),
1681        };
1682        let score = cells
1683            .iter()
1684            .find(|c| c.column == "score")
1685            .expect("score cell must be present (as a tombstone)");
1686
1687        assert!(
1688            matches!(
1689                score.value,
1690                Value::Tombstone(ref info) if info.tombstone_type == TombstoneType::CellTombstone
1691            ),
1692            "at equal ts the cell tombstone must win over the live value (got {:?}) — \
1693             a recency-only tiebreak would have kept the newer file's live 42 (#498 per cell)",
1694            score.value
1695        );
1696    }
1697
1698    #[test]
1699    fn test_real_merger_same_column_conflict_resolves_by_timestamp() {
1700        // Issue #533: when both SSTables write the SAME column, the higher-timestamp
1701        // value wins (last-write-wins), but disjoint columns still survive.
1702        //
1703        //   A (run_index 1, ts=100): {name: "old", extra: "a-only"}
1704        //   B (run_index 0, ts=200): {name: "new"}
1705        // => {name: "new" (ts=200 wins), extra: "a-only" (survives)}
1706        use crate::schema::{Column, KeyColumn, TableSchema};
1707        use std::collections::HashMap;
1708
1709        let schema = TableSchema {
1710            keyspace: "conflict_ks".to_string(),
1711            table: "conflict_tbl".to_string(),
1712            partition_keys: vec![KeyColumn {
1713                name: "pk".to_string(),
1714                data_type: "int".to_string(),
1715                position: 0,
1716            }],
1717            clustering_keys: vec![],
1718            columns: vec![
1719                Column {
1720                    name: "name".to_string(),
1721                    data_type: "text".to_string(),
1722                    nullable: true,
1723                    default: None,
1724                    is_static: false,
1725                },
1726                Column {
1727                    name: "extra".to_string(),
1728                    data_type: "text".to_string(),
1729                    nullable: true,
1730                    default: None,
1731                    is_static: false,
1732                },
1733            ],
1734            comments: HashMap::new(),
1735        };
1736
1737        let partition_key = DecoratedKey::new(100, vec![0, 0, 0, 1]);
1738
1739        let entry_a = MergeEntry::new(
1740            1,
1741            partition_key.clone(),
1742            None,
1743            100,
1744            RowData::Live {
1745                cells: vec![
1746                    CellData {
1747                        column: "name".to_string(),
1748                        value: Value::Text("old".to_string()),
1749                        timestamp: 100,
1750                        ttl: None,
1751                    },
1752                    CellData {
1753                        column: "extra".to_string(),
1754                        value: Value::Text("a-only".to_string()),
1755                        timestamp: 100,
1756                        ttl: None,
1757                    },
1758                ],
1759            },
1760        );
1761
1762        let entry_b = MergeEntry::new(
1763            0,
1764            partition_key.clone(),
1765            None,
1766            200,
1767            RowData::Live {
1768                cells: vec![CellData {
1769                    column: "name".to_string(),
1770                    value: Value::Text("new".to_string()),
1771                    timestamp: 200,
1772                    ttl: None,
1773                }],
1774            },
1775        );
1776
1777        let merger = KWayMerger {
1778            runs: vec![],
1779            heap: BinaryHeap::new(),
1780            current_partition: None,
1781            schema,
1782        };
1783
1784        let merged = merger
1785            .merge_partition_rows(vec![entry_b, entry_a])
1786            .expect("merge_partition_rows must not fail");
1787
1788        assert_eq!(merged.len(), 1);
1789        let cells = match &merged[0].row_data {
1790            RowData::Live { cells } => cells,
1791            other => panic!("expected Live, got {:?}", other),
1792        };
1793
1794        let name = cells
1795            .iter()
1796            .find(|c| c.column == "name")
1797            .expect("name present");
1798        let extra = cells
1799            .iter()
1800            .find(|c| c.column == "extra")
1801            .expect("extra (disjoint) must survive");
1802
1803        assert_eq!(
1804            name.value,
1805            Value::Text("new".to_string()),
1806            "same-column conflict must resolve to the higher-timestamp value"
1807        );
1808        assert_eq!(
1809            extra.value,
1810            Value::Text("a-only".to_string()),
1811            "disjoint column from the older file must survive the conflict merge"
1812        );
1813    }
1814
1815    #[test]
1816    fn test_real_merger_row_tombstone_shadows_old_cells_keeps_new() {
1817        // Issue #533 / #505: a row tombstone shadows cells with ts <= row_del but
1818        // a cell written strictly AFTER the tombstone survives.
1819        //
1820        //   A (ts=100): {name: "old"}          -> 100 <= 200 row_del => shadowed
1821        //   B (ts=200, row tombstone)
1822        //   C (ts=300): {score: 7}             -> 300 > 200            => survives
1823        use crate::schema::{Column, KeyColumn, TableSchema};
1824        use std::collections::HashMap;
1825
1826        let schema = TableSchema {
1827            keyspace: "shadow_ks".to_string(),
1828            table: "shadow_tbl".to_string(),
1829            partition_keys: vec![KeyColumn {
1830                name: "pk".to_string(),
1831                data_type: "int".to_string(),
1832                position: 0,
1833            }],
1834            clustering_keys: vec![],
1835            columns: vec![
1836                Column {
1837                    name: "name".to_string(),
1838                    data_type: "text".to_string(),
1839                    nullable: true,
1840                    default: None,
1841                    is_static: false,
1842                },
1843                Column {
1844                    name: "score".to_string(),
1845                    data_type: "int".to_string(),
1846                    nullable: true,
1847                    default: None,
1848                    is_static: false,
1849                },
1850            ],
1851            comments: HashMap::new(),
1852        };
1853
1854        let pk = DecoratedKey::new(100, vec![0, 0, 0, 1]);
1855
1856        let entry_a = MergeEntry::new(
1857            2,
1858            pk.clone(),
1859            None,
1860            100,
1861            RowData::Live {
1862                cells: vec![CellData {
1863                    column: "name".to_string(),
1864                    value: Value::Text("old".to_string()),
1865                    timestamp: 100,
1866                    ttl: None,
1867                }],
1868            },
1869        );
1870        let entry_b = MergeEntry::new(
1871            1,
1872            pk.clone(),
1873            None,
1874            200,
1875            RowData::Tombstone {
1876                deletion_time: 200,
1877                local_deletion_time: 0,
1878            },
1879        );
1880        let entry_c = MergeEntry::new(
1881            0,
1882            pk.clone(),
1883            None,
1884            300,
1885            RowData::Live {
1886                cells: vec![CellData {
1887                    column: "score".to_string(),
1888                    value: Value::Integer(7),
1889                    timestamp: 300,
1890                    ttl: None,
1891                }],
1892            },
1893        );
1894
1895        let merger = KWayMerger {
1896            runs: vec![],
1897            heap: BinaryHeap::new(),
1898            current_partition: None,
1899            schema,
1900        };
1901
1902        let merged = merger
1903            .merge_partition_rows(vec![entry_c, entry_b, entry_a])
1904            .expect("merge must not fail");
1905
1906        assert_eq!(merged.len(), 1);
1907        let cells = match &merged[0].row_data {
1908            RowData::Live { cells } => cells,
1909            other => panic!(
1910                "expected Live (score survives the tombstone), got {:?}",
1911                other
1912            ),
1913        };
1914
1915        assert!(
1916            cells.iter().all(|c| c.column != "name"),
1917            "`name` (ts=100 <= row_del=200) must be shadowed by the row tombstone"
1918        );
1919        let score = cells
1920            .iter()
1921            .find(|c| c.column == "score")
1922            .expect("`score` (ts=300 > row_del=200) must survive the row tombstone");
1923        assert_eq!(score.value, Value::Integer(7));
1924    }
1925
1926    #[test]
1927    fn test_real_merger_row_tombstone_only_emits_tombstone() {
1928        // When every cell is shadowed by a row tombstone (no later writes), the
1929        // merger must emit a Tombstone entry so the row stays deleted downstream
1930        // (preserves #505/#498 absence semantics).
1931        use crate::schema::{Column, KeyColumn, TableSchema};
1932        use std::collections::HashMap;
1933
1934        let schema = TableSchema {
1935            keyspace: "ts_only_ks".to_string(),
1936            table: "ts_only_tbl".to_string(),
1937            partition_keys: vec![KeyColumn {
1938                name: "pk".to_string(),
1939                data_type: "int".to_string(),
1940                position: 0,
1941            }],
1942            clustering_keys: vec![],
1943            columns: vec![Column {
1944                name: "name".to_string(),
1945                data_type: "text".to_string(),
1946                nullable: true,
1947                default: None,
1948                is_static: false,
1949            }],
1950            comments: HashMap::new(),
1951        };
1952
1953        let pk = DecoratedKey::new(100, vec![0, 0, 0, 1]);
1954
1955        let live = MergeEntry::new(
1956            1,
1957            pk.clone(),
1958            None,
1959            100,
1960            RowData::Live {
1961                cells: vec![CellData {
1962                    column: "name".to_string(),
1963                    value: Value::Text("doomed".to_string()),
1964                    timestamp: 100,
1965                    ttl: None,
1966                }],
1967            },
1968        );
1969        let tomb = MergeEntry::new(
1970            0,
1971            pk.clone(),
1972            None,
1973            300,
1974            RowData::Tombstone {
1975                deletion_time: 300,
1976                local_deletion_time: 0,
1977            },
1978        );
1979
1980        let merger = KWayMerger {
1981            runs: vec![],
1982            heap: BinaryHeap::new(),
1983            current_partition: None,
1984            schema,
1985        };
1986
1987        let merged = merger
1988            .merge_partition_rows(vec![tomb, live])
1989            .expect("merge must not fail");
1990
1991        assert_eq!(merged.len(), 1);
1992        match &merged[0].row_data {
1993            RowData::Tombstone { deletion_time, .. } => {
1994                assert_eq!(*deletion_time, 300, "tombstone deletion_time preserved");
1995            }
1996            other => panic!("expected a Tombstone entry, got {:?}", other),
1997        }
1998    }
1999
2000    #[test]
2001    fn test_merge_step_variants() {
2002        let key = DecoratedKey::new(100, vec![1, 2, 3]);
2003        let rows = vec![];
2004
2005        let partition_step = MergeStep::Partition { key, rows };
2006
2007        match partition_step {
2008            MergeStep::Partition { key, rows } => {
2009                assert_eq!(key.token, 100);
2010                assert_eq!(rows.len(), 0);
2011            }
2012            _ => panic!("Expected Partition variant"),
2013        }
2014
2015        let complete_step = MergeStep::Complete;
2016        match complete_step {
2017            MergeStep::Complete => {}
2018            _ => panic!("Expected Complete variant"),
2019        }
2020    }
2021
2022    #[test]
2023    fn test_cell_merge_last_write_wins_higher_timestamp() {
2024        // Two cells with different timestamps
2025        let cell1 = CellData {
2026            column: "name".to_string(),
2027            value: Value::Text("Old".to_string()),
2028            timestamp: 1000,
2029            ttl: None,
2030        };
2031
2032        let cell2 = CellData {
2033            column: "name".to_string(),
2034            value: Value::Text("New".to_string()),
2035            timestamp: 2000, // Higher timestamp wins
2036            ttl: None,
2037        };
2038
2039        // Cell2 should win in last-write-wins merge
2040        assert!(cell2.timestamp > cell1.timestamp);
2041    }
2042
2043    #[test]
2044    fn test_memory_budget_calculation() {
2045        // For k=10 SSTables, memory budget should be ~80KB
2046        let k = 10;
2047        let buffer_size_per_run = RunReader::DEFAULT_BUFFER_SIZE;
2048        let total_memory = k * buffer_size_per_run;
2049
2050        assert_eq!(buffer_size_per_run, 8 * 1024); // 8KB
2051        assert_eq!(total_memory, 80 * 1024); // 80KB total
2052    }
2053
2054    #[test]
2055    fn test_merge_entry_to_mutation_live_cells() {
2056        use crate::schema::{KeyColumn, TableSchema};
2057        use crate::storage::write_engine::mutation::{CellOperation, DecoratedKey};
2058        use std::collections::HashMap;
2059
2060        let schema = TableSchema {
2061            keyspace: "test_ks".to_string(),
2062            table: "test_table".to_string(),
2063            partition_keys: vec![KeyColumn {
2064                name: "id".to_string(),
2065                data_type: "int".to_string(),
2066                position: 0,
2067            }],
2068            clustering_keys: vec![],
2069            columns: vec![],
2070            comments: HashMap::new(),
2071        };
2072
2073        // Encode key as 4-byte big-endian int (42)
2074        let key_bytes = 42i32.to_be_bytes().to_vec();
2075
2076        let entry = MergeEntry::new(
2077            0,
2078            DecoratedKey::new(1000, key_bytes),
2079            None,
2080            999_000_000,
2081            RowData::Live {
2082                cells: vec![
2083                    CellData {
2084                        column: "name".to_string(),
2085                        value: Value::Text("Alice".to_string()),
2086                        timestamp: 999_000_000,
2087                        ttl: None,
2088                    },
2089                    CellData {
2090                        column: "age".to_string(),
2091                        value: Value::Integer(30),
2092                        timestamp: 999_000_000,
2093                        ttl: Some(3600),
2094                    },
2095                ],
2096            },
2097        );
2098
2099        let mutation =
2100            KWayMerger::merge_entry_to_mutation(entry, &schema).expect("conversion should succeed");
2101
2102        // Partition key should have one column named "id"
2103        assert_eq!(mutation.partition_key.columns.len(), 1);
2104        assert_eq!(mutation.partition_key.columns[0].0, "id");
2105
2106        // Two operations: one Write and one WriteWithTtl
2107        assert_eq!(mutation.operations.len(), 2);
2108        assert_eq!(mutation.timestamp_micros, 999_000_000);
2109
2110        let has_write = mutation
2111            .operations
2112            .iter()
2113            .any(|op| matches!(op, CellOperation::Write { column, .. } if column == "name"));
2114        let has_ttl_write = mutation.operations.iter().any(|op| {
2115            matches!(op, CellOperation::WriteWithTtl { column, ttl_seconds, .. }
2116                if column == "age" && *ttl_seconds == 3600)
2117        });
2118        assert!(has_write, "Expected Write operation for 'name'");
2119        assert!(has_ttl_write, "Expected WriteWithTtl operation for 'age'");
2120    }
2121
2122    #[test]
2123    fn test_merge_entry_to_mutation_tombstone() {
2124        use crate::schema::{KeyColumn, TableSchema};
2125        use crate::storage::write_engine::mutation::{CellOperation, DecoratedKey};
2126        use std::collections::HashMap;
2127
2128        let schema = TableSchema {
2129            keyspace: "test_ks".to_string(),
2130            table: "test_table".to_string(),
2131            partition_keys: vec![KeyColumn {
2132                name: "id".to_string(),
2133                data_type: "int".to_string(),
2134                position: 0,
2135            }],
2136            clustering_keys: vec![],
2137            columns: vec![],
2138            comments: HashMap::new(),
2139        };
2140
2141        let key_bytes = 7i32.to_be_bytes().to_vec();
2142
2143        let entry = MergeEntry::new(
2144            0,
2145            DecoratedKey::new(500, key_bytes),
2146            None,
2147            888_000_000,
2148            RowData::Tombstone {
2149                deletion_time: 888_000_000,
2150                local_deletion_time: 1_700_000_000,
2151            },
2152        );
2153
2154        let mutation =
2155            KWayMerger::merge_entry_to_mutation(entry, &schema).expect("conversion should succeed");
2156
2157        assert_eq!(mutation.operations.len(), 1);
2158        assert!(
2159            matches!(mutation.operations[0], CellOperation::DeleteRow),
2160            "Expected DeleteRow operation for tombstone entry"
2161        );
2162    }
2163}
2164
2165// ─────────────────────────────────────────────────────────────────────────────
2166// Property tests for compaction merge semantics (Issue #475, Epic #469)
2167// ─────────────────────────────────────────────────────────────────────────────
2168//
2169// Strategy: define a small in-memory `reference_merge` that applies the full
2170// Cassandra per-key merge rules (timestamp LWW, tombstone shadowing, TTL expiry,
2171// range tombstone application), generate randomised cell streams with proptest,
2172// and assert that both the reference and the real KWayMerger agree.
2173//
2174// Three coverage areas required by the issue:
2175//  A. Tombstone shadowing   – delete-ts > write-ts => cell suppressed
2176//  B. TTL expiry            – write with TTL whose local_deletion_time < merge_time => dropped
2177//  C. Range tombstone       – row in range with marked_for_delete_at >= cell-ts => dropped
2178//
2179// The reference implementation is tested directly via proptest.
2180// The real merger (merge_partition_rows) is also exercised for the cases it
2181// handles today (LWW by timestamp for live rows and tombstones).
2182
2183#[cfg(all(test, feature = "write-support"))]
2184mod merge_property_tests {
2185    use super::*;
2186    use proptest::prelude::*;
2187    use std::collections::HashMap;
2188
2189    // ─── Fixed "wall clock" used in all TTL expiry tests ─────────────────────
2190    // Unix seconds; cells with local_deletion_time < MERGE_TIME_SECS are expired.
2191    const MERGE_TIME_SECS: i32 = 1_000;
2192
2193    // ─── Cell operation model ─────────────────────────────────────────────────
2194
2195    /// The three kinds of cell operations that a compaction must resolve.
2196    #[derive(Debug, Clone)]
2197    enum CellOp {
2198        /// A live write: column <- value, recorded at `timestamp`.
2199        /// When `local_deletion_time` is Some(t) it is an expiring cell; the
2200        /// cell is considered dead when `t < MERGE_TIME_SECS`.
2201        Write {
2202            timestamp: i64,
2203            local_deletion_time: Option<i32>,
2204        },
2205        /// A cell tombstone (DELETE column): column is dead at `timestamp`.
2206        Delete { timestamp: i64 },
2207        /// A range tombstone covering the inclusive integer range
2208        /// `[start_ck, end_ck]`. Any row whose clustering key integer falls
2209        /// within the range and whose write-timestamp <= `marked_for_delete_at`
2210        /// is suppressed.
2211        RangeTombstone {
2212            start_ck: u8,
2213            end_ck: u8,
2214            marked_for_delete_at: i64,
2215        },
2216    }
2217
2218    /// A single entry in the randomised cell stream.
2219    ///
2220    /// We work with small integer partition/clustering/column spaces so
2221    /// collisions occur frequently and the interesting merge cases arise.
2222    #[derive(Debug, Clone)]
2223    struct CellInput {
2224        /// 0..4
2225        partition: u8,
2226        /// 0..4
2227        clustering: u8,
2228        /// 0..3
2229        column: u8,
2230        op: CellOp,
2231    }
2232
2233    // ─── Key type for the merged output map ──────────────────────────────────
2234
2235    /// (partition, clustering, column) triple identifying a unique cell slot.
2236    type CellKey = (u8, u8, u8);
2237
2238    /// What the reference merge produces for a cell slot.
2239    #[derive(Debug, Clone, PartialEq, Eq)]
2240    enum MergedCell {
2241        /// The cell is alive with the given write timestamp.
2242        Live { timestamp: i64 },
2243        /// The cell is a tombstone (deleted) at the given timestamp.
2244        Dead { timestamp: i64 },
2245    }
2246
2247    // ─── Reference implementation ─────────────────────────────────────────────
2248
2249    /// Reference merge over a flat cell-stream, applying full Cassandra rules.
2250    ///
2251    /// Rules (applied in order):
2252    /// 1. Per (partition, clustering, column), keep the op with the highest
2253    ///    `timestamp`. Ties: Delete wins over Write (Cassandra reconcile).
2254    /// 2. A `RangeTombstone` with `marked_for_delete_at >= cell.timestamp`
2255    ///    covering a clustering key suppresses the live cell in that slot.
2256    /// 3. A `Write` whose `local_deletion_time < MERGE_TIME_SECS` (TTL expired)
2257    ///    is dropped from the output even if it has the highest timestamp.
2258    fn reference_merge(inputs: &[CellInput]) -> HashMap<CellKey, MergedCell> {
2259        // ── Step 1: per-slot LWW ──────────────────────────────────────────────
2260        let mut per_slot: HashMap<CellKey, MergedCell> = HashMap::new();
2261
2262        // Collect range tombstones grouped by (partition, clustering range).
2263        let mut range_tombstones: Vec<CellInput> = Vec::new();
2264
2265        for ci in inputs {
2266            match &ci.op {
2267                CellOp::RangeTombstone { .. } => {
2268                    range_tombstones.push(ci.clone());
2269                }
2270                CellOp::Write {
2271                    timestamp,
2272                    local_deletion_time,
2273                } => {
2274                    // TTL expiry: drop the write if its local_deletion_time has passed.
2275                    if local_deletion_time
2276                        .map(|ldt| ldt < MERGE_TIME_SECS)
2277                        .unwrap_or(false)
2278                    {
2279                        // Expired: treat as if this write never happened.
2280                        continue;
2281                    }
2282                    let key = (ci.partition, ci.clustering, ci.column);
2283                    let candidate = MergedCell::Live {
2284                        timestamp: *timestamp,
2285                    };
2286                    per_slot
2287                        .entry(key)
2288                        .and_modify(|existing| {
2289                            match existing {
2290                                MergedCell::Live { timestamp: ex_ts } => {
2291                                    if *timestamp > *ex_ts {
2292                                        *existing = candidate.clone();
2293                                    }
2294                                }
2295                                MergedCell::Dead { timestamp: ex_ts } => {
2296                                    // Dead wins over a live cell at the same timestamp;
2297                                    // only replace if the write is strictly newer.
2298                                    if *timestamp > *ex_ts {
2299                                        *existing = candidate.clone();
2300                                    }
2301                                }
2302                            }
2303                        })
2304                        .or_insert(candidate);
2305                }
2306                CellOp::Delete { timestamp } => {
2307                    let key = (ci.partition, ci.clustering, ci.column);
2308                    let candidate = MergedCell::Dead {
2309                        timestamp: *timestamp,
2310                    };
2311                    per_slot
2312                        .entry(key)
2313                        .and_modify(|existing| {
2314                            match existing {
2315                                MergedCell::Live { timestamp: ex_ts } => {
2316                                    if *timestamp >= *ex_ts {
2317                                        // Delete wins at equal timestamp (Cassandra rule).
2318                                        *existing = candidate.clone();
2319                                    }
2320                                }
2321                                MergedCell::Dead { timestamp: ex_ts } => {
2322                                    if *timestamp > *ex_ts {
2323                                        *existing = candidate.clone();
2324                                    }
2325                                }
2326                            }
2327                        })
2328                        .or_insert(candidate);
2329                }
2330            }
2331        }
2332
2333        // ── Step 2: apply range tombstones ────────────────────────────────────
2334        // A range tombstone suppresses a live cell when:
2335        //   - partition matches
2336        //   - clustering key is within [start_ck, end_ck]
2337        //   - marked_for_delete_at >= cell write timestamp
2338        per_slot.retain(|&(pk, ck, _col), cell| {
2339            for rt in &range_tombstones {
2340                if rt.partition != pk {
2341                    continue;
2342                }
2343                if let CellOp::RangeTombstone {
2344                    start_ck,
2345                    end_ck,
2346                    marked_for_delete_at,
2347                } = rt.op
2348                {
2349                    if ck >= start_ck && ck <= end_ck {
2350                        if let MergedCell::Live { timestamp } = cell {
2351                            if marked_for_delete_at >= *timestamp {
2352                                return false; // suppressed
2353                            }
2354                        }
2355                    }
2356                }
2357            }
2358            true
2359        });
2360
2361        // Dead cells (tombstones) are kept in the output so callers can verify
2362        // they appear rather than a live cell with a lower timestamp.
2363        per_slot
2364    }
2365
2366    // ─── Proptest strategies ──────────────────────────────────────────────────
2367
2368    fn arb_timestamp() -> impl Strategy<Value = i64> {
2369        1i64..=20i64
2370    }
2371
2372    /// local_deletion_time: sometimes None, sometimes expired (<MERGE_TIME_SECS),
2373    /// sometimes live (>=MERGE_TIME_SECS).
2374    fn arb_local_deletion_time() -> impl Strategy<Value = Option<i32>> {
2375        prop_oneof![
2376            3 => Just(None),                         // no TTL
2377            1 => (990i32..=999i32).prop_map(Some),   // expired TTL
2378            1 => (1000i32..=1010i32).prop_map(Some), // live TTL
2379        ]
2380    }
2381
2382    fn arb_cell_op() -> impl Strategy<Value = CellOp> {
2383        prop_oneof![
2384            5 => (arb_timestamp(), arb_local_deletion_time())
2385                    .prop_map(|(ts, ldt)| CellOp::Write {
2386                        timestamp: ts,
2387                        local_deletion_time: ldt,
2388                    }),
2389            3 => arb_timestamp().prop_map(|ts| CellOp::Delete { timestamp: ts }),
2390            2 => (0u8..=3u8, 0u8..=3u8, arb_timestamp()).prop_map(|(s, e, ts)| {
2391                    let (start_ck, end_ck) = if s <= e { (s, e) } else { (e, s) };
2392                    CellOp::RangeTombstone {
2393                        start_ck,
2394                        end_ck,
2395                        marked_for_delete_at: ts,
2396                    }
2397                }),
2398        ]
2399    }
2400
2401    fn arb_cell_input() -> impl Strategy<Value = CellInput> {
2402        (0u8..4u8, 0u8..4u8, 0u8..3u8, arb_cell_op()).prop_map(
2403            |(partition, clustering, column, op)| CellInput {
2404                partition,
2405                clustering,
2406                column,
2407                op,
2408            },
2409        )
2410    }
2411
2412    fn arb_cell_stream() -> impl Strategy<Value = Vec<CellInput>> {
2413        prop::collection::vec(arb_cell_input(), 4..=32)
2414    }
2415
2416    // ─── Helper: sort merged output for stable comparison ─────────────────────
2417    fn sorted_keys(m: &HashMap<CellKey, MergedCell>) -> Vec<(CellKey, MergedCell)> {
2418        let mut v: Vec<_> = m.iter().map(|(&k, v)| (k, v.clone())).collect();
2419        v.sort_by_key(|(k, _)| *k);
2420        v
2421    }
2422
2423    // ─── Deterministic unit tests for reference semantics ────────────────────
2424
2425    #[test]
2426    fn ref_tombstone_shadows_earlier_write() {
2427        // Write at ts=5, then Delete at ts=10 => cell must be Dead(10).
2428        let inputs = vec![
2429            CellInput {
2430                partition: 0,
2431                clustering: 0,
2432                column: 0,
2433                op: CellOp::Write {
2434                    timestamp: 5,
2435                    local_deletion_time: None,
2436                },
2437            },
2438            CellInput {
2439                partition: 0,
2440                clustering: 0,
2441                column: 0,
2442                op: CellOp::Delete { timestamp: 10 },
2443            },
2444        ];
2445        let result = reference_merge(&inputs);
2446        assert_eq!(
2447            result.get(&(0, 0, 0)),
2448            Some(&MergedCell::Dead { timestamp: 10 }),
2449            "Delete(ts=10) must shadow Write(ts=5)"
2450        );
2451    }
2452
2453    #[test]
2454    fn ref_write_not_shadowed_by_older_tombstone() {
2455        // Write at ts=10, Delete at ts=5 => cell must be Live(10).
2456        let inputs = vec![
2457            CellInput {
2458                partition: 0,
2459                clustering: 0,
2460                column: 0,
2461                op: CellOp::Write {
2462                    timestamp: 10,
2463                    local_deletion_time: None,
2464                },
2465            },
2466            CellInput {
2467                partition: 0,
2468                clustering: 0,
2469                column: 0,
2470                op: CellOp::Delete { timestamp: 5 },
2471            },
2472        ];
2473        let result = reference_merge(&inputs);
2474        assert_eq!(
2475            result.get(&(0, 0, 0)),
2476            Some(&MergedCell::Live { timestamp: 10 }),
2477            "Write(ts=10) must win over Delete(ts=5)"
2478        );
2479    }
2480
2481    #[test]
2482    fn ref_delete_wins_at_equal_timestamp() {
2483        // Write at ts=5, Delete at ts=5 => Delete must win (Cassandra reconcile).
2484        let inputs = vec![
2485            CellInput {
2486                partition: 0,
2487                clustering: 0,
2488                column: 0,
2489                op: CellOp::Write {
2490                    timestamp: 5,
2491                    local_deletion_time: None,
2492                },
2493            },
2494            CellInput {
2495                partition: 0,
2496                clustering: 0,
2497                column: 0,
2498                op: CellOp::Delete { timestamp: 5 },
2499            },
2500        ];
2501        let result = reference_merge(&inputs);
2502        assert_eq!(
2503            result.get(&(0, 0, 0)),
2504            Some(&MergedCell::Dead { timestamp: 5 }),
2505            "Delete must win at equal timestamp (Cassandra reconcile rule)"
2506        );
2507    }
2508
2509    #[test]
2510    fn ref_expired_ttl_drops_cell() {
2511        // Write at ts=5 with local_deletion_time=500 (< MERGE_TIME_SECS=1000).
2512        // No other ops => cell should be absent from merged output.
2513        let inputs = vec![CellInput {
2514            partition: 0,
2515            clustering: 0,
2516            column: 0,
2517            op: CellOp::Write {
2518                timestamp: 5,
2519                local_deletion_time: Some(500), // expired
2520            },
2521        }];
2522        let result = reference_merge(&inputs);
2523        assert!(
2524            !result.contains_key(&(0, 0, 0)),
2525            "Expired TTL cell must be absent from merged output"
2526        );
2527    }
2528
2529    #[test]
2530    fn ref_live_ttl_keeps_cell() {
2531        // Write at ts=5 with local_deletion_time=1500 (>= MERGE_TIME_SECS=1000).
2532        // Cell should still be present.
2533        let inputs = vec![CellInput {
2534            partition: 0,
2535            clustering: 0,
2536            column: 0,
2537            op: CellOp::Write {
2538                timestamp: 5,
2539                local_deletion_time: Some(1500), // not expired
2540            },
2541        }];
2542        let result = reference_merge(&inputs);
2543        assert_eq!(
2544            result.get(&(0, 0, 0)),
2545            Some(&MergedCell::Live { timestamp: 5 }),
2546            "Non-expired TTL cell must be present"
2547        );
2548    }
2549
2550    #[test]
2551    fn ref_range_tombstone_suppresses_row_in_range() {
2552        // Write at ts=5 for clustering key 2 in partition 0.
2553        // RangeTombstone covering [0, 5] with marked_for_delete_at=10 => suppressed.
2554        let inputs = vec![
2555            CellInput {
2556                partition: 0,
2557                clustering: 2,
2558                column: 0,
2559                op: CellOp::Write {
2560                    timestamp: 5,
2561                    local_deletion_time: None,
2562                },
2563            },
2564            CellInput {
2565                partition: 0,
2566                clustering: 0, // column/clustering fields ignored for RT; range is in op
2567                column: 0,
2568                op: CellOp::RangeTombstone {
2569                    start_ck: 0,
2570                    end_ck: 5,
2571                    marked_for_delete_at: 10,
2572                },
2573            },
2574        ];
2575        let result = reference_merge(&inputs);
2576        assert!(
2577            !result.contains_key(&(0, 2, 0)),
2578            "Cell with ts=5 at clustering=2 must be suppressed by RangeTombstone(mfda=10, [0,5])"
2579        );
2580    }
2581
2582    #[test]
2583    fn ref_range_tombstone_does_not_suppress_newer_write() {
2584        // Write at ts=15, RangeTombstone with mfda=10 => not suppressed.
2585        let inputs = vec![
2586            CellInput {
2587                partition: 0,
2588                clustering: 2,
2589                column: 0,
2590                op: CellOp::Write {
2591                    timestamp: 15,
2592                    local_deletion_time: None,
2593                },
2594            },
2595            CellInput {
2596                partition: 0,
2597                clustering: 0,
2598                column: 0,
2599                op: CellOp::RangeTombstone {
2600                    start_ck: 0,
2601                    end_ck: 5,
2602                    marked_for_delete_at: 10,
2603                },
2604            },
2605        ];
2606        let result = reference_merge(&inputs);
2607        assert_eq!(
2608            result.get(&(0, 2, 0)),
2609            Some(&MergedCell::Live { timestamp: 15 }),
2610            "Write(ts=15) must NOT be suppressed by RangeTombstone(mfda=10)"
2611        );
2612    }
2613
2614    #[test]
2615    fn ref_range_tombstone_only_applies_within_partition() {
2616        // Write in partition 1, RangeTombstone in partition 0 => not suppressed.
2617        let inputs = vec![
2618            CellInput {
2619                partition: 1,
2620                clustering: 2,
2621                column: 0,
2622                op: CellOp::Write {
2623                    timestamp: 5,
2624                    local_deletion_time: None,
2625                },
2626            },
2627            CellInput {
2628                partition: 0,
2629                clustering: 0,
2630                column: 0,
2631                op: CellOp::RangeTombstone {
2632                    start_ck: 0,
2633                    end_ck: 5,
2634                    marked_for_delete_at: 10,
2635                },
2636            },
2637        ];
2638        let result = reference_merge(&inputs);
2639        assert_eq!(
2640            result.get(&(1, 2, 0)),
2641            Some(&MergedCell::Live { timestamp: 5 }),
2642            "RangeTombstone in partition 0 must not affect partition 1"
2643        );
2644    }
2645
2646    // ─── Property tests ───────────────────────────────────────────────────────
2647
2648    proptest! {
2649        #![proptest_config(ProptestConfig::with_cases(64))]
2650
2651        // Property A: Tombstone shadowing
2652        // After merging, for every (partition, clustering, column) cell slot, if
2653        // the reference says Dead(ts=T) then the highest-timestamp Delete in the
2654        // input stream for that slot must have timestamp T.
2655        #[test]
2656        fn prop_tombstone_shadowing_consistent(inputs in arb_cell_stream()) {
2657            let merged = reference_merge(&inputs);
2658
2659            for (&(pk, ck, col), cell) in &merged {
2660                if let MergedCell::Dead { timestamp: dead_ts } = cell {
2661                    // Find the highest-timestamp Delete for this slot in the input.
2662                    let best_delete = inputs.iter()
2663                        .filter(|ci| ci.partition == pk && ci.clustering == ck && ci.column == col)
2664                        .filter_map(|ci| {
2665                            if let CellOp::Delete { timestamp } = ci.op {
2666                                Some(timestamp)
2667                            } else {
2668                                None
2669                            }
2670                        })
2671                        .max();
2672
2673                    prop_assert!(
2674                        best_delete.is_some(),
2675                        "Dead cell at ({},{},{}) but no Delete in inputs",
2676                        pk, ck, col
2677                    );
2678                    prop_assert_eq!(
2679                        best_delete.unwrap(),
2680                        *dead_ts,
2681                        "Dead cell timestamp must equal best Delete timestamp for ({},{},{})",
2682                        pk, ck, col
2683                    );
2684                }
2685            }
2686        }
2687
2688        // Property B: TTL expiry
2689        // After merging, no cell should be Live if all its Write ops are TTL-expired.
2690        #[test]
2691        fn prop_ttl_expiry_no_expired_live_cells(inputs in arb_cell_stream()) {
2692            let merged = reference_merge(&inputs);
2693
2694            for (&(pk, ck, col), cell) in &merged {
2695                if let MergedCell::Live { .. } = cell {
2696                    // There must be at least one non-expired Write for this slot.
2697                    let has_live_write = inputs.iter()
2698                        .filter(|ci| ci.partition == pk && ci.clustering == ck && ci.column == col)
2699                        .any(|ci| {
2700                            if let CellOp::Write { local_deletion_time, .. } = &ci.op {
2701                                // A live write: no TTL, or TTL not expired.
2702                                local_deletion_time
2703                                    .map(|ldt| ldt >= MERGE_TIME_SECS)
2704                                    .unwrap_or(true)
2705                            } else {
2706                                false
2707                            }
2708                        });
2709
2710                    prop_assert!(
2711                        has_live_write,
2712                        "Live cell at ({},{},{}) but all writes are expired",
2713                        pk, ck, col
2714                    );
2715                }
2716            }
2717        }
2718
2719        // Property C: Range tombstone application
2720        // After merging, no Live cell should exist that is fully covered by a
2721        // range tombstone whose marked_for_delete_at >= the cell's write timestamp.
2722        #[test]
2723        fn prop_range_tombstone_suppresses_covered_live_cells(inputs in arb_cell_stream()) {
2724            let merged = reference_merge(&inputs);
2725
2726            // Collect all range tombstones from the input stream.
2727            let range_tombstones: Vec<(u8, u8, u8, i64)> = inputs.iter()
2728                .filter_map(|ci| {
2729                    if let CellOp::RangeTombstone { start_ck, end_ck, marked_for_delete_at } = ci.op {
2730                        Some((ci.partition, start_ck, end_ck, marked_for_delete_at))
2731                    } else {
2732                        None
2733                    }
2734                })
2735                .collect();
2736
2737            for (&(pk, ck, _col), cell) in &merged {
2738                if let MergedCell::Live { timestamp } = cell {
2739                    // Verify no range tombstone shadows this cell.
2740                    for &(rt_pk, start_ck, end_ck, mfda) in &range_tombstones {
2741                        if rt_pk == pk && ck >= start_ck && ck <= end_ck && mfda >= *timestamp {
2742                            prop_assert!(
2743                                false,
2744                                "Live cell at ({},{}) ts={} should be suppressed by \
2745                                 RangeTombstone(part={}, [{},{}], mfda={})",
2746                                pk, ck, timestamp, rt_pk, start_ck, end_ck, mfda
2747                            );
2748                        }
2749                    }
2750                }
2751            }
2752        }
2753
2754        // Property D: LWW correctness
2755        // Every Live cell in the merged output must have a timestamp equal to
2756        // the maximum non-expired Write timestamp for that cell slot.
2757        #[test]
2758        fn prop_live_cell_has_max_write_timestamp(inputs in arb_cell_stream()) {
2759            let merged = reference_merge(&inputs);
2760
2761            for (&(pk, ck, col), cell) in &merged {
2762                if let MergedCell::Live { timestamp: live_ts } = cell {
2763                    // Find the maximum non-expired Write timestamp for this slot.
2764                    let max_ts = inputs.iter()
2765                        .filter(|ci| ci.partition == pk && ci.clustering == ck && ci.column == col)
2766                        .filter_map(|ci| {
2767                            if let CellOp::Write { timestamp, local_deletion_time } = &ci.op {
2768                                // Only include non-expired writes.
2769                                let not_expired = local_deletion_time
2770                                    .map(|ldt| ldt >= MERGE_TIME_SECS)
2771                                    .unwrap_or(true);
2772                                if not_expired { Some(*timestamp) } else { None }
2773                            } else {
2774                                None
2775                            }
2776                        })
2777                        .max();
2778
2779                    prop_assert_eq!(
2780                        max_ts,
2781                        Some(*live_ts),
2782                        "Live cell at ({},{},{}) must have max non-expired write timestamp",
2783                        pk, ck, col
2784                    );
2785                }
2786            }
2787        }
2788
2789        // Property E: Output is deterministic (idempotent reference)
2790        // Calling reference_merge twice on the same input produces identical output.
2791        #[test]
2792        fn prop_reference_merge_is_deterministic(inputs in arb_cell_stream()) {
2793            let result_a = reference_merge(&inputs);
2794            let result_b = reference_merge(&inputs);
2795            prop_assert_eq!(
2796                sorted_keys(&result_a),
2797                sorted_keys(&result_b),
2798                "reference_merge must be deterministic"
2799            );
2800        }
2801
2802        // Property F: Real merger LWW parity
2803        // For cell streams containing only non-expired Writes (no Deletes,
2804        // no RangeTombstones, no TTL), the real KWayMerger.merge_partition_rows
2805        // must agree with the reference on which row wins per clustering key.
2806        //
2807        // We drive merge_partition_rows directly with synthetic MergeEntry inputs
2808        // that represent the Write ops.
2809        #[test]
2810        fn prop_real_merger_lww_agrees_with_reference(
2811            entries in prop::collection::vec(
2812                // (clustering_key 0..4, run_index 0..2, timestamp 1..20)
2813                (0u8..4u8, 0usize..2usize, 1i64..=20i64),
2814                2..=12usize,
2815            )
2816        ) {
2817            use crate::schema::{Column, KeyColumn};
2818            use std::collections::HashMap as SchemaMap;
2819
2820            let schema = TableSchema {
2821                keyspace: "prop_test_ks".to_string(),
2822                table: "prop_test_table".to_string(),
2823                partition_keys: vec![KeyColumn {
2824                    name: "id".to_string(),
2825                    data_type: "int".to_string(),
2826                    position: 0,
2827                }],
2828                clustering_keys: vec![],
2829                columns: vec![Column {
2830                    name: "value".to_string(),
2831                    data_type: "text".to_string(),
2832                    nullable: true,
2833                    default: None,
2834                    is_static: false,
2835                }],
2836                comments: SchemaMap::new(),
2837            };
2838
2839            // Build MergeEntry stream — one per (ck, run_index, timestamp) tuple.
2840            // All entries share the same partition.
2841            let partition_key = DecoratedKey::new(100, vec![0, 0, 0, 1]);
2842            let merge_entries: Vec<MergeEntry> = entries.iter().map(|&(ck, run_index, ts)| {
2843                let ck_key = ClusteringKey {
2844                    columns: vec![("ck".to_string(), Value::TinyInt(ck as i8))],
2845                };
2846                MergeEntry::new(
2847                    run_index,
2848                    partition_key.clone(),
2849                    Some(ck_key),
2850                    ts,
2851                    RowData::Live {
2852                        cells: vec![CellData {
2853                            column: "value".to_string(),
2854                            value: Value::Integer(ts as i32),
2855                            timestamp: ts,
2856                            ttl: None,
2857                        }],
2858                    },
2859                )
2860            }).collect();
2861
2862            // Drive the real merger.
2863            let merger = KWayMerger {
2864                runs: vec![],
2865                heap: std::collections::BinaryHeap::new(),
2866                current_partition: None,
2867                schema: schema.clone(),
2868            };
2869            let real_merged = merger.merge_partition_rows(merge_entries.clone())
2870                .expect("merge_partition_rows must not fail");
2871
2872            // Build the reference result: per clustering-key int, highest timestamp wins.
2873            // (run_index as tie-breaker: lower run_index wins at equal ts — same as merger)
2874            let mut ref_map: HashMap<u8, (i64, usize)> = HashMap::new();
2875            for &(ck, run_index, ts) in &entries {
2876                ref_map.entry(ck)
2877                    .and_modify(|(best_ts, best_run)| {
2878                        if ts > *best_ts || (ts == *best_ts && run_index < *best_run) {
2879                            *best_ts = ts;
2880                            *best_run = run_index;
2881                        }
2882                    })
2883                    .or_insert((ts, run_index));
2884            }
2885
2886            // Verify each winner in the real output matches the reference.
2887            prop_assert_eq!(
2888                real_merged.len(),
2889                ref_map.len(),
2890                "real merger output row count must match reference"
2891            );
2892
2893            for entry in &real_merged {
2894                let ck_byte = match entry.clustering_key.as_ref()
2895                    .and_then(|ck| ck.columns.first())
2896                    .map(|(_, v)| v)
2897                {
2898                    Some(Value::TinyInt(b)) => *b as u8,
2899                    _ => {
2900                        prop_assert!(false, "unexpected clustering key value");
2901                        unreachable!()
2902                    }
2903                };
2904
2905                let (ref_ts, _ref_run) = ref_map[&ck_byte];
2906                prop_assert_eq!(
2907                    entry.timestamp,
2908                    ref_ts,
2909                    "real merger winner timestamp must match reference for ck={}",
2910                    ck_byte
2911                );
2912            }
2913        }
2914
2915        // Property G: Tombstone wins over live row at same clustering key in real merger
2916        // When a Tombstone and a Live row have the same clustering key, the one with
2917        // the higher timestamp must win — and the real merger must reflect this.
2918        #[test]
2919        fn prop_real_merger_tombstone_vs_live(
2920            ts_write in 1i64..=10i64,
2921            ts_delete in 1i64..=20i64,
2922        ) {
2923            use crate::schema::{Column, KeyColumn};
2924            use std::collections::HashMap as SchemaMap;
2925
2926            let schema = TableSchema {
2927                keyspace: "prop_test_ks".to_string(),
2928                table: "prop_test_table".to_string(),
2929                partition_keys: vec![KeyColumn {
2930                    name: "id".to_string(),
2931                    data_type: "int".to_string(),
2932                    position: 0,
2933                }],
2934                clustering_keys: vec![],
2935                columns: vec![Column {
2936                    name: "value".to_string(),
2937                    data_type: "text".to_string(),
2938                    nullable: true,
2939                    default: None,
2940                    is_static: false,
2941                }],
2942                comments: SchemaMap::new(),
2943            };
2944
2945            let partition_key = DecoratedKey::new(100, vec![0, 0, 0, 1]);
2946            let ck = ClusteringKey {
2947                columns: vec![("ck".to_string(), Value::TinyInt(0))],
2948            };
2949
2950            // Deliberately give the LIVE row the NEWER file (run_index 0) and the
2951            // tombstone the OLDER file (run_index 1). A run_index-only tiebreak at
2952            // equal timestamp would wrongly pick the live row; the Cassandra
2953            // liveness rule must pick the tombstone regardless of file recency.
2954            let live_entry = MergeEntry::new(
2955                0, // run_index 0 = newer file
2956                partition_key.clone(),
2957                Some(ck.clone()),
2958                ts_write,
2959                RowData::Live {
2960                    cells: vec![CellData {
2961                        column: "value".to_string(),
2962                        value: Value::Integer(42),
2963                        timestamp: ts_write,
2964                        ttl: None,
2965                    }],
2966                },
2967            );
2968            let tombstone_entry = MergeEntry::new(
2969                1, // run_index 1 = older file
2970                partition_key.clone(),
2971                Some(ck.clone()),
2972                ts_delete,
2973                RowData::Tombstone {
2974                    deletion_time: ts_delete,
2975                    local_deletion_time: 2000,
2976                },
2977            );
2978
2979            let merger = KWayMerger {
2980                runs: vec![],
2981                heap: std::collections::BinaryHeap::new(),
2982                current_partition: None,
2983                schema: schema.clone(),
2984            };
2985            let merged = merger.merge_partition_rows(vec![live_entry, tombstone_entry])
2986                .expect("merge_partition_rows must not fail");
2987
2988            prop_assert_eq!(merged.len(), 1, "one clustering key => one merged row");
2989
2990            let winner = &merged[0];
2991            if ts_delete > ts_write {
2992                // Tombstone has higher timestamp => should win.
2993                prop_assert!(
2994                    matches!(winner.row_data, RowData::Tombstone { .. }),
2995                    "Tombstone(ts={}) must win over Live(ts={})",
2996                    ts_delete, ts_write
2997                );
2998            } else if ts_write > ts_delete {
2999                // Live write has higher timestamp => should win.
3000                prop_assert!(
3001                    matches!(winner.row_data, RowData::Live { .. }),
3002                    "Live(ts={}) must win over Tombstone(ts={})",
3003                    ts_write, ts_delete
3004                );
3005            } else {
3006                // Equal timestamps: the tombstone (Delete) ALWAYS wins, matching
3007                // Cassandra `Cells#reconcile`. This must hold regardless of file
3008                // recency — the assertion previously carved this case out, hiding
3009                // the run_index-only tiebreak bug (Issue #498).
3010                prop_assert!(
3011                    matches!(winner.row_data, RowData::Tombstone { .. }),
3012                    "At equal ts={}, Tombstone must win over Live (Cassandra reconcile rule)",
3013                    ts_delete
3014                );
3015            }
3016        }
3017    }
3018}