1use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
45use std::io::{Cursor, Read};
46
47pub const DATA_BLOCK_SIZE: usize = 64 * 1024;
49
50pub const FENCE_INTERVAL_BYTES: usize = 1024 * 1024;
52
53pub const BLOCK_INDEX_ENTRY_SIZE: usize = 60;
55
56pub const FENCE_POINTER_SIZE: usize = 32;
58
59#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
61pub struct TemporalKey {
62 pub timestamp_us: u64,
63 pub edge_id: u128,
64}
65
66impl TemporalKey {
67 pub fn new(timestamp_us: u64, edge_id: u128) -> Self {
68 Self {
69 timestamp_us,
70 edge_id,
71 }
72 }
73
74 pub fn min() -> Self {
76 Self {
77 timestamp_us: 0,
78 edge_id: 0,
79 }
80 }
81
82 pub fn max() -> Self {
84 Self {
85 timestamp_us: u64::MAX,
86 edge_id: u128::MAX,
87 }
88 }
89}
90
91#[derive(Debug, Clone, Copy)]
95pub struct BlockIndexEntry {
96 pub min_key: TemporalKey,
98 pub max_key: TemporalKey,
100 pub offset: u64,
102 pub length: u32,
104}
105
106impl BlockIndexEntry {
107 pub fn contains_key(&self, key: &TemporalKey) -> bool {
109 *key >= self.min_key && *key <= self.max_key
110 }
111
112 pub fn overlaps_range(&self, start_ts: u64, end_ts: u64) -> bool {
114 self.max_key.timestamp_us >= start_ts && self.min_key.timestamp_us <= end_ts
115 }
116
117 pub fn to_bytes(&self) -> [u8; BLOCK_INDEX_ENTRY_SIZE] {
119 let mut buf = [0u8; BLOCK_INDEX_ENTRY_SIZE];
120 let mut cursor = Cursor::new(&mut buf[..]);
121
122 cursor
123 .write_u64::<LittleEndian>(self.min_key.timestamp_us)
124 .unwrap();
125 cursor
126 .write_u128::<LittleEndian>(self.min_key.edge_id)
127 .unwrap();
128 cursor
129 .write_u64::<LittleEndian>(self.max_key.timestamp_us)
130 .unwrap();
131 cursor
132 .write_u128::<LittleEndian>(self.max_key.edge_id)
133 .unwrap();
134 cursor.write_u64::<LittleEndian>(self.offset).unwrap();
135 cursor.write_u32::<LittleEndian>(self.length).unwrap();
136
137 buf
138 }
139
140 pub fn from_bytes(bytes: &[u8]) -> std::io::Result<Self> {
142 if bytes.len() < BLOCK_INDEX_ENTRY_SIZE {
143 return Err(std::io::Error::new(
144 std::io::ErrorKind::InvalidData,
145 "Block index entry too short",
146 ));
147 }
148
149 let mut cursor = Cursor::new(bytes);
150
151 let min_timestamp_us = cursor.read_u64::<LittleEndian>()?;
152 let min_edge_id = cursor.read_u128::<LittleEndian>()?;
153 let max_timestamp_us = cursor.read_u64::<LittleEndian>()?;
154 let max_edge_id = cursor.read_u128::<LittleEndian>()?;
155 let offset = cursor.read_u64::<LittleEndian>()?;
156 let length = cursor.read_u32::<LittleEndian>()?;
157
158 Ok(Self {
159 min_key: TemporalKey::new(min_timestamp_us, min_edge_id),
160 max_key: TemporalKey::new(max_timestamp_us, max_edge_id),
161 offset,
162 length,
163 })
164 }
165}
166
167#[derive(Debug, Clone, Copy)]
172pub struct FencePointer {
173 pub key: TemporalKey,
175 pub block_index_offset: u64,
177}
178
179impl FencePointer {
180 pub fn to_bytes(&self) -> [u8; FENCE_POINTER_SIZE] {
182 let mut buf = [0u8; FENCE_POINTER_SIZE];
183 let mut cursor = Cursor::new(&mut buf[..]);
184
185 cursor
186 .write_u64::<LittleEndian>(self.key.timestamp_us)
187 .unwrap();
188 cursor.write_u128::<LittleEndian>(self.key.edge_id).unwrap();
189 cursor
190 .write_u64::<LittleEndian>(self.block_index_offset)
191 .unwrap();
192
193 buf
194 }
195
196 pub fn from_bytes(bytes: &[u8]) -> std::io::Result<Self> {
198 if bytes.len() < FENCE_POINTER_SIZE {
199 return Err(std::io::Error::new(
200 std::io::ErrorKind::InvalidData,
201 "Fence pointer too short",
202 ));
203 }
204
205 let mut cursor = Cursor::new(bytes);
206
207 let timestamp_us = cursor.read_u64::<LittleEndian>()?;
208 let edge_id = cursor.read_u128::<LittleEndian>()?;
209 let block_index_offset = cursor.read_u64::<LittleEndian>()?;
210
211 Ok(Self {
212 key: TemporalKey::new(timestamp_us, edge_id),
213 block_index_offset,
214 })
215 }
216}
217
218#[derive(Debug)]
223pub struct TwoLevelIndex {
224 pub fence_pointers: Vec<FencePointer>,
226
227 pub total_blocks: u32,
229
230 pub block_index_offset: u64,
232
233 pub block_index_length: u64,
235}
236
237impl TwoLevelIndex {
238 pub fn build(blocks: &[BlockIndexEntry], block_index_offset: u64) -> Self {
242 let mut fence_pointers = Vec::new();
243 let mut current_offset = 0u64;
244
245 for (i, block) in blocks.iter().enumerate() {
247 let entry_offset = (i * BLOCK_INDEX_ENTRY_SIZE) as u64;
248
249 if i == 0 || (entry_offset - current_offset) >= FENCE_INTERVAL_BYTES as u64 {
251 fence_pointers.push(FencePointer {
252 key: block.min_key,
253 block_index_offset: entry_offset,
254 });
255 current_offset = entry_offset;
256 }
257 }
258
259 let block_index_length = (blocks.len() * BLOCK_INDEX_ENTRY_SIZE) as u64;
260
261 Self {
262 fence_pointers,
263 total_blocks: blocks.len() as u32,
264 block_index_offset,
265 block_index_length,
266 }
267 }
268
269 pub fn find_fence_range(&self, key: &TemporalKey) -> (u64, u64) {
273 if self.fence_pointers.is_empty() {
274 return (0, self.block_index_length);
275 }
276
277 let idx = match self.fence_pointers.binary_search_by(|fp| fp.key.cmp(key)) {
279 Ok(i) => i, Err(i) => {
281 if i == 0 {
282 0
283 } else {
284 i - 1 }
286 }
287 };
288
289 let start_offset = self.fence_pointers[idx].block_index_offset;
290 let end_offset = if idx + 1 < self.fence_pointers.len() {
291 self.fence_pointers[idx + 1].block_index_offset
292 } else {
293 self.block_index_length
294 };
295
296 (start_offset, end_offset)
297 }
298
299 pub fn find_fence_range_for_timestamps(&self, start_ts: u64, end_ts: u64) -> (u64, u64) {
303 if self.fence_pointers.is_empty() {
304 return (0, self.block_index_length);
305 }
306
307 let start_key = TemporalKey::new(start_ts, 0);
309 let start_idx = match self
310 .fence_pointers
311 .binary_search_by(|fp| fp.key.cmp(&start_key))
312 {
313 Ok(i) => i,
314 Err(i) => {
315 if i == 0 {
316 0
317 } else {
318 i - 1
319 }
320 }
321 };
322
323 let end_key = TemporalKey::new(end_ts, u128::MAX);
325 let end_idx = match self
326 .fence_pointers
327 .binary_search_by(|fp| fp.key.cmp(&end_key))
328 {
329 Ok(i) => i + 1,
330 Err(i) => i,
331 };
332
333 let start_offset = self.fence_pointers[start_idx].block_index_offset;
334 let end_offset = if end_idx < self.fence_pointers.len() {
335 self.fence_pointers[end_idx].block_index_offset
336 } else {
337 self.block_index_length
338 };
339
340 (start_offset, end_offset)
341 }
342
343 pub fn fence_pointers_to_bytes(&self) -> Vec<u8> {
345 let mut buf = Vec::with_capacity(self.fence_pointers.len() * FENCE_POINTER_SIZE + 8);
346
347 buf.write_u32::<LittleEndian>(self.fence_pointers.len() as u32)
349 .unwrap();
350 buf.write_u32::<LittleEndian>(self.total_blocks).unwrap();
351
352 for fp in &self.fence_pointers {
354 buf.extend_from_slice(&fp.to_bytes());
355 }
356
357 buf
358 }
359
360 pub fn fence_pointers_from_bytes(
362 bytes: &[u8],
363 block_index_offset: u64,
364 block_index_length: u64,
365 ) -> std::io::Result<Self> {
366 if bytes.len() < 8 {
367 return Err(std::io::Error::new(
368 std::io::ErrorKind::InvalidData,
369 "Fence pointer section too short",
370 ));
371 }
372
373 let mut cursor = Cursor::new(bytes);
374 let count = cursor.read_u32::<LittleEndian>()? as usize;
375 let total_blocks = cursor.read_u32::<LittleEndian>()?;
376
377 let expected_size = 8 + count * FENCE_POINTER_SIZE;
378 if bytes.len() < expected_size {
379 return Err(std::io::Error::new(
380 std::io::ErrorKind::InvalidData,
381 format!(
382 "Fence pointer section too short: {} < {}",
383 bytes.len(),
384 expected_size
385 ),
386 ));
387 }
388
389 let mut fence_pointers = Vec::with_capacity(count);
390 for _ in 0..count {
391 let mut buf = [0u8; FENCE_POINTER_SIZE];
392 cursor.read_exact(&mut buf)?;
393 fence_pointers.push(FencePointer::from_bytes(&buf)?);
394 }
395
396 Ok(Self {
397 fence_pointers,
398 total_blocks,
399 block_index_offset,
400 block_index_length,
401 })
402 }
403
404 pub fn memory_usage(&self) -> usize {
406 std::mem::size_of::<Self>()
407 + self.fence_pointers.len() * std::mem::size_of::<FencePointer>()
408 }
409
410 pub fn fence_count(&self) -> usize {
412 self.fence_pointers.len()
413 }
414}
415
416pub struct BlockIndexReader<'a> {
418 data: &'a [u8],
420}
421
422impl<'a> BlockIndexReader<'a> {
423 pub fn new(data: &'a [u8]) -> Self {
425 Self { data }
426 }
427
428 pub fn read_entry(&self, offset: usize) -> std::io::Result<BlockIndexEntry> {
430 if offset + BLOCK_INDEX_ENTRY_SIZE > self.data.len() {
431 return Err(std::io::Error::new(
432 std::io::ErrorKind::InvalidData,
433 "Block index offset out of bounds",
434 ));
435 }
436
437 BlockIndexEntry::from_bytes(&self.data[offset..offset + BLOCK_INDEX_ENTRY_SIZE])
438 }
439
440 pub fn read_range(&self, start: usize, end: usize) -> std::io::Result<Vec<BlockIndexEntry>> {
442 let start = start.min(self.data.len());
443 let end = end.min(self.data.len());
444
445 let mut entries = Vec::new();
446 let mut offset = start;
447
448 while offset + BLOCK_INDEX_ENTRY_SIZE <= end {
449 entries.push(self.read_entry(offset)?);
450 offset += BLOCK_INDEX_ENTRY_SIZE;
451 }
452
453 Ok(entries)
454 }
455
456 pub fn find_block_for_key(
458 &self,
459 key: &TemporalKey,
460 start_offset: usize,
461 end_offset: usize,
462 ) -> std::io::Result<Option<BlockIndexEntry>> {
463 let entries = self.read_range(start_offset, end_offset)?;
464
465 let idx = entries.partition_point(|e| e.max_key < *key);
467
468 if idx < entries.len() && entries[idx].contains_key(key) {
469 Ok(Some(entries[idx]))
470 } else {
471 Ok(None)
472 }
473 }
474
475 pub fn find_blocks_for_range(
477 &self,
478 start_ts: u64,
479 end_ts: u64,
480 start_offset: usize,
481 end_offset: usize,
482 ) -> std::io::Result<Vec<BlockIndexEntry>> {
483 let entries = self.read_range(start_offset, end_offset)?;
484
485 Ok(entries
486 .into_iter()
487 .filter(|e| e.overlaps_range(start_ts, end_ts))
488 .collect())
489 }
490
491 pub fn entry_count(&self) -> usize {
493 self.data.len() / BLOCK_INDEX_ENTRY_SIZE
494 }
495}
496
497#[cfg(test)]
498mod tests {
499 use super::*;
500
501 fn create_test_blocks(count: usize) -> Vec<BlockIndexEntry> {
502 (0..count)
503 .map(|i| BlockIndexEntry {
504 min_key: TemporalKey::new(i as u64 * 1000, i as u128),
505 max_key: TemporalKey::new((i + 1) as u64 * 1000 - 1, i as u128),
506 offset: (i * DATA_BLOCK_SIZE) as u64,
507 length: DATA_BLOCK_SIZE as u32,
508 })
509 .collect()
510 }
511
512 #[test]
513 fn test_temporal_key_ordering() {
514 let k1 = TemporalKey::new(100, 1);
515 let k2 = TemporalKey::new(100, 2);
516 let k3 = TemporalKey::new(200, 1);
517
518 assert!(k1 < k2); assert!(k2 < k3); assert!(k1 < k3);
521 }
522
523 #[test]
524 fn test_block_index_entry_serialization() {
525 let entry = BlockIndexEntry {
526 min_key: TemporalKey::new(1000, 42),
527 max_key: TemporalKey::new(2000, 100),
528 offset: 65536,
529 length: 64000,
530 };
531
532 let bytes = entry.to_bytes();
533 let restored = BlockIndexEntry::from_bytes(&bytes).unwrap();
534
535 assert_eq!(restored.min_key, entry.min_key);
536 assert_eq!(restored.max_key, entry.max_key);
537 assert_eq!(restored.offset, entry.offset);
538 assert_eq!(restored.length, entry.length);
539 }
540
541 #[test]
542 fn test_fence_pointer_serialization() {
543 let fp = FencePointer {
544 key: TemporalKey::new(5000, 123),
545 block_index_offset: 1024 * 1024,
546 };
547
548 let bytes = fp.to_bytes();
549 let restored = FencePointer::from_bytes(&bytes).unwrap();
550
551 assert_eq!(restored.key, fp.key);
552 assert_eq!(restored.block_index_offset, fp.block_index_offset);
553 }
554
555 #[test]
556 fn test_two_level_index_build() {
557 let blocks = create_test_blocks(100);
558 let index = TwoLevelIndex::build(&blocks, 0);
559
560 assert!(!index.fence_pointers.is_empty());
562 assert_eq!(index.total_blocks, 100);
563
564 assert_eq!(index.fence_pointers[0].block_index_offset, 0);
566 }
567
568 #[test]
569 fn test_two_level_index_fence_range() {
570 let blocks = create_test_blocks(100);
571 let index = TwoLevelIndex::build(&blocks, 0);
572
573 let key = TemporalKey::new(500, 0);
575 let (start, end) = index.find_fence_range(&key);
576 assert_eq!(start, 0);
577 assert!(end > start);
578
579 let key = TemporalKey::new(99000, 99);
581 let (start, end) = index.find_fence_range(&key);
582 assert!(start < end);
583 assert_eq!(end, index.block_index_length);
584 }
585
586 #[test]
587 fn test_two_level_index_serialization() {
588 let blocks = create_test_blocks(50);
589 let index = TwoLevelIndex::build(&blocks, 1024);
590
591 let bytes = index.fence_pointers_to_bytes();
592 let restored =
593 TwoLevelIndex::fence_pointers_from_bytes(&bytes, 1024, index.block_index_length)
594 .unwrap();
595
596 assert_eq!(restored.fence_pointers.len(), index.fence_pointers.len());
597 assert_eq!(restored.total_blocks, index.total_blocks);
598 }
599
600 #[test]
601 fn test_block_index_reader() {
602 let blocks = create_test_blocks(10);
603
604 let mut data = Vec::new();
606 for block in &blocks {
607 data.extend_from_slice(&block.to_bytes());
608 }
609
610 let reader = BlockIndexReader::new(&data);
611
612 let entry = reader.read_entry(0).unwrap();
614 assert_eq!(entry.min_key, blocks[0].min_key);
615
616 let range = reader.read_range(0, data.len()).unwrap();
618 assert_eq!(range.len(), 10);
619 }
620
621 #[test]
622 fn test_block_index_find_block_for_key() {
623 let blocks = create_test_blocks(10);
624
625 let mut data = Vec::new();
626 for block in &blocks {
627 data.extend_from_slice(&block.to_bytes());
628 }
629
630 let reader = BlockIndexReader::new(&data);
631
632 let key = TemporalKey::new(5500, 5);
634 let found = reader.find_block_for_key(&key, 0, data.len()).unwrap();
635
636 assert!(found.is_some());
637 let block = found.unwrap();
638 assert!(block.contains_key(&key));
639 }
640
641 #[test]
642 fn test_block_index_find_blocks_for_range() {
643 let blocks = create_test_blocks(10);
644
645 let mut data = Vec::new();
646 for block in &blocks {
647 data.extend_from_slice(&block.to_bytes());
648 }
649
650 let reader = BlockIndexReader::new(&data);
651
652 let found = reader
654 .find_blocks_for_range(2500, 4500, 0, data.len())
655 .unwrap();
656
657 assert!(found.len() >= 2); }
659
660 #[test]
661 fn test_memory_usage() {
662 let blocks = create_test_blocks(1000);
663 let index = TwoLevelIndex::build(&blocks, 0);
664
665 let memory = index.memory_usage();
666
667 let full_index_size = blocks.len() * BLOCK_INDEX_ENTRY_SIZE;
669 assert!(memory < full_index_size / 10); }
671
672 #[test]
673 fn test_block_contains_key() {
674 let block = BlockIndexEntry {
675 min_key: TemporalKey::new(1000, 0),
676 max_key: TemporalKey::new(2000, 100),
677 offset: 0,
678 length: 64000,
679 };
680
681 assert!(block.contains_key(&TemporalKey::new(1500, 50)));
682 assert!(block.contains_key(&TemporalKey::new(1000, 0))); assert!(block.contains_key(&TemporalKey::new(2000, 100))); assert!(!block.contains_key(&TemporalKey::new(999, 0))); assert!(!block.contains_key(&TemporalKey::new(2001, 0))); }
687
688 #[test]
689 fn test_block_overlaps_range() {
690 let block = BlockIndexEntry {
691 min_key: TemporalKey::new(1000, 0),
692 max_key: TemporalKey::new(2000, 100),
693 offset: 0,
694 length: 64000,
695 };
696
697 assert!(block.overlaps_range(500, 1500)); assert!(block.overlaps_range(1500, 2500)); assert!(block.overlaps_range(1200, 1800)); assert!(block.overlaps_range(500, 2500)); assert!(!block.overlaps_range(100, 500)); assert!(!block.overlaps_range(2500, 3000)); }
704}