cqlite-core 0.11.0

Core engine for CQLite — read Apache Cassandra 5.0 SSTables locally without a cluster
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
//! Index.db reader implementation for Cassandra 5+ SSTable format
//!
//! This module provides comprehensive parsing of Index.db files which contain
//! partition-level index information including promoted index entries for wide partitions.
//! The index is used for efficient partition lookups and range queries.

use crate::{
    error::{Error, Result},
    parser::vint::parse_vuint,
    platform::Platform,
};

use super::header_spec::get_global_registry;
use nom::{bytes::complete::take, number::complete::be_u16, IResult};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::fs::File;
use tokio::io::AsyncReadExt;

use super::summary_reader::SummaryReader;

/// Index.db file header
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IndexHeader {
    /// Format version identifier
    pub version: u32,
    /// Number of index entries
    pub entry_count: u32,
    /// Size of the index data section
    pub data_size: u64,
    /// Checksum for validation
    pub checksum: u32,
}

/// Partition index entry in Index.db
#[derive(Debug, Clone)]
pub struct PartitionIndexEntry {
    /// Raw partition key bytes (length-prefixed in the on-disk BIG/NB Index.db format).
    ///
    /// NOTE (Issue #552): Despite the historical field name `key_digest`, this holds the
    /// RAW partition key bytes, not an MD5 digest. The real Cassandra 5.0 NB Index.db entry
    /// format is `[key_len: u16 BE][raw key bytes][data_offset: vint][promoted_len: vint]`.
    /// There is no `0x0010` marker and no MD5 digest on disk. The field name is retained to
    /// avoid churn in the zero-copy lookup table and downstream callers; it is used directly
    /// as the partition key (e.g. for `RowKey`). The leading u16 is the key length
    /// (e.g. 0x0010 for a 16-byte UUID, 0x0026 for a 38-byte composite key).
    pub key_digest: Arc<[u8]>,
    /// Raw partition key bytes (mirror of `key_digest`, kept for API compatibility).
    /// Always `Some` now that all entries carry their raw key.
    pub raw_key: Option<Arc<[u8]>>,
    /// Offset in Data.db file
    pub data_offset: u64,
    /// Size of partition data
    pub data_size: u32,
    /// Promoted index entries for wide partitions (optional)
    pub promoted_index: Option<PromotedIndexData>,
}

/// Promoted index for wide partitions
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PromotedIndexData {
    /// Number of promoted index entries
    pub entry_count: u32,
    /// Individual promoted index entries
    pub entries: Vec<PromotedIndexEntry>,
}

/// Individual promoted index entry
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PromotedIndexEntry {
    /// Clustering key prefix
    pub clustering_key: Vec<u8>,
    /// Offset within the partition
    pub partition_offset: u32,
    /// Size of the indexed section
    pub section_size: u32,
}

/// Complete Index.db data structure
#[derive(Debug, Clone)]
pub struct IndexData {
    /// File header
    pub header: IndexHeader,
    /// All partition index entries
    pub partition_entries: Vec<PartitionIndexEntry>,
    /// Lookup table for efficient partition access - uses Arc<[u8]> as key type
    ///
    /// ## Zero-Copy Design (Issue #107, Problem 1)
    ///
    /// - Keys are `Arc<[u8]>` to enable reference counting without cloning digest bytes
    /// - Lookups use `&[u8]` directly via Borrow trait (zero heap allocations)
    /// - `Arc<[u8]>` implements `Borrow<[u8]>` enabling HashMap::get(&[u8]) without temporary Arc creation
    pub key_lookup: HashMap<Arc<[u8]>, usize>,
}

/// High-level Index.db file reader
#[allow(dead_code)]
pub struct IndexReader {
    /// Path to the Index.db file
    file_path: PathBuf,
    /// Parsed index data
    index_data: IndexData,
    /// Platform abstraction for file operations
    platform: Arc<Platform>,
}

impl IndexReader {
    /// Open and parse an Index.db file
    pub async fn open(path: &Path, platform: Arc<Platform>) -> Result<Self> {
        Self::open_with_summary(path, platform, None).await
    }

