use crate::{
error::{Error, Result},
parser::vint::parse_vuint,
platform::Platform,
};
use super::header_spec::get_global_registry;
use nom::{bytes::complete::take, number::complete::be_u16, IResult};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::fs::File;
use tokio::io::AsyncReadExt;
use super::summary_reader::SummaryReader;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IndexHeader {
pub version: u32,
pub entry_count: u32,
pub data_size: u64,
pub checksum: u32,
}
#[derive(Debug, Clone)]
pub struct PartitionIndexEntry {
pub key_digest: Arc<[u8]>,
pub raw_key: Option<Arc<[u8]>>,
pub data_offset: u64,
pub data_size: u32,
pub promoted_index: Option<PromotedIndexData>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PromotedIndexData {
pub entry_count: u32,
pub entries: Vec<PromotedIndexEntry>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PromotedIndexEntry {
pub clustering_key: Vec<u8>,
pub partition_offset: u32,
pub section_size: u32,
}
#[derive(Debug, Clone)]
pub struct IndexData {
pub header: IndexHeader,
pub partition_entries: Vec<PartitionIndexEntry>,
pub key_lookup: HashMap<Arc<[u8]>, usize>,
}
#[allow(dead_code)]
pub struct IndexReader {
file_path: PathBuf,
index_data: IndexData,
platform: Arc<Platform>,
}
impl IndexReader {
pub async fn open(path: &Path, platform: Arc<Platform>) -> Result<Self> {
Self::open_with_summary(path, platform, None).await
}
pub async fn open_with_summary(
path: &Path,
platform: Arc<Platform>,
summary_reader: Option<&SummaryReader>,
) -> Result<Self> {
if !platform.fs().exists(path).await? {
return Err(Error::not_found(format!(
"Index.db file not found: {}",
path.display()
)));
}
let mut file = File::open(path).await?;
let mut buffer = Vec::new();
file.read_to_end(&mut buffer).await?;
if buffer.is_empty() {
return Err(Error::corruption(format!(
"Index.db file is empty: {}",
path.display()
)));
}
let index_data = match parse_index_data_with_summary(&buffer, summary_reader) {
Ok((_, data)) => data,
Err(e) => {
return Err(Error::corruption(format!(
"Failed to parse Index.db: {:?}",
e
)));
}
};
Ok(Self {
file_path: path.to_path_buf(),
index_data,
platform,
})
}
pub fn get_partition_entries(&self) -> &[PartitionIndexEntry] {
&self.index_data.partition_entries
}
pub fn lookup_partition(&self, key_digest: &[u8]) -> Option<&PartitionIndexEntry> {
self.index_data
.key_lookup
.get(key_digest)
.and_then(|&index| self.index_data.partition_entries.get(index))
}
pub fn get_statistics(&self) -> IndexStatistics {
let mut promoted_count = 0;
let mut total_promoted_entries = 0;
for entry in &self.index_data.partition_entries {
if let Some(ref promoted) = entry.promoted_index {
promoted_count += 1;
total_promoted_entries += promoted.entry_count as usize;
}
}
IndexStatistics {
total_partitions: self.index_data.partition_entries.len(),
partitions_with_promoted_index: promoted_count,
total_promoted_entries,
file_size: self.file_path.metadata().map(|m| m.len()).unwrap_or(0),
}
}
pub async fn validate_integrity(&self) -> Result<Vec<String>> {
let mut issues = Vec::new();
let mut offsets: Vec<_> = self
.index_data
.partition_entries
.iter()
.map(|e| (e.data_offset, e.data_size))
.collect();
offsets.sort_by_key(|&(offset, _)| offset);
for i in 1..offsets.len() {
let (prev_offset, prev_size) = offsets[i - 1];
let (curr_offset, _) = offsets[i];
if prev_offset + prev_size as u64 > curr_offset {
issues.push(format!(
"Overlapping partitions: offset {} + size {} overlaps with offset {}",
prev_offset, prev_size, curr_offset
));
}
}
Ok(issues)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IndexStatistics {
pub total_partitions: usize,
pub partitions_with_promoted_index: usize,
pub total_promoted_entries: usize,
pub file_size: u64,
}
fn parse_index_data_with_summary<'a>(
input: &'a [u8],
summary_reader: Option<&SummaryReader>,
) -> IResult<&'a [u8], IndexData> {
use nom::error::{Error as NomError, ErrorKind};
let registry = get_global_registry();
let (remaining, header) = match registry.parse_index_header(input) {
Ok(parsed_header) => {
log::debug!("Successfully parsed Index.db header using spec-driven approach");
let header = IndexHeader {
version: parsed_header
.fields
.get("version")
.and_then(|v| v.as_u32().ok())
.unwrap_or(1),
entry_count: parsed_header
.fields
.get("entry_count")
.and_then(|v| v.as_u32().ok())
.unwrap_or(0),
data_size: parsed_header
.fields
.get("data_size")
.and_then(|v| v.as_u64().ok())
.unwrap_or(input.len() as u64),
checksum: parsed_header
.fields
.get("checksum")
.and_then(|v| v.as_u32().ok())
.unwrap_or(0),
};
let header_size = parsed_header.header_size;
if input.len() < header_size {
return Err(nom::Err::Error(NomError::new(input, ErrorKind::Eof)));
}
(&input[header_size..], header)
}
Err(_) => {
log::debug!("Spec-driven header parsing failed, assuming headerless format");
let header = IndexHeader {
version: 1,
entry_count: 0, data_size: input.len() as u64,
checksum: 0,
};
(input, header)
}
};
let (remaining, partition_entries) =
parse_all_partition_keys_with_summary(remaining, summary_reader)?;
let mut key_lookup = HashMap::new();
for (index, entry) in partition_entries.iter().enumerate() {
key_lookup.insert(Arc::clone(&entry.key_digest), index);
}
let header = IndexHeader {
entry_count: partition_entries.len() as u32,
..header
};
Ok((
remaining,
IndexData {
header,
partition_entries,
key_lookup,
},
))
}
fn parse_all_partition_keys_with_summary<'a>(
input: &'a [u8],
_summary_reader: Option<&SummaryReader>,
) -> IResult<&'a [u8], Vec<PartitionIndexEntry>> {
let mut entries = Vec::new();
let mut remaining = input;
let mut entry_index = 0;
while !remaining.is_empty() {
match parse_big_index_entry(remaining) {
Ok((rest, entry)) => {
debug_assert!(
rest.len() < remaining.len(),
"BIG Index.db parser must make forward progress"
);
entries.push(entry);
remaining = rest;
entry_index += 1;
}
Err(_e) => {
log::debug!(
"Stopped parsing Index.db at entry {} with {} bytes remaining",
entry_index,
remaining.len()
);
break;
}
}
}
log::debug!("Parsed {} partition entries from Index.db", entries.len());
Ok((remaining, entries))
}
pub(crate) fn parse_big_index_entry(input: &[u8]) -> IResult<&[u8], PartitionIndexEntry> {
let (input, key_len) = be_u16(input)?;
let (input, key_bytes) = take(key_len)(input)?;
let (input, data_offset) = parse_vuint(input)?;
let (input, promoted_len) = parse_vuint(input)?;
let promoted_len = usize::try_from(promoted_len).unwrap_or(usize::MAX);
let (input, _promoted_data) = take(promoted_len)(input)?;
log::trace!(
"Index.db BIG entry: key_len={}, data_offset={}, promoted_len={}",
key_len,
data_offset,
promoted_len
);
let raw_key: Arc<[u8]> = Arc::from(key_bytes);
Ok((
input,
PartitionIndexEntry {
key_digest: Arc::clone(&raw_key),
raw_key: Some(raw_key),
data_offset,
data_size: 0,
promoted_index: None,
},
))
}
#[allow(dead_code)]
fn parse_index_data(input: &[u8]) -> IResult<&[u8], IndexData> {
parse_index_data_with_summary(input, None)
}
#[allow(dead_code)]
pub(crate) fn parse_all_partition_keys(input: &[u8]) -> IResult<&[u8], Vec<PartitionIndexEntry>> {
parse_all_partition_keys_with_summary(input, None)
}
#[allow(dead_code)]
fn parse_simple_partition_key(input: &[u8]) -> IResult<&[u8], PartitionIndexEntry> {
parse_big_index_entry(input)
}
#[cfg(test)]
mod tests {
use super::*;
use std::env;
#[tokio::test]
#[ignore = "Requires test data files (CQLITE_DATASETS_ROOT)"]
async fn test_stock_prices_index_db_parsing() {
let datasets_root = env::var("CQLITE_DATASETS_ROOT").unwrap_or_else(|_| {
"/Users/patrick/local_projects/cqlite/test-data/datasets".to_string()
});
let index_path = format!(
"{}/sstables/test_timeseries/stock_prices-6c9fad60a25111f0a3fef1a551383fb9/nb-1-big-Index.db",
datasets_root
);
println!("\n=== Testing stock_prices Index.db ===");
println!("Path: {}", index_path);
let file_data = std::fs::read(&index_path).expect("Failed to read Index.db");
println!("File size: {} bytes", file_data.len());
println!(
"First 56 bytes (hex): {:02x?}",
&file_data[..std::cmp::min(56, file_data.len())]
);
println!("\n=== Format Analysis ===");
println!(
"First 2 bytes: {:#06x} (expected 0x0010 for digest format)",
u16::from_be_bytes([file_data[0], file_data[1]])
);
println!("\n=== Parsing with parse_all_partition_keys_with_summary ===");
match parse_all_partition_keys_with_summary(&file_data, None) {
Ok((remaining, entries)) => {
println!("SUCCESS: Parsed {} entries", entries.len());
println!("Remaining bytes: {}", remaining.len());
for (i, entry) in entries.iter().enumerate() {
println!(
" Entry {}: offset={}, size={}, key_digest={:02x?}",
i,
entry.data_offset,
entry.data_size,
&entry.key_digest[..]
);
}
assert!(
entries.len() >= 2,
"Expected at least 2 partition entries for stock_prices (found {})",
entries.len()
);
}
Err(e) => {
println!("FAILED: {:?}", e);
panic!("Failed to parse stock_prices Index.db: {:?}", e);
}
}
}
#[tokio::test]
#[ignore = "Requires test data files (CQLITE_DATASETS_ROOT)"]
async fn test_stock_prices_index_reader() {
let datasets_root = env::var("CQLITE_DATASETS_ROOT").unwrap_or_else(|_| {
"/Users/patrick/local_projects/cqlite/test-data/datasets".to_string()
});
let index_path = std::path::PathBuf::from(format!(
"{}/sstables/test_timeseries/stock_prices-6c9fad60a25111f0a3fef1a551383fb9/nb-1-big-Index.db",
datasets_root
));
println!("\n=== Testing IndexReader::open ===");
println!("Path: {:?}", index_path);
let config = crate::Config::default();
let platform = Arc::new(
crate::Platform::new(&config)
.await
.expect("Failed to create platform"),
);
match IndexReader::open(&index_path, platform.clone()).await {
Ok(reader) => {
let entries = reader.get_partition_entries();
println!(
"SUCCESS: IndexReader found {} partition entries",
entries.len()
);
for (i, entry) in entries.iter().enumerate() {
println!(
" Entry {}: offset={}, size={}, key_digest={:02x?}",
i,
entry.data_offset,
entry.data_size,
&entry.key_digest[..8]
);
}
let stats = reader.get_statistics();
println!(
"Statistics: total_partitions={}, file_size={}",
stats.total_partitions, stats.file_size
);
assert!(
entries.len() >= 2,
"Expected at least 2 partition entries for stock_prices (found {})",
entries.len()
);
}
Err(e) => {
println!("FAILED: {:?}", e);
panic!("Failed to open stock_prices Index.db: {:?}", e);
}
}
}
#[tokio::test]
#[ignore = "Requires test data files (CQLITE_DATASETS_ROOT)"]
async fn test_stock_prices_sstable_reader_integration() {
let datasets_root = env::var("CQLITE_DATASETS_ROOT").unwrap_or_else(|_| {
"/Users/patrick/local_projects/cqlite/test-data/datasets".to_string()
});
let data_path = std::path::PathBuf::from(format!(
"{}/sstables/test_timeseries/stock_prices-6c9fad60a25111f0a3fef1a551383fb9/nb-1-big-Data.db",
datasets_root
));
println!("\n=== Testing SSTableReader with stock_prices ===");
println!("Data.db path: {:?}", data_path);
let config = crate::Config::default();
let platform = Arc::new(
crate::Platform::new(&config)
.await
.expect("Failed to create platform"),
);
use crate::storage::sstable::reader::SSTableReader;
match SSTableReader::open(&data_path, &config, platform.clone()).await {
Ok(reader) => {
println!("SUCCESS: SSTableReader opened");
if let Some(ref index_reader) = reader.index_reader {
let entries = index_reader.get_partition_entries();
println!("Index loaded with {} partition entries", entries.len());
for (i, entry) in entries.iter().enumerate() {
println!(
" Entry {}: offset={}, size={}",
i, entry.data_offset, entry.data_size
);
}
assert!(
entries.len() >= 2,
"Expected at least 2 partition entries for stock_prices (found {})",
entries.len()
);
} else {
println!("WARNING: Index.db was not loaded by SSTableReader");
panic!("SSTableReader did not load Index.db");
}
}
Err(e) => {
println!("FAILED: {:?}", e);
panic!("Failed to open stock_prices SSTable: {:?}", e);
}
}
}
#[tokio::test]
#[ignore = "Requires test data files (CQLITE_DATASETS_ROOT)"]
async fn test_real_index_db_big_format() {
let datasets_root = env::var("CQLITE_DATASETS_ROOT").unwrap_or_else(|_| {
"/Users/patrickmcfadin/local_projects/cqlite/test-data/datasets".to_string()
});
let multi_dir = format!(
"{}/sstables/test_basic/multi_partition_table-6ac52100a25111f0a3fef1a551383fb9",
datasets_root
);
let multi_index = format!("{}/nb-1-big-Index.db", multi_dir);
let bytes = std::fs::read(&multi_index).expect("read multi_partition_table Index.db");
assert_eq!(
u16::from_be_bytes([bytes[0], bytes[1]]),
38,
"Composite key length should be 38 (0x0026)"
);
let (rest, entries) = parse_all_partition_keys(&bytes).expect("parse composite Index.db");
assert!(rest.is_empty(), "Should consume all Index.db bytes");
assert!(
entries.len() >= 2,
"multi_partition_table should have multiple partitions (got {})",
entries.len()
);
assert_eq!(
entries[0].key_digest.len(),
38,
"First key should be 38 bytes"
);
assert_eq!(
entries[0].data_offset, 0,
"First partition offset should be 0"
);
for i in 1..entries.len() {
assert!(
entries[i].data_offset > entries[i - 1].data_offset,
"Offsets must increase: entry {} ({}) <= entry {} ({})",
i,
entries[i].data_offset,
i - 1,
entries[i - 1].data_offset
);
}
let simple_index = format!(
"{}/sstables/test_basic/simple_table-6aa08200a25111f0a3fef1a551383fb9/nb-1-big-Index.db",
datasets_root
);
let bytes = std::fs::read(&simple_index).expect("read simple_table Index.db");
assert_eq!(
u16::from_be_bytes([bytes[0], bytes[1]]),
16,
"UUID key length should be 16 (0x0010)"
);
let (rest, entries) = parse_all_partition_keys(&bytes).expect("parse simple Index.db");
assert!(rest.is_empty(), "Should consume all Index.db bytes");
assert!(
entries.len() > 3,
"simple_table should have many partitions (got {})",
entries.len()
);
assert_eq!(
entries[0].key_digest.len(),
16,
"First key should be 16 bytes"
);
assert_eq!(
entries[0].data_offset, 0,
"First partition offset should be 0"
);
}
#[test]
fn test_simple_partition_key_parsing() {
let data = vec![
0x00, 0x10, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10, 0x81, 0x00, 0x00, ];
let (_, entry) = parse_simple_partition_key(&data).unwrap();
assert_eq!(
entry.key_digest.as_ref(),
&[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]
);
assert_eq!(entry.data_offset, 256);
assert_eq!(entry.data_size, 0); assert!(entry.promoted_index.is_none());
}
#[test]
fn test_partition_key_parsing_without_summary() {
let data = vec![
0x00, 0x10, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10, 0x90, 0x00, 0x00, ];
let (_, entry) = parse_simple_partition_key(&data).unwrap();
assert_eq!(
entry.key_digest.as_ref(),
&[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]
);
assert_eq!(
entry.raw_key.as_deref(),
Some(&[1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16][..]),
"raw_key should mirror the raw partition key"
);
assert_eq!(entry.data_offset, 4096);
}
#[test]
fn test_variable_length_keys_parse_all_entries() {
let data = vec![
0x00, 0x04, 0x00, 0x00, 0x00, 0x2A, 0x64, 0x00, 0x00, 0x01, 0x07, 0x81, 0xF4, 0x00, ];
let (rest, entries) = parse_all_partition_keys(&data).unwrap();
assert!(rest.is_empty(), "All bytes should be consumed");
assert_eq!(entries.len(), 2, "Both variable-length entries must parse");
assert_eq!(entries[0].key_digest.as_ref(), &[0x00, 0x00, 0x00, 0x2A]);
assert_eq!(entries[0].data_offset, 100);
assert_eq!(entries[1].key_digest.as_ref(), &[0x07]);
assert_eq!(entries[1].data_offset, 500);
}
#[test]
fn test_multiple_partition_keys_parsing() {
let data = vec![
0x00, 0x10, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10, 0x64, 0x00, 0x00, 0x10, 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1A, 0x1B, 0x1C, 0x1D, 0x1E, 0x1F, 0x20, 0x81, 0xF4, 0x00, ];
let (_, entries) = parse_all_partition_keys(&data).unwrap();
assert_eq!(entries.len(), 2);
if !entries.is_empty() {
assert_eq!(
entries[0].key_digest.as_ref(),
&[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]
);
}
if entries.len() >= 2 {
assert_eq!(
entries[1].key_digest.as_ref(),
&[
0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1A, 0x1B, 0x1C, 0x1D,
0x1E, 0x1F, 0x20
]
);
assert_eq!(entries[0].data_offset, 100);
assert_eq!(entries[1].data_offset, 500);
}
}
#[test]
fn test_borrow_trait_zero_allocation_lookup() {
let data = vec![
0x00, 0x10, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10, 0x64, 0x00, 0x00, 0x10, 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1A, 0x1B, 0x1C, 0x1D, 0x1E, 0x1F, 0x20, 0x81, 0xF4, 0x00, ];
let (_, index_data) = parse_index_data(&data).unwrap();
let key1: &[u8] = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16];
let key2: &[u8] = &[
0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1A, 0x1B, 0x1C, 0x1D, 0x1E,
0x1F, 0x20,
];
let key_not_found: &[u8] = &[0xFF; 16];
let result1 = index_data.key_lookup.get(key1);
let result2 = index_data.key_lookup.get(key2);
let result3 = index_data.key_lookup.get(key_not_found);
assert!(result1.is_some(), "Should find first key");
assert!(result2.is_some(), "Should find second key");
assert!(result3.is_none(), "Should not find non-existent key");
assert_eq!(*result1.unwrap(), 0, "First key should map to index 0");
assert_eq!(*result2.unwrap(), 1, "Second key should map to index 1");
assert_eq!(index_data.partition_entries[0].key_digest.as_ref(), key1);
assert_eq!(index_data.partition_entries[1].key_digest.as_ref(), key2);
}
}