use crate::{
error::{Error, Result},
platform::Platform,
};
use nom::{
bytes::complete::take,
error::Error as NomError,
multi::count,
number::complete::{be_u32, be_u64, le_u32},
IResult,
};
use serde::{Deserialize, Serialize};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::fs::File;
use tokio::io::AsyncReadExt;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SummaryHeader {
pub min_index_interval: u32,
pub entries_count: u32,
pub summary_entries_size: u64,
pub sampling_level: u32,
pub size_at_full_sampling: u32,
}
const SUMMARY_HEADER_SIZE: usize = 24;
const MAX_REASONABLE_ENTRIES: u32 = 100_000_000;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SummaryEntry {
pub partition_key: Vec<u8>,
pub position: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SummaryData {
pub header: SummaryHeader,
pub entries: Vec<SummaryEntry>,
pub first_key: Vec<u8>,
pub last_key: Vec<u8>,
}
#[allow(dead_code)]
pub struct SummaryReader {
file_path: PathBuf,
summary_data: SummaryData,
platform: Arc<Platform>,
}
impl SummaryReader {
pub async fn open(path: &Path, platform: Arc<Platform>) -> Result<Self> {
if !platform.fs().exists(path).await? {
return Err(Error::not_found(format!(
"Summary.db file not found: {}",
path.display()
)));
}
let mut file = File::open(path).await?;
let mut buffer = Vec::new();
file.read_to_end(&mut buffer).await?;
let summary_data = parse_summary_data(&buffer)
.map_err(|e| Error::corruption(format!("Failed to parse Summary.db: {:?}", e)))?;
Ok(Self {
file_path: path.to_path_buf(),
summary_data,
platform,
})
}
pub fn get_entries(&self) -> &[SummaryEntry] {
&self.summary_data.entries
}
pub fn get_header(&self) -> &SummaryHeader {
&self.summary_data.header
}
pub fn get_first_key(&self) -> &[u8] {
&self.summary_data.first_key
}
pub fn get_last_key(&self) -> &[u8] {
&self.summary_data.last_key
}
pub fn find_entry_for_position(&self, target_position: u64) -> Option<&SummaryEntry> {
let mut left = 0;
let mut right = self.summary_data.entries.len();
let mut best_entry = None;
while left < right {
let mid = left + (right - left) / 2;
let entry = &self.summary_data.entries[mid];
if entry.position <= target_position {
best_entry = Some(entry);
left = mid + 1;
} else {
right = mid;
}
}
best_entry
}
pub fn get_entry_at(&self, index: usize) -> Option<&SummaryEntry> {
self.summary_data.entries.get(index)
}
pub fn get_statistics(&self) -> SummaryStatistics {
let header = &self.summary_data.header;
let entries = &self.summary_data.entries;
let avg_key_size = if !entries.is_empty() {
entries.iter().map(|e| e.partition_key.len()).sum::<usize>() as f64
/ entries.len() as f64
} else {
0.0
};
SummaryStatistics {
total_entries: entries.len(),
min_index_interval: header.min_index_interval,
sampling_level: header.sampling_level,
size_at_full_sampling: header.size_at_full_sampling,
average_key_size: avg_key_size,
file_size: std::fs::metadata(&self.file_path)
.map(|m| m.len())
.unwrap_or(0),
}
}
pub async fn validate_integrity(&self) -> Result<Vec<String>> {
let mut issues = Vec::new();
for i in 1..self.summary_data.entries.len() {
let prev_pos = self.summary_data.entries[i - 1].position;
let curr_pos = self.summary_data.entries[i].position;
if prev_pos > curr_pos {
issues.push(format!(
"Entries not sorted by position: entry {} has position {}, entry {} has position {}",
i - 1, prev_pos, i, curr_pos
));
}
}
if self.summary_data.entries.len() != self.summary_data.header.entries_count as usize {
issues.push(format!(
"Entry count mismatch: header says {}, but found {}",
self.summary_data.header.entries_count,
self.summary_data.entries.len()
));
}
Ok(issues)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SummaryStatistics {
pub total_entries: usize,
pub min_index_interval: u32,
pub sampling_level: u32,
pub size_at_full_sampling: u32,
pub average_key_size: f64,
pub file_size: u64,
}
fn parse_summary_data(input: &[u8]) -> Result<SummaryData> {
if input.len() < SUMMARY_HEADER_SIZE {
return Err(Error::corruption(format!(
"Summary.db too small: {} bytes, need at least {} for header",
input.len(),
SUMMARY_HEADER_SIZE
)));
}
let (remaining, header) = parse_summary_header(input)
.map_err(|e| Error::corruption(format!("Failed to parse Summary.db header: {:?}", e)))?;
if header.entries_count > MAX_REASONABLE_ENTRIES {
return Err(Error::corruption(format!(
"Summary.db entry count {} exceeds maximum {}",
header.entries_count, MAX_REASONABLE_ENTRIES
)));
}
let offset_table_size = header.entries_count as usize * 4;
if remaining.len() < offset_table_size {
return Err(Error::corruption(format!(
"Summary.db insufficient data for offset table: need {} bytes, have {}",
offset_table_size,
remaining.len()
)));
}
let (after_offsets, offsets) = count(le_u32::<_, NomError<_>>, header.entries_count as usize)(
remaining,
)
.map_err(|e: nom::Err<NomError<_>>| {
Error::corruption(format!("Failed to parse offset table: {:?}", e))
})?;
let entry_data_size = header.summary_entries_size as usize - offset_table_size;
if after_offsets.len() < entry_data_size {
return Err(Error::corruption(format!(
"Summary.db insufficient entry data: need {} bytes, have {}",
entry_data_size,
after_offsets.len()
)));
}
let entry_data = &after_offsets[..entry_data_size];
let after_entries = &after_offsets[entry_data_size..];
let entries = parse_entries_from_offsets(
entry_data,
&offsets,
offset_table_size,
header.summary_entries_size as usize,
)?;
let (after_first, first_key) = parse_serialized_key(after_entries)
.map_err(|e| Error::corruption(format!("Failed to parse first key: {:?}", e)))?;
let (_, last_key) = parse_serialized_key(after_first)
.map_err(|e| Error::corruption(format!("Failed to parse last key: {:?}", e)))?;
Ok(SummaryData {
header,
entries,
first_key,
last_key,
})
}
pub(crate) fn parse_summary_header(input: &[u8]) -> IResult<&[u8], SummaryHeader> {
let (input, min_index_interval) = be_u32(input)?;
let (input, entries_count) = be_u32(input)?;
let (input, summary_entries_size) = be_u64(input)?;
let (input, sampling_level) = be_u32(input)?;
let (input, size_at_full_sampling) = be_u32(input)?;
Ok((
input,
SummaryHeader {
min_index_interval,
entries_count,
summary_entries_size,
sampling_level,
size_at_full_sampling,
},
))
}
fn parse_entries_from_offsets(
entry_data: &[u8],
offsets: &[u32],
offset_table_size: usize,
summary_entries_size: usize,
) -> Result<Vec<SummaryEntry>> {
let offsets = normalize_entry_offsets(
offsets,
entry_data.len(),
offset_table_size,
summary_entries_size,
)?;
let mut entries = Vec::with_capacity(offsets.len());
for i in 0..offsets.len() {
let start = offsets[i];
let end = if i + 1 < offsets.len() {
offsets[i + 1]
} else {
entry_data.len()
};
if start >= end {
return Err(Error::corruption(format!(
"Invalid offset at index {}: start {} >= end {}",
i, start, end
)));
}
if end > entry_data.len() {
return Err(Error::corruption(format!(
"Offset {} points beyond entry data (size {})",
end,
entry_data.len()
)));
}
let entry_bytes = &entry_data[start..end];
if entry_bytes.len() < 8 {
return Err(Error::corruption(format!(
"Entry {} too small: {} bytes, need at least 8 for position",
i,
entry_bytes.len()
)));
}
let key_len = entry_bytes.len() - 8;
let partition_key = entry_bytes[..key_len].to_vec();
let position_bytes = &entry_bytes[key_len..];
let position = u64::from_be_bytes([
position_bytes[0],
position_bytes[1],
position_bytes[2],
position_bytes[3],
position_bytes[4],
position_bytes[5],
position_bytes[6],
position_bytes[7],
]);
entries.push(SummaryEntry {
partition_key,
position,
});
}
Ok(entries)
}
fn normalize_entry_offsets(
offsets: &[u32],
entry_data_size: usize,
offset_table_size: usize,
summary_entries_size: usize,
) -> Result<Vec<usize>> {
if offsets.is_empty() {
return Ok(Vec::new());
}
let usize_offsets: Vec<usize> = offsets.iter().map(|offset| *offset as usize).collect();
if usize_offsets[0] == 0 && usize_offsets.iter().all(|offset| *offset < entry_data_size) {
return Ok(usize_offsets);
}
if usize_offsets
.iter()
.all(|offset| *offset >= offset_table_size && *offset < summary_entries_size)
{
return Ok(usize_offsets
.into_iter()
.map(|offset| offset - offset_table_size)
.collect());
}
Err(Error::corruption(format!(
"Summary.db offsets are invalid for both relative and absolute layouts: offsets={offsets:?}, entry_data_size={entry_data_size}, offset_table_size={offset_table_size}, summary_entries_size={summary_entries_size}"
)))
}
fn parse_serialized_key(input: &[u8]) -> IResult<&[u8], Vec<u8>> {
let (input, size) = be_u32(input)?;
let (input, key_data) = take(size)(input)?;
Ok((input, key_data.to_vec()))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_summary_header_parsing() {
let data = vec![
0x00, 0x00, 0x00, 0x80, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x1c, 0x00, 0x00, 0x00, 0x80, 0x00, 0x00, 0x00, 0x01, ];
let (remaining, header) = parse_summary_header(&data).unwrap();
assert_eq!(header.min_index_interval, 128);
assert_eq!(header.entries_count, 1);
assert_eq!(header.summary_entries_size, 28);
assert_eq!(header.sampling_level, 128);
assert_eq!(header.size_at_full_sampling, 1);
assert!(remaining.is_empty());
}
#[test]
fn test_offset_table_little_endian() {
let offset_data: [u8; 8] = [
0x00, 0x00, 0x00, 0x00, 0x18, 0x00, 0x00, 0x00, ];
let (_, offsets) = count(le_u32::<_, NomError<_>>, 2usize)(&offset_data[..]).unwrap();
assert_eq!(offsets[0], 0);
assert_eq!(offsets[1], 24);
}
#[test]
fn test_entry_parsing_from_offsets() {
let key_bytes = vec![
0xdc, 0x67, 0x26, 0xa6, 0x05, 0xc6, 0x48, 0x50, 0x86, 0xcd, 0x0f, 0xe3, 0x1b, 0x67,
0x57, 0xaf,
];
let position_bytes = vec![0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00];
let mut entry_data = key_bytes.clone();
entry_data.extend_from_slice(&position_bytes);
let offsets = vec![0u32];
let entries =
parse_entries_from_offsets(&entry_data, &offsets, 4, 4 + entry_data.len()).unwrap();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].partition_key, key_bytes);
assert_eq!(entries[0].position, 0);
}
#[test]
fn test_entry_parsing_from_absolute_offsets() {
let key0 = vec![0xAA; 16];
let key1 = vec![0xBB; 16];
let mut entry_data = key0.clone();
entry_data.extend_from_slice(&0u64.to_be_bytes());
entry_data.extend_from_slice(&key1);
entry_data.extend_from_slice(&128u64.to_be_bytes());
let offsets = vec![8u32, 32u32];
let entries = parse_entries_from_offsets(&entry_data, &offsets, 8, 56).unwrap();
assert_eq!(entries.len(), 2);
assert_eq!(entries[0].partition_key, key0);
assert_eq!(entries[0].position, 0);
assert_eq!(entries[1].partition_key, key1);
assert_eq!(entries[1].position, 128);
}
#[test]
fn test_serialized_key_parsing() {
let data = vec![
0x00, 0x00, 0x00, 0x10, 0xdc, 0x67, 0x26, 0xa6, 0x05, 0xc6, 0x48, 0x50, 0x86, 0xcd, 0x0f, 0xe3, 0x1b, 0x67,
0x57, 0xaf, ];
let (remaining, key) = parse_serialized_key(&data).unwrap();
assert_eq!(key.len(), 16);
assert_eq!(
key,
vec![
0xdc, 0x67, 0x26, 0xa6, 0x05, 0xc6, 0x48, 0x50, 0x86, 0xcd, 0x0f, 0xe3, 0x1b, 0x67,
0x57, 0xaf
]
);
assert!(remaining.is_empty());
}
#[test]
fn test_complete_summary_parsing() {
let mut data = vec![
0x00, 0x00, 0x00, 0x80, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x1c, 0x00, 0x00, 0x00, 0x80, 0x00, 0x00, 0x00, 0x01, ];
data.extend_from_slice(&[0x00, 0x00, 0x00, 0x00]);
let entry_key: [u8; 16] = [
0xdc, 0x67, 0x26, 0xa6, 0x05, 0xc6, 0x48, 0x50, 0x86, 0xcd, 0x0f, 0xe3, 0x1b, 0x67,
0x57, 0xaf,
];
data.extend_from_slice(&entry_key);
data.extend_from_slice(&[0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]);
data.extend_from_slice(&[0x00, 0x00, 0x00, 0x10]); data.extend_from_slice(&entry_key);
data.extend_from_slice(&[0x00, 0x00, 0x00, 0x10]); data.extend_from_slice(&entry_key);
let summary = parse_summary_data(&data).unwrap();
assert_eq!(summary.header.min_index_interval, 128);
assert_eq!(summary.header.entries_count, 1);
assert_eq!(summary.entries.len(), 1);
assert_eq!(summary.entries[0].partition_key, entry_key.to_vec());
assert_eq!(summary.entries[0].position, 0);
assert_eq!(summary.first_key, entry_key.to_vec());
assert_eq!(summary.last_key, entry_key.to_vec());
}
#[test]
fn test_entry_position_sorted() {
let mut data = vec![
0x00, 0x00, 0x00, 0x80, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x38, 0x00, 0x00, 0x00, 0x80, 0x00, 0x00, 0x00, 0x02, ];
data.extend_from_slice(&[0x00, 0x00, 0x00, 0x00]); data.extend_from_slice(&[0x18, 0x00, 0x00, 0x00]);
let key0: [u8; 16] = [0x01; 16];
data.extend_from_slice(&key0);
data.extend_from_slice(&[0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]);
let key1: [u8; 16] = [0x02; 16];
data.extend_from_slice(&key1);
data.extend_from_slice(&[0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x64]);
data.extend_from_slice(&[0x00, 0x00, 0x00, 0x10]);
data.extend_from_slice(&key0);
data.extend_from_slice(&[0x00, 0x00, 0x00, 0x10]);
data.extend_from_slice(&key1);
let summary = parse_summary_data(&data).unwrap();
assert_eq!(summary.entries.len(), 2);
assert_eq!(summary.entries[0].position, 0);
assert_eq!(summary.entries[1].position, 100);
}
}