1use crate::{
42 error::{Error, Result},
43 platform::Platform,
44};
45
46use nom::{
47 bytes::complete::take,
48 error::Error as NomError,
49 multi::count,
50 number::complete::{be_u32, be_u64, le_u32},
51 IResult,
52};
53use serde::{Deserialize, Serialize};
54use std::path::{Path, PathBuf};
55use std::sync::Arc;
56use tokio::fs::File;
57use tokio::io::AsyncReadExt;
58
59#[derive(Debug, Clone, Serialize, Deserialize)]
61pub struct SummaryHeader {
62 pub min_index_interval: u32,
64 pub entries_count: u32,
66 pub summary_entries_size: u64,
68 pub sampling_level: u32,
70 pub size_at_full_sampling: u32,
72}
73
74const SUMMARY_HEADER_SIZE: usize = 24;
76
77const MAX_REASONABLE_ENTRIES: u32 = 100_000_000;
79
80#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct SummaryEntry {
83 pub partition_key: Vec<u8>,
85 pub position: u64,
87}
88
89#[derive(Debug, Clone, Serialize, Deserialize)]
91pub struct SummaryData {
92 pub header: SummaryHeader,
94 pub entries: Vec<SummaryEntry>,
96 pub first_key: Vec<u8>,
98 pub last_key: Vec<u8>,
100}
101
102#[allow(dead_code)]
104pub struct SummaryReader {
105 file_path: PathBuf,
107 summary_data: SummaryData,
109 platform: Arc<Platform>,
111}
112
113impl SummaryReader {
114 pub async fn open(path: &Path, platform: Arc<Platform>) -> Result<Self> {
116 if !platform.fs().exists(path).await? {
117 return Err(Error::not_found(format!(
118 "Summary.db file not found: {}",
119 path.display()
120 )));
121 }
122
123 let mut file = File::open(path).await?;
125 let mut buffer = Vec::new();
126 file.read_to_end(&mut buffer).await?;
127
128 let summary_data = parse_summary_data(&buffer)
130 .map_err(|e| Error::corruption(format!("Failed to parse Summary.db: {:?}", e)))?;
131
132 Ok(Self {
133 file_path: path.to_path_buf(),
134 summary_data,
135 platform,
136 })
137 }
138
139 pub fn get_entries(&self) -> &[SummaryEntry] {
141 &self.summary_data.entries
142 }
143
144 pub fn get_header(&self) -> &SummaryHeader {
146 &self.summary_data.header
147 }
148
149 pub fn get_first_key(&self) -> &[u8] {
151 &self.summary_data.first_key
152 }
153
154 pub fn get_last_key(&self) -> &[u8] {
156 &self.summary_data.last_key
157 }
158
159 pub fn find_entry_for_position(&self, target_position: u64) -> Option<&SummaryEntry> {
165 let mut left = 0;
166 let mut right = self.summary_data.entries.len();
167 let mut best_entry = None;
168
169 while left < right {
170 let mid = left + (right - left) / 2;
171 let entry = &self.summary_data.entries[mid];
172
173 if entry.position <= target_position {
174 best_entry = Some(entry);
175 left = mid + 1;
176 } else {
177 right = mid;
178 }
179 }
180
181 best_entry
182 }
183
184 pub fn get_entry_at(&self, index: usize) -> Option<&SummaryEntry> {
186 self.summary_data.entries.get(index)
187 }
188
189 pub fn get_statistics(&self) -> SummaryStatistics {
191 let header = &self.summary_data.header;
192 let entries = &self.summary_data.entries;
193
194 let avg_key_size = if !entries.is_empty() {
195 entries.iter().map(|e| e.partition_key.len()).sum::<usize>() as f64
196 / entries.len() as f64
197 } else {
198 0.0
199 };
200
201 SummaryStatistics {
202 total_entries: entries.len(),
203 min_index_interval: header.min_index_interval,
204 sampling_level: header.sampling_level,
205 size_at_full_sampling: header.size_at_full_sampling,
206 average_key_size: avg_key_size,
207 file_size: std::fs::metadata(&self.file_path)
208 .map(|m| m.len())
209 .unwrap_or(0),
210 }
211 }
212
213 pub async fn validate_integrity(&self) -> Result<Vec<String>> {
215 let mut issues = Vec::new();
216
217 for i in 1..self.summary_data.entries.len() {
219 let prev_pos = self.summary_data.entries[i - 1].position;
220 let curr_pos = self.summary_data.entries[i].position;
221
222 if prev_pos > curr_pos {
223 issues.push(format!(
224 "Entries not sorted by position: entry {} has position {}, entry {} has position {}",
225 i - 1, prev_pos, i, curr_pos
226 ));
227 }
228 }
229
230 if self.summary_data.entries.len() != self.summary_data.header.entries_count as usize {
232 issues.push(format!(
233 "Entry count mismatch: header says {}, but found {}",
234 self.summary_data.header.entries_count,
235 self.summary_data.entries.len()
236 ));
237 }
238
239 Ok(issues)
240 }
241}
242
243#[derive(Debug, Clone, Serialize, Deserialize)]
245pub struct SummaryStatistics {
246 pub total_entries: usize,
248 pub min_index_interval: u32,
250 pub sampling_level: u32,
252 pub size_at_full_sampling: u32,
254 pub average_key_size: f64,
256 pub file_size: u64,
258}
259
260fn parse_summary_data(input: &[u8]) -> Result<SummaryData> {
262 if input.len() < SUMMARY_HEADER_SIZE {
263 return Err(Error::corruption(format!(
264 "Summary.db too small: {} bytes, need at least {} for header",
265 input.len(),
266 SUMMARY_HEADER_SIZE
267 )));
268 }
269
270 let (remaining, header) = parse_summary_header(input)
272 .map_err(|e| Error::corruption(format!("Failed to parse Summary.db header: {:?}", e)))?;
273
274 if header.entries_count > MAX_REASONABLE_ENTRIES {
276 return Err(Error::corruption(format!(
277 "Summary.db entry count {} exceeds maximum {}",
278 header.entries_count, MAX_REASONABLE_ENTRIES
279 )));
280 }
281
282 let offset_table_size = header.entries_count as usize * 4;
289
290 if remaining.len() < offset_table_size {
291 return Err(Error::corruption(format!(
292 "Summary.db insufficient data for offset table: need {} bytes, have {}",
293 offset_table_size,
294 remaining.len()
295 )));
296 }
297
298 let (after_offsets, offsets) = count(le_u32::<_, NomError<_>>, header.entries_count as usize)(
300 remaining,
301 )
302 .map_err(|e: nom::Err<NomError<_>>| {
303 Error::corruption(format!("Failed to parse offset table: {:?}", e))
304 })?;
305
306 let entry_data_size = header.summary_entries_size as usize - offset_table_size;
308
309 if after_offsets.len() < entry_data_size {
310 return Err(Error::corruption(format!(
311 "Summary.db insufficient entry data: need {} bytes, have {}",
312 entry_data_size,
313 after_offsets.len()
314 )));
315 }
316
317 let entry_data = &after_offsets[..entry_data_size];
318 let after_entries = &after_offsets[entry_data_size..];
319
320 let entries = parse_entries_from_offsets(
322 entry_data,
323 &offsets,
324 offset_table_size,
325 header.summary_entries_size as usize,
326 )?;
327
328 let (after_first, first_key) = parse_serialized_key(after_entries)
330 .map_err(|e| Error::corruption(format!("Failed to parse first key: {:?}", e)))?;
331
332 let (_, last_key) = parse_serialized_key(after_first)
333 .map_err(|e| Error::corruption(format!("Failed to parse last key: {:?}", e)))?;
334
335 Ok(SummaryData {
336 header,
337 entries,
338 first_key,
339 last_key,
340 })
341}
342
343pub(crate) fn parse_summary_header(input: &[u8]) -> IResult<&[u8], SummaryHeader> {
345 let (input, min_index_interval) = be_u32(input)?;
346 let (input, entries_count) = be_u32(input)?;
347 let (input, summary_entries_size) = be_u64(input)?;
348 let (input, sampling_level) = be_u32(input)?;
349 let (input, size_at_full_sampling) = be_u32(input)?;
350
351 Ok((
352 input,
353 SummaryHeader {
354 min_index_interval,
355 entries_count,
356 summary_entries_size,
357 sampling_level,
358 size_at_full_sampling,
359 },
360 ))
361}
362
363fn parse_entries_from_offsets(
368 entry_data: &[u8],
369 offsets: &[u32],
370 offset_table_size: usize,
371 summary_entries_size: usize,
372) -> Result<Vec<SummaryEntry>> {
373 let offsets = normalize_entry_offsets(
374 offsets,
375 entry_data.len(),
376 offset_table_size,
377 summary_entries_size,
378 )?;
379 let mut entries = Vec::with_capacity(offsets.len());
380
381 for i in 0..offsets.len() {
382 let start = offsets[i];
383
384 let end = if i + 1 < offsets.len() {
386 offsets[i + 1]
387 } else {
388 entry_data.len()
389 };
390
391 if start >= end {
392 return Err(Error::corruption(format!(
393 "Invalid offset at index {}: start {} >= end {}",
394 i, start, end
395 )));
396 }
397
398 if end > entry_data.len() {
399 return Err(Error::corruption(format!(
400 "Offset {} points beyond entry data (size {})",
401 end,
402 entry_data.len()
403 )));
404 }
405
406 let entry_bytes = &entry_data[start..end];
407
408 if entry_bytes.len() < 8 {
411 return Err(Error::corruption(format!(
412 "Entry {} too small: {} bytes, need at least 8 for position",
413 i,
414 entry_bytes.len()
415 )));
416 }
417
418 let key_len = entry_bytes.len() - 8;
419 let partition_key = entry_bytes[..key_len].to_vec();
420
421 let position_bytes = &entry_bytes[key_len..];
423 let position = u64::from_be_bytes([
424 position_bytes[0],
425 position_bytes[1],
426 position_bytes[2],
427 position_bytes[3],
428 position_bytes[4],
429 position_bytes[5],
430 position_bytes[6],
431 position_bytes[7],
432 ]);
433
434 entries.push(SummaryEntry {
435 partition_key,
436 position,
437 });
438 }
439
440 Ok(entries)
441}
442
443fn normalize_entry_offsets(
444 offsets: &[u32],
445 entry_data_size: usize,
446 offset_table_size: usize,
447 summary_entries_size: usize,
448) -> Result<Vec<usize>> {
449 if offsets.is_empty() {
450 return Ok(Vec::new());
451 }
452
453 let usize_offsets: Vec<usize> = offsets.iter().map(|offset| *offset as usize).collect();
454
455 if usize_offsets[0] == 0 && usize_offsets.iter().all(|offset| *offset < entry_data_size) {
457 return Ok(usize_offsets);
458 }
459
460 if usize_offsets
462 .iter()
463 .all(|offset| *offset >= offset_table_size && *offset < summary_entries_size)
464 {
465 return Ok(usize_offsets
466 .into_iter()
467 .map(|offset| offset - offset_table_size)
468 .collect());
469 }
470
471 Err(Error::corruption(format!(
472 "Summary.db offsets are invalid for both relative and absolute layouts: offsets={offsets:?}, entry_data_size={entry_data_size}, offset_table_size={offset_table_size}, summary_entries_size={summary_entries_size}"
473 )))
474}
475
476fn parse_serialized_key(input: &[u8]) -> IResult<&[u8], Vec<u8>> {
478 let (input, size) = be_u32(input)?;
479 let (input, key_data) = take(size)(input)?;
480 Ok((input, key_data.to_vec()))
481}
482
483#[cfg(test)]
484mod tests {
485 use super::*;
486
487 #[test]
488 fn test_summary_header_parsing() {
489 let data = vec![
493 0x00, 0x00, 0x00, 0x80, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x1c, 0x00, 0x00, 0x00, 0x80, 0x00, 0x00, 0x00, 0x01, ];
499
500 let (remaining, header) = parse_summary_header(&data).unwrap();
501
502 assert_eq!(header.min_index_interval, 128);
503 assert_eq!(header.entries_count, 1);
504 assert_eq!(header.summary_entries_size, 28);
505 assert_eq!(header.sampling_level, 128);
506 assert_eq!(header.size_at_full_sampling, 1);
507 assert!(remaining.is_empty());
508 }
509
510 #[test]
511 fn test_offset_table_little_endian() {
512 let offset_data: [u8; 8] = [
515 0x00, 0x00, 0x00, 0x00, 0x18, 0x00, 0x00, 0x00, ];
518
519 let (_, offsets) = count(le_u32::<_, NomError<_>>, 2usize)(&offset_data[..]).unwrap();
520
521 assert_eq!(offsets[0], 0);
522 assert_eq!(offsets[1], 24);
523 }
524
525 #[test]
526 fn test_entry_parsing_from_offsets() {
527 let key_bytes = vec![
531 0xdc, 0x67, 0x26, 0xa6, 0x05, 0xc6, 0x48, 0x50, 0x86, 0xcd, 0x0f, 0xe3, 0x1b, 0x67,
532 0x57, 0xaf,
533 ];
534 let position_bytes = vec![0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]; let mut entry_data = key_bytes.clone();
537 entry_data.extend_from_slice(&position_bytes);
538
539 let offsets = vec![0u32];
540 let entries =
541 parse_entries_from_offsets(&entry_data, &offsets, 4, 4 + entry_data.len()).unwrap();
542
543 assert_eq!(entries.len(), 1);
544 assert_eq!(entries[0].partition_key, key_bytes);
545 assert_eq!(entries[0].position, 0);
546 }
547
548 #[test]
549 fn test_entry_parsing_from_absolute_offsets() {
550 let key0 = vec![0xAA; 16];
551 let key1 = vec![0xBB; 16];
552
553 let mut entry_data = key0.clone();
554 entry_data.extend_from_slice(&0u64.to_be_bytes());
555 entry_data.extend_from_slice(&key1);
556 entry_data.extend_from_slice(&128u64.to_be_bytes());
557
558 let offsets = vec![8u32, 32u32];
559 let entries = parse_entries_from_offsets(&entry_data, &offsets, 8, 56).unwrap();
560
561 assert_eq!(entries.len(), 2);
562 assert_eq!(entries[0].partition_key, key0);
563 assert_eq!(entries[0].position, 0);
564 assert_eq!(entries[1].partition_key, key1);
565 assert_eq!(entries[1].position, 128);
566 }
567
568 #[test]
569 fn test_serialized_key_parsing() {
570 let data = vec![
572 0x00, 0x00, 0x00, 0x10, 0xdc, 0x67, 0x26, 0xa6, 0x05, 0xc6, 0x48, 0x50, 0x86, 0xcd, 0x0f, 0xe3, 0x1b, 0x67,
574 0x57, 0xaf, ];
576
577 let (remaining, key) = parse_serialized_key(&data).unwrap();
578
579 assert_eq!(key.len(), 16);
580 assert_eq!(
581 key,
582 vec![
583 0xdc, 0x67, 0x26, 0xa6, 0x05, 0xc6, 0x48, 0x50, 0x86, 0xcd, 0x0f, 0xe3, 0x1b, 0x67,
584 0x57, 0xaf
585 ]
586 );
587 assert!(remaining.is_empty());
588 }
589
590 #[test]
591 fn test_complete_summary_parsing() {
592 let mut data = vec![
595 0x00, 0x00, 0x00, 0x80, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
599 0x1c, 0x00, 0x00, 0x00, 0x80, 0x00, 0x00, 0x00, 0x01, ];
603 data.extend_from_slice(&[0x00, 0x00, 0x00, 0x00]); let entry_key: [u8; 16] = [
608 0xdc, 0x67, 0x26, 0xa6, 0x05, 0xc6, 0x48, 0x50, 0x86, 0xcd, 0x0f, 0xe3, 0x1b, 0x67,
609 0x57, 0xaf,
610 ];
611 data.extend_from_slice(&entry_key);
612 data.extend_from_slice(&[0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]); data.extend_from_slice(&[0x00, 0x00, 0x00, 0x10]); data.extend_from_slice(&entry_key);
617
618 data.extend_from_slice(&[0x00, 0x00, 0x00, 0x10]); data.extend_from_slice(&entry_key);
621
622 let summary = parse_summary_data(&data).unwrap();
623
624 assert_eq!(summary.header.min_index_interval, 128);
625 assert_eq!(summary.header.entries_count, 1);
626 assert_eq!(summary.entries.len(), 1);
627 assert_eq!(summary.entries[0].partition_key, entry_key.to_vec());
628 assert_eq!(summary.entries[0].position, 0);
629 assert_eq!(summary.first_key, entry_key.to_vec());
630 assert_eq!(summary.last_key, entry_key.to_vec());
631 }
632
633 #[test]
634 fn test_entry_position_sorted() {
635 let mut data = vec![
637 0x00, 0x00, 0x00, 0x80, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
641 0x38, 0x00, 0x00, 0x00, 0x80, 0x00, 0x00, 0x00, 0x02, ];
645 data.extend_from_slice(&[0x00, 0x00, 0x00, 0x00]); data.extend_from_slice(&[0x18, 0x00, 0x00, 0x00]); let key0: [u8; 16] = [0x01; 16];
651 data.extend_from_slice(&key0);
652 data.extend_from_slice(&[0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]);
653
654 let key1: [u8; 16] = [0x02; 16];
656 data.extend_from_slice(&key1);
657 data.extend_from_slice(&[0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x64]);
658
659 data.extend_from_slice(&[0x00, 0x00, 0x00, 0x10]);
661 data.extend_from_slice(&key0);
662
663 data.extend_from_slice(&[0x00, 0x00, 0x00, 0x10]);
665 data.extend_from_slice(&key1);
666
667 let summary = parse_summary_data(&data).unwrap();
668
669 assert_eq!(summary.entries.len(), 2);
670 assert_eq!(summary.entries[0].position, 0);
671 assert_eq!(summary.entries[1].position, 100);
672 }
673}