    /// Open and parse an Index.db file with Summary.db correlation for proper offset mapping
    pub async fn open_with_summary(
        path: &Path,
        platform: Arc<Platform>,
        summary_reader: Option<&SummaryReader>,
    ) -> Result<Self> {
        if !platform.fs().exists(path).await? {
            return Err(Error::not_found(format!(
                "Index.db file not found: {}",
                path.display()
            )));
        }

        // Read the entire file
        let mut file = File::open(path).await?;
        let mut buffer = Vec::new();
        file.read_to_end(&mut buffer).await?;

        // Check for empty file
        if buffer.is_empty() {
            return Err(Error::corruption(format!(
                "Index.db file is empty: {}",
                path.display()
            )));
        }

        // Parse the index data with optional Summary.db correlation
        let index_data = match parse_index_data_with_summary(&buffer, summary_reader) {
            Ok((_, data)) => data,
            Err(e) => {
                return Err(Error::corruption(format!(
                    "Failed to parse Index.db: {:?}",
                    e
                )));
            }
        };

        Ok(Self {
            file_path: path.to_path_buf(),
            index_data,
            platform,
        })
    }

    /// Get all partition entries
    pub fn get_partition_entries(&self) -> &[PartitionIndexEntry] {
        &self.index_data.partition_entries
    }

    /// Look up a partition by key digest
    ///
    /// ## Zero-Allocation Optimization (Issue #107)
    ///
    /// This method performs HashMap lookup without heap allocation by leveraging
    /// the `Borrow` trait. Since `Arc<[u8]>` implements `Borrow<[u8]>`, we can
    /// lookup using `&[u8]` directly without creating a temporary Arc.
    ///
    /// **Before:** `let key_arc: Arc<[u8]> = key_digest.into();` (heap allocation per query)
    /// **After:** Direct `get(key_digest)` using Borrow trait (zero allocations)
    pub fn lookup_partition(&self, key_digest: &[u8]) -> Option<&PartitionIndexEntry> {
        self.index_data
            .key_lookup
            .get(key_digest)
            .and_then(|&index| self.index_data.partition_entries.get(index))
    }

    /// Get statistics about the index
    pub fn get_statistics(&self) -> IndexStatistics {
        let mut promoted_count = 0;
        let mut total_promoted_entries = 0;

        for entry in &self.index_data.partition_entries {
            if let Some(ref promoted) = entry.promoted_index {
                promoted_count += 1;
                total_promoted_entries += promoted.entry_count as usize;
            }
        }

        IndexStatistics {
            total_partitions: self.index_data.partition_entries.len(),
            partitions_with_promoted_index: promoted_count,
            total_promoted_entries,
            file_size: self.file_path.metadata().map(|m| m.len()).unwrap_or(0),
        }
    }

    /// Validate index integrity against Data.db offsets
    pub async fn validate_integrity(&self) -> Result<Vec<String>> {
        let mut issues = Vec::new();

        // Check for overlapping offsets
        let mut offsets: Vec<_> = self
            .index_data
            .partition_entries
            .iter()
            .map(|e| (e.data_offset, e.data_size))
            .collect();

        offsets.sort_by_key(|&(offset, _)| offset);

        for i in 1..offsets.len() {
            let (prev_offset, prev_size) = offsets[i - 1];
            let (curr_offset, _) = offsets[i];

            if prev_offset + prev_size as u64 > curr_offset {
                issues.push(format!(
                    "Overlapping partitions: offset {} + size {} overlaps with offset {}",
                    prev_offset, prev_size, curr_offset
                ));
            }
        }

        Ok(issues)
    }
}

/// Index statistics for analysis and validation
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IndexStatistics {
    /// Total number of partitions
    pub total_partitions: usize,
    /// Number of partitions with promoted index
    pub partitions_with_promoted_index: usize,
    /// Total number of promoted index entries
    pub total_promoted_entries: usize,
    /// File size in bytes
    pub file_size: u64,
}

