Skip to main content

cqlite_core/storage/sstable/
index_reader.rs

1//! Index.db reader implementation for Cassandra 5+ SSTable format
2//!
3//! This module provides comprehensive parsing of Index.db files which contain
4//! partition-level index information including promoted index entries for wide partitions.
5//! The index is used for efficient partition lookups and range queries.
6
7use crate::{
8    error::{Error, Result},
9    parser::vint::parse_vuint,
10    platform::Platform,
11};
12
13use super::header_spec::get_global_registry;
14use nom::{bytes::complete::take, number::complete::be_u16, IResult};
15use serde::{Deserialize, Serialize};
16use std::collections::HashMap;
17use std::path::{Path, PathBuf};
18use std::sync::Arc;
19use tokio::fs::File;
20use tokio::io::AsyncReadExt;
21
22use super::summary_reader::SummaryReader;
23
24/// Index.db file header
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct IndexHeader {
27    /// Format version identifier
28    pub version: u32,
29    /// Number of index entries
30    pub entry_count: u32,
31    /// Size of the index data section
32    pub data_size: u64,
33    /// Checksum for validation
34    pub checksum: u32,
35}
36
37/// Partition index entry in Index.db
38#[derive(Debug, Clone)]
39pub struct PartitionIndexEntry {
40    /// Raw partition key bytes (length-prefixed in the on-disk BIG/NB Index.db format).
41    ///
42    /// NOTE (Issue #552): Despite the historical field name `key_digest`, this holds the
43    /// RAW partition key bytes, not an MD5 digest. The real Cassandra 5.0 NB Index.db entry
44    /// format is `[key_len: u16 BE][raw key bytes][data_offset: vint][promoted_len: vint]`.
45    /// There is no `0x0010` marker and no MD5 digest on disk. The field name is retained to
46    /// avoid churn in the zero-copy lookup table and downstream callers; it is used directly
47    /// as the partition key (e.g. for `RowKey`). The leading u16 is the key length
48    /// (e.g. 0x0010 for a 16-byte UUID, 0x0026 for a 38-byte composite key).
49    pub key_digest: Arc<[u8]>,
50    /// Raw partition key bytes (mirror of `key_digest`, kept for API compatibility).
51    /// Always `Some` now that all entries carry their raw key.
52    pub raw_key: Option<Arc<[u8]>>,
53    /// Offset in Data.db file
54    pub data_offset: u64,
55    /// Size of partition data
56    pub data_size: u32,
57    /// Promoted index entries for wide partitions (optional)
58    pub promoted_index: Option<PromotedIndexData>,
59}
60
61/// Promoted index for wide partitions
62#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct PromotedIndexData {
64    /// Number of promoted index entries
65    pub entry_count: u32,
66    /// Individual promoted index entries
67    pub entries: Vec<PromotedIndexEntry>,
68}
69
70/// Individual promoted index entry
71#[derive(Debug, Clone, Serialize, Deserialize)]
72pub struct PromotedIndexEntry {
73    /// Clustering key prefix
74    pub clustering_key: Vec<u8>,
75    /// Offset within the partition
76    pub partition_offset: u32,
77    /// Size of the indexed section
78    pub section_size: u32,
79}
80
81/// Complete Index.db data structure
82#[derive(Debug, Clone)]
83pub struct IndexData {
84    /// File header
85    pub header: IndexHeader,
86    /// All partition index entries
87    pub partition_entries: Vec<PartitionIndexEntry>,
88    /// Lookup table for efficient partition access - uses Arc<[u8]> as key type
89    ///
90    /// ## Zero-Copy Design (Issue #107, Problem 1)
91    ///
92    /// - Keys are `Arc<[u8]>` to enable reference counting without cloning digest bytes
93    /// - Lookups use `&[u8]` directly via Borrow trait (zero heap allocations)
94    /// - `Arc<[u8]>` implements `Borrow<[u8]>` enabling HashMap::get(&[u8]) without temporary Arc creation
95    pub key_lookup: HashMap<Arc<[u8]>, usize>,
96}
97
98/// High-level Index.db file reader
99#[allow(dead_code)]
100pub struct IndexReader {
101    /// Path to the Index.db file
102    file_path: PathBuf,
103    /// Parsed index data
104    index_data: IndexData,
105    /// Platform abstraction for file operations
106    platform: Arc<Platform>,
107}
108
109impl IndexReader {
110    /// Open and parse an Index.db file
111    pub async fn open(path: &Path, platform: Arc<Platform>) -> Result<Self> {
112        Self::open_with_summary(path, platform, None).await
113    }
114
115    /// Open and parse an Index.db file with Summary.db correlation for proper offset mapping
116    pub async fn open_with_summary(
117        path: &Path,
118        platform: Arc<Platform>,
119        summary_reader: Option<&SummaryReader>,
120    ) -> Result<Self> {
121        if !platform.fs().exists(path).await? {
122            return Err(Error::not_found(format!(
123                "Index.db file not found: {}",
124                path.display()
125            )));
126        }
127
128        // Read the entire file
129        let mut file = File::open(path).await?;
130        let mut buffer = Vec::new();
131        file.read_to_end(&mut buffer).await?;
132
133        // Check for empty file
134        if buffer.is_empty() {
135            return Err(Error::corruption(format!(
136                "Index.db file is empty: {}",
137                path.display()
138            )));
139        }
140
141        // Parse the index data with optional Summary.db correlation
142        let index_data = match parse_index_data_with_summary(&buffer, summary_reader) {
143            Ok((_, data)) => data,
144            Err(e) => {
145                return Err(Error::corruption(format!(
146                    "Failed to parse Index.db: {:?}",
147                    e
148                )));
149            }
150        };
151
152        Ok(Self {
153            file_path: path.to_path_buf(),
154            index_data,
155            platform,
156        })
157    }
158
159    /// Get all partition entries
160    pub fn get_partition_entries(&self) -> &[PartitionIndexEntry] {
161        &self.index_data.partition_entries
162    }
163
164    /// Look up a partition by key digest
165    ///
166    /// ## Zero-Allocation Optimization (Issue #107)
167    ///
168    /// This method performs HashMap lookup without heap allocation by leveraging
169    /// the `Borrow` trait. Since `Arc<[u8]>` implements `Borrow<[u8]>`, we can
170    /// lookup using `&[u8]` directly without creating a temporary Arc.
171    ///
172    /// **Before:** `let key_arc: Arc<[u8]> = key_digest.into();` (heap allocation per query)
173    /// **After:** Direct `get(key_digest)` using Borrow trait (zero allocations)
174    pub fn lookup_partition(&self, key_digest: &[u8]) -> Option<&PartitionIndexEntry> {
175        self.index_data
176            .key_lookup
177            .get(key_digest)
178            .and_then(|&index| self.index_data.partition_entries.get(index))
179    }
180
181    /// Get statistics about the index
182    pub fn get_statistics(&self) -> IndexStatistics {
183        let mut promoted_count = 0;
184        let mut total_promoted_entries = 0;
185
186        for entry in &self.index_data.partition_entries {
187            if let Some(ref promoted) = entry.promoted_index {
188                promoted_count += 1;
189                total_promoted_entries += promoted.entry_count as usize;
190            }
191        }
192
193        IndexStatistics {
194            total_partitions: self.index_data.partition_entries.len(),
195            partitions_with_promoted_index: promoted_count,
196            total_promoted_entries,
197            file_size: self.file_path.metadata().map(|m| m.len()).unwrap_or(0),
198        }
199    }
200
201    /// Validate index integrity against Data.db offsets
202    pub async fn validate_integrity(&self) -> Result<Vec<String>> {
203        let mut issues = Vec::new();
204
205        // Check for overlapping offsets
206        let mut offsets: Vec<_> = self
207            .index_data
208            .partition_entries
209            .iter()
210            .map(|e| (e.data_offset, e.data_size))
211            .collect();
212
213        offsets.sort_by_key(|&(offset, _)| offset);
214
215        for i in 1..offsets.len() {
216            let (prev_offset, prev_size) = offsets[i - 1];
217            let (curr_offset, _) = offsets[i];
218
219            if prev_offset + prev_size as u64 > curr_offset {
220                issues.push(format!(
221                    "Overlapping partitions: offset {} + size {} overlaps with offset {}",
222                    prev_offset, prev_size, curr_offset
223                ));
224            }
225        }
226
227        Ok(issues)
228    }
229}
230
231/// Index statistics for analysis and validation
232#[derive(Debug, Clone, Serialize, Deserialize)]
233pub struct IndexStatistics {
234    /// Total number of partitions
235    pub total_partitions: usize,
236    /// Number of partitions with promoted index
237    pub partitions_with_promoted_index: usize,
238    /// Total number of promoted index entries
239    pub total_promoted_entries: usize,
240    /// File size in bytes
241    pub file_size: u64,
242}
243
244/// Parse Index.db file data with optional Summary.db correlation using spec-driven approach
245fn parse_index_data_with_summary<'a>(
246    input: &'a [u8],
247    summary_reader: Option<&SummaryReader>,
248) -> IResult<&'a [u8], IndexData> {
249    use nom::error::{Error as NomError, ErrorKind};
250
251    // First try spec-driven header parsing
252    let registry = get_global_registry();
253    let (remaining, header) = match registry.parse_index_header(input) {
254        Ok(parsed_header) => {
255            log::debug!("Successfully parsed Index.db header using spec-driven approach");
256
257            // Convert ParsedHeader to IndexHeader
258            let header = IndexHeader {
259                version: parsed_header
260                    .fields
261                    .get("version")
262                    .and_then(|v| v.as_u32().ok())
263                    .unwrap_or(1),
264                entry_count: parsed_header
265                    .fields
266                    .get("entry_count")
267                    .and_then(|v| v.as_u32().ok())
268                    .unwrap_or(0),
269                data_size: parsed_header
270                    .fields
271                    .get("data_size")
272                    .and_then(|v| v.as_u64().ok())
273                    .unwrap_or(input.len() as u64),
274                checksum: parsed_header
275                    .fields
276                    .get("checksum")
277                    .and_then(|v| v.as_u32().ok())
278                    .unwrap_or(0),
279            };
280
281            // Skip header bytes for data parsing
282            let header_size = parsed_header.header_size;
283            if input.len() < header_size {
284                return Err(nom::Err::Error(NomError::new(input, ErrorKind::Eof)));
285            }
286            (&input[header_size..], header)
287        }
288        Err(_) => {
289            log::debug!("Spec-driven header parsing failed, assuming headerless format");
290
291            // Parse all partition key digests - no header in some formats
292            let header = IndexHeader {
293                version: 1,
294                entry_count: 0, // Will be updated after parsing entries
295                data_size: input.len() as u64,
296                checksum: 0,
297            };
298            (input, header)
299        }
300    };
301
302    // Parse partition entries from remaining data
303    let (remaining, partition_entries) =
304        parse_all_partition_keys_with_summary(remaining, summary_reader)?;
305
306    // Build lookup table with zero-copy approach using Arc::clone (reference counting only)
307    // This eliminates the memory explosion from cloning Vec<u8> key digests
308    let mut key_lookup = HashMap::new();
309    for (index, entry) in partition_entries.iter().enumerate() {
310        key_lookup.insert(Arc::clone(&entry.key_digest), index);
311    }
312
313    // Update header with actual entry count
314    let header = IndexHeader {
315        entry_count: partition_entries.len() as u32,
316        ..header
317    };
318
319    Ok((
320        remaining,
321        IndexData {
322            header,
323            partition_entries,
324            key_lookup,
325        },
326    ))
327}
328
329/// Parse all partition entries from the Index.db file.
330///
331/// ## Authoritative format (Issue #552, Cassandra 5.0 NB / BIG Index.db)
332///
333/// Index.db is ALWAYS the BIG-format partition index. Each entry is:
334///
335/// ```text
336/// [key_len: u16 BE]                    ← length of the raw partition key
337/// [raw partition key bytes: key_len]   ← the partition key exactly as in Data.db
338/// [data_offset: unsigned vint]         ← byte offset into the Data.db data section
339/// [promoted_index_len: unsigned vint]  ← byte length of the promoted index (0 = none)
340/// [promoted_index_data: promoted_index_len bytes]
341/// ```
342///
343/// The leading u16 is the partition key LENGTH, not a `0x0010` marker, and there is no
344/// MD5 digest on disk (verified against real Cassandra Index.db files: single-UUID keys
345/// start `0x0010`, the composite-key `multi_partition_table` starts `0x0026` = 38 bytes).
346///
347/// There is no separate "BTI" Index.db format: a BTI-indexed SSTable uses Partitions.db /
348/// Rows.db trie structures and does not produce an Index.db at all (see guide Ch.17). So the
349/// previous `detect_index_format` heuristic was entirely spurious (Issue #28 mandate) and has
350/// been removed in favour of this single, spec-accurate parser that works for ANY key length.
351///
352/// The `summary_reader` argument is retained for API compatibility; offsets are now stored
353/// inline so Summary.db correlation is no longer needed for parsing.
354fn parse_all_partition_keys_with_summary<'a>(
355    input: &'a [u8],
356    _summary_reader: Option<&SummaryReader>,
357) -> IResult<&'a [u8], Vec<PartitionIndexEntry>> {
358    let mut entries = Vec::new();
359    let mut remaining = input;
360
361    let mut entry_index = 0;
362    while !remaining.is_empty() {
363        match parse_big_index_entry(remaining) {
364            Ok((rest, entry)) => {
365                debug_assert!(
366                    rest.len() < remaining.len(),
367                    "BIG Index.db parser must make forward progress"
368                );
369                entries.push(entry);
370                remaining = rest;
371                entry_index += 1;
372            }
373            Err(_e) => {
374                log::debug!(
375                    "Stopped parsing Index.db at entry {} with {} bytes remaining",
376                    entry_index,
377                    remaining.len()
378                );
379                break;
380            }
381        }
382    }
383
384    log::debug!("Parsed {} partition entries from Index.db", entries.len());
385    Ok((remaining, entries))
386}
387
388/// Parse a single BIG-format Index.db entry.
389///
390/// Layout: `[key_len: u16 BE][raw key][data_offset: vint][promoted_len: vint][promoted...]`.
391/// Works for any key length (int, text, UUID, composite). The raw partition key is stored
392/// directly in `key_digest` / `raw_key` (no MD5, no marker).
393pub(crate) fn parse_big_index_entry(input: &[u8]) -> IResult<&[u8], PartitionIndexEntry> {
394    // Read partition key length (u16 big-endian).
395    let (input, key_len) = be_u16(input)?;
396
397    // Read the raw partition key bytes.
398    let (input, key_bytes) = take(key_len)(input)?;
399
400    // Read unsigned VInt data offset (relative to the Data.db data section start;
401    // SSTableReader adds the header size when seeking).
402    let (input, data_offset) = parse_vuint(input)?;
403
404    // Read promoted-index length (unsigned VInt) and skip the promoted data.
405    // Partition-level lookups work without decoding the promoted index.
406    let (input, promoted_len) = parse_vuint(input)?;
407    // Saturating cast: on a 32-bit target `promoted_len as usize` could truncate and
408    // misalign subsequent entries. `usize::MAX` makes `take` return an Eof error on a
409    // short buffer instead, which is the safe failure mode for a corrupt Index.db.
410    let promoted_len = usize::try_from(promoted_len).unwrap_or(usize::MAX);
411    let (input, _promoted_data) = take(promoted_len)(input)?;
412
413    log::trace!(
414        "Index.db BIG entry: key_len={}, data_offset={}, promoted_len={}",
415        key_len,
416        data_offset,
417        promoted_len
418    );
419
420    let raw_key: Arc<[u8]> = Arc::from(key_bytes);
421
422    Ok((
423        input,
424        PartitionIndexEntry {
425            key_digest: Arc::clone(&raw_key),
426            raw_key: Some(raw_key),
427            // Size is not stored in Index.db; determined during the Data.db read.
428            data_offset,
429            data_size: 0,
430            promoted_index: None,
431        },
432    ))
433}
434
435// REMOVED: Old heuristic functions that violated Issue #28 no-heuristics mandate
436// - calculate_data_offset_from_summary: Summary.db correlation (now obsolete with inline offsets)
437// - interpolate_data_offset_from_summary_position: Used arbitrary estimates
438// - estimate_data_offset_from_index_position: Used hardcoded partition size guesses
439//
440// Modern Cassandra 5+ Index.db format includes unsigned VInt offsets inline,
441// eliminating the need for Summary.db correlation. See parse_vuint() in parser/vint.rs.
442
443/// Parse Index.db file data - Legacy API for backward compatibility
444#[allow(dead_code)]
445fn parse_index_data(input: &[u8]) -> IResult<&[u8], IndexData> {
446    parse_index_data_with_summary(input, None)
447}
448
449/// Parse all partition key digests from the Index.db file - Legacy API
450#[allow(dead_code)]
451pub(crate) fn parse_all_partition_keys(input: &[u8]) -> IResult<&[u8], Vec<PartitionIndexEntry>> {
452    parse_all_partition_keys_with_summary(input, None)
453}
454
455/// Parse a single BIG-format Index.db partition entry - Legacy API
456#[allow(dead_code)]
457fn parse_simple_partition_key(input: &[u8]) -> IResult<&[u8], PartitionIndexEntry> {
458    parse_big_index_entry(input)
459}
460
461// Note: Promoted index parsing removed as it's not present in the simple Index.db format
462// Real Cassandra 5 Index.db files only contain partition key digests
463
464#[cfg(test)]
465mod tests {
466    use super::*;
467    use std::env;
468
469    /// Test stock_prices Index.db parsing (Issue #208)
470    ///
471    /// This test directly parses the stock_prices Index.db file which contains 3 partition entries (AMZN, GOOG, AAPL).
472    /// Note: Data.db.jsonl only has 2 entries, suggesting incomplete test data or filtering at a higher level.
473    /// The file uses a BTI format with actual partition keys (not MD5 digests).
474    ///
475    /// **Note:** This test requires test data files and is ignored in minimal CI builds.
476    /// Run with: `cargo test --package cqlite-core -- --ignored`
477    #[tokio::test]
478    #[ignore = "Requires test data files (CQLITE_DATASETS_ROOT)"]
479    async fn test_stock_prices_index_db_parsing() {
480        let datasets_root = env::var("CQLITE_DATASETS_ROOT").unwrap_or_else(|_| {
481            "/Users/patrick/local_projects/cqlite/test-data/datasets".to_string()
482        });
483
484        let index_path = format!(
485            "{}/sstables/test_timeseries/stock_prices-6c9fad60a25111f0a3fef1a551383fb9/nb-1-big-Index.db",
486            datasets_root
487        );
488
489        println!("\n=== Testing stock_prices Index.db ===");
490        println!("Path: {}", index_path);
491
492        // Read file directly to inspect format
493        let file_data = std::fs::read(&index_path).expect("Failed to read Index.db");
494        println!("File size: {} bytes", file_data.len());
495        println!(
496            "First 56 bytes (hex): {:02x?}",
497            &file_data[..std::cmp::min(56, file_data.len())]
498        );
499
500        // Check format detection
501        println!("\n=== Format Analysis ===");
502        println!(
503            "First 2 bytes: {:#06x} (expected 0x0010 for digest format)",
504            u16::from_be_bytes([file_data[0], file_data[1]])
505        );
506
507        // Try to parse with current implementation
508        println!("\n=== Parsing with parse_all_partition_keys_with_summary ===");
509        match parse_all_partition_keys_with_summary(&file_data, None) {
510            Ok((remaining, entries)) => {
511                println!("SUCCESS: Parsed {} entries", entries.len());
512                println!("Remaining bytes: {}", remaining.len());
513
514                for (i, entry) in entries.iter().enumerate() {
515                    println!(
516                        "  Entry {}: offset={}, size={}, key_digest={:02x?}",
517                        i,
518                        entry.data_offset,
519                        entry.data_size,
520                        &entry.key_digest[..]
521                    );
522                }
523
524                // Note: Index.db contains 3 entries (AMZN, GOOG, AAPL) but Data.db.jsonl only has 2.
525                // This may indicate incomplete test data or filtering at a higher level.
526                // For now, verify parser works correctly (finds all entries in Index.db).
527                assert!(
528                    entries.len() >= 2,
529                    "Expected at least 2 partition entries for stock_prices (found {})",
530                    entries.len()
531                );
532            }
533            Err(e) => {
534                println!("FAILED: {:?}", e);
535                panic!("Failed to parse stock_prices Index.db: {:?}", e);
536            }
537        }
538    }
539
540    /// Test stock_prices Index.db via IndexReader (Issue #208)
541    ///
542    /// This test uses the high-level IndexReader API to open the stock_prices Index.db.
543    /// It should successfully parse at least 2 partition entries (Index.db has 3 total).
544    ///
545    /// **Note:** This test requires test data files and is ignored in minimal CI builds.
546    /// Run with: `cargo test --package cqlite-core -- --ignored`
547    #[tokio::test]
548    #[ignore = "Requires test data files (CQLITE_DATASETS_ROOT)"]
549    async fn test_stock_prices_index_reader() {
550        let datasets_root = env::var("CQLITE_DATASETS_ROOT").unwrap_or_else(|_| {
551            "/Users/patrick/local_projects/cqlite/test-data/datasets".to_string()
552        });
553
554        let index_path = std::path::PathBuf::from(format!(
555            "{}/sstables/test_timeseries/stock_prices-6c9fad60a25111f0a3fef1a551383fb9/nb-1-big-Index.db",
556            datasets_root
557        ));
558
559        println!("\n=== Testing IndexReader::open ===");
560        println!("Path: {:?}", index_path);
561
562        // Create platform
563        let config = crate::Config::default();
564        let platform = Arc::new(
565            crate::Platform::new(&config)
566                .await
567                .expect("Failed to create platform"),
568        );
569
570        // Try to open with IndexReader
571        match IndexReader::open(&index_path, platform.clone()).await {
572            Ok(reader) => {
573                let entries = reader.get_partition_entries();
574                println!(
575                    "SUCCESS: IndexReader found {} partition entries",
576                    entries.len()
577                );
578
579                for (i, entry) in entries.iter().enumerate() {
580                    println!(
581                        "  Entry {}: offset={}, size={}, key_digest={:02x?}",
582                        i,
583                        entry.data_offset,
584                        entry.data_size,
585                        &entry.key_digest[..8]
586                    );
587                }
588
589                let stats = reader.get_statistics();
590                println!(
591                    "Statistics: total_partitions={}, file_size={}",
592                    stats.total_partitions, stats.file_size
593                );
594
595                // Verify parser works correctly (Index.db has 3 entries, Data.db.jsonl has 2)
596                assert!(
597                    entries.len() >= 2,
598                    "Expected at least 2 partition entries for stock_prices (found {})",
599                    entries.len()
600                );
601            }
602            Err(e) => {
603                println!("FAILED: {:?}", e);
604                panic!("Failed to open stock_prices Index.db: {:?}", e);
605            }
606        }
607    }
608
609    /// Test stock_prices via SSTableReader integration (Issue #208)
610    ///
611    /// This test verifies that SSTableReader correctly loads the Index.db
612    /// and can access partition entries (at least 2, Index.db has 3 total).
613    ///
614    /// **Note:** This test requires test data files and is ignored in minimal CI builds.
615    /// Run with: `cargo test --package cqlite-core -- --ignored`
616    #[tokio::test]
617    #[ignore = "Requires test data files (CQLITE_DATASETS_ROOT)"]
618    async fn test_stock_prices_sstable_reader_integration() {
619        let datasets_root = env::var("CQLITE_DATASETS_ROOT").unwrap_or_else(|_| {
620            "/Users/patrick/local_projects/cqlite/test-data/datasets".to_string()
621        });
622
623        let data_path = std::path::PathBuf::from(format!(
624            "{}/sstables/test_timeseries/stock_prices-6c9fad60a25111f0a3fef1a551383fb9/nb-1-big-Data.db",
625            datasets_root
626        ));
627
628        println!("\n=== Testing SSTableReader with stock_prices ===");
629        println!("Data.db path: {:?}", data_path);
630
631        // Create platform
632        let config = crate::Config::default();
633        let platform = Arc::new(
634            crate::Platform::new(&config)
635                .await
636                .expect("Failed to create platform"),
637        );
638
639        // Try to open with SSTableReader
640        use crate::storage::sstable::reader::SSTableReader;
641        match SSTableReader::open(&data_path, &config, platform.clone()).await {
642            Ok(reader) => {
643                println!("SUCCESS: SSTableReader opened");
644
645                // Check if index_reader was loaded (it's a public field)
646                if let Some(ref index_reader) = reader.index_reader {
647                    let entries = index_reader.get_partition_entries();
648                    println!("Index loaded with {} partition entries", entries.len());
649
650                    for (i, entry) in entries.iter().enumerate() {
651                        println!(
652                            "  Entry {}: offset={}, size={}",
653                            i, entry.data_offset, entry.data_size
654                        );
655                    }
656
657                    // Verify Index.db was parsed correctly (has at least 2 entries, actually has 3)
658                    assert!(
659                        entries.len() >= 2,
660                        "Expected at least 2 partition entries for stock_prices (found {})",
661                        entries.len()
662                    );
663                } else {
664                    println!("WARNING: Index.db was not loaded by SSTableReader");
665                    panic!("SSTableReader did not load Index.db");
666                }
667            }
668            Err(e) => {
669                println!("FAILED: {:?}", e);
670                panic!("Failed to open stock_prices SSTable: {:?}", e);
671            }
672        }
673    }
674
675    /// Issue #552: Validate the BIG-format parser against REAL Cassandra 5.0 Index.db files.
676    ///
677    /// `simple_table` has a single 16-byte UUID partition key (entries start 0x0010).
678    /// `multi_partition_table` has a 38-byte composite partition key (entries start 0x0026).
679    /// Both must read back ALL entries with monotonically increasing offsets.
680    #[tokio::test]
681    #[ignore = "Requires test data files (CQLITE_DATASETS_ROOT)"]
682    async fn test_real_index_db_big_format() {
683        let datasets_root = env::var("CQLITE_DATASETS_ROOT").unwrap_or_else(|_| {
684            "/Users/patrickmcfadin/local_projects/cqlite/test-data/datasets".to_string()
685        });
686
687        // --- Composite-key table (38-byte keys, entries start 0x0026) ---
688        let multi_dir = format!(
689            "{}/sstables/test_basic/multi_partition_table-6ac52100a25111f0a3fef1a551383fb9",
690            datasets_root
691        );
692        let multi_index = format!("{}/nb-1-big-Index.db", multi_dir);
693        let bytes = std::fs::read(&multi_index).expect("read multi_partition_table Index.db");
694        assert_eq!(
695            u16::from_be_bytes([bytes[0], bytes[1]]),
696            38,
697            "Composite key length should be 38 (0x0026)"
698        );
699        let (rest, entries) = parse_all_partition_keys(&bytes).expect("parse composite Index.db");
700        assert!(rest.is_empty(), "Should consume all Index.db bytes");
701        assert!(
702            entries.len() >= 2,
703            "multi_partition_table should have multiple partitions (got {})",
704            entries.len()
705        );
706        // First key is 38 bytes; first offset must be 0.
707        assert_eq!(
708            entries[0].key_digest.len(),
709            38,
710            "First key should be 38 bytes"
711        );
712        assert_eq!(
713            entries[0].data_offset, 0,
714            "First partition offset should be 0"
715        );
716        // Offsets are strictly increasing in token order.
717        for i in 1..entries.len() {
718            assert!(
719                entries[i].data_offset > entries[i - 1].data_offset,
720                "Offsets must increase: entry {} ({}) <= entry {} ({})",
721                i,
722                entries[i].data_offset,
723                i - 1,
724                entries[i - 1].data_offset
725            );
726        }
727
728        // --- Single-UUID-key table (16-byte keys, entries start 0x0010) ---
729        let simple_index = format!(
730            "{}/sstables/test_basic/simple_table-6aa08200a25111f0a3fef1a551383fb9/nb-1-big-Index.db",
731            datasets_root
732        );
733        let bytes = std::fs::read(&simple_index).expect("read simple_table Index.db");
734        assert_eq!(
735            u16::from_be_bytes([bytes[0], bytes[1]]),
736            16,
737            "UUID key length should be 16 (0x0010)"
738        );
739        let (rest, entries) = parse_all_partition_keys(&bytes).expect("parse simple Index.db");
740        assert!(rest.is_empty(), "Should consume all Index.db bytes");
741        assert!(
742            entries.len() > 3,
743            "simple_table should have many partitions (got {})",
744            entries.len()
745        );
746        assert_eq!(
747            entries[0].key_digest.len(),
748            16,
749            "First key should be 16 bytes"
750        );
751        assert_eq!(
752            entries[0].data_offset, 0,
753            "First partition offset should be 0"
754        );
755    }
756
757    #[test]
758    fn test_simple_partition_key_parsing() {
759        // NB BIG format: key_len(2) + raw_key(key_len) + vint_offset(1-9) + vint_promoted_size(1-9)
760        // VInt encoding for 256: 0x81, 0x00 (2 bytes, 10xxxxxx format)
761        let data = vec![
762            0x00, 0x10, // key_len = 16 (e.g. a 16-byte UUID partition key)
763            0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, // raw key (16 bytes)
764            0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10, // key_digest cont.
765            0x81, 0x00, // VInt offset = 256
766            0x00, // VInt promoted_size = 0 (no promoted index)
767        ];
768
769        let (_, entry) = parse_simple_partition_key(&data).unwrap();
770
771        assert_eq!(
772            entry.key_digest.as_ref(),
773            &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]
774        );
775        // Raw offset from Index.db (relative to data section start)
776        // SSTableReader will add actual_header_size to get absolute file offset
777        assert_eq!(entry.data_offset, 256);
778        assert_eq!(entry.data_size, 0); // Size not stored in Index.db (Issue #149)
779        assert!(entry.promoted_index.is_none());
780    }
781
782    #[test]
783    fn test_partition_key_parsing_without_summary() {
784        // BIG format: key_len(2) + raw key(key_len) + vint_offset + vint_promoted_size
785        // VInt encoding for 4096 (0x1000): 0x90, 0x00 (2 bytes, 10xxxxxx format)
786        // byte0 = 0x80 | ((4096 >> 8) & 0x3F) = 0x80 | 0x10 = 0x90
787        // byte1 = 4096 & 0xFF = 0x00
788        let data = vec![
789            0x00, 0x10, // key_len = 16
790            0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, // raw key (16 bytes)
791            0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10, // raw key cont.
792            0x90, 0x00, // VInt offset = 4096
793            0x00, // VInt promoted_size = 0
794        ];
795
796        let (_, entry) = parse_simple_partition_key(&data).unwrap();
797
798        assert_eq!(
799            entry.key_digest.as_ref(),
800            &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]
801        );
802        assert_eq!(
803            entry.raw_key.as_deref(),
804            Some(&[1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16][..]),
805            "raw_key should mirror the raw partition key"
806        );
807
808        // Raw offset from Index.db (relative to data section start)
809        assert_eq!(entry.data_offset, 4096);
810    }
811
812    #[test]
813    fn test_variable_length_keys_parse_all_entries() {
814        // Issue #552: prove the parser handles non-16-byte keys (composite/int/text).
815        // Entry 1: 4-byte int key (0x0000002A), offset 100, no promoted index.
816        // Entry 2: 1-byte key (0x07), offset 500 (2-byte vint 0x81 0xF4), no promoted.
817        let data = vec![
818            // Entry 1
819            0x00, 0x04, // key_len = 4
820            0x00, 0x00, 0x00, 0x2A, // raw key (int 42)
821            0x64, // vint offset = 100
822            0x00, // vint promoted_size = 0
823            // Entry 2
824            0x00, 0x01, // key_len = 1
825            0x07, // raw key
826            0x81, 0xF4, // vint offset = 500
827            0x00, // vint promoted_size = 0
828        ];
829
830        let (rest, entries) = parse_all_partition_keys(&data).unwrap();
831        assert!(rest.is_empty(), "All bytes should be consumed");
832        assert_eq!(entries.len(), 2, "Both variable-length entries must parse");
833
834        assert_eq!(entries[0].key_digest.as_ref(), &[0x00, 0x00, 0x00, 0x2A]);
835        assert_eq!(entries[0].data_offset, 100);
836
837        assert_eq!(entries[1].key_digest.as_ref(), &[0x07]);
838        assert_eq!(entries[1].data_offset, 500);
839    }
840
841    // REMOVED: test_enhanced_partition_entry_parsing
842    // Enhanced format parsing removed per Issue #92
843
844    #[test]
845    fn test_multiple_partition_keys_parsing() {
846        // Two partition entries with VInt offsets (NB format)
847        // Format: key_len(2) + raw_key(key_len) + vint_offset + vint_promoted_size
848        // VInt encoding for 100 (0x64): 0x64 (1 byte, value < 128)
849        // VInt encoding for 500 (0x1F4): 0x81, 0xF4 (2 bytes, 10xxxxxx format)
850        //   byte0 = 0x80 | ((500 >> 8) & 0x3F) = 0x80 | 1 = 0x81
851        //   byte1 = 500 & 0xFF = 0xF4
852        let data = vec![
853            // Entry 1
854            0x00, 0x10, // key_len = 16 (e.g. a 16-byte UUID partition key)
855            0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, // key_digest 1 (16 bytes)
856            0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10, // key_digest cont.
857            0x64, // VInt offset = 100
858            0x00, // VInt promoted_size = 0
859            // Entry 2
860            0x00, 0x10, // key_len = 16 (e.g. a 16-byte UUID partition key)
861            0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, // key_digest 2 (16 bytes)
862            0x19, 0x1A, 0x1B, 0x1C, 0x1D, 0x1E, 0x1F, 0x20, // key_digest cont.
863            0x81, 0xF4, // VInt offset = 500
864            0x00, // VInt promoted_size = 0
865        ];
866
867        let (_, entries) = parse_all_partition_keys(&data).unwrap();
868
869        assert_eq!(entries.len(), 2);
870
871        if !entries.is_empty() {
872            assert_eq!(
873                entries[0].key_digest.as_ref(),
874                &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]
875            );
876        }
877
878        if entries.len() >= 2 {
879            assert_eq!(
880                entries[1].key_digest.as_ref(),
881                &[
882                    0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1A, 0x1B, 0x1C, 0x1D,
883                    0x1E, 0x1F, 0x20
884                ]
885            );
886
887            // Raw offsets from Index.db (relative to data section start)
888            assert_eq!(entries[0].data_offset, 100);
889            assert_eq!(entries[1].data_offset, 500);
890        }
891    }
892
893    // REMOVED: test_data_offset_estimation_algorithm
894    // This test validated the old heuristic estimation function which has been removed
895    // in favor of spec-accurate Summary.db correlation (Issue #92)
896
897    #[test]
898    fn test_borrow_trait_zero_allocation_lookup() {
899        // Test Issue #107 fix: Verify that lookup_partition uses Borrow trait
900        // to avoid heap allocation on every lookup
901
902        // Create index data with two partition entries (NB format with VInt offsets)
903        // Format: key_len(2) + raw_key(key_len) + vint_offset + vint_promoted_size
904        // VInt for 100: 0x64 (single byte, value < 128)
905        // VInt for 500: 0x81, 0xF4 (2 bytes)
906        let data = vec![
907            // Entry 1
908            0x00, 0x10, // key_len = 16 (e.g. a 16-byte UUID partition key)
909            0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, // key_digest 1
910            0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10, // key_digest cont.
911            0x64, // VInt offset = 100
912            0x00, // VInt promoted_size = 0
913            // Entry 2
914            0x00, 0x10, // key_len = 16 (e.g. a 16-byte UUID partition key)
915            0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, // key_digest 2
916            0x19, 0x1A, 0x1B, 0x1C, 0x1D, 0x1E, 0x1F, 0x20, // key_digest cont.
917            0x81, 0xF4, // VInt offset = 500
918            0x00, // VInt promoted_size = 0
919        ];
920
921        let (_, index_data) = parse_index_data(&data).unwrap();
922
923        // Prepare lookup keys as slices (NOT Arc)
924        let key1: &[u8] = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16];
925        let key2: &[u8] = &[
926            0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1A, 0x1B, 0x1C, 0x1D, 0x1E,
927            0x1F, 0x20,
928        ];
929        let key_not_found: &[u8] = &[0xFF; 16];
930
931        // Test lookups - these should use Borrow trait without creating Arc
932        // The key_lookup HashMap has Arc<[u8]> keys but accepts &[u8] for get()
933        let result1 = index_data.key_lookup.get(key1);
934        let result2 = index_data.key_lookup.get(key2);
935        let result3 = index_data.key_lookup.get(key_not_found);
936
937        assert!(result1.is_some(), "Should find first key");
938        assert!(result2.is_some(), "Should find second key");
939        assert!(result3.is_none(), "Should not find non-existent key");
940
941        assert_eq!(*result1.unwrap(), 0, "First key should map to index 0");
942        assert_eq!(*result2.unwrap(), 1, "Second key should map to index 1");
943
944        // Verify the actual entries match
945        assert_eq!(index_data.partition_entries[0].key_digest.as_ref(), key1);
946        assert_eq!(index_data.partition_entries[1].key_digest.as_ref(), key2);
947    }
948}