1use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
48use std::io::{Cursor, Read};
49
50pub const DATA_BLOCK_SIZE: usize = 64 * 1024;
52
53pub const FENCE_INTERVAL_BYTES: usize = 1024 * 1024;
55
56pub const BLOCK_INDEX_ENTRY_SIZE: usize = 60;
58
59pub const FENCE_POINTER_SIZE: usize = 32;
61
62#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
64pub struct TemporalKey {
65 pub timestamp_us: u64,
66 pub edge_id: u128,
67}
68
69impl TemporalKey {
70 pub fn new(timestamp_us: u64, edge_id: u128) -> Self {
71 Self {
72 timestamp_us,
73 edge_id,
74 }
75 }
76
77 pub fn min() -> Self {
79 Self {
80 timestamp_us: 0,
81 edge_id: 0,
82 }
83 }
84
85 pub fn max() -> Self {
87 Self {
88 timestamp_us: u64::MAX,
89 edge_id: u128::MAX,
90 }
91 }
92}
93
94#[derive(Debug, Clone, Copy)]
98pub struct BlockIndexEntry {
99 pub min_key: TemporalKey,
101 pub max_key: TemporalKey,
103 pub offset: u64,
105 pub length: u32,
107}
108
109impl BlockIndexEntry {
110 pub fn contains_key(&self, key: &TemporalKey) -> bool {
112 *key >= self.min_key && *key <= self.max_key
113 }
114
115 pub fn overlaps_range(&self, start_ts: u64, end_ts: u64) -> bool {
117 self.max_key.timestamp_us >= start_ts && self.min_key.timestamp_us <= end_ts
118 }
119
120 pub fn to_bytes(&self) -> [u8; BLOCK_INDEX_ENTRY_SIZE] {
122 let mut buf = [0u8; BLOCK_INDEX_ENTRY_SIZE];
123 let mut cursor = Cursor::new(&mut buf[..]);
124
125 cursor
126 .write_u64::<LittleEndian>(self.min_key.timestamp_us)
127 .unwrap();
128 cursor
129 .write_u128::<LittleEndian>(self.min_key.edge_id)
130 .unwrap();
131 cursor
132 .write_u64::<LittleEndian>(self.max_key.timestamp_us)
133 .unwrap();
134 cursor
135 .write_u128::<LittleEndian>(self.max_key.edge_id)
136 .unwrap();
137 cursor.write_u64::<LittleEndian>(self.offset).unwrap();
138 cursor.write_u32::<LittleEndian>(self.length).unwrap();
139
140 buf
141 }
142
143 pub fn from_bytes(bytes: &[u8]) -> std::io::Result<Self> {
145 if bytes.len() < BLOCK_INDEX_ENTRY_SIZE {
146 return Err(std::io::Error::new(
147 std::io::ErrorKind::InvalidData,
148 "Block index entry too short",
149 ));
150 }
151
152 let mut cursor = Cursor::new(bytes);
153
154 let min_timestamp_us = cursor.read_u64::<LittleEndian>()?;
155 let min_edge_id = cursor.read_u128::<LittleEndian>()?;
156 let max_timestamp_us = cursor.read_u64::<LittleEndian>()?;
157 let max_edge_id = cursor.read_u128::<LittleEndian>()?;
158 let offset = cursor.read_u64::<LittleEndian>()?;
159 let length = cursor.read_u32::<LittleEndian>()?;
160
161 Ok(Self {
162 min_key: TemporalKey::new(min_timestamp_us, min_edge_id),
163 max_key: TemporalKey::new(max_timestamp_us, max_edge_id),
164 offset,
165 length,
166 })
167 }
168}
169
170#[derive(Debug, Clone, Copy)]
175pub struct FencePointer {
176 pub key: TemporalKey,
178 pub block_index_offset: u64,
180}
181
182impl FencePointer {
183 pub fn to_bytes(&self) -> [u8; FENCE_POINTER_SIZE] {
185 let mut buf = [0u8; FENCE_POINTER_SIZE];
186 let mut cursor = Cursor::new(&mut buf[..]);
187
188 cursor
189 .write_u64::<LittleEndian>(self.key.timestamp_us)
190 .unwrap();
191 cursor.write_u128::<LittleEndian>(self.key.edge_id).unwrap();
192 cursor
193 .write_u64::<LittleEndian>(self.block_index_offset)
194 .unwrap();
195
196 buf
197 }
198
199 pub fn from_bytes(bytes: &[u8]) -> std::io::Result<Self> {
201 if bytes.len() < FENCE_POINTER_SIZE {
202 return Err(std::io::Error::new(
203 std::io::ErrorKind::InvalidData,
204 "Fence pointer too short",
205 ));
206 }
207
208 let mut cursor = Cursor::new(bytes);
209
210 let timestamp_us = cursor.read_u64::<LittleEndian>()?;
211 let edge_id = cursor.read_u128::<LittleEndian>()?;
212 let block_index_offset = cursor.read_u64::<LittleEndian>()?;
213
214 Ok(Self {
215 key: TemporalKey::new(timestamp_us, edge_id),
216 block_index_offset,
217 })
218 }
219}
220
221#[derive(Debug)]
226pub struct TwoLevelIndex {
227 pub fence_pointers: Vec<FencePointer>,
229
230 pub total_blocks: u32,
232
233 pub block_index_offset: u64,
235
236 pub block_index_length: u64,
238}
239
240impl TwoLevelIndex {
241 pub fn build(blocks: &[BlockIndexEntry], block_index_offset: u64) -> Self {
245 let mut fence_pointers = Vec::new();
246 let mut current_offset = 0u64;
247
248 for (i, block) in blocks.iter().enumerate() {
250 let entry_offset = (i * BLOCK_INDEX_ENTRY_SIZE) as u64;
251
252 if i == 0 || (entry_offset - current_offset) >= FENCE_INTERVAL_BYTES as u64 {
254 fence_pointers.push(FencePointer {
255 key: block.min_key,
256 block_index_offset: entry_offset,
257 });
258 current_offset = entry_offset;
259 }
260 }
261
262 let block_index_length = (blocks.len() * BLOCK_INDEX_ENTRY_SIZE) as u64;
263
264 Self {
265 fence_pointers,
266 total_blocks: blocks.len() as u32,
267 block_index_offset,
268 block_index_length,
269 }
270 }
271
272 pub fn find_fence_range(&self, key: &TemporalKey) -> (u64, u64) {
276 if self.fence_pointers.is_empty() {
277 return (0, self.block_index_length);
278 }
279
280 let idx = match self.fence_pointers.binary_search_by(|fp| fp.key.cmp(key)) {
282 Ok(i) => i, Err(i) => {
284 if i == 0 {
285 0
286 } else {
287 i - 1 }
289 }
290 };
291
292 let start_offset = self.fence_pointers[idx].block_index_offset;
293 let end_offset = if idx + 1 < self.fence_pointers.len() {
294 self.fence_pointers[idx + 1].block_index_offset
295 } else {
296 self.block_index_length
297 };
298
299 (start_offset, end_offset)
300 }
301
302 pub fn find_fence_range_for_timestamps(&self, start_ts: u64, end_ts: u64) -> (u64, u64) {
306 if self.fence_pointers.is_empty() {
307 return (0, self.block_index_length);
308 }
309
310 let start_key = TemporalKey::new(start_ts, 0);
312 let start_idx = match self
313 .fence_pointers
314 .binary_search_by(|fp| fp.key.cmp(&start_key))
315 {
316 Ok(i) => i,
317 Err(i) => {
318 if i == 0 {
319 0
320 } else {
321 i - 1
322 }
323 }
324 };
325
326 let end_key = TemporalKey::new(end_ts, u128::MAX);
328 let end_idx = match self
329 .fence_pointers
330 .binary_search_by(|fp| fp.key.cmp(&end_key))
331 {
332 Ok(i) => i + 1,
333 Err(i) => i,
334 };
335
336 let start_offset = self.fence_pointers[start_idx].block_index_offset;
337 let end_offset = if end_idx < self.fence_pointers.len() {
338 self.fence_pointers[end_idx].block_index_offset
339 } else {
340 self.block_index_length
341 };
342
343 (start_offset, end_offset)
344 }
345
346 pub fn fence_pointers_to_bytes(&self) -> Vec<u8> {
348 let mut buf = Vec::with_capacity(self.fence_pointers.len() * FENCE_POINTER_SIZE + 8);
349
350 buf.write_u32::<LittleEndian>(self.fence_pointers.len() as u32)
352 .unwrap();
353 buf.write_u32::<LittleEndian>(self.total_blocks).unwrap();
354
355 for fp in &self.fence_pointers {
357 buf.extend_from_slice(&fp.to_bytes());
358 }
359
360 buf
361 }
362
363 pub fn fence_pointers_from_bytes(
365 bytes: &[u8],
366 block_index_offset: u64,
367 block_index_length: u64,
368 ) -> std::io::Result<Self> {
369 if bytes.len() < 8 {
370 return Err(std::io::Error::new(
371 std::io::ErrorKind::InvalidData,
372 "Fence pointer section too short",
373 ));
374 }
375
376 let mut cursor = Cursor::new(bytes);
377 let count = cursor.read_u32::<LittleEndian>()? as usize;
378 let total_blocks = cursor.read_u32::<LittleEndian>()?;
379
380 let expected_size = 8 + count * FENCE_POINTER_SIZE;
381 if bytes.len() < expected_size {
382 return Err(std::io::Error::new(
383 std::io::ErrorKind::InvalidData,
384 format!(
385 "Fence pointer section too short: {} < {}",
386 bytes.len(),
387 expected_size
388 ),
389 ));
390 }
391
392 let mut fence_pointers = Vec::with_capacity(count);
393 for _ in 0..count {
394 let mut buf = [0u8; FENCE_POINTER_SIZE];
395 cursor.read_exact(&mut buf)?;
396 fence_pointers.push(FencePointer::from_bytes(&buf)?);
397 }
398
399 Ok(Self {
400 fence_pointers,
401 total_blocks,
402 block_index_offset,
403 block_index_length,
404 })
405 }
406
407 pub fn memory_usage(&self) -> usize {
409 std::mem::size_of::<Self>()
410 + self.fence_pointers.len() * std::mem::size_of::<FencePointer>()
411 }
412
413 pub fn fence_count(&self) -> usize {
415 self.fence_pointers.len()
416 }
417}
418
419pub struct BlockIndexReader<'a> {
421 data: &'a [u8],
423}
424
425impl<'a> BlockIndexReader<'a> {
426 pub fn new(data: &'a [u8]) -> Self {
428 Self { data }
429 }
430
431 pub fn read_entry(&self, offset: usize) -> std::io::Result<BlockIndexEntry> {
433 if offset + BLOCK_INDEX_ENTRY_SIZE > self.data.len() {
434 return Err(std::io::Error::new(
435 std::io::ErrorKind::InvalidData,
436 "Block index offset out of bounds",
437 ));
438 }
439
440 BlockIndexEntry::from_bytes(&self.data[offset..offset + BLOCK_INDEX_ENTRY_SIZE])
441 }
442
443 pub fn read_range(&self, start: usize, end: usize) -> std::io::Result<Vec<BlockIndexEntry>> {
445 let start = start.min(self.data.len());
446 let end = end.min(self.data.len());
447
448 let mut entries = Vec::new();
449 let mut offset = start;
450
451 while offset + BLOCK_INDEX_ENTRY_SIZE <= end {
452 entries.push(self.read_entry(offset)?);
453 offset += BLOCK_INDEX_ENTRY_SIZE;
454 }
455
456 Ok(entries)
457 }
458
459 pub fn find_block_for_key(
461 &self,
462 key: &TemporalKey,
463 start_offset: usize,
464 end_offset: usize,
465 ) -> std::io::Result<Option<BlockIndexEntry>> {
466 let entries = self.read_range(start_offset, end_offset)?;
467
468 let idx = entries.partition_point(|e| e.max_key < *key);
470
471 if idx < entries.len() && entries[idx].contains_key(key) {
472 Ok(Some(entries[idx]))
473 } else {
474 Ok(None)
475 }
476 }
477
478 pub fn find_blocks_for_range(
480 &self,
481 start_ts: u64,
482 end_ts: u64,
483 start_offset: usize,
484 end_offset: usize,
485 ) -> std::io::Result<Vec<BlockIndexEntry>> {
486 let entries = self.read_range(start_offset, end_offset)?;
487
488 Ok(entries
489 .into_iter()
490 .filter(|e| e.overlaps_range(start_ts, end_ts))
491 .collect())
492 }
493
494 pub fn entry_count(&self) -> usize {
496 self.data.len() / BLOCK_INDEX_ENTRY_SIZE
497 }
498}
499
500#[cfg(test)]
501mod tests {
502 use super::*;
503
504 fn create_test_blocks(count: usize) -> Vec<BlockIndexEntry> {
505 (0..count)
506 .map(|i| BlockIndexEntry {
507 min_key: TemporalKey::new(i as u64 * 1000, i as u128),
508 max_key: TemporalKey::new((i + 1) as u64 * 1000 - 1, i as u128),
509 offset: (i * DATA_BLOCK_SIZE) as u64,
510 length: DATA_BLOCK_SIZE as u32,
511 })
512 .collect()
513 }
514
515 #[test]
516 fn test_temporal_key_ordering() {
517 let k1 = TemporalKey::new(100, 1);
518 let k2 = TemporalKey::new(100, 2);
519 let k3 = TemporalKey::new(200, 1);
520
521 assert!(k1 < k2); assert!(k2 < k3); assert!(k1 < k3);
524 }
525
526 #[test]
527 fn test_block_index_entry_serialization() {
528 let entry = BlockIndexEntry {
529 min_key: TemporalKey::new(1000, 42),
530 max_key: TemporalKey::new(2000, 100),
531 offset: 65536,
532 length: 64000,
533 };
534
535 let bytes = entry.to_bytes();
536 let restored = BlockIndexEntry::from_bytes(&bytes).unwrap();
537
538 assert_eq!(restored.min_key, entry.min_key);
539 assert_eq!(restored.max_key, entry.max_key);
540 assert_eq!(restored.offset, entry.offset);
541 assert_eq!(restored.length, entry.length);
542 }
543
544 #[test]
545 fn test_fence_pointer_serialization() {
546 let fp = FencePointer {
547 key: TemporalKey::new(5000, 123),
548 block_index_offset: 1024 * 1024,
549 };
550
551 let bytes = fp.to_bytes();
552 let restored = FencePointer::from_bytes(&bytes).unwrap();
553
554 assert_eq!(restored.key, fp.key);
555 assert_eq!(restored.block_index_offset, fp.block_index_offset);
556 }
557
558 #[test]
559 fn test_two_level_index_build() {
560 let blocks = create_test_blocks(100);
561 let index = TwoLevelIndex::build(&blocks, 0);
562
563 assert!(!index.fence_pointers.is_empty());
565 assert_eq!(index.total_blocks, 100);
566
567 assert_eq!(index.fence_pointers[0].block_index_offset, 0);
569 }
570
571 #[test]
572 fn test_two_level_index_fence_range() {
573 let blocks = create_test_blocks(100);
574 let index = TwoLevelIndex::build(&blocks, 0);
575
576 let key = TemporalKey::new(500, 0);
578 let (start, end) = index.find_fence_range(&key);
579 assert_eq!(start, 0);
580 assert!(end > start);
581
582 let key = TemporalKey::new(99000, 99);
584 let (start, end) = index.find_fence_range(&key);
585 assert!(start < end);
586 assert_eq!(end, index.block_index_length);
587 }
588
589 #[test]
590 fn test_two_level_index_serialization() {
591 let blocks = create_test_blocks(50);
592 let index = TwoLevelIndex::build(&blocks, 1024);
593
594 let bytes = index.fence_pointers_to_bytes();
595 let restored =
596 TwoLevelIndex::fence_pointers_from_bytes(&bytes, 1024, index.block_index_length)
597 .unwrap();
598
599 assert_eq!(restored.fence_pointers.len(), index.fence_pointers.len());
600 assert_eq!(restored.total_blocks, index.total_blocks);
601 }
602
603 #[test]
604 fn test_block_index_reader() {
605 let blocks = create_test_blocks(10);
606
607 let mut data = Vec::new();
609 for block in &blocks {
610 data.extend_from_slice(&block.to_bytes());
611 }
612
613 let reader = BlockIndexReader::new(&data);
614
615 let entry = reader.read_entry(0).unwrap();
617 assert_eq!(entry.min_key, blocks[0].min_key);
618
619 let range = reader.read_range(0, data.len()).unwrap();
621 assert_eq!(range.len(), 10);
622 }
623
624 #[test]
625 fn test_block_index_find_block_for_key() {
626 let blocks = create_test_blocks(10);
627
628 let mut data = Vec::new();
629 for block in &blocks {
630 data.extend_from_slice(&block.to_bytes());
631 }
632
633 let reader = BlockIndexReader::new(&data);
634
635 let key = TemporalKey::new(5500, 5);
637 let found = reader.find_block_for_key(&key, 0, data.len()).unwrap();
638
639 assert!(found.is_some());
640 let block = found.unwrap();
641 assert!(block.contains_key(&key));
642 }
643
644 #[test]
645 fn test_block_index_find_blocks_for_range() {
646 let blocks = create_test_blocks(10);
647
648 let mut data = Vec::new();
649 for block in &blocks {
650 data.extend_from_slice(&block.to_bytes());
651 }
652
653 let reader = BlockIndexReader::new(&data);
654
655 let found = reader
657 .find_blocks_for_range(2500, 4500, 0, data.len())
658 .unwrap();
659
660 assert!(found.len() >= 2); }
662
663 #[test]
664 fn test_memory_usage() {
665 let blocks = create_test_blocks(1000);
666 let index = TwoLevelIndex::build(&blocks, 0);
667
668 let memory = index.memory_usage();
669
670 let full_index_size = blocks.len() * BLOCK_INDEX_ENTRY_SIZE;
672 assert!(memory < full_index_size / 10); }
674
675 #[test]
676 fn test_block_contains_key() {
677 let block = BlockIndexEntry {
678 min_key: TemporalKey::new(1000, 0),
679 max_key: TemporalKey::new(2000, 100),
680 offset: 0,
681 length: 64000,
682 };
683
684 assert!(block.contains_key(&TemporalKey::new(1500, 50)));
685 assert!(block.contains_key(&TemporalKey::new(1000, 0))); assert!(block.contains_key(&TemporalKey::new(2000, 100))); assert!(!block.contains_key(&TemporalKey::new(999, 0))); assert!(!block.contains_key(&TemporalKey::new(2001, 0))); }
690
691 #[test]
692 fn test_block_overlaps_range() {
693 let block = BlockIndexEntry {
694 min_key: TemporalKey::new(1000, 0),
695 max_key: TemporalKey::new(2000, 100),
696 offset: 0,
697 length: 64000,
698 };
699
700 assert!(block.overlaps_range(500, 1500)); assert!(block.overlaps_range(1500, 2500)); assert!(block.overlaps_range(1200, 1800)); assert!(block.overlaps_range(500, 2500)); assert!(!block.overlaps_range(100, 500)); assert!(!block.overlaps_range(2500, 3000)); }
707}