sochdb_storage/
two_level_index.rs

1// Copyright 2025 Sushanth (https://github.com/sushanthpy)
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Two-Level Index for SSTables
16//!
17//! Implements a hierarchical index structure to reduce memory usage and
18//! SSTable open time for large files.
19//!
20//! ## jj.md Task 1: Two-Level Index Structure
21//!
22//! Goals:
23//! - Reduce SSTable open time from O(index_size) to O(1)
24//! - Reduce memory usage by 10-50x for large SSTables
25//! - Improve cold-start latency from seconds to milliseconds
26//!
27//! ## Architecture
28//!
29//! ```text
30//! Two-Level Index Structure:
31//! ├── Data Blocks (64KB each)
32//! │   └── Sorted edges
33//! ├── Block Index (loaded on-demand)
34//! │   └── [min_key, max_key, offset] per block
35//! └── Top-Level Index (in footer, always loaded)
36//!     └── Fence pointers: every 1MB of block index
37//!
38//! Lookup: O(log(N/B)) where B = block size
39//! Memory: O(fence_count) ≈ O(file_size / 1MB) instead of O(N/block_size)
40//! ```
41//!
42//! Reference: RocksDB BlockBasedTableFormat - https://github.com/facebook/rocksdb/wiki/BlockBasedTable-Format
43
44use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
45use std::io::{Cursor, Read};
46
47/// Size of each data block in bytes (64KB - cache friendly)
48pub const DATA_BLOCK_SIZE: usize = 64 * 1024;
49
50/// Distance between fence pointers in the block index (1MB of block index data)
51pub const FENCE_INTERVAL_BYTES: usize = 1024 * 1024;
52
53/// Size of each block index entry: min_ts(8) + min_edge_id(16) + max_ts(8) + max_edge_id(16) + offset(8) + length(4)
54pub const BLOCK_INDEX_ENTRY_SIZE: usize = 60;
55
56/// Size of each fence pointer entry: min_ts(8) + min_edge_id(16) + block_index_offset(8)
57pub const FENCE_POINTER_SIZE: usize = 32;
58
59/// A key in the temporal index (timestamp + edge_id for uniqueness)
60#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
61pub struct TemporalKey {
62    pub timestamp_us: u64,
63    pub edge_id: u128,
64}
65
66impl TemporalKey {
67    pub fn new(timestamp_us: u64, edge_id: u128) -> Self {
68        Self {
69            timestamp_us,
70            edge_id,
71        }
72    }
73
74    /// Minimum possible key
75    pub fn min() -> Self {
76        Self {
77            timestamp_us: 0,
78            edge_id: 0,
79        }
80    }
81
82    /// Maximum possible key
83    pub fn max() -> Self {
84        Self {
85            timestamp_us: u64::MAX,
86            edge_id: u128::MAX,
87        }
88    }
89}
90
91/// Entry in the block-level index (Level 2)
92///
93/// Each entry describes a data block's key range and location.
94#[derive(Debug, Clone, Copy)]
95pub struct BlockIndexEntry {
96    /// Minimum key in this block
97    pub min_key: TemporalKey,
98    /// Maximum key in this block
99    pub max_key: TemporalKey,
100    /// Offset of the block in the data section
101    pub offset: u64,
102    /// Length of the block in bytes
103    pub length: u32,
104}
105
106impl BlockIndexEntry {
107    /// Check if a key falls within this block's range
108    pub fn contains_key(&self, key: &TemporalKey) -> bool {
109        *key >= self.min_key && *key <= self.max_key
110    }
111
112    /// Check if a timestamp range overlaps with this block
113    pub fn overlaps_range(&self, start_ts: u64, end_ts: u64) -> bool {
114        self.max_key.timestamp_us >= start_ts && self.min_key.timestamp_us <= end_ts
115    }
116
117    /// Serialize to bytes
118    pub fn to_bytes(&self) -> [u8; BLOCK_INDEX_ENTRY_SIZE] {
119        let mut buf = [0u8; BLOCK_INDEX_ENTRY_SIZE];
120        let mut cursor = Cursor::new(&mut buf[..]);
121
122        cursor
123            .write_u64::<LittleEndian>(self.min_key.timestamp_us)
124            .unwrap();
125        cursor
126            .write_u128::<LittleEndian>(self.min_key.edge_id)
127            .unwrap();
128        cursor
129            .write_u64::<LittleEndian>(self.max_key.timestamp_us)
130            .unwrap();
131        cursor
132            .write_u128::<LittleEndian>(self.max_key.edge_id)
133            .unwrap();
134        cursor.write_u64::<LittleEndian>(self.offset).unwrap();
135        cursor.write_u32::<LittleEndian>(self.length).unwrap();
136
137        buf
138    }
139
140    /// Deserialize from bytes
141    pub fn from_bytes(bytes: &[u8]) -> std::io::Result<Self> {
142        if bytes.len() < BLOCK_INDEX_ENTRY_SIZE {
143            return Err(std::io::Error::new(
144                std::io::ErrorKind::InvalidData,
145                "Block index entry too short",
146            ));
147        }
148
149        let mut cursor = Cursor::new(bytes);
150
151        let min_timestamp_us = cursor.read_u64::<LittleEndian>()?;
152        let min_edge_id = cursor.read_u128::<LittleEndian>()?;
153        let max_timestamp_us = cursor.read_u64::<LittleEndian>()?;
154        let max_edge_id = cursor.read_u128::<LittleEndian>()?;
155        let offset = cursor.read_u64::<LittleEndian>()?;
156        let length = cursor.read_u32::<LittleEndian>()?;
157
158        Ok(Self {
159            min_key: TemporalKey::new(min_timestamp_us, min_edge_id),
160            max_key: TemporalKey::new(max_timestamp_us, max_edge_id),
161            offset,
162            length,
163        })
164    }
165}
166
167/// Fence pointer in the top-level index (Level 1)
168///
169/// Points to a position in the block index. Used to quickly narrow
170/// down which section of the block index to load.
171#[derive(Debug, Clone, Copy)]
172pub struct FencePointer {
173    /// First key at this fence position
174    pub key: TemporalKey,
175    /// Offset within the block index section
176    pub block_index_offset: u64,
177}
178
179impl FencePointer {
180    /// Serialize to bytes
181    pub fn to_bytes(&self) -> [u8; FENCE_POINTER_SIZE] {
182        let mut buf = [0u8; FENCE_POINTER_SIZE];
183        let mut cursor = Cursor::new(&mut buf[..]);
184
185        cursor
186            .write_u64::<LittleEndian>(self.key.timestamp_us)
187            .unwrap();
188        cursor.write_u128::<LittleEndian>(self.key.edge_id).unwrap();
189        cursor
190            .write_u64::<LittleEndian>(self.block_index_offset)
191            .unwrap();
192
193        buf
194    }
195
196    /// Deserialize from bytes
197    pub fn from_bytes(bytes: &[u8]) -> std::io::Result<Self> {
198        if bytes.len() < FENCE_POINTER_SIZE {
199            return Err(std::io::Error::new(
200                std::io::ErrorKind::InvalidData,
201                "Fence pointer too short",
202            ));
203        }
204
205        let mut cursor = Cursor::new(bytes);
206
207        let timestamp_us = cursor.read_u64::<LittleEndian>()?;
208        let edge_id = cursor.read_u128::<LittleEndian>()?;
209        let block_index_offset = cursor.read_u64::<LittleEndian>()?;
210
211        Ok(Self {
212            key: TemporalKey::new(timestamp_us, edge_id),
213            block_index_offset,
214        })
215    }
216}
217
218/// Two-level index for efficient lookups with minimal memory usage
219///
220/// The top-level fence pointers are always in memory (~1KB per GB of data).
221/// Block index entries are loaded on-demand.
222#[derive(Debug)]
223pub struct TwoLevelIndex {
224    /// Top-level fence pointers (always in memory)
225    pub fence_pointers: Vec<FencePointer>,
226
227    /// Total number of blocks in the SSTable
228    pub total_blocks: u32,
229
230    /// Offset of the block index section in the file
231    pub block_index_offset: u64,
232
233    /// Length of the block index section
234    pub block_index_length: u64,
235}
236
237impl TwoLevelIndex {
238    /// Create a new two-level index from block entries
239    ///
240    /// This is called during SSTable creation to build the index structure.
241    pub fn build(blocks: &[BlockIndexEntry], block_index_offset: u64) -> Self {
242        let mut fence_pointers = Vec::new();
243        let mut current_offset = 0u64;
244
245        // Create fence pointers at regular intervals
246        for (i, block) in blocks.iter().enumerate() {
247            let entry_offset = (i * BLOCK_INDEX_ENTRY_SIZE) as u64;
248
249            // Add fence pointer at start and every FENCE_INTERVAL_BYTES
250            if i == 0 || (entry_offset - current_offset) >= FENCE_INTERVAL_BYTES as u64 {
251                fence_pointers.push(FencePointer {
252                    key: block.min_key,
253                    block_index_offset: entry_offset,
254                });
255                current_offset = entry_offset;
256            }
257        }
258
259        let block_index_length = (blocks.len() * BLOCK_INDEX_ENTRY_SIZE) as u64;
260
261        Self {
262            fence_pointers,
263            total_blocks: blocks.len() as u32,
264            block_index_offset,
265            block_index_length,
266        }
267    }
268
269    /// Find the fence pointer range that may contain the target key
270    ///
271    /// Returns (start_offset, end_offset) within the block index section.
272    pub fn find_fence_range(&self, key: &TemporalKey) -> (u64, u64) {
273        if self.fence_pointers.is_empty() {
274            return (0, self.block_index_length);
275        }
276
277        // Binary search for the fence pointer
278        let idx = match self.fence_pointers.binary_search_by(|fp| fp.key.cmp(key)) {
279            Ok(i) => i, // Exact match
280            Err(i) => {
281                if i == 0 {
282                    0
283                } else {
284                    i - 1 // Key is between fence_pointers[i-1] and fence_pointers[i]
285                }
286            }
287        };
288
289        let start_offset = self.fence_pointers[idx].block_index_offset;
290        let end_offset = if idx + 1 < self.fence_pointers.len() {
291            self.fence_pointers[idx + 1].block_index_offset
292        } else {
293            self.block_index_length
294        };
295
296        (start_offset, end_offset)
297    }
298
299    /// Find fence pointer range for a timestamp range query
300    ///
301    /// Returns (start_offset, end_offset) within the block index section.
302    pub fn find_fence_range_for_timestamps(&self, start_ts: u64, end_ts: u64) -> (u64, u64) {
303        if self.fence_pointers.is_empty() {
304            return (0, self.block_index_length);
305        }
306
307        // Find first fence that could contain start_ts
308        let start_key = TemporalKey::new(start_ts, 0);
309        let start_idx = match self
310            .fence_pointers
311            .binary_search_by(|fp| fp.key.cmp(&start_key))
312        {
313            Ok(i) => i,
314            Err(i) => {
315                if i == 0 {
316                    0
317                } else {
318                    i - 1
319                }
320            }
321        };
322
323        // Find last fence that could contain end_ts
324        let end_key = TemporalKey::new(end_ts, u128::MAX);
325        let end_idx = match self
326            .fence_pointers
327            .binary_search_by(|fp| fp.key.cmp(&end_key))
328        {
329            Ok(i) => i + 1,
330            Err(i) => i,
331        };
332
333        let start_offset = self.fence_pointers[start_idx].block_index_offset;
334        let end_offset = if end_idx < self.fence_pointers.len() {
335            self.fence_pointers[end_idx].block_index_offset
336        } else {
337            self.block_index_length
338        };
339
340        (start_offset, end_offset)
341    }
342
343    /// Serialize the fence pointers to bytes (stored in footer)
344    pub fn fence_pointers_to_bytes(&self) -> Vec<u8> {
345        let mut buf = Vec::with_capacity(self.fence_pointers.len() * FENCE_POINTER_SIZE + 8);
346
347        // Write count
348        buf.write_u32::<LittleEndian>(self.fence_pointers.len() as u32)
349            .unwrap();
350        buf.write_u32::<LittleEndian>(self.total_blocks).unwrap();
351
352        // Write fence pointers
353        for fp in &self.fence_pointers {
354            buf.extend_from_slice(&fp.to_bytes());
355        }
356
357        buf
358    }
359
360    /// Deserialize fence pointers from bytes
361    pub fn fence_pointers_from_bytes(
362        bytes: &[u8],
363        block_index_offset: u64,
364        block_index_length: u64,
365    ) -> std::io::Result<Self> {
366        if bytes.len() < 8 {
367            return Err(std::io::Error::new(
368                std::io::ErrorKind::InvalidData,
369                "Fence pointer section too short",
370            ));
371        }
372
373        let mut cursor = Cursor::new(bytes);
374        let count = cursor.read_u32::<LittleEndian>()? as usize;
375        let total_blocks = cursor.read_u32::<LittleEndian>()?;
376
377        let expected_size = 8 + count * FENCE_POINTER_SIZE;
378        if bytes.len() < expected_size {
379            return Err(std::io::Error::new(
380                std::io::ErrorKind::InvalidData,
381                format!(
382                    "Fence pointer section too short: {} < {}",
383                    bytes.len(),
384                    expected_size
385                ),
386            ));
387        }
388
389        let mut fence_pointers = Vec::with_capacity(count);
390        for _ in 0..count {
391            let mut buf = [0u8; FENCE_POINTER_SIZE];
392            cursor.read_exact(&mut buf)?;
393            fence_pointers.push(FencePointer::from_bytes(&buf)?);
394        }
395
396        Ok(Self {
397            fence_pointers,
398            total_blocks,
399            block_index_offset,
400            block_index_length,
401        })
402    }
403
404    /// Estimate memory usage of this index in bytes
405    pub fn memory_usage(&self) -> usize {
406        std::mem::size_of::<Self>()
407            + self.fence_pointers.len() * std::mem::size_of::<FencePointer>()
408    }
409
410    /// Get the number of fence pointers
411    pub fn fence_count(&self) -> usize {
412        self.fence_pointers.len()
413    }
414}
415
416/// Block index reader for loading block index entries on-demand
417pub struct BlockIndexReader<'a> {
418    /// Reference to the mmap'd block index section
419    data: &'a [u8],
420}
421
422impl<'a> BlockIndexReader<'a> {
423    /// Create a new block index reader from the block index section
424    pub fn new(data: &'a [u8]) -> Self {
425        Self { data }
426    }
427
428    /// Read a single block index entry at the given offset
429    pub fn read_entry(&self, offset: usize) -> std::io::Result<BlockIndexEntry> {
430        if offset + BLOCK_INDEX_ENTRY_SIZE > self.data.len() {
431            return Err(std::io::Error::new(
432                std::io::ErrorKind::InvalidData,
433                "Block index offset out of bounds",
434            ));
435        }
436
437        BlockIndexEntry::from_bytes(&self.data[offset..offset + BLOCK_INDEX_ENTRY_SIZE])
438    }
439
440    /// Read a range of block index entries
441    pub fn read_range(&self, start: usize, end: usize) -> std::io::Result<Vec<BlockIndexEntry>> {
442        let start = start.min(self.data.len());
443        let end = end.min(self.data.len());
444
445        let mut entries = Vec::new();
446        let mut offset = start;
447
448        while offset + BLOCK_INDEX_ENTRY_SIZE <= end {
449            entries.push(self.read_entry(offset)?);
450            offset += BLOCK_INDEX_ENTRY_SIZE;
451        }
452
453        Ok(entries)
454    }
455
456    /// Binary search for a key in a range of block index entries
457    pub fn find_block_for_key(
458        &self,
459        key: &TemporalKey,
460        start_offset: usize,
461        end_offset: usize,
462    ) -> std::io::Result<Option<BlockIndexEntry>> {
463        let entries = self.read_range(start_offset, end_offset)?;
464
465        // Binary search for the block containing this key
466        let idx = entries.partition_point(|e| e.max_key < *key);
467
468        if idx < entries.len() && entries[idx].contains_key(key) {
469            Ok(Some(entries[idx]))
470        } else {
471            Ok(None)
472        }
473    }
474
475    /// Find all blocks that overlap with a timestamp range
476    pub fn find_blocks_for_range(
477        &self,
478        start_ts: u64,
479        end_ts: u64,
480        start_offset: usize,
481        end_offset: usize,
482    ) -> std::io::Result<Vec<BlockIndexEntry>> {
483        let entries = self.read_range(start_offset, end_offset)?;
484
485        Ok(entries
486            .into_iter()
487            .filter(|e| e.overlaps_range(start_ts, end_ts))
488            .collect())
489    }
490
491    /// Get total number of entries in this reader's range
492    pub fn entry_count(&self) -> usize {
493        self.data.len() / BLOCK_INDEX_ENTRY_SIZE
494    }
495}
496
497#[cfg(test)]
498mod tests {
499    use super::*;
500
501    fn create_test_blocks(count: usize) -> Vec<BlockIndexEntry> {
502        (0..count)
503            .map(|i| BlockIndexEntry {
504                min_key: TemporalKey::new(i as u64 * 1000, i as u128),
505                max_key: TemporalKey::new((i + 1) as u64 * 1000 - 1, i as u128),
506                offset: (i * DATA_BLOCK_SIZE) as u64,
507                length: DATA_BLOCK_SIZE as u32,
508            })
509            .collect()
510    }
511
512    #[test]
513    fn test_temporal_key_ordering() {
514        let k1 = TemporalKey::new(100, 1);
515        let k2 = TemporalKey::new(100, 2);
516        let k3 = TemporalKey::new(200, 1);
517
518        assert!(k1 < k2); // Same timestamp, lower edge_id comes first
519        assert!(k2 < k3); // Lower timestamp comes first
520        assert!(k1 < k3);
521    }
522
523    #[test]
524    fn test_block_index_entry_serialization() {
525        let entry = BlockIndexEntry {
526            min_key: TemporalKey::new(1000, 42),
527            max_key: TemporalKey::new(2000, 100),
528            offset: 65536,
529            length: 64000,
530        };
531
532        let bytes = entry.to_bytes();
533        let restored = BlockIndexEntry::from_bytes(&bytes).unwrap();
534
535        assert_eq!(restored.min_key, entry.min_key);
536        assert_eq!(restored.max_key, entry.max_key);
537        assert_eq!(restored.offset, entry.offset);
538        assert_eq!(restored.length, entry.length);
539    }
540
541    #[test]
542    fn test_fence_pointer_serialization() {
543        let fp = FencePointer {
544            key: TemporalKey::new(5000, 123),
545            block_index_offset: 1024 * 1024,
546        };
547
548        let bytes = fp.to_bytes();
549        let restored = FencePointer::from_bytes(&bytes).unwrap();
550
551        assert_eq!(restored.key, fp.key);
552        assert_eq!(restored.block_index_offset, fp.block_index_offset);
553    }
554
555    #[test]
556    fn test_two_level_index_build() {
557        let blocks = create_test_blocks(100);
558        let index = TwoLevelIndex::build(&blocks, 0);
559
560        // Should have at least one fence pointer (at start)
561        assert!(!index.fence_pointers.is_empty());
562        assert_eq!(index.total_blocks, 100);
563
564        // First fence should point to start
565        assert_eq!(index.fence_pointers[0].block_index_offset, 0);
566    }
567
568    #[test]
569    fn test_two_level_index_fence_range() {
570        let blocks = create_test_blocks(100);
571        let index = TwoLevelIndex::build(&blocks, 0);
572
573        // Key in first block
574        let key = TemporalKey::new(500, 0);
575        let (start, end) = index.find_fence_range(&key);
576        assert_eq!(start, 0);
577        assert!(end > start);
578
579        // Key at end
580        let key = TemporalKey::new(99000, 99);
581        let (start, end) = index.find_fence_range(&key);
582        assert!(start < end);
583        assert_eq!(end, index.block_index_length);
584    }
585
586    #[test]
587    fn test_two_level_index_serialization() {
588        let blocks = create_test_blocks(50);
589        let index = TwoLevelIndex::build(&blocks, 1024);
590
591        let bytes = index.fence_pointers_to_bytes();
592        let restored =
593            TwoLevelIndex::fence_pointers_from_bytes(&bytes, 1024, index.block_index_length)
594                .unwrap();
595
596        assert_eq!(restored.fence_pointers.len(), index.fence_pointers.len());
597        assert_eq!(restored.total_blocks, index.total_blocks);
598    }
599
600    #[test]
601    fn test_block_index_reader() {
602        let blocks = create_test_blocks(10);
603
604        // Serialize blocks
605        let mut data = Vec::new();
606        for block in &blocks {
607            data.extend_from_slice(&block.to_bytes());
608        }
609
610        let reader = BlockIndexReader::new(&data);
611
612        // Read first entry
613        let entry = reader.read_entry(0).unwrap();
614        assert_eq!(entry.min_key, blocks[0].min_key);
615
616        // Read range
617        let range = reader.read_range(0, data.len()).unwrap();
618        assert_eq!(range.len(), 10);
619    }
620
621    #[test]
622    fn test_block_index_find_block_for_key() {
623        let blocks = create_test_blocks(10);
624
625        let mut data = Vec::new();
626        for block in &blocks {
627            data.extend_from_slice(&block.to_bytes());
628        }
629
630        let reader = BlockIndexReader::new(&data);
631
632        // Find key in block 5
633        let key = TemporalKey::new(5500, 5);
634        let found = reader.find_block_for_key(&key, 0, data.len()).unwrap();
635
636        assert!(found.is_some());
637        let block = found.unwrap();
638        assert!(block.contains_key(&key));
639    }
640
641    #[test]
642    fn test_block_index_find_blocks_for_range() {
643        let blocks = create_test_blocks(10);
644
645        let mut data = Vec::new();
646        for block in &blocks {
647            data.extend_from_slice(&block.to_bytes());
648        }
649
650        let reader = BlockIndexReader::new(&data);
651
652        // Find blocks overlapping timestamp range 2500-4500 (should include blocks 2,3,4)
653        let found = reader
654            .find_blocks_for_range(2500, 4500, 0, data.len())
655            .unwrap();
656
657        assert!(found.len() >= 2); // At least blocks containing these timestamps
658    }
659
660    #[test]
661    fn test_memory_usage() {
662        let blocks = create_test_blocks(1000);
663        let index = TwoLevelIndex::build(&blocks, 0);
664
665        let memory = index.memory_usage();
666
667        // Should be much less than full block index
668        let full_index_size = blocks.len() * BLOCK_INDEX_ENTRY_SIZE;
669        assert!(memory < full_index_size / 10); // At least 10x reduction
670    }
671
672    #[test]
673    fn test_block_contains_key() {
674        let block = BlockIndexEntry {
675            min_key: TemporalKey::new(1000, 0),
676            max_key: TemporalKey::new(2000, 100),
677            offset: 0,
678            length: 64000,
679        };
680
681        assert!(block.contains_key(&TemporalKey::new(1500, 50)));
682        assert!(block.contains_key(&TemporalKey::new(1000, 0))); // Min edge
683        assert!(block.contains_key(&TemporalKey::new(2000, 100))); // Max edge
684        assert!(!block.contains_key(&TemporalKey::new(999, 0))); // Below range
685        assert!(!block.contains_key(&TemporalKey::new(2001, 0))); // Above range
686    }
687
688    #[test]
689    fn test_block_overlaps_range() {
690        let block = BlockIndexEntry {
691            min_key: TemporalKey::new(1000, 0),
692            max_key: TemporalKey::new(2000, 100),
693            offset: 0,
694            length: 64000,
695        };
696
697        assert!(block.overlaps_range(500, 1500)); // Overlaps start
698        assert!(block.overlaps_range(1500, 2500)); // Overlaps end
699        assert!(block.overlaps_range(1200, 1800)); // Fully inside
700        assert!(block.overlaps_range(500, 2500)); // Fully outside
701        assert!(!block.overlaps_range(100, 500)); // Before
702        assert!(!block.overlaps_range(2500, 3000)); // After
703    }
704}