Skip to main content

sochdb_storage/
two_level_index.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Two-Level Index for SSTables
19//!
20//! Implements a hierarchical index structure to reduce memory usage and
21//! SSTable open time for large files.
22//!
23//! ## jj.md Task 1: Two-Level Index Structure
24//!
25//! Goals:
26//! - Reduce SSTable open time from O(index_size) to O(1)
27//! - Reduce memory usage by 10-50x for large SSTables
28//! - Improve cold-start latency from seconds to milliseconds
29//!
30//! ## Architecture
31//!
32//! ```text
33//! Two-Level Index Structure:
34//! ├── Data Blocks (64KB each)
35//! │   └── Sorted edges
36//! ├── Block Index (loaded on-demand)
37//! │   └── [min_key, max_key, offset] per block
38//! └── Top-Level Index (in footer, always loaded)
39//!     └── Fence pointers: every 1MB of block index
40//!
41//! Lookup: O(log(N/B)) where B = block size
42//! Memory: O(fence_count) ≈ O(file_size / 1MB) instead of O(N/block_size)
43//! ```
44//!
45//! Reference: RocksDB BlockBasedTableFormat - https://github.com/facebook/rocksdb/wiki/BlockBasedTable-Format
46
47use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
48use std::io::{Cursor, Read};
49
50/// Size of each data block in bytes (64KB - cache friendly)
51pub const DATA_BLOCK_SIZE: usize = 64 * 1024;
52
53/// Distance between fence pointers in the block index (1MB of block index data)
54pub const FENCE_INTERVAL_BYTES: usize = 1024 * 1024;
55
56/// Size of each block index entry: min_ts(8) + min_edge_id(16) + max_ts(8) + max_edge_id(16) + offset(8) + length(4)
57pub const BLOCK_INDEX_ENTRY_SIZE: usize = 60;
58
59/// Size of each fence pointer entry: min_ts(8) + min_edge_id(16) + block_index_offset(8)
60pub const FENCE_POINTER_SIZE: usize = 32;
61
62/// A key in the temporal index (timestamp + edge_id for uniqueness)
63#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
64pub struct TemporalKey {
65    pub timestamp_us: u64,
66    pub edge_id: u128,
67}
68
69impl TemporalKey {
70    pub fn new(timestamp_us: u64, edge_id: u128) -> Self {
71        Self {
72            timestamp_us,
73            edge_id,
74        }
75    }
76
77    /// Minimum possible key
78    pub fn min() -> Self {
79        Self {
80            timestamp_us: 0,
81            edge_id: 0,
82        }
83    }
84
85    /// Maximum possible key
86    pub fn max() -> Self {
87        Self {
88            timestamp_us: u64::MAX,
89            edge_id: u128::MAX,
90        }
91    }
92}
93
94/// Entry in the block-level index (Level 2)
95///
96/// Each entry describes a data block's key range and location.
97#[derive(Debug, Clone, Copy)]
98pub struct BlockIndexEntry {
99    /// Minimum key in this block
100    pub min_key: TemporalKey,
101    /// Maximum key in this block
102    pub max_key: TemporalKey,
103    /// Offset of the block in the data section
104    pub offset: u64,
105    /// Length of the block in bytes
106    pub length: u32,
107}
108
109impl BlockIndexEntry {
110    /// Check if a key falls within this block's range
111    pub fn contains_key(&self, key: &TemporalKey) -> bool {
112        *key >= self.min_key && *key <= self.max_key
113    }
114
115    /// Check if a timestamp range overlaps with this block
116    pub fn overlaps_range(&self, start_ts: u64, end_ts: u64) -> bool {
117        self.max_key.timestamp_us >= start_ts && self.min_key.timestamp_us <= end_ts
118    }
119
120    /// Serialize to bytes
121    pub fn to_bytes(&self) -> [u8; BLOCK_INDEX_ENTRY_SIZE] {
122        let mut buf = [0u8; BLOCK_INDEX_ENTRY_SIZE];
123        let mut cursor = Cursor::new(&mut buf[..]);
124
125        cursor
126            .write_u64::<LittleEndian>(self.min_key.timestamp_us)
127            .unwrap();
128        cursor
129            .write_u128::<LittleEndian>(self.min_key.edge_id)
130            .unwrap();
131        cursor
132            .write_u64::<LittleEndian>(self.max_key.timestamp_us)
133            .unwrap();
134        cursor
135            .write_u128::<LittleEndian>(self.max_key.edge_id)
136            .unwrap();
137        cursor.write_u64::<LittleEndian>(self.offset).unwrap();
138        cursor.write_u32::<LittleEndian>(self.length).unwrap();
139
140        buf
141    }
142
143    /// Deserialize from bytes
144    pub fn from_bytes(bytes: &[u8]) -> std::io::Result<Self> {
145        if bytes.len() < BLOCK_INDEX_ENTRY_SIZE {
146            return Err(std::io::Error::new(
147                std::io::ErrorKind::InvalidData,
148                "Block index entry too short",
149            ));
150        }
151
152        let mut cursor = Cursor::new(bytes);
153
154        let min_timestamp_us = cursor.read_u64::<LittleEndian>()?;
155        let min_edge_id = cursor.read_u128::<LittleEndian>()?;
156        let max_timestamp_us = cursor.read_u64::<LittleEndian>()?;
157        let max_edge_id = cursor.read_u128::<LittleEndian>()?;
158        let offset = cursor.read_u64::<LittleEndian>()?;
159        let length = cursor.read_u32::<LittleEndian>()?;
160
161        Ok(Self {
162            min_key: TemporalKey::new(min_timestamp_us, min_edge_id),
163            max_key: TemporalKey::new(max_timestamp_us, max_edge_id),
164            offset,
165            length,
166        })
167    }
168}
169
170/// Fence pointer in the top-level index (Level 1)
171///
172/// Points to a position in the block index. Used to quickly narrow
173/// down which section of the block index to load.
174#[derive(Debug, Clone, Copy)]
175pub struct FencePointer {
176    /// First key at this fence position
177    pub key: TemporalKey,
178    /// Offset within the block index section
179    pub block_index_offset: u64,
180}
181
182impl FencePointer {
183    /// Serialize to bytes
184    pub fn to_bytes(&self) -> [u8; FENCE_POINTER_SIZE] {
185        let mut buf = [0u8; FENCE_POINTER_SIZE];
186        let mut cursor = Cursor::new(&mut buf[..]);
187
188        cursor
189            .write_u64::<LittleEndian>(self.key.timestamp_us)
190            .unwrap();
191        cursor.write_u128::<LittleEndian>(self.key.edge_id).unwrap();
192        cursor
193            .write_u64::<LittleEndian>(self.block_index_offset)
194            .unwrap();
195
196        buf
197    }
198
199    /// Deserialize from bytes
200    pub fn from_bytes(bytes: &[u8]) -> std::io::Result<Self> {
201        if bytes.len() < FENCE_POINTER_SIZE {
202            return Err(std::io::Error::new(
203                std::io::ErrorKind::InvalidData,
204                "Fence pointer too short",
205            ));
206        }
207
208        let mut cursor = Cursor::new(bytes);
209
210        let timestamp_us = cursor.read_u64::<LittleEndian>()?;
211        let edge_id = cursor.read_u128::<LittleEndian>()?;
212        let block_index_offset = cursor.read_u64::<LittleEndian>()?;
213
214        Ok(Self {
215            key: TemporalKey::new(timestamp_us, edge_id),
216            block_index_offset,
217        })
218    }
219}
220
221/// Two-level index for efficient lookups with minimal memory usage
222///
223/// The top-level fence pointers are always in memory (~1KB per GB of data).
224/// Block index entries are loaded on-demand.
225#[derive(Debug)]
226pub struct TwoLevelIndex {
227    /// Top-level fence pointers (always in memory)
228    pub fence_pointers: Vec<FencePointer>,
229
230    /// Total number of blocks in the SSTable
231    pub total_blocks: u32,
232
233    /// Offset of the block index section in the file
234    pub block_index_offset: u64,
235
236    /// Length of the block index section
237    pub block_index_length: u64,
238}
239
240impl TwoLevelIndex {
241    /// Create a new two-level index from block entries
242    ///
243    /// This is called during SSTable creation to build the index structure.
244    pub fn build(blocks: &[BlockIndexEntry], block_index_offset: u64) -> Self {
245        let mut fence_pointers = Vec::new();
246        let mut current_offset = 0u64;
247
248        // Create fence pointers at regular intervals
249        for (i, block) in blocks.iter().enumerate() {
250            let entry_offset = (i * BLOCK_INDEX_ENTRY_SIZE) as u64;
251
252            // Add fence pointer at start and every FENCE_INTERVAL_BYTES
253            if i == 0 || (entry_offset - current_offset) >= FENCE_INTERVAL_BYTES as u64 {
254                fence_pointers.push(FencePointer {
255                    key: block.min_key,
256                    block_index_offset: entry_offset,
257                });
258                current_offset = entry_offset;
259            }
260        }
261
262        let block_index_length = (blocks.len() * BLOCK_INDEX_ENTRY_SIZE) as u64;
263
264        Self {
265            fence_pointers,
266            total_blocks: blocks.len() as u32,
267            block_index_offset,
268            block_index_length,
269        }
270    }
271
272    /// Find the fence pointer range that may contain the target key
273    ///
274    /// Returns (start_offset, end_offset) within the block index section.
275    pub fn find_fence_range(&self, key: &TemporalKey) -> (u64, u64) {
276        if self.fence_pointers.is_empty() {
277            return (0, self.block_index_length);
278        }
279
280        // Binary search for the fence pointer
281        let idx = match self.fence_pointers.binary_search_by(|fp| fp.key.cmp(key)) {
282            Ok(i) => i, // Exact match
283            Err(i) => {
284                if i == 0 {
285                    0
286                } else {
287                    i - 1 // Key is between fence_pointers[i-1] and fence_pointers[i]
288                }
289            }
290        };
291
292        let start_offset = self.fence_pointers[idx].block_index_offset;
293        let end_offset = if idx + 1 < self.fence_pointers.len() {
294            self.fence_pointers[idx + 1].block_index_offset
295        } else {
296            self.block_index_length
297        };
298
299        (start_offset, end_offset)
300    }
301
302    /// Find fence pointer range for a timestamp range query
303    ///
304    /// Returns (start_offset, end_offset) within the block index section.
305    pub fn find_fence_range_for_timestamps(&self, start_ts: u64, end_ts: u64) -> (u64, u64) {
306        if self.fence_pointers.is_empty() {
307            return (0, self.block_index_length);
308        }
309
310        // Find first fence that could contain start_ts
311        let start_key = TemporalKey::new(start_ts, 0);
312        let start_idx = match self
313            .fence_pointers
314            .binary_search_by(|fp| fp.key.cmp(&start_key))
315        {
316            Ok(i) => i,
317            Err(i) => {
318                if i == 0 {
319                    0
320                } else {
321                    i - 1
322                }
323            }
324        };
325
326        // Find last fence that could contain end_ts
327        let end_key = TemporalKey::new(end_ts, u128::MAX);
328        let end_idx = match self
329            .fence_pointers
330            .binary_search_by(|fp| fp.key.cmp(&end_key))
331        {
332            Ok(i) => i + 1,
333            Err(i) => i,
334        };
335
336        let start_offset = self.fence_pointers[start_idx].block_index_offset;
337        let end_offset = if end_idx < self.fence_pointers.len() {
338            self.fence_pointers[end_idx].block_index_offset
339        } else {
340            self.block_index_length
341        };
342
343        (start_offset, end_offset)
344    }
345
346    /// Serialize the fence pointers to bytes (stored in footer)
347    pub fn fence_pointers_to_bytes(&self) -> Vec<u8> {
348        let mut buf = Vec::with_capacity(self.fence_pointers.len() * FENCE_POINTER_SIZE + 8);
349
350        // Write count
351        buf.write_u32::<LittleEndian>(self.fence_pointers.len() as u32)
352            .unwrap();
353        buf.write_u32::<LittleEndian>(self.total_blocks).unwrap();
354
355        // Write fence pointers
356        for fp in &self.fence_pointers {
357            buf.extend_from_slice(&fp.to_bytes());
358        }
359
360        buf
361    }
362
363    /// Deserialize fence pointers from bytes
364    pub fn fence_pointers_from_bytes(
365        bytes: &[u8],
366        block_index_offset: u64,
367        block_index_length: u64,
368    ) -> std::io::Result<Self> {
369        if bytes.len() < 8 {
370            return Err(std::io::Error::new(
371                std::io::ErrorKind::InvalidData,
372                "Fence pointer section too short",
373            ));
374        }
375
376        let mut cursor = Cursor::new(bytes);
377        let count = cursor.read_u32::<LittleEndian>()? as usize;
378        let total_blocks = cursor.read_u32::<LittleEndian>()?;
379
380        let expected_size = 8 + count * FENCE_POINTER_SIZE;
381        if bytes.len() < expected_size {
382            return Err(std::io::Error::new(
383                std::io::ErrorKind::InvalidData,
384                format!(
385                    "Fence pointer section too short: {} < {}",
386                    bytes.len(),
387                    expected_size
388                ),
389            ));
390        }
391
392        let mut fence_pointers = Vec::with_capacity(count);
393        for _ in 0..count {
394            let mut buf = [0u8; FENCE_POINTER_SIZE];
395            cursor.read_exact(&mut buf)?;
396            fence_pointers.push(FencePointer::from_bytes(&buf)?);
397        }
398
399        Ok(Self {
400            fence_pointers,
401            total_blocks,
402            block_index_offset,
403            block_index_length,
404        })
405    }
406
407    /// Estimate memory usage of this index in bytes
408    pub fn memory_usage(&self) -> usize {
409        std::mem::size_of::<Self>()
410            + self.fence_pointers.len() * std::mem::size_of::<FencePointer>()
411    }
412
413    /// Get the number of fence pointers
414    pub fn fence_count(&self) -> usize {
415        self.fence_pointers.len()
416    }
417}
418
419/// Block index reader for loading block index entries on-demand
420pub struct BlockIndexReader<'a> {
421    /// Reference to the mmap'd block index section
422    data: &'a [u8],
423}
424
425impl<'a> BlockIndexReader<'a> {
426    /// Create a new block index reader from the block index section
427    pub fn new(data: &'a [u8]) -> Self {
428        Self { data }
429    }
430
431    /// Read a single block index entry at the given offset
432    pub fn read_entry(&self, offset: usize) -> std::io::Result<BlockIndexEntry> {
433        if offset + BLOCK_INDEX_ENTRY_SIZE > self.data.len() {
434            return Err(std::io::Error::new(
435                std::io::ErrorKind::InvalidData,
436                "Block index offset out of bounds",
437            ));
438        }
439
440        BlockIndexEntry::from_bytes(&self.data[offset..offset + BLOCK_INDEX_ENTRY_SIZE])
441    }
442
443    /// Read a range of block index entries
444    pub fn read_range(&self, start: usize, end: usize) -> std::io::Result<Vec<BlockIndexEntry>> {
445        let start = start.min(self.data.len());
446        let end = end.min(self.data.len());
447
448        let mut entries = Vec::new();
449        let mut offset = start;
450
451        while offset + BLOCK_INDEX_ENTRY_SIZE <= end {
452            entries.push(self.read_entry(offset)?);
453            offset += BLOCK_INDEX_ENTRY_SIZE;
454        }
455
456        Ok(entries)
457    }
458
459    /// Binary search for a key in a range of block index entries
460    pub fn find_block_for_key(
461        &self,
462        key: &TemporalKey,
463        start_offset: usize,
464        end_offset: usize,
465    ) -> std::io::Result<Option<BlockIndexEntry>> {
466        let entries = self.read_range(start_offset, end_offset)?;
467
468        // Binary search for the block containing this key
469        let idx = entries.partition_point(|e| e.max_key < *key);
470
471        if idx < entries.len() && entries[idx].contains_key(key) {
472            Ok(Some(entries[idx]))
473        } else {
474            Ok(None)
475        }
476    }
477
478    /// Find all blocks that overlap with a timestamp range
479    pub fn find_blocks_for_range(
480        &self,
481        start_ts: u64,
482        end_ts: u64,
483        start_offset: usize,
484        end_offset: usize,
485    ) -> std::io::Result<Vec<BlockIndexEntry>> {
486        let entries = self.read_range(start_offset, end_offset)?;
487
488        Ok(entries
489            .into_iter()
490            .filter(|e| e.overlaps_range(start_ts, end_ts))
491            .collect())
492    }
493
494    /// Get total number of entries in this reader's range
495    pub fn entry_count(&self) -> usize {
496        self.data.len() / BLOCK_INDEX_ENTRY_SIZE
497    }
498}
499
500#[cfg(test)]
501mod tests {
502    use super::*;
503
504    fn create_test_blocks(count: usize) -> Vec<BlockIndexEntry> {
505        (0..count)
506            .map(|i| BlockIndexEntry {
507                min_key: TemporalKey::new(i as u64 * 1000, i as u128),
508                max_key: TemporalKey::new((i + 1) as u64 * 1000 - 1, i as u128),
509                offset: (i * DATA_BLOCK_SIZE) as u64,
510                length: DATA_BLOCK_SIZE as u32,
511            })
512            .collect()
513    }
514
515    #[test]
516    fn test_temporal_key_ordering() {
517        let k1 = TemporalKey::new(100, 1);
518        let k2 = TemporalKey::new(100, 2);
519        let k3 = TemporalKey::new(200, 1);
520
521        assert!(k1 < k2); // Same timestamp, lower edge_id comes first
522        assert!(k2 < k3); // Lower timestamp comes first
523        assert!(k1 < k3);
524    }
525
526    #[test]
527    fn test_block_index_entry_serialization() {
528        let entry = BlockIndexEntry {
529            min_key: TemporalKey::new(1000, 42),
530            max_key: TemporalKey::new(2000, 100),
531            offset: 65536,
532            length: 64000,
533        };
534
535        let bytes = entry.to_bytes();
536        let restored = BlockIndexEntry::from_bytes(&bytes).unwrap();
537
538        assert_eq!(restored.min_key, entry.min_key);
539        assert_eq!(restored.max_key, entry.max_key);
540        assert_eq!(restored.offset, entry.offset);
541        assert_eq!(restored.length, entry.length);
542    }
543
544    #[test]
545    fn test_fence_pointer_serialization() {
546        let fp = FencePointer {
547            key: TemporalKey::new(5000, 123),
548            block_index_offset: 1024 * 1024,
549        };
550
551        let bytes = fp.to_bytes();
552        let restored = FencePointer::from_bytes(&bytes).unwrap();
553
554        assert_eq!(restored.key, fp.key);
555        assert_eq!(restored.block_index_offset, fp.block_index_offset);
556    }
557
558    #[test]
559    fn test_two_level_index_build() {
560        let blocks = create_test_blocks(100);
561        let index = TwoLevelIndex::build(&blocks, 0);
562
563        // Should have at least one fence pointer (at start)
564        assert!(!index.fence_pointers.is_empty());
565        assert_eq!(index.total_blocks, 100);
566
567        // First fence should point to start
568        assert_eq!(index.fence_pointers[0].block_index_offset, 0);
569    }
570
571    #[test]
572    fn test_two_level_index_fence_range() {
573        let blocks = create_test_blocks(100);
574        let index = TwoLevelIndex::build(&blocks, 0);
575
576        // Key in first block
577        let key = TemporalKey::new(500, 0);
578        let (start, end) = index.find_fence_range(&key);
579        assert_eq!(start, 0);
580        assert!(end > start);
581
582        // Key at end
583        let key = TemporalKey::new(99000, 99);
584        let (start, end) = index.find_fence_range(&key);
585        assert!(start < end);
586        assert_eq!(end, index.block_index_length);
587    }
588
589    #[test]
590    fn test_two_level_index_serialization() {
591        let blocks = create_test_blocks(50);
592        let index = TwoLevelIndex::build(&blocks, 1024);
593
594        let bytes = index.fence_pointers_to_bytes();
595        let restored =
596            TwoLevelIndex::fence_pointers_from_bytes(&bytes, 1024, index.block_index_length)
597                .unwrap();
598
599        assert_eq!(restored.fence_pointers.len(), index.fence_pointers.len());
600        assert_eq!(restored.total_blocks, index.total_blocks);
601    }
602
603    #[test]
604    fn test_block_index_reader() {
605        let blocks = create_test_blocks(10);
606
607        // Serialize blocks
608        let mut data = Vec::new();
609        for block in &blocks {
610            data.extend_from_slice(&block.to_bytes());
611        }
612
613        let reader = BlockIndexReader::new(&data);
614
615        // Read first entry
616        let entry = reader.read_entry(0).unwrap();
617        assert_eq!(entry.min_key, blocks[0].min_key);
618
619        // Read range
620        let range = reader.read_range(0, data.len()).unwrap();
621        assert_eq!(range.len(), 10);
622    }
623
624    #[test]
625    fn test_block_index_find_block_for_key() {
626        let blocks = create_test_blocks(10);
627
628        let mut data = Vec::new();
629        for block in &blocks {
630            data.extend_from_slice(&block.to_bytes());
631        }
632
633        let reader = BlockIndexReader::new(&data);
634
635        // Find key in block 5
636        let key = TemporalKey::new(5500, 5);
637        let found = reader.find_block_for_key(&key, 0, data.len()).unwrap();
638
639        assert!(found.is_some());
640        let block = found.unwrap();
641        assert!(block.contains_key(&key));
642    }
643
644    #[test]
645    fn test_block_index_find_blocks_for_range() {
646        let blocks = create_test_blocks(10);
647
648        let mut data = Vec::new();
649        for block in &blocks {
650            data.extend_from_slice(&block.to_bytes());
651        }
652
653        let reader = BlockIndexReader::new(&data);
654
655        // Find blocks overlapping timestamp range 2500-4500 (should include blocks 2,3,4)
656        let found = reader
657            .find_blocks_for_range(2500, 4500, 0, data.len())
658            .unwrap();
659
660        assert!(found.len() >= 2); // At least blocks containing these timestamps
661    }
662
663    #[test]
664    fn test_memory_usage() {
665        let blocks = create_test_blocks(1000);
666        let index = TwoLevelIndex::build(&blocks, 0);
667
668        let memory = index.memory_usage();
669
670        // Should be much less than full block index
671        let full_index_size = blocks.len() * BLOCK_INDEX_ENTRY_SIZE;
672        assert!(memory < full_index_size / 10); // At least 10x reduction
673    }
674
675    #[test]
676    fn test_block_contains_key() {
677        let block = BlockIndexEntry {
678            min_key: TemporalKey::new(1000, 0),
679            max_key: TemporalKey::new(2000, 100),
680            offset: 0,
681            length: 64000,
682        };
683
684        assert!(block.contains_key(&TemporalKey::new(1500, 50)));
685        assert!(block.contains_key(&TemporalKey::new(1000, 0))); // Min edge
686        assert!(block.contains_key(&TemporalKey::new(2000, 100))); // Max edge
687        assert!(!block.contains_key(&TemporalKey::new(999, 0))); // Below range
688        assert!(!block.contains_key(&TemporalKey::new(2001, 0))); // Above range
689    }
690
691    #[test]
692    fn test_block_overlaps_range() {
693        let block = BlockIndexEntry {
694            min_key: TemporalKey::new(1000, 0),
695            max_key: TemporalKey::new(2000, 100),
696            offset: 0,
697            length: 64000,
698        };
699
700        assert!(block.overlaps_range(500, 1500)); // Overlaps start
701        assert!(block.overlaps_range(1500, 2500)); // Overlaps end
702        assert!(block.overlaps_range(1200, 1800)); // Fully inside
703        assert!(block.overlaps_range(500, 2500)); // Fully outside
704        assert!(!block.overlaps_range(100, 500)); // Before
705        assert!(!block.overlaps_range(2500, 3000)); // After
706    }
707}