/// Parse Index.db file data with optional Summary.db correlation using spec-driven approach
fn parse_index_data_with_summary<'a>(
    input: &'a [u8],
    summary_reader: Option<&SummaryReader>,
) -> IResult<&'a [u8], IndexData> {
    use nom::error::{Error as NomError, ErrorKind};

    // First try spec-driven header parsing
    let registry = get_global_registry();
    let (remaining, header) = match registry.parse_index_header(input) {
        Ok(parsed_header) => {
            log::debug!("Successfully parsed Index.db header using spec-driven approach");

            // Convert ParsedHeader to IndexHeader
            let header = IndexHeader {
                version: parsed_header
                    .fields
                    .get("version")
                    .and_then(|v| v.as_u32().ok())
                    .unwrap_or(1),
                entry_count: parsed_header
                    .fields
                    .get("entry_count")
                    .and_then(|v| v.as_u32().ok())
                    .unwrap_or(0),
                data_size: parsed_header
                    .fields
                    .get("data_size")
                    .and_then(|v| v.as_u64().ok())
                    .unwrap_or(input.len() as u64),
                checksum: parsed_header
                    .fields
                    .get("checksum")
                    .and_then(|v| v.as_u32().ok())
                    .unwrap_or(0),
            };

            // Skip header bytes for data parsing
            let header_size = parsed_header.header_size;
            if input.len() < header_size {
                return Err(nom::Err::Error(NomError::new(input, ErrorKind::Eof)));
            }
            (&input[header_size..], header)
        }
        Err(_) => {
            log::debug!("Spec-driven header parsing failed, assuming headerless format");

            // Parse all partition key digests - no header in some formats
            let header = IndexHeader {
                version: 1,
                entry_count: 0, // Will be updated after parsing entries
                data_size: input.len() as u64,
                checksum: 0,
            };
            (input, header)
        }
    };

    // Parse partition entries from remaining data
    let (remaining, partition_entries) =
        parse_all_partition_keys_with_summary(remaining, summary_reader)?;

    // Build lookup table with zero-copy approach using Arc::clone (reference counting only)
    // This eliminates the memory explosion from cloning Vec<u8> key digests
    let mut key_lookup = HashMap::new();
    for (index, entry) in partition_entries.iter().enumerate() {
        key_lookup.insert(Arc::clone(&entry.key_digest), index);
    }

    // Update header with actual entry count
    let header = IndexHeader {
        entry_count: partition_entries.len() as u32,
        ..header
    };

    Ok((
        remaining,
        IndexData {
            header,
            partition_entries,
            key_lookup,
        },
    ))
}

/// Parse all partition entries from the Index.db file.
///
/// ## Authoritative format (Issue #552, Cassandra 5.0 NB / BIG Index.db)
///
/// Index.db is ALWAYS the BIG-format partition index. Each entry is:
///
/// ```text
/// [key_len: u16 BE]                    ← length of the raw partition key
/// [raw partition key bytes: key_len]   ← the partition key exactly as in Data.db
/// [data_offset: unsigned vint]         ← byte offset into the Data.db data section
/// [promoted_index_len: unsigned vint]  ← byte length of the promoted index (0 = none)
/// [promoted_index_data: promoted_index_len bytes]
/// ```
///
/// The leading u16 is the partition key LENGTH, not a `0x0010` marker, and there is no
/// MD5 digest on disk (verified against real Cassandra Index.db files: single-UUID keys
/// start `0x0010`, the composite-key `multi_partition_table` starts `0x0026` = 38 bytes).
///
/// There is no separate "BTI" Index.db format: a BTI-indexed SSTable uses Partitions.db /
/// Rows.db trie structures and does not produce an Index.db at all (see guide Ch.17). So the
/// previous `detect_index_format` heuristic was entirely spurious (Issue #28 mandate) and has
/// been removed in favour of this single, spec-accurate parser that works for ANY key length.
///
/// The `summary_reader` argument is retained for API compatibility; offsets are now stored
/// inline so Summary.db correlation is no longer needed for parsing.
fn parse_all_partition_keys_with_summary<'a>(
    input: &'a [u8],
    _summary_reader: Option<&SummaryReader>,
) -> IResult<&'a [u8], Vec<PartitionIndexEntry>> {
    let mut entries = Vec::new();
    let mut remaining = input;

    let mut entry_index = 0;
    while !remaining.is_empty() {
        match parse_big_index_entry(remaining) {
            Ok((rest, entry)) => {
                debug_assert!(
                    rest.len() < remaining.len(),
                    "BIG Index.db parser must make forward progress"
                );
                entries.push(entry);
                remaining = rest;
                entry_index += 1;
            }
            Err(_e) => {
                log::debug!(
                    "Stopped parsing Index.db at entry {} with {} bytes remaining",
                    entry_index,
                    remaining.len()
                );
                break;
            }
        }
    }

    log::debug!("Parsed {} partition entries from Index.db", entries.len());
    Ok((remaining, entries))
}

