cqlite_core/storage/sstable/index_reader.rs
1//! Index.db reader implementation for Cassandra 5+ SSTable format
2//!
3//! This module provides comprehensive parsing of Index.db files which contain
4//! partition-level index information including promoted index entries for wide partitions.
5//! The index is used for efficient partition lookups and range queries.
6
7use crate::{
8 error::{Error, Result},
9 parser::vint::parse_vuint,
10 platform::Platform,
11};
12
13use super::header_spec::get_global_registry;
14use nom::{bytes::complete::take, number::complete::be_u16, IResult};
15use serde::{Deserialize, Serialize};
16use std::collections::HashMap;
17use std::path::{Path, PathBuf};
18use std::sync::Arc;
19use tokio::fs::File;
20use tokio::io::AsyncReadExt;
21
22use super::summary_reader::SummaryReader;
23
24/// Index.db file header
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct IndexHeader {
27 /// Format version identifier
28 pub version: u32,
29 /// Number of index entries
30 pub entry_count: u32,
31 /// Size of the index data section
32 pub data_size: u64,
33 /// Checksum for validation
34 pub checksum: u32,
35}
36
37/// Partition index entry in Index.db
38#[derive(Debug, Clone)]
39pub struct PartitionIndexEntry {
40 /// Raw partition key bytes (length-prefixed in the on-disk BIG/NB Index.db format).
41 ///
42 /// NOTE (Issue #552): Despite the historical field name `key_digest`, this holds the
43 /// RAW partition key bytes, not an MD5 digest. The real Cassandra 5.0 NB Index.db entry
44 /// format is `[key_len: u16 BE][raw key bytes][data_offset: vint][promoted_len: vint]`.
45 /// There is no `0x0010` marker and no MD5 digest on disk. The field name is retained to
46 /// avoid churn in the zero-copy lookup table and downstream callers; it is used directly
47 /// as the partition key (e.g. for `RowKey`). The leading u16 is the key length
48 /// (e.g. 0x0010 for a 16-byte UUID, 0x0026 for a 38-byte composite key).
49 pub key_digest: Arc<[u8]>,
50 /// Raw partition key bytes (mirror of `key_digest`, kept for API compatibility).
51 /// Always `Some` now that all entries carry their raw key.
52 pub raw_key: Option<Arc<[u8]>>,
53 /// Offset in Data.db file
54 pub data_offset: u64,
55 /// Size of partition data
56 pub data_size: u32,
57 /// Promoted index entries for wide partitions (optional)
58 pub promoted_index: Option<PromotedIndexData>,
59}
60
61/// Promoted index for wide partitions
62#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct PromotedIndexData {
64 /// Number of promoted index entries
65 pub entry_count: u32,
66 /// Individual promoted index entries
67 pub entries: Vec<PromotedIndexEntry>,
68}
69
70/// Individual promoted index entry
71#[derive(Debug, Clone, Serialize, Deserialize)]
72pub struct PromotedIndexEntry {
73 /// Clustering key prefix
74 pub clustering_key: Vec<u8>,
75 /// Offset within the partition
76 pub partition_offset: u32,
77 /// Size of the indexed section
78 pub section_size: u32,
79}
80
81/// Complete Index.db data structure
82#[derive(Debug, Clone)]
83pub struct IndexData {
84 /// File header
85 pub header: IndexHeader,
86 /// All partition index entries
87 pub partition_entries: Vec<PartitionIndexEntry>,
88 /// Lookup table for efficient partition access - uses Arc<[u8]> as key type
89 ///
90 /// ## Zero-Copy Design (Issue #107, Problem 1)
91 ///
92 /// - Keys are `Arc<[u8]>` to enable reference counting without cloning digest bytes
93 /// - Lookups use `&[u8]` directly via Borrow trait (zero heap allocations)
94 /// - `Arc<[u8]>` implements `Borrow<[u8]>` enabling HashMap::get(&[u8]) without temporary Arc creation
95 pub key_lookup: HashMap<Arc<[u8]>, usize>,
96}
97
98/// High-level Index.db file reader
99#[allow(dead_code)]
100pub struct IndexReader {
101 /// Path to the Index.db file
102 file_path: PathBuf,
103 /// Parsed index data
104 index_data: IndexData,
105 /// Platform abstraction for file operations
106 platform: Arc<Platform>,
107}
108
109impl IndexReader {
110 /// Open and parse an Index.db file
111 pub async fn open(path: &Path, platform: Arc<Platform>) -> Result<Self> {
112 Self::open_with_summary(path, platform, None).await
113 }
114
115 /// Open and parse an Index.db file with Summary.db correlation for proper offset mapping
116 pub async fn open_with_summary(
117 path: &Path,
118 platform: Arc<Platform>,
119 summary_reader: Option<&SummaryReader>,
120 ) -> Result<Self> {
121 if !platform.fs().exists(path).await? {
122 return Err(Error::not_found(format!(
123 "Index.db file not found: {}",
124 path.display()
125 )));
126 }
127
128 // Read the entire file
129 let mut file = File::open(path).await?;
130 let mut buffer = Vec::new();
131 file.read_to_end(&mut buffer).await?;
132
133 // Check for empty file
134 if buffer.is_empty() {
135 return Err(Error::corruption(format!(
136 "Index.db file is empty: {}",
137 path.display()
138 )));
139 }
140
141 // Parse the index data with optional Summary.db correlation
142 let index_data = match parse_index_data_with_summary(&buffer, summary_reader) {
143 Ok((_, data)) => data,
144 Err(e) => {
145 return Err(Error::corruption(format!(
146 "Failed to parse Index.db: {:?}",
147 e
148 )));
149 }
150 };
151
152 Ok(Self {
153 file_path: path.to_path_buf(),
154 index_data,
155 platform,
156 })
157 }
158
159 /// Get all partition entries
160 pub fn get_partition_entries(&self) -> &[PartitionIndexEntry] {
161 &self.index_data.partition_entries
162 }
163
164 /// Look up a partition by key digest
165 ///
166 /// ## Zero-Allocation Optimization (Issue #107)
167 ///
168 /// This method performs HashMap lookup without heap allocation by leveraging
169 /// the `Borrow` trait. Since `Arc<[u8]>` implements `Borrow<[u8]>`, we can
170 /// lookup using `&[u8]` directly without creating a temporary Arc.
171 ///
172 /// **Before:** `let key_arc: Arc<[u8]> = key_digest.into();` (heap allocation per query)
173 /// **After:** Direct `get(key_digest)` using Borrow trait (zero allocations)
174 pub fn lookup_partition(&self, key_digest: &[u8]) -> Option<&PartitionIndexEntry> {
175 self.index_data
176 .key_lookup
177 .get(key_digest)
178 .and_then(|&index| self.index_data.partition_entries.get(index))
179 }
180
181 /// Get statistics about the index
182 pub fn get_statistics(&self) -> IndexStatistics {
183 let mut promoted_count = 0;
184 let mut total_promoted_entries = 0;
185
186 for entry in &self.index_data.partition_entries {
187 if let Some(ref promoted) = entry.promoted_index {
188 promoted_count += 1;
189 total_promoted_entries += promoted.entry_count as usize;
190 }
191 }
192
193 IndexStatistics {
194 total_partitions: self.index_data.partition_entries.len(),
195 partitions_with_promoted_index: promoted_count,
196 total_promoted_entries,
197 file_size: self.file_path.metadata().map(|m| m.len()).unwrap_or(0),
198 }
199 }
200
201 /// Validate index integrity against Data.db offsets
202 pub async fn validate_integrity(&self) -> Result<Vec<String>> {
203 let mut issues = Vec::new();
204
205 // Check for overlapping offsets
206 let mut offsets: Vec<_> = self
207 .index_data
208 .partition_entries
209 .iter()
210 .map(|e| (e.data_offset, e.data_size))
211 .collect();
212
213 offsets.sort_by_key(|&(offset, _)| offset);
214
215 for i in 1..offsets.len() {
216 let (prev_offset, prev_size) = offsets[i - 1];
217 let (curr_offset, _) = offsets[i];
218
219 if prev_offset + prev_size as u64 > curr_offset {
220 issues.push(format!(
221 "Overlapping partitions: offset {} + size {} overlaps with offset {}",
222 prev_offset, prev_size, curr_offset
223 ));
224 }
225 }
226
227 Ok(issues)
228 }
229}
230
231/// Index statistics for analysis and validation
232#[derive(Debug, Clone, Serialize, Deserialize)]
233pub struct IndexStatistics {
234 /// Total number of partitions
235 pub total_partitions: usize,
236 /// Number of partitions with promoted index
237 pub partitions_with_promoted_index: usize,
238 /// Total number of promoted index entries
239 pub total_promoted_entries: usize,
240 /// File size in bytes
241 pub file_size: u64,
242}
243
244/// Parse Index.db file data with optional Summary.db correlation using spec-driven approach
245fn parse_index_data_with_summary<'a>(
246 input: &'a [u8],
247 summary_reader: Option<&SummaryReader>,
248) -> IResult<&'a [u8], IndexData> {
249 use nom::error::{Error as NomError, ErrorKind};
250
251 // First try spec-driven header parsing
252 let registry = get_global_registry();
253 let (remaining, header) = match registry.parse_index_header(input) {
254 Ok(parsed_header) => {
255 log::debug!("Successfully parsed Index.db header using spec-driven approach");
256
257 // Convert ParsedHeader to IndexHeader
258 let header = IndexHeader {
259 version: parsed_header
260 .fields
261 .get("version")
262 .and_then(|v| v.as_u32().ok())
263 .unwrap_or(1),
264 entry_count: parsed_header
265 .fields
266 .get("entry_count")
267 .and_then(|v| v.as_u32().ok())
268 .unwrap_or(0),
269 data_size: parsed_header
270 .fields
271 .get("data_size")
272 .and_then(|v| v.as_u64().ok())
273 .unwrap_or(input.len() as u64),
274 checksum: parsed_header
275 .fields
276 .get("checksum")
277 .and_then(|v| v.as_u32().ok())
278 .unwrap_or(0),
279 };
280
281 // Skip header bytes for data parsing
282 let header_size = parsed_header.header_size;
283 if input.len() < header_size {
284 return Err(nom::Err::Error(NomError::new(input, ErrorKind::Eof)));
285 }
286 (&input[header_size..], header)
287 }
288 Err(_) => {
289 log::debug!("Spec-driven header parsing failed, assuming headerless format");
290
291 // Parse all partition key digests - no header in some formats
292 let header = IndexHeader {
293 version: 1,
294 entry_count: 0, // Will be updated after parsing entries
295 data_size: input.len() as u64,
296 checksum: 0,
297 };
298 (input, header)
299 }
300 };
301
302 // Parse partition entries from remaining data
303 let (remaining, partition_entries) =
304 parse_all_partition_keys_with_summary(remaining, summary_reader)?;
305
306 // Build lookup table with zero-copy approach using Arc::clone (reference counting only)
307 // This eliminates the memory explosion from cloning Vec<u8> key digests
308 let mut key_lookup = HashMap::new();
309 for (index, entry) in partition_entries.iter().enumerate() {
310 key_lookup.insert(Arc::clone(&entry.key_digest), index);
311 }
312
313 // Update header with actual entry count
314 let header = IndexHeader {
315 entry_count: partition_entries.len() as u32,
316 ..header
317 };
318
319 Ok((
320 remaining,
321 IndexData {
322 header,
323 partition_entries,
324 key_lookup,
325 },
326 ))
327}
328
329/// Parse all partition entries from the Index.db file.
330///
331/// ## Authoritative format (Issue #552, Cassandra 5.0 NB / BIG Index.db)
332///
333/// Index.db is ALWAYS the BIG-format partition index. Each entry is:
334///
335/// ```text
336/// [key_len: u16 BE] ← length of the raw partition key
337/// [raw partition key bytes: key_len] ← the partition key exactly as in Data.db
338/// [data_offset: unsigned vint] ← byte offset into the Data.db data section
339/// [promoted_index_len: unsigned vint] ← byte length of the promoted index (0 = none)
340/// [promoted_index_data: promoted_index_len bytes]
341/// ```
342///
343/// The leading u16 is the partition key LENGTH, not a `0x0010` marker, and there is no
344/// MD5 digest on disk (verified against real Cassandra Index.db files: single-UUID keys
345/// start `0x0010`, the composite-key `multi_partition_table` starts `0x0026` = 38 bytes).
346///
347/// There is no separate "BTI" Index.db format: a BTI-indexed SSTable uses Partitions.db /
348/// Rows.db trie structures and does not produce an Index.db at all (see guide Ch.17). So the
349/// previous `detect_index_format` heuristic was entirely spurious (Issue #28 mandate) and has
350/// been removed in favour of this single, spec-accurate parser that works for ANY key length.
351///
352/// The `summary_reader` argument is retained for API compatibility; offsets are now stored
353/// inline so Summary.db correlation is no longer needed for parsing.
354fn parse_all_partition_keys_with_summary<'a>(
355 input: &'a [u8],
356 _summary_reader: Option<&SummaryReader>,
357) -> IResult<&'a [u8], Vec<PartitionIndexEntry>> {
358 let mut entries = Vec::new();
359 let mut remaining = input;
360
361 let mut entry_index = 0;
362 while !remaining.is_empty() {
363 match parse_big_index_entry(remaining) {
364 Ok((rest, entry)) => {
365 debug_assert!(
366 rest.len() < remaining.len(),
367 "BIG Index.db parser must make forward progress"
368 );
369 entries.push(entry);
370 remaining = rest;
371 entry_index += 1;
372 }
373 Err(_e) => {
374 log::debug!(
375 "Stopped parsing Index.db at entry {} with {} bytes remaining",
376 entry_index,
377 remaining.len()
378 );
379 break;
380 }
381 }
382 }
383
384 log::debug!("Parsed {} partition entries from Index.db", entries.len());
385 Ok((remaining, entries))
386}
387
388/// Parse a single BIG-format Index.db entry.
389///
390/// Layout: `[key_len: u16 BE][raw key][data_offset: vint][promoted_len: vint][promoted...]`.
391/// Works for any key length (int, text, UUID, composite). The raw partition key is stored
392/// directly in `key_digest` / `raw_key` (no MD5, no marker).
393pub(crate) fn parse_big_index_entry(input: &[u8]) -> IResult<&[u8], PartitionIndexEntry> {
394 // Read partition key length (u16 big-endian).
395 let (input, key_len) = be_u16(input)?;
396
397 // Read the raw partition key bytes.
398 let (input, key_bytes) = take(key_len)(input)?;
399
400 // Read unsigned VInt data offset (relative to the Data.db data section start;
401 // SSTableReader adds the header size when seeking).
402 let (input, data_offset) = parse_vuint(input)?;
403
404 // Read promoted-index length (unsigned VInt) and skip the promoted data.
405 // Partition-level lookups work without decoding the promoted index.
406 let (input, promoted_len) = parse_vuint(input)?;
407 // Saturating cast: on a 32-bit target `promoted_len as usize` could truncate and
408 // misalign subsequent entries. `usize::MAX` makes `take` return an Eof error on a
409 // short buffer instead, which is the safe failure mode for a corrupt Index.db.
410 let promoted_len = usize::try_from(promoted_len).unwrap_or(usize::MAX);
411 let (input, _promoted_data) = take(promoted_len)(input)?;
412
413 log::trace!(
414 "Index.db BIG entry: key_len={}, data_offset={}, promoted_len={}",
415 key_len,
416 data_offset,
417 promoted_len
418 );
419
420 let raw_key: Arc<[u8]> = Arc::from(key_bytes);
421
422 Ok((
423 input,
424 PartitionIndexEntry {
425 key_digest: Arc::clone(&raw_key),
426 raw_key: Some(raw_key),
427 // Size is not stored in Index.db; determined during the Data.db read.
428 data_offset,
429 data_size: 0,
430 promoted_index: None,
431 },
432 ))
433}
434
435// REMOVED: Old heuristic functions that violated Issue #28 no-heuristics mandate
436// - calculate_data_offset_from_summary: Summary.db correlation (now obsolete with inline offsets)
437// - interpolate_data_offset_from_summary_position: Used arbitrary estimates
438// - estimate_data_offset_from_index_position: Used hardcoded partition size guesses
439//
440// Modern Cassandra 5+ Index.db format includes unsigned VInt offsets inline,
441// eliminating the need for Summary.db correlation. See parse_vuint() in parser/vint.rs.
442
443/// Parse Index.db file data - Legacy API for backward compatibility
444#[allow(dead_code)]
445fn parse_index_data(input: &[u8]) -> IResult<&[u8], IndexData> {
446 parse_index_data_with_summary(input, None)
447}
448
449/// Parse all partition key digests from the Index.db file - Legacy API
450#[allow(dead_code)]
451pub(crate) fn parse_all_partition_keys(input: &[u8]) -> IResult<&[u8], Vec<PartitionIndexEntry>> {
452 parse_all_partition_keys_with_summary(input, None)
453}
454
455/// Parse a single BIG-format Index.db partition entry - Legacy API
456#[allow(dead_code)]
457fn parse_simple_partition_key(input: &[u8]) -> IResult<&[u8], PartitionIndexEntry> {
458 parse_big_index_entry(input)
459}
460
461// Note: Promoted index parsing removed as it's not present in the simple Index.db format
462// Real Cassandra 5 Index.db files only contain partition key digests
463
464#[cfg(test)]
465mod tests {
466 use super::*;
467 use std::env;
468
469 /// Test stock_prices Index.db parsing (Issue #208)
470 ///
471 /// This test directly parses the stock_prices Index.db file which contains 3 partition entries (AMZN, GOOG, AAPL).
472 /// Note: Data.db.jsonl only has 2 entries, suggesting incomplete test data or filtering at a higher level.
473 /// The file uses a BTI format with actual partition keys (not MD5 digests).
474 ///
475 /// **Note:** This test requires test data files and is ignored in minimal CI builds.
476 /// Run with: `cargo test --package cqlite-core -- --ignored`
477 #[tokio::test]
478 #[ignore = "Requires test data files (CQLITE_DATASETS_ROOT)"]
479 async fn test_stock_prices_index_db_parsing() {
480 let datasets_root = env::var("CQLITE_DATASETS_ROOT").unwrap_or_else(|_| {
481 "/Users/patrick/local_projects/cqlite/test-data/datasets".to_string()
482 });
483
484 let index_path = format!(
485 "{}/sstables/test_timeseries/stock_prices-6c9fad60a25111f0a3fef1a551383fb9/nb-1-big-Index.db",
486 datasets_root
487 );
488
489 println!("\n=== Testing stock_prices Index.db ===");
490 println!("Path: {}", index_path);
491
492 // Read file directly to inspect format
493 let file_data = std::fs::read(&index_path).expect("Failed to read Index.db");
494 println!("File size: {} bytes", file_data.len());
495 println!(
496 "First 56 bytes (hex): {:02x?}",
497 &file_data[..std::cmp::min(56, file_data.len())]
498 );
499
500 // Check format detection
501 println!("\n=== Format Analysis ===");
502 println!(
503 "First 2 bytes: {:#06x} (expected 0x0010 for digest format)",
504 u16::from_be_bytes([file_data[0], file_data[1]])
505 );
506
507 // Try to parse with current implementation
508 println!("\n=== Parsing with parse_all_partition_keys_with_summary ===");
509 match parse_all_partition_keys_with_summary(&file_data, None) {
510 Ok((remaining, entries)) => {
511 println!("SUCCESS: Parsed {} entries", entries.len());
512 println!("Remaining bytes: {}", remaining.len());
513
514 for (i, entry) in entries.iter().enumerate() {
515 println!(
516 " Entry {}: offset={}, size={}, key_digest={:02x?}",
517 i,
518 entry.data_offset,
519 entry.data_size,
520 &entry.key_digest[..]
521 );
522 }
523
524 // Note: Index.db contains 3 entries (AMZN, GOOG, AAPL) but Data.db.jsonl only has 2.
525 // This may indicate incomplete test data or filtering at a higher level.
526 // For now, verify parser works correctly (finds all entries in Index.db).
527 assert!(
528 entries.len() >= 2,
529 "Expected at least 2 partition entries for stock_prices (found {})",
530 entries.len()
531 );
532 }
533 Err(e) => {
534 println!("FAILED: {:?}", e);
535 panic!("Failed to parse stock_prices Index.db: {:?}", e);
536 }
537 }
538 }
539
540 /// Test stock_prices Index.db via IndexReader (Issue #208)
541 ///
542 /// This test uses the high-level IndexReader API to open the stock_prices Index.db.
543 /// It should successfully parse at least 2 partition entries (Index.db has 3 total).
544 ///
545 /// **Note:** This test requires test data files and is ignored in minimal CI builds.
546 /// Run with: `cargo test --package cqlite-core -- --ignored`
547 #[tokio::test]
548 #[ignore = "Requires test data files (CQLITE_DATASETS_ROOT)"]
549 async fn test_stock_prices_index_reader() {
550 let datasets_root = env::var("CQLITE_DATASETS_ROOT").unwrap_or_else(|_| {
551 "/Users/patrick/local_projects/cqlite/test-data/datasets".to_string()
552 });
553
554 let index_path = std::path::PathBuf::from(format!(
555 "{}/sstables/test_timeseries/stock_prices-6c9fad60a25111f0a3fef1a551383fb9/nb-1-big-Index.db",
556 datasets_root
557 ));
558
559 println!("\n=== Testing IndexReader::open ===");
560 println!("Path: {:?}", index_path);
561
562 // Create platform
563 let config = crate::Config::default();
564 let platform = Arc::new(
565 crate::Platform::new(&config)
566 .await
567 .expect("Failed to create platform"),
568 );
569
570 // Try to open with IndexReader
571 match IndexReader::open(&index_path, platform.clone()).await {
572 Ok(reader) => {
573 let entries = reader.get_partition_entries();
574 println!(
575 "SUCCESS: IndexReader found {} partition entries",
576 entries.len()
577 );
578
579 for (i, entry) in entries.iter().enumerate() {
580 println!(
581 " Entry {}: offset={}, size={}, key_digest={:02x?}",
582 i,
583 entry.data_offset,
584 entry.data_size,
585 &entry.key_digest[..8]
586 );
587 }
588
589 let stats = reader.get_statistics();
590 println!(
591 "Statistics: total_partitions={}, file_size={}",
592 stats.total_partitions, stats.file_size
593 );
594
595 // Verify parser works correctly (Index.db has 3 entries, Data.db.jsonl has 2)
596 assert!(
597 entries.len() >= 2,
598 "Expected at least 2 partition entries for stock_prices (found {})",
599 entries.len()
600 );
601 }
602 Err(e) => {
603 println!("FAILED: {:?}", e);
604 panic!("Failed to open stock_prices Index.db: {:?}", e);
605 }
606 }
607 }
608
609 /// Test stock_prices via SSTableReader integration (Issue #208)
610 ///
611 /// This test verifies that SSTableReader correctly loads the Index.db
612 /// and can access partition entries (at least 2, Index.db has 3 total).
613 ///
614 /// **Note:** This test requires test data files and is ignored in minimal CI builds.
615 /// Run with: `cargo test --package cqlite-core -- --ignored`
616 #[tokio::test]
617 #[ignore = "Requires test data files (CQLITE_DATASETS_ROOT)"]
618 async fn test_stock_prices_sstable_reader_integration() {
619 let datasets_root = env::var("CQLITE_DATASETS_ROOT").unwrap_or_else(|_| {
620 "/Users/patrick/local_projects/cqlite/test-data/datasets".to_string()
621 });
622
623 let data_path = std::path::PathBuf::from(format!(
624 "{}/sstables/test_timeseries/stock_prices-6c9fad60a25111f0a3fef1a551383fb9/nb-1-big-Data.db",
625 datasets_root
626 ));
627
628 println!("\n=== Testing SSTableReader with stock_prices ===");
629 println!("Data.db path: {:?}", data_path);
630
631 // Create platform
632 let config = crate::Config::default();
633 let platform = Arc::new(
634 crate::Platform::new(&config)
635 .await
636 .expect("Failed to create platform"),
637 );
638
639 // Try to open with SSTableReader
640 use crate::storage::sstable::reader::SSTableReader;
641 match SSTableReader::open(&data_path, &config, platform.clone()).await {
642 Ok(reader) => {
643 println!("SUCCESS: SSTableReader opened");
644
645 // Check if index_reader was loaded (it's a public field)
646 if let Some(ref index_reader) = reader.index_reader {
647 let entries = index_reader.get_partition_entries();
648 println!("Index loaded with {} partition entries", entries.len());
649
650 for (i, entry) in entries.iter().enumerate() {
651 println!(
652 " Entry {}: offset={}, size={}",
653 i, entry.data_offset, entry.data_size
654 );
655 }
656
657 // Verify Index.db was parsed correctly (has at least 2 entries, actually has 3)
658 assert!(
659 entries.len() >= 2,
660 "Expected at least 2 partition entries for stock_prices (found {})",
661 entries.len()
662 );
663 } else {
664 println!("WARNING: Index.db was not loaded by SSTableReader");
665 panic!("SSTableReader did not load Index.db");
666 }
667 }
668 Err(e) => {
669 println!("FAILED: {:?}", e);
670 panic!("Failed to open stock_prices SSTable: {:?}", e);
671 }
672 }
673 }
674
675 /// Issue #552: Validate the BIG-format parser against REAL Cassandra 5.0 Index.db files.
676 ///
677 /// `simple_table` has a single 16-byte UUID partition key (entries start 0x0010).
678 /// `multi_partition_table` has a 38-byte composite partition key (entries start 0x0026).
679 /// Both must read back ALL entries with monotonically increasing offsets.
680 #[tokio::test]
681 #[ignore = "Requires test data files (CQLITE_DATASETS_ROOT)"]
682 async fn test_real_index_db_big_format() {
683 let datasets_root = env::var("CQLITE_DATASETS_ROOT").unwrap_or_else(|_| {
684 "/Users/patrickmcfadin/local_projects/cqlite/test-data/datasets".to_string()
685 });
686
687 // --- Composite-key table (38-byte keys, entries start 0x0026) ---
688 let multi_dir = format!(
689 "{}/sstables/test_basic/multi_partition_table-6ac52100a25111f0a3fef1a551383fb9",
690 datasets_root
691 );
692 let multi_index = format!("{}/nb-1-big-Index.db", multi_dir);
693 let bytes = std::fs::read(&multi_index).expect("read multi_partition_table Index.db");
694 assert_eq!(
695 u16::from_be_bytes([bytes[0], bytes[1]]),
696 38,
697 "Composite key length should be 38 (0x0026)"
698 );
699 let (rest, entries) = parse_all_partition_keys(&bytes).expect("parse composite Index.db");
700 assert!(rest.is_empty(), "Should consume all Index.db bytes");
701 assert!(
702 entries.len() >= 2,
703 "multi_partition_table should have multiple partitions (got {})",
704 entries.len()
705 );
706 // First key is 38 bytes; first offset must be 0.
707 assert_eq!(
708 entries[0].key_digest.len(),
709 38,
710 "First key should be 38 bytes"
711 );
712 assert_eq!(
713 entries[0].data_offset, 0,
714 "First partition offset should be 0"
715 );
716 // Offsets are strictly increasing in token order.
717 for i in 1..entries.len() {
718 assert!(
719 entries[i].data_offset > entries[i - 1].data_offset,
720 "Offsets must increase: entry {} ({}) <= entry {} ({})",
721 i,
722 entries[i].data_offset,
723 i - 1,
724 entries[i - 1].data_offset
725 );
726 }
727
728 // --- Single-UUID-key table (16-byte keys, entries start 0x0010) ---
729 let simple_index = format!(
730 "{}/sstables/test_basic/simple_table-6aa08200a25111f0a3fef1a551383fb9/nb-1-big-Index.db",
731 datasets_root
732 );
733 let bytes = std::fs::read(&simple_index).expect("read simple_table Index.db");
734 assert_eq!(
735 u16::from_be_bytes([bytes[0], bytes[1]]),
736 16,
737 "UUID key length should be 16 (0x0010)"
738 );
739 let (rest, entries) = parse_all_partition_keys(&bytes).expect("parse simple Index.db");
740 assert!(rest.is_empty(), "Should consume all Index.db bytes");
741 assert!(
742 entries.len() > 3,
743 "simple_table should have many partitions (got {})",
744 entries.len()
745 );
746 assert_eq!(
747 entries[0].key_digest.len(),
748 16,
749 "First key should be 16 bytes"
750 );
751 assert_eq!(
752 entries[0].data_offset, 0,
753 "First partition offset should be 0"
754 );
755 }
756
757 #[test]
758 fn test_simple_partition_key_parsing() {
759 // NB BIG format: key_len(2) + raw_key(key_len) + vint_offset(1-9) + vint_promoted_size(1-9)
760 // VInt encoding for 256: 0x81, 0x00 (2 bytes, 10xxxxxx format)
761 let data = vec![
762 0x00, 0x10, // key_len = 16 (e.g. a 16-byte UUID partition key)
763 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, // raw key (16 bytes)
764 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10, // key_digest cont.
765 0x81, 0x00, // VInt offset = 256
766 0x00, // VInt promoted_size = 0 (no promoted index)
767 ];
768
769 let (_, entry) = parse_simple_partition_key(&data).unwrap();
770
771 assert_eq!(
772 entry.key_digest.as_ref(),
773 &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]
774 );
775 // Raw offset from Index.db (relative to data section start)
776 // SSTableReader will add actual_header_size to get absolute file offset
777 assert_eq!(entry.data_offset, 256);
778 assert_eq!(entry.data_size, 0); // Size not stored in Index.db (Issue #149)
779 assert!(entry.promoted_index.is_none());
780 }
781
782 #[test]
783 fn test_partition_key_parsing_without_summary() {
784 // BIG format: key_len(2) + raw key(key_len) + vint_offset + vint_promoted_size
785 // VInt encoding for 4096 (0x1000): 0x90, 0x00 (2 bytes, 10xxxxxx format)
786 // byte0 = 0x80 | ((4096 >> 8) & 0x3F) = 0x80 | 0x10 = 0x90
787 // byte1 = 4096 & 0xFF = 0x00
788 let data = vec![
789 0x00, 0x10, // key_len = 16
790 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, // raw key (16 bytes)
791 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10, // raw key cont.
792 0x90, 0x00, // VInt offset = 4096
793 0x00, // VInt promoted_size = 0
794 ];
795
796 let (_, entry) = parse_simple_partition_key(&data).unwrap();
797
798 assert_eq!(
799 entry.key_digest.as_ref(),
800 &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]
801 );
802 assert_eq!(
803 entry.raw_key.as_deref(),
804 Some(&[1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16][..]),
805 "raw_key should mirror the raw partition key"
806 );
807
808 // Raw offset from Index.db (relative to data section start)
809 assert_eq!(entry.data_offset, 4096);
810 }
811
812 #[test]
813 fn test_variable_length_keys_parse_all_entries() {
814 // Issue #552: prove the parser handles non-16-byte keys (composite/int/text).
815 // Entry 1: 4-byte int key (0x0000002A), offset 100, no promoted index.
816 // Entry 2: 1-byte key (0x07), offset 500 (2-byte vint 0x81 0xF4), no promoted.
817 let data = vec![
818 // Entry 1
819 0x00, 0x04, // key_len = 4
820 0x00, 0x00, 0x00, 0x2A, // raw key (int 42)
821 0x64, // vint offset = 100
822 0x00, // vint promoted_size = 0
823 // Entry 2
824 0x00, 0x01, // key_len = 1
825 0x07, // raw key
826 0x81, 0xF4, // vint offset = 500
827 0x00, // vint promoted_size = 0
828 ];
829
830 let (rest, entries) = parse_all_partition_keys(&data).unwrap();
831 assert!(rest.is_empty(), "All bytes should be consumed");
832 assert_eq!(entries.len(), 2, "Both variable-length entries must parse");
833
834 assert_eq!(entries[0].key_digest.as_ref(), &[0x00, 0x00, 0x00, 0x2A]);
835 assert_eq!(entries[0].data_offset, 100);
836
837 assert_eq!(entries[1].key_digest.as_ref(), &[0x07]);
838 assert_eq!(entries[1].data_offset, 500);
839 }
840
841 // REMOVED: test_enhanced_partition_entry_parsing
842 // Enhanced format parsing removed per Issue #92
843
844 #[test]
845 fn test_multiple_partition_keys_parsing() {
846 // Two partition entries with VInt offsets (NB format)
847 // Format: key_len(2) + raw_key(key_len) + vint_offset + vint_promoted_size
848 // VInt encoding for 100 (0x64): 0x64 (1 byte, value < 128)
849 // VInt encoding for 500 (0x1F4): 0x81, 0xF4 (2 bytes, 10xxxxxx format)
850 // byte0 = 0x80 | ((500 >> 8) & 0x3F) = 0x80 | 1 = 0x81
851 // byte1 = 500 & 0xFF = 0xF4
852 let data = vec![
853 // Entry 1
854 0x00, 0x10, // key_len = 16 (e.g. a 16-byte UUID partition key)
855 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, // key_digest 1 (16 bytes)
856 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10, // key_digest cont.
857 0x64, // VInt offset = 100
858 0x00, // VInt promoted_size = 0
859 // Entry 2
860 0x00, 0x10, // key_len = 16 (e.g. a 16-byte UUID partition key)
861 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, // key_digest 2 (16 bytes)
862 0x19, 0x1A, 0x1B, 0x1C, 0x1D, 0x1E, 0x1F, 0x20, // key_digest cont.
863 0x81, 0xF4, // VInt offset = 500
864 0x00, // VInt promoted_size = 0
865 ];
866
867 let (_, entries) = parse_all_partition_keys(&data).unwrap();
868
869 assert_eq!(entries.len(), 2);
870
871 if !entries.is_empty() {
872 assert_eq!(
873 entries[0].key_digest.as_ref(),
874 &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]
875 );
876 }
877
878 if entries.len() >= 2 {
879 assert_eq!(
880 entries[1].key_digest.as_ref(),
881 &[
882 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1A, 0x1B, 0x1C, 0x1D,
883 0x1E, 0x1F, 0x20
884 ]
885 );
886
887 // Raw offsets from Index.db (relative to data section start)
888 assert_eq!(entries[0].data_offset, 100);
889 assert_eq!(entries[1].data_offset, 500);
890 }
891 }
892
893 // REMOVED: test_data_offset_estimation_algorithm
894 // This test validated the old heuristic estimation function which has been removed
895 // in favor of spec-accurate Summary.db correlation (Issue #92)
896
897 #[test]
898 fn test_borrow_trait_zero_allocation_lookup() {
899 // Test Issue #107 fix: Verify that lookup_partition uses Borrow trait
900 // to avoid heap allocation on every lookup
901
902 // Create index data with two partition entries (NB format with VInt offsets)
903 // Format: key_len(2) + raw_key(key_len) + vint_offset + vint_promoted_size
904 // VInt for 100: 0x64 (single byte, value < 128)
905 // VInt for 500: 0x81, 0xF4 (2 bytes)
906 let data = vec![
907 // Entry 1
908 0x00, 0x10, // key_len = 16 (e.g. a 16-byte UUID partition key)
909 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, // key_digest 1
910 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10, // key_digest cont.
911 0x64, // VInt offset = 100
912 0x00, // VInt promoted_size = 0
913 // Entry 2
914 0x00, 0x10, // key_len = 16 (e.g. a 16-byte UUID partition key)
915 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, // key_digest 2
916 0x19, 0x1A, 0x1B, 0x1C, 0x1D, 0x1E, 0x1F, 0x20, // key_digest cont.
917 0x81, 0xF4, // VInt offset = 500
918 0x00, // VInt promoted_size = 0
919 ];
920
921 let (_, index_data) = parse_index_data(&data).unwrap();
922
923 // Prepare lookup keys as slices (NOT Arc)
924 let key1: &[u8] = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16];
925 let key2: &[u8] = &[
926 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1A, 0x1B, 0x1C, 0x1D, 0x1E,
927 0x1F, 0x20,
928 ];
929 let key_not_found: &[u8] = &[0xFF; 16];
930
931 // Test lookups - these should use Borrow trait without creating Arc
932 // The key_lookup HashMap has Arc<[u8]> keys but accepts &[u8] for get()
933 let result1 = index_data.key_lookup.get(key1);
934 let result2 = index_data.key_lookup.get(key2);
935 let result3 = index_data.key_lookup.get(key_not_found);
936
937 assert!(result1.is_some(), "Should find first key");
938 assert!(result2.is_some(), "Should find second key");
939 assert!(result3.is_none(), "Should not find non-existent key");
940
941 assert_eq!(*result1.unwrap(), 0, "First key should map to index 0");
942 assert_eq!(*result2.unwrap(), 1, "Second key should map to index 1");
943
944 // Verify the actual entries match
945 assert_eq!(index_data.partition_entries[0].key_digest.as_ref(), key1);
946 assert_eq!(index_data.partition_entries[1].key_digest.as_ref(), key2);
947 }
948}