use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use std::io::{Cursor, Read};
pub const DATA_BLOCK_SIZE: usize = 64 * 1024;
pub const FENCE_INTERVAL_BYTES: usize = 1024 * 1024;
pub const BLOCK_INDEX_ENTRY_SIZE: usize = 60;
pub const FENCE_POINTER_SIZE: usize = 32;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub struct TemporalKey {
pub timestamp_us: u64,
pub edge_id: u128,
}
impl TemporalKey {
pub fn new(timestamp_us: u64, edge_id: u128) -> Self {
Self {
timestamp_us,
edge_id,
}
}
pub fn min() -> Self {
Self {
timestamp_us: 0,
edge_id: 0,
}
}
pub fn max() -> Self {
Self {
timestamp_us: u64::MAX,
edge_id: u128::MAX,
}
}
}
#[derive(Debug, Clone, Copy)]
pub struct BlockIndexEntry {
pub min_key: TemporalKey,
pub max_key: TemporalKey,
pub offset: u64,
pub length: u32,
}
impl BlockIndexEntry {
pub fn contains_key(&self, key: &TemporalKey) -> bool {
*key >= self.min_key && *key <= self.max_key
}
pub fn overlaps_range(&self, start_ts: u64, end_ts: u64) -> bool {
self.max_key.timestamp_us >= start_ts && self.min_key.timestamp_us <= end_ts
}
pub fn to_bytes(&self) -> [u8; BLOCK_INDEX_ENTRY_SIZE] {
let mut buf = [0u8; BLOCK_INDEX_ENTRY_SIZE];
let mut cursor = Cursor::new(&mut buf[..]);
cursor
.write_u64::<LittleEndian>(self.min_key.timestamp_us)
.unwrap();
cursor
.write_u128::<LittleEndian>(self.min_key.edge_id)
.unwrap();
cursor
.write_u64::<LittleEndian>(self.max_key.timestamp_us)
.unwrap();
cursor
.write_u128::<LittleEndian>(self.max_key.edge_id)
.unwrap();
cursor.write_u64::<LittleEndian>(self.offset).unwrap();
cursor.write_u32::<LittleEndian>(self.length).unwrap();
buf
}
pub fn from_bytes(bytes: &[u8]) -> std::io::Result<Self> {
if bytes.len() < BLOCK_INDEX_ENTRY_SIZE {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"Block index entry too short",
));
}
let mut cursor = Cursor::new(bytes);
let min_timestamp_us = cursor.read_u64::<LittleEndian>()?;
let min_edge_id = cursor.read_u128::<LittleEndian>()?;
let max_timestamp_us = cursor.read_u64::<LittleEndian>()?;
let max_edge_id = cursor.read_u128::<LittleEndian>()?;
let offset = cursor.read_u64::<LittleEndian>()?;
let length = cursor.read_u32::<LittleEndian>()?;
Ok(Self {
min_key: TemporalKey::new(min_timestamp_us, min_edge_id),
max_key: TemporalKey::new(max_timestamp_us, max_edge_id),
offset,
length,
})
}
}
#[derive(Debug, Clone, Copy)]
pub struct FencePointer {
pub key: TemporalKey,
pub block_index_offset: u64,
}
impl FencePointer {
pub fn to_bytes(&self) -> [u8; FENCE_POINTER_SIZE] {
let mut buf = [0u8; FENCE_POINTER_SIZE];
let mut cursor = Cursor::new(&mut buf[..]);
cursor
.write_u64::<LittleEndian>(self.key.timestamp_us)
.unwrap();
cursor.write_u128::<LittleEndian>(self.key.edge_id).unwrap();
cursor
.write_u64::<LittleEndian>(self.block_index_offset)
.unwrap();
buf
}
pub fn from_bytes(bytes: &[u8]) -> std::io::Result<Self> {
if bytes.len() < FENCE_POINTER_SIZE {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"Fence pointer too short",
));
}
let mut cursor = Cursor::new(bytes);
let timestamp_us = cursor.read_u64::<LittleEndian>()?;
let edge_id = cursor.read_u128::<LittleEndian>()?;
let block_index_offset = cursor.read_u64::<LittleEndian>()?;
Ok(Self {
key: TemporalKey::new(timestamp_us, edge_id),
block_index_offset,
})
}
}
#[derive(Debug)]
pub struct TwoLevelIndex {
pub fence_pointers: Vec<FencePointer>,
pub total_blocks: u32,
pub block_index_offset: u64,
pub block_index_length: u64,
}
impl TwoLevelIndex {
pub fn build(blocks: &[BlockIndexEntry], block_index_offset: u64) -> Self {
let mut fence_pointers = Vec::new();
let mut current_offset = 0u64;
for (i, block) in blocks.iter().enumerate() {
let entry_offset = (i * BLOCK_INDEX_ENTRY_SIZE) as u64;
if i == 0 || (entry_offset - current_offset) >= FENCE_INTERVAL_BYTES as u64 {
fence_pointers.push(FencePointer {
key: block.min_key,
block_index_offset: entry_offset,
});
current_offset = entry_offset;
}
}
let block_index_length = (blocks.len() * BLOCK_INDEX_ENTRY_SIZE) as u64;
Self {
fence_pointers,
total_blocks: blocks.len() as u32,
block_index_offset,
block_index_length,
}
}
pub fn find_fence_range(&self, key: &TemporalKey) -> (u64, u64) {
if self.fence_pointers.is_empty() {
return (0, self.block_index_length);
}
let idx = match self.fence_pointers.binary_search_by(|fp| fp.key.cmp(key)) {
Ok(i) => i, Err(i) => {
if i == 0 {
0
} else {
i - 1 }
}
};
let start_offset = self.fence_pointers[idx].block_index_offset;
let end_offset = if idx + 1 < self.fence_pointers.len() {
self.fence_pointers[idx + 1].block_index_offset
} else {
self.block_index_length
};
(start_offset, end_offset)
}
pub fn find_fence_range_for_timestamps(&self, start_ts: u64, end_ts: u64) -> (u64, u64) {
if self.fence_pointers.is_empty() {
return (0, self.block_index_length);
}
let start_key = TemporalKey::new(start_ts, 0);
let start_idx = match self
.fence_pointers
.binary_search_by(|fp| fp.key.cmp(&start_key))
{
Ok(i) => i,
Err(i) => {
if i == 0 {
0
} else {
i - 1
}
}
};
let end_key = TemporalKey::new(end_ts, u128::MAX);
let end_idx = match self
.fence_pointers
.binary_search_by(|fp| fp.key.cmp(&end_key))
{
Ok(i) => i + 1,
Err(i) => i,
};
let start_offset = self.fence_pointers[start_idx].block_index_offset;
let end_offset = if end_idx < self.fence_pointers.len() {
self.fence_pointers[end_idx].block_index_offset
} else {
self.block_index_length
};
(start_offset, end_offset)
}
pub fn fence_pointers_to_bytes(&self) -> Vec<u8> {
let mut buf = Vec::with_capacity(self.fence_pointers.len() * FENCE_POINTER_SIZE + 8);
buf.write_u32::<LittleEndian>(self.fence_pointers.len() as u32)
.unwrap();
buf.write_u32::<LittleEndian>(self.total_blocks).unwrap();
for fp in &self.fence_pointers {
buf.extend_from_slice(&fp.to_bytes());
}
buf
}
pub fn fence_pointers_from_bytes(
bytes: &[u8],
block_index_offset: u64,
block_index_length: u64,
) -> std::io::Result<Self> {
if bytes.len() < 8 {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"Fence pointer section too short",
));
}
let mut cursor = Cursor::new(bytes);
let count = cursor.read_u32::<LittleEndian>()? as usize;
let total_blocks = cursor.read_u32::<LittleEndian>()?;
let expected_size = 8 + count * FENCE_POINTER_SIZE;
if bytes.len() < expected_size {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!(
"Fence pointer section too short: {} < {}",
bytes.len(),
expected_size
),
));
}
let mut fence_pointers = Vec::with_capacity(count);
for _ in 0..count {
let mut buf = [0u8; FENCE_POINTER_SIZE];
cursor.read_exact(&mut buf)?;
fence_pointers.push(FencePointer::from_bytes(&buf)?);
}
Ok(Self {
fence_pointers,
total_blocks,
block_index_offset,
block_index_length,
})
}
pub fn memory_usage(&self) -> usize {
std::mem::size_of::<Self>()
+ self.fence_pointers.len() * std::mem::size_of::<FencePointer>()
}
pub fn fence_count(&self) -> usize {
self.fence_pointers.len()
}
}
pub struct BlockIndexReader<'a> {
data: &'a [u8],
}
impl<'a> BlockIndexReader<'a> {
pub fn new(data: &'a [u8]) -> Self {
Self { data }
}
pub fn read_entry(&self, offset: usize) -> std::io::Result<BlockIndexEntry> {
if offset + BLOCK_INDEX_ENTRY_SIZE > self.data.len() {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"Block index offset out of bounds",
));
}
BlockIndexEntry::from_bytes(&self.data[offset..offset + BLOCK_INDEX_ENTRY_SIZE])
}
pub fn read_range(&self, start: usize, end: usize) -> std::io::Result<Vec<BlockIndexEntry>> {
let start = start.min(self.data.len());
let end = end.min(self.data.len());
let mut entries = Vec::new();
let mut offset = start;
while offset + BLOCK_INDEX_ENTRY_SIZE <= end {
entries.push(self.read_entry(offset)?);
offset += BLOCK_INDEX_ENTRY_SIZE;
}
Ok(entries)
}
pub fn find_block_for_key(
&self,
key: &TemporalKey,
start_offset: usize,
end_offset: usize,
) -> std::io::Result<Option<BlockIndexEntry>> {
let entries = self.read_range(start_offset, end_offset)?;
let idx = entries.partition_point(|e| e.max_key < *key);
if idx < entries.len() && entries[idx].contains_key(key) {
Ok(Some(entries[idx]))
} else {
Ok(None)
}
}
pub fn find_blocks_for_range(
&self,
start_ts: u64,
end_ts: u64,
start_offset: usize,
end_offset: usize,
) -> std::io::Result<Vec<BlockIndexEntry>> {
let entries = self.read_range(start_offset, end_offset)?;
Ok(entries
.into_iter()
.filter(|e| e.overlaps_range(start_ts, end_ts))
.collect())
}
pub fn entry_count(&self) -> usize {
self.data.len() / BLOCK_INDEX_ENTRY_SIZE
}
}
#[cfg(test)]
mod tests {
use super::*;
fn create_test_blocks(count: usize) -> Vec<BlockIndexEntry> {
(0..count)
.map(|i| BlockIndexEntry {
min_key: TemporalKey::new(i as u64 * 1000, i as u128),
max_key: TemporalKey::new((i + 1) as u64 * 1000 - 1, i as u128),
offset: (i * DATA_BLOCK_SIZE) as u64,
length: DATA_BLOCK_SIZE as u32,
})
.collect()
}
#[test]
fn test_temporal_key_ordering() {
let k1 = TemporalKey::new(100, 1);
let k2 = TemporalKey::new(100, 2);
let k3 = TemporalKey::new(200, 1);
assert!(k1 < k2); assert!(k2 < k3); assert!(k1 < k3);
}
#[test]
fn test_block_index_entry_serialization() {
let entry = BlockIndexEntry {
min_key: TemporalKey::new(1000, 42),
max_key: TemporalKey::new(2000, 100),
offset: 65536,
length: 64000,
};
let bytes = entry.to_bytes();
let restored = BlockIndexEntry::from_bytes(&bytes).unwrap();
assert_eq!(restored.min_key, entry.min_key);
assert_eq!(restored.max_key, entry.max_key);
assert_eq!(restored.offset, entry.offset);
assert_eq!(restored.length, entry.length);
}
#[test]
fn test_fence_pointer_serialization() {
let fp = FencePointer {
key: TemporalKey::new(5000, 123),
block_index_offset: 1024 * 1024,
};
let bytes = fp.to_bytes();
let restored = FencePointer::from_bytes(&bytes).unwrap();
assert_eq!(restored.key, fp.key);
assert_eq!(restored.block_index_offset, fp.block_index_offset);
}
#[test]
fn test_two_level_index_build() {
let blocks = create_test_blocks(100);
let index = TwoLevelIndex::build(&blocks, 0);
assert!(!index.fence_pointers.is_empty());
assert_eq!(index.total_blocks, 100);
assert_eq!(index.fence_pointers[0].block_index_offset, 0);
}
#[test]
fn test_two_level_index_fence_range() {
let blocks = create_test_blocks(100);
let index = TwoLevelIndex::build(&blocks, 0);
let key = TemporalKey::new(500, 0);
let (start, end) = index.find_fence_range(&key);
assert_eq!(start, 0);
assert!(end > start);
let key = TemporalKey::new(99000, 99);
let (start, end) = index.find_fence_range(&key);
assert!(start < end);
assert_eq!(end, index.block_index_length);
}
#[test]
fn test_two_level_index_serialization() {
let blocks = create_test_blocks(50);
let index = TwoLevelIndex::build(&blocks, 1024);
let bytes = index.fence_pointers_to_bytes();
let restored =
TwoLevelIndex::fence_pointers_from_bytes(&bytes, 1024, index.block_index_length)
.unwrap();
assert_eq!(restored.fence_pointers.len(), index.fence_pointers.len());
assert_eq!(restored.total_blocks, index.total_blocks);
}
#[test]
fn test_block_index_reader() {
let blocks = create_test_blocks(10);
let mut data = Vec::new();
for block in &blocks {
data.extend_from_slice(&block.to_bytes());
}
let reader = BlockIndexReader::new(&data);
let entry = reader.read_entry(0).unwrap();
assert_eq!(entry.min_key, blocks[0].min_key);
let range = reader.read_range(0, data.len()).unwrap();
assert_eq!(range.len(), 10);
}
#[test]
fn test_block_index_find_block_for_key() {
let blocks = create_test_blocks(10);
let mut data = Vec::new();
for block in &blocks {
data.extend_from_slice(&block.to_bytes());
}
let reader = BlockIndexReader::new(&data);
let key = TemporalKey::new(5500, 5);
let found = reader.find_block_for_key(&key, 0, data.len()).unwrap();
assert!(found.is_some());
let block = found.unwrap();
assert!(block.contains_key(&key));
}
#[test]
fn test_block_index_find_blocks_for_range() {
let blocks = create_test_blocks(10);
let mut data = Vec::new();
for block in &blocks {
data.extend_from_slice(&block.to_bytes());
}
let reader = BlockIndexReader::new(&data);
let found = reader
.find_blocks_for_range(2500, 4500, 0, data.len())
.unwrap();
assert!(found.len() >= 2); }
#[test]
fn test_memory_usage() {
let blocks = create_test_blocks(1000);
let index = TwoLevelIndex::build(&blocks, 0);
let memory = index.memory_usage();
let full_index_size = blocks.len() * BLOCK_INDEX_ENTRY_SIZE;
assert!(memory < full_index_size / 10); }
#[test]
fn test_block_contains_key() {
let block = BlockIndexEntry {
min_key: TemporalKey::new(1000, 0),
max_key: TemporalKey::new(2000, 100),
offset: 0,
length: 64000,
};
assert!(block.contains_key(&TemporalKey::new(1500, 50)));
assert!(block.contains_key(&TemporalKey::new(1000, 0))); assert!(block.contains_key(&TemporalKey::new(2000, 100))); assert!(!block.contains_key(&TemporalKey::new(999, 0))); assert!(!block.contains_key(&TemporalKey::new(2001, 0))); }
#[test]
fn test_block_overlaps_range() {
let block = BlockIndexEntry {
min_key: TemporalKey::new(1000, 0),
max_key: TemporalKey::new(2000, 100),
offset: 0,
length: 64000,
};
assert!(block.overlaps_range(500, 1500)); assert!(block.overlaps_range(1500, 2500)); assert!(block.overlaps_range(1200, 1800)); assert!(block.overlaps_range(500, 2500)); assert!(!block.overlaps_range(100, 500)); assert!(!block.overlaps_range(2500, 3000)); }
}