/// Parse a single BIG-format Index.db entry.
///
/// Layout: `[key_len: u16 BE][raw key][data_offset: vint][promoted_len: vint][promoted...]`.
/// Works for any key length (int, text, UUID, composite). The raw partition key is stored
/// directly in `key_digest` / `raw_key` (no MD5, no marker).
pub(crate) fn parse_big_index_entry(input: &[u8]) -> IResult<&[u8], PartitionIndexEntry> {
    // Read partition key length (u16 big-endian).
    let (input, key_len) = be_u16(input)?;

    // Read the raw partition key bytes.
    let (input, key_bytes) = take(key_len)(input)?;

    // Read unsigned VInt data offset (relative to the Data.db data section start;
    // SSTableReader adds the header size when seeking).
    let (input, data_offset) = parse_vuint(input)?;

    // Read promoted-index length (unsigned VInt) and skip the promoted data.
    // Partition-level lookups work without decoding the promoted index.
    let (input, promoted_len) = parse_vuint(input)?;
    // Saturating cast: on a 32-bit target `promoted_len as usize` could truncate and
    // misalign subsequent entries. `usize::MAX` makes `take` return an Eof error on a
    // short buffer instead, which is the safe failure mode for a corrupt Index.db.
    let promoted_len = usize::try_from(promoted_len).unwrap_or(usize::MAX);
    let (input, _promoted_data) = take(promoted_len)(input)?;

    log::trace!(
        "Index.db BIG entry: key_len={}, data_offset={}, promoted_len={}",
        key_len,
        data_offset,
        promoted_len
    );

    let raw_key: Arc<[u8]> = Arc::from(key_bytes);

    Ok((
        input,
        PartitionIndexEntry {
            key_digest: Arc::clone(&raw_key),
            raw_key: Some(raw_key),
            // Size is not stored in Index.db; determined during the Data.db read.
            data_offset,
            data_size: 0,
            promoted_index: None,
        },
    ))
}

// REMOVED: Old heuristic functions that violated Issue #28 no-heuristics mandate
// - calculate_data_offset_from_summary: Summary.db correlation (now obsolete with inline offsets)
// - interpolate_data_offset_from_summary_position: Used arbitrary estimates
// - estimate_data_offset_from_index_position: Used hardcoded partition size guesses
//
// Modern Cassandra 5+ Index.db format includes unsigned VInt offsets inline,
// eliminating the need for Summary.db correlation. See parse_vuint() in parser/vint.rs.

/// Parse Index.db file data - Legacy API for backward compatibility
#[allow(dead_code)]
fn parse_index_data(input: &[u8]) -> IResult<&[u8], IndexData> {
    parse_index_data_with_summary(input, None)
}

/// Parse all partition key digests from the Index.db file - Legacy API
#[allow(dead_code)]
pub(crate) fn parse_all_partition_keys(input: &[u8]) -> IResult<&[u8], Vec<PartitionIndexEntry>> {
    parse_all_partition_keys_with_summary(input, None)
}

/// Parse a single BIG-format Index.db partition entry - Legacy API
#[allow(dead_code)]
fn parse_simple_partition_key(input: &[u8]) -> IResult<&[u8], PartitionIndexEntry> {
    parse_big_index_entry(input)
}

// Note: Promoted index parsing removed as it's not present in the simple Index.db format
// Real Cassandra 5 Index.db files only contain partition key digests

#[cfg(test)]
mod tests {
    use super::*;
    use std::env;

    /// Test stock_prices Index.db parsing (Issue #208)
    ///
    /// This test directly parses the stock_prices Index.db file which contains 3 partition entries (AMZN, GOOG, AAPL).
    /// Note: Data.db.jsonl only has 2 entries, suggesting incomplete test data or filtering at a higher level.
    /// The file uses a BTI format with actual partition keys (not MD5 digests).
    ///
    /// **Note:** This test requires test data files and is ignored in minimal CI builds.
    /// Run with: `cargo test --package cqlite-core -- --ignored`
    #[tokio::test]
    #[ignore = "Requires test data files (CQLITE_DATASETS_ROOT)"]
    async fn test_stock_prices_index_db_parsing() {
        let datasets_root = env::var("CQLITE_DATASETS_ROOT").unwrap_or_else(|_| {
            "/Users/patrick/local_projects/cqlite/test-data/datasets".to_string()
        });

        let index_path = format!(
            "{}/sstables/test_timeseries/stock_prices-6c9fad60a25111f0a3fef1a551383fb9/nb-1-big-Index.db",
            datasets_root
        );

        println!("\n=== Testing stock_prices Index.db ===");
        println!("Path: {}", index_path);

        // Read file directly to inspect format
        let file_data = std::fs::read(&index_path).expect("Failed to read Index.db");
        println!("File size: {} bytes", file_data.len());
        println!(
            "First 56 bytes (hex): {:02x?}",
            &file_data[..std::cmp::min(56, file_data.len())]
        );

        // Check format detection
        println!("\n=== Format Analysis ===");
        println!(
            "First 2 bytes: {:#06x} (expected 0x0010 for digest format)",
            u16::from_be_bytes([file_data[0], file_data[1]])
        );

        // Try to parse with current implementation
        println!("\n=== Parsing with parse_all_partition_keys_with_summary ===");
        match parse_all_partition_keys_with_summary(&file_data, None) {
            Ok((remaining, entries)) => {
                println!("SUCCESS: Parsed {} entries", entries.len());
                println!("Remaining bytes: {}", remaining.len());

                for (i, entry) in entries.iter().enumerate() {
                    println!(
                        "  Entry {}: offset={}, size={}, key_digest={:02x?}",
                        i,
                        entry.data_offset,
                        entry.data_size,
                        &entry.key_digest[..]
                    );
                }

                // Note: Index.db contains 3 entries (AMZN, GOOG, AAPL) but Data.db.jsonl only has 2.
                // This may indicate incomplete test data or filtering at a higher level.
                // For now, verify parser works correctly (finds all entries in Index.db).
                assert!(
                    entries.len() >= 2,
                    "Expected at least 2 partition entries for stock_prices (found {})",
                    entries.len()
                );
            }
            Err(e) => {
                println!("FAILED: {:?}", e);
                panic!("Failed to parse stock_prices Index.db: {:?}", e);
            }
        }
    }

    /// Test stock_prices Index.db via IndexReader (Issue #208)
    ///
    /// This test uses the high-level IndexReader API to open the stock_prices Index.db.
    /// It should successfully parse at least 2 partition entries (Index.db has 3 total).
    ///
    /// **Note:** This test requires test data files and is ignored in minimal CI builds.
    /// Run with: `cargo test --package cqlite-core -- --ignored`
    #[tokio::test]
    #[ignore = "Requires test data files (CQLITE_DATASETS_ROOT)"]
    async fn test_stock_prices_index_reader() {
        let datasets_root = env::var("CQLITE_DATASETS_ROOT").unwrap_or_else(|_| {
            "/Users/patrick/local_projects/cqlite/test-data/datasets".to_string()
        });

        let index_path = std::path::PathBuf::from(format!(
            "{}/sstables/test_timeseries/stock_prices-6c9fad60a25111f0a3fef1a551383fb9/nb-1-big-Index.db",
            datasets_root
        ));

        println!("\n=== Testing IndexReader::open ===");
        println!("Path: {:?}", index_path);

        // Create platform
        let config = crate::Config::default();
        let platform = Arc::new(
            crate::Platform::new(&config)
                .await
                .expect("Failed to create platform"),
        );

        // Try to open with IndexReader
        match IndexReader::open(&index_path, platform.clone()).await {
            Ok(reader) => {
                let entries = reader.get_partition_entries();
                println!(
                    "SUCCESS: IndexReader found {} partition entries",
                    entries.len()
                );

                for (i, entry) in entries.iter().enumerate() {
                    println!(
                        "  Entry {}: offset={}, size={}, key_digest={:02x?}",
                        i,
                        entry.data_offset,
                        entry.data_size,
                        &entry.key_digest[..8]
                    );
                }

                let stats = reader.get_statistics();
                println!(
                    "Statistics: total_partitions={}, file_size={}",
                    stats.total_partitions, stats.file_size
                );

                // Verify parser works correctly (Index.db has 3 entries, Data.db.jsonl has 2)
                assert!(
                    entries.len() >= 2,
                    "Expected at least 2 partition entries for stock_prices (found {})",
                    entries.len()
                );
            }
            Err(e) => {
                println!("FAILED: {:?}", e);
                panic!("Failed to open stock_prices Index.db: {:?}", e);
            }
        }
    }

    /// Test stock_prices via SSTableReader integration (Issue #208)
    ///
    /// This test verifies that SSTableReader correctly loads the Index.db
    /// and can access partition entries (at least 2, Index.db has 3 total).
    ///
    /// **Note:** This test requires test data files and is ignored in minimal CI builds.
    /// Run with: `cargo test --package cqlite-core -- --ignored`
    #[tokio::test]
    #[ignore = "Requires test data files (CQLITE_DATASETS_ROOT)"]
    async fn test_stock_prices_sstable_reader_integration() {
        let datasets_root = env::var("CQLITE_DATASETS_ROOT").unwrap_or_else(|_| {
            "/Users/patrick/local_projects/cqlite/test-data/datasets".to_string()
        });

        let data_path = std::path::PathBuf::from(format!(
            "{}/sstables/test_timeseries/stock_prices-6c9fad60a25111f0a3fef1a551383fb9/nb-1-big-Data.db",
            datasets_root
        ));

        println!("\n=== Testing SSTableReader with stock_prices ===");
        println!("Data.db path: {:?}", data_path);

        // Create platform
        let config = crate::Config::default();
        let platform = Arc::new(
            crate::Platform::new(&config)
                .await
                .expect("Failed to create platform"),
        );

        // Try to open with SSTableReader
        use crate::storage::sstable::reader::SSTableReader;
        match SSTableReader::open(&data_path, &config, platform.clone()).await {
            Ok(reader) => {
                println!("SUCCESS: SSTableReader opened");

                // Check if index_reader was loaded (it's a public field)
                if let Some(ref index_reader) = reader.index_reader {
                    let entries = index_reader.get_partition_entries();
                    println!("Index loaded with {} partition entries", entries.len());

                    for (i, entry) in entries.iter().enumerate() {
                        println!(
                            "  Entry {}: offset={}, size={}",
                            i, entry.data_offset, entry.data_size
                        );
                    }

                    // Verify Index.db was parsed correctly (has at least 2 entries, actually has 3)
                    assert!(
                        entries.len() >= 2,
                        "Expected at least 2 partition entries for stock_prices (found {})",
                        entries.len()
                    );
                } else {
                    println!("WARNING: Index.db was not loaded by SSTableReader");
                    panic!("SSTableReader did not load Index.db");
                }
            }
            Err(e) => {
                println!("FAILED: {:?}", e);
                panic!("Failed to open stock_prices SSTable: {:?}", e);
            }
        }
    }

    /// Issue #552: Validate the BIG-format parser against REAL Cassandra 5.0 Index.db files.
    ///
    /// `simple_table` has a single 16-byte UUID partition key (entries start 0x0010).
    /// `multi_partition_table` has a 38-byte composite partition key (entries start 0x0026).
    /// Both must read back ALL entries with monotonically increasing offsets.
    #[tokio::test]
    #[ignore = "Requires test data files (CQLITE_DATASETS_ROOT)"]
    async fn test_real_index_db_big_format() {
        let datasets_root = env::var("CQLITE_DATASETS_ROOT").unwrap_or_else(|_| {
            "/Users/patrickmcfadin/local_projects/cqlite/test-data/datasets".to_string()
        });

        // --- Composite-key table (38-byte keys, entries start 0x0026) ---
        let multi_dir = format!(
            "{}/sstables/test_basic/multi_partition_table-6ac52100a25111f0a3fef1a551383fb9",
            datasets_root
        );
        let multi_index = format!("{}/nb-1-big-Index.db", multi_dir);
        let bytes = std::fs::read(&multi_index).expect("read multi_partition_table Index.db");
        assert_eq!(
            u16::from_be_bytes([bytes[0], bytes[1]]),
            38,
            "Composite key length should be 38 (0x0026)"
        );
        let (rest, entries) = parse_all_partition_keys(&bytes).expect("parse composite Index.db");
        assert!(rest.is_empty(), "Should consume all Index.db bytes");
        assert!(
            entries.len() >= 2,
            "multi_partition_table should have multiple partitions (got {})",
            entries.len()
        );
        // First key is 38 bytes; first offset must be 0.
        assert_eq!(
            entries[0].key_digest.len(),
            38,
            "First key should be 38 bytes"
        );
        assert_eq!(
            entries[0].data_offset, 0,
            "First partition offset should be 0"
        );
        // Offsets are strictly increasing in token order.
        for i in 1..entries.len() {
            assert!(
                entries[i].data_offset > entries[i - 1].data_offset,
                "Offsets must increase: entry {} ({}) <= entry {} ({})",
                i,
                entries[i].data_offset,
                i - 1,
                entries[i - 1].data_offset
            );
        }

        // --- Single-UUID-key table (16-byte keys, entries start 0x0010) ---
        let simple_index = format!(
            "{}/sstables/test_basic/simple_table-6aa08200a25111f0a3fef1a551383fb9/nb-1-big-Index.db",
            datasets_root
        );
        let bytes = std::fs::read(&simple_index).expect("read simple_table Index.db");
        assert_eq!(
            u16::from_be_bytes([bytes[0], bytes[1]]),
            16,
            "UUID key length should be 16 (0x0010)"
        );
        let (rest, entries) = parse_all_partition_keys(&bytes).expect("parse simple Index.db");
        assert!(rest.is_empty(), "Should consume all Index.db bytes");
        assert!(
            entries.len() > 3,
            "simple_table should have many partitions (got {})",
            entries.len()
        );
        assert_eq!(
            entries[0].key_digest.len(),
            16,
            "First key should be 16 bytes"
        );
        assert_eq!(
            entries[0].data_offset, 0,
            "First partition offset should be 0"
        );
    }

    #[test]
    fn test_simple_partition_key_parsing() {
        // NB BIG format: key_len(2) + raw_key(key_len) + vint_offset(1-9) + vint_promoted_size(1-9)
        // VInt encoding for 256: 0x81, 0x00 (2 bytes, 10xxxxxx format)
        let data = vec![
            0x00, 0x10, // key_len = 16 (e.g. a 16-byte UUID partition key)
            0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, // raw key (16 bytes)
            0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10, // key_digest cont.
            0x81, 0x00, // VInt offset = 256
            0x00, // VInt promoted_size = 0 (no promoted index)
        ];

        let (_, entry) = parse_simple_partition_key(&data).unwrap();

        assert_eq!(
            entry.key_digest.as_ref(),
            &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]
        );
        // Raw offset from Index.db (relative to data section start)
        // SSTableReader will add actual_header_size to get absolute file offset
        assert_eq!(entry.data_offset, 256);
        assert_eq!(entry.data_size, 0); // Size not stored in Index.db (Issue #149)
        assert!(entry.promoted_index.is_none());
    }

    #[test]
    fn test_partition_key_parsing_without_summary() {
        // BIG format: key_len(2) + raw key(key_len) + vint_offset + vint_promoted_size
        // VInt encoding for 4096 (0x1000): 0x90, 0x00 (2 bytes, 10xxxxxx format)
        // byte0 = 0x80 | ((4096 >> 8) & 0x3F) = 0x80 | 0x10 = 0x90
        // byte1 = 4096 & 0xFF = 0x00
        let data = vec![
            0x00, 0x10, // key_len = 16
            0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, // raw key (16 bytes)
            0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10, // raw key cont.
            0x90, 0x00, // VInt offset = 4096
            0x00, // VInt promoted_size = 0
        ];

        let (_, entry) = parse_simple_partition_key(&data).unwrap();

        assert_eq!(
            entry.key_digest.as_ref(),
            &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]
        );
        assert_eq!(
            entry.raw_key.as_deref(),
            Some(&[1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16][..]),
            "raw_key should mirror the raw partition key"
        );

        // Raw offset from Index.db (relative to data section start)
        assert_eq!(entry.data_offset, 4096);
    }

    #[test]
    fn test_variable_length_keys_parse_all_entries() {
        // Issue #552: prove the parser handles non-16-byte keys (composite/int/text).
        // Entry 1: 4-byte int key (0x0000002A), offset 100, no promoted index.
        // Entry 2: 1-byte key (0x07), offset 500 (2-byte vint 0x81 0xF4), no promoted.
        let data = vec![
            // Entry 1
            0x00, 0x04, // key_len = 4
            0x00, 0x00, 0x00, 0x2A, // raw key (int 42)
            0x64, // vint offset = 100
            0x00, // vint promoted_size = 0
            // Entry 2
            0x00, 0x01, // key_len = 1
            0x07, // raw key
            0x81, 0xF4, // vint offset = 500
            0x00, // vint promoted_size = 0
        ];

        let (rest, entries) = parse_all_partition_keys(&data).unwrap();
        assert!(rest.is_empty(), "All bytes should be consumed");
        assert_eq!(entries.len(), 2, "Both variable-length entries must parse");

        assert_eq!(entries[0].key_digest.as_ref(), &[0x00, 0x00, 0x00, 0x2A]);
        assert_eq!(entries[0].data_offset, 100);

        assert_eq!(entries[1].key_digest.as_ref(), &[0x07]);
        assert_eq!(entries[1].data_offset, 500);
    }

    // REMOVED: test_enhanced_partition_entry_parsing
    // Enhanced format parsing removed per Issue #92

    #[test]
    fn test_multiple_partition_keys_parsing() {
        // Two partition entries with VInt offsets (NB format)
        // Format: key_len(2) + raw_key(key_len) + vint_offset + vint_promoted_size
        // VInt encoding for 100 (0x64): 0x64 (1 byte, value < 128)
        // VInt encoding for 500 (0x1F4): 0x81, 0xF4 (2 bytes, 10xxxxxx format)
        //   byte0 = 0x80 | ((500 >> 8) & 0x3F) = 0x80 | 1 = 0x81
        //   byte1 = 500 & 0xFF = 0xF4
        let data = vec![
            // Entry 1
            0x00, 0x10, // key_len = 16 (e.g. a 16-byte UUID partition key)
            0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, // key_digest 1 (16 bytes)
            0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10, // key_digest cont.
            0x64, // VInt offset = 100
            0x00, // VInt promoted_size = 0
            // Entry 2
            0x00, 0x10, // key_len = 16 (e.g. a 16-byte UUID partition key)
            0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, // key_digest 2 (16 bytes)
            0x19, 0x1A, 0x1B, 0x1C, 0x1D, 0x1E, 0x1F, 0x20, // key_digest cont.
            0x81, 0xF4, // VInt offset = 500
            0x00, // VInt promoted_size = 0
        ];

        let (_, entries) = parse_all_partition_keys(&data).unwrap();

        assert_eq!(entries.len(), 2);

        if !entries.is_empty() {
            assert_eq!(
                entries[0].key_digest.as_ref(),
                &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]
            );
        }

        if entries.len() >= 2 {
            assert_eq!(
                entries[1].key_digest.as_ref(),
                &[
                    0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1A, 0x1B, 0x1C, 0x1D,
                    0x1E, 0x1F, 0x20
                ]
            );

            // Raw offsets from Index.db (relative to data section start)
            assert_eq!(entries[0].data_offset, 100);
            assert_eq!(entries[1].data_offset, 500);
        }
    }

    // REMOVED: test_data_offset_estimation_algorithm
    // This test validated the old heuristic estimation function which has been removed
    // in favor of spec-accurate Summary.db correlation (Issue #92)

    #[test]
    fn test_borrow_trait_zero_allocation_lookup() {
        // Test Issue #107 fix: Verify that lookup_partition uses Borrow trait
        // to avoid heap allocation on every lookup

        // Create index data with two partition entries (NB format with VInt offsets)
        // Format: key_len(2) + raw_key(key_len) + vint_offset + vint_promoted_size
        // VInt for 100: 0x64 (single byte, value < 128)
        // VInt for 500: 0x81, 0xF4 (2 bytes)
        let data = vec![
            // Entry 1
            0x00, 0x10, // key_len = 16 (e.g. a 16-byte UUID partition key)
            0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, // key_digest 1
            0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10, // key_digest cont.
            0x64, // VInt offset = 100
            0x00, // VInt promoted_size = 0
            // Entry 2
            0x00, 0x10, // key_len = 16 (e.g. a 16-byte UUID partition key)
            0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, // key_digest 2
            0x19, 0x1A, 0x1B, 0x1C, 0x1D, 0x1E, 0x1F, 0x20, // key_digest cont.
            0x81, 0xF4, // VInt offset = 500
            0x00, // VInt promoted_size = 0
        ];

        let (_, index_data) = parse_index_data(&data).unwrap();

        // Prepare lookup keys as slices (NOT Arc)
        let key1: &[u8] = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16];
        let key2: &[u8] = &[
            0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1A, 0x1B, 0x1C, 0x1D, 0x1E,
            0x1F, 0x20,
        ];
        let key_not_found: &[u8] = &[0xFF; 16];

        // Test lookups - these should use Borrow trait without creating Arc
        // The key_lookup HashMap has Arc<[u8]> keys but accepts &[u8] for get()
        let result1 = index_data.key_lookup.get(key1);
        let result2 = index_data.key_lookup.get(key2);
        let result3 = index_data.key_lookup.get(key_not_found);

        assert!(result1.is_some(), "Should find first key");
        assert!(result2.is_some(), "Should find second key");
        assert!(result3.is_none(), "Should not find non-existent key");

        assert_eq!(*result1.unwrap(), 0, "First key should map to index 0");
        assert_eq!(*result2.unwrap(), 1, "Second key should map to index 1");

        // Verify the actual entries match
        assert_eq!(index_data.partition_entries[0].key_digest.as_ref(), key1);
        assert_eq!(index_data.partition_entries[1].key_digest.as_ref(), key2);
    }
}