use anyhow::{Context, Result};
use memmap2::MmapMut;
use std::fs::{File, OpenOptions};
use std::io::{Read, Seek, SeekFrom, Write};
use std::path::Path;
use crate::telemetry::{LoopGuard, OpTracer};
use super::data_structures::{EdgeRec, MetadataRec, NodeRec};
pub struct StorageManager {
file: File,
mmap: Option<MmapMut>,
node_count: usize,
edge_count: usize,
metadata_count: usize,
edge_offset: usize, metadata_offset: usize, #[allow(dead_code)]
path: std::path::PathBuf,
version_index: std::collections::HashMap<u64, Vec<u64>>,
}
impl StorageManager {
pub fn create(path: &Path) -> Result<Self> {
let mut file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(true)
.open(path)
.context("Failed to create database file")?;
let initial_count: u64 = 0;
file.write_all(&initial_count.to_le_bytes())
.context("Failed to write header")?;
file.sync_all().context("Failed to sync file")?;
let mmap = unsafe { MmapMut::map_mut(&file).context("Failed to memory-map file")? };
Ok(Self {
file,
mmap: Some(mmap),
node_count: 0,
edge_count: 0,
metadata_count: 0,
edge_offset: 8,
metadata_offset: 8,
path: path.to_path_buf(),
version_index: std::collections::HashMap::new(),
})
}
pub fn open(path: &Path) -> Result<Self> {
let mut file = OpenOptions::new()
.read(true)
.write(true)
.open(path)
.context("Failed to open database file")?;
let mut header = [0u8; 8];
file.read_exact(&mut header)
.context("Failed to read header")?;
let node_count = u64::from_le_bytes(header) as usize;
let edge_offset = 8 + node_count * std::mem::size_of::<NodeRec>();
file.seek(SeekFrom::Start(edge_offset as u64))
.context("Failed to seek to edge section")?;
let mut edge_header = [0u8; 8];
let edge_count = if file.read_exact(&mut edge_header).is_ok() {
u64::from_le_bytes(edge_header) as usize
} else {
0
};
let metadata_offset = edge_offset + 8 + edge_count * std::mem::size_of::<EdgeRec>();
file.seek(SeekFrom::Start(metadata_offset as u64))
.context("Failed to seek to metadata section")?;
let mut metadata_header = [0u8; 8];
let metadata_count = if file.read_exact(&mut metadata_header).is_ok() {
u64::from_le_bytes(metadata_header) as usize
} else {
0
};
let mmap = unsafe { MmapMut::map_mut(&file).context("Failed to memory-map file")? };
Ok(Self {
file,
mmap: Some(mmap),
node_count,
edge_count,
metadata_count,
edge_offset,
metadata_offset,
path: path.to_path_buf(),
version_index: std::collections::HashMap::new(),
})
}
pub fn node_count(&self) -> usize {
self.node_count
}
pub fn edge_count(&self) -> usize {
self.edge_count
}
pub fn metadata_count(&self) -> usize {
self.metadata_count
}
pub fn insert_metadata_at(&mut self, id: u64, metadata: MetadataRec) -> Result<()> {
let required_id = id as usize + 1;
if required_id > self.metadata_count {
self.metadata_count = required_id;
}
let node_section_size = self.node_count * std::mem::size_of::<NodeRec>();
let edge_header_size = 8;
let edge_section_size = self.edge_count * std::mem::size_of::<EdgeRec>();
let metadata_header_size = 8;
let metadata_section_size = self.metadata_count * std::mem::size_of::<MetadataRec>();
let required_size = 8
+ node_section_size
+ edge_header_size
+ edge_section_size
+ metadata_header_size
+ metadata_section_size;
self.file
.set_len(required_size as u64)
.context("Failed to grow file for metadata")?;
if let Some(ref mut mmap) = self.mmap {
if mmap.len() < required_size {
mmap.flush().ok();
*mmap = unsafe { MmapMut::map_mut(&self.file).context("Failed to re-map file")? };
}
}
if let Some(ref mut mmap) = self.mmap {
let metadata_data_offset = self.metadata_offset + 8;
let offset = metadata_data_offset + id as usize * std::mem::size_of::<MetadataRec>();
let metadata_bytes = bytemuck::bytes_of(&metadata);
mmap[offset..offset + std::mem::size_of::<MetadataRec>()]
.copy_from_slice(metadata_bytes);
let metadata_header_offset = self.metadata_offset;
mmap[metadata_header_offset..metadata_header_offset + 8]
.copy_from_slice(&(self.metadata_count as u64).to_le_bytes());
mmap.flush().context("Failed to flush mmap")?;
}
Ok(())
}
pub fn get_metadata(&self, id: u64) -> Option<&MetadataRec> {
if id as usize >= self.metadata_count {
return None;
}
self.mmap.as_ref().and_then(|mmap| {
let metadata_data_offset = self.metadata_offset + 8;
let offset = metadata_data_offset + id as usize * std::mem::size_of::<MetadataRec>();
let bytes = &mmap[offset..offset + std::mem::size_of::<MetadataRec>()];
bytemuck::try_from_bytes::<MetadataRec>(bytes).ok()
})
}
pub fn insert_node(&mut self, node: NodeRec) -> Result<u64> {
let node_id = self.node_count as u64;
let node_section_size = (self.node_count + 1) * std::mem::size_of::<NodeRec>();
let edge_header_size = 8;
let edge_section_size = self.edge_count * std::mem::size_of::<EdgeRec>();
let metadata_header_size = 8;
let metadata_section_size = self.metadata_count * std::mem::size_of::<MetadataRec>();
let required_size = 8
+ node_section_size
+ edge_header_size
+ edge_section_size
+ metadata_header_size
+ metadata_section_size;
self.file
.set_len(required_size as u64)
.context("Failed to grow file")?;
if let Some(ref mut mmap) = self.mmap {
if mmap.len() < required_size {
mmap.flush().ok();
*mmap = unsafe { MmapMut::map_mut(&self.file).context("Failed to re-map file")? };
}
}
if self.edge_count > 0 || self.metadata_count > 0 {
self.move_data_sections(node_section_size)?;
}
if let Some(ref mut mmap) = self.mmap {
let offset = 8 + self.node_count * std::mem::size_of::<NodeRec>();
let node_bytes = bytemuck::bytes_of(&node);
mmap[offset..offset + std::mem::size_of::<NodeRec>()].copy_from_slice(node_bytes);
}
self.node_count += 1;
self.edge_offset = 8 + self.node_count * std::mem::size_of::<NodeRec>();
self.metadata_offset =
self.edge_offset + 8 + self.edge_count * std::mem::size_of::<EdgeRec>();
if let Some(ref mut mmap) = self.mmap {
mmap[0..8].copy_from_slice(&(self.node_count as u64).to_le_bytes());
let edge_header_offset = self.edge_offset;
mmap[edge_header_offset..edge_header_offset + 8]
.copy_from_slice(&(self.edge_count as u64).to_le_bytes());
let metadata_header_offset = self.metadata_offset;
mmap[metadata_header_offset..metadata_header_offset + 8]
.copy_from_slice(&(self.metadata_count as u64).to_le_bytes());
mmap.flush().context("Failed to flush mmap")?;
}
self.version_index
.entry(node.id)
.or_insert_with(Vec::new)
.push(node_id);
Ok(node_id)
}
pub fn insert_edge(&mut self, edge: EdgeRec) -> Result<u64> {
let edge_id = self.edge_count as u64;
let node_section_size = self.node_count * std::mem::size_of::<NodeRec>();
let edge_header_size = 8;
let edge_section_size = (self.edge_count + 1) * std::mem::size_of::<EdgeRec>();
let metadata_header_size = 8;
let metadata_section_size = self.metadata_count * std::mem::size_of::<MetadataRec>();
let required_size = 8
+ node_section_size
+ edge_header_size
+ edge_section_size
+ metadata_header_size
+ metadata_section_size;
self.file
.set_len(required_size as u64)
.context("Failed to grow file")?;
if let Some(ref mut mmap) = self.mmap {
if mmap.len() < required_size {
mmap.flush().ok();
*mmap = unsafe { MmapMut::map_mut(&self.file).context("Failed to re-map file")? };
}
}
if self.metadata_count > 0 {
self.move_metadata_section()?;
}
if let Some(ref mut mmap) = self.mmap {
let edge_header_offset = self.edge_offset;
let edge_data_offset = edge_header_offset + 8;
let offset = edge_data_offset + self.edge_count * std::mem::size_of::<EdgeRec>();
let edge_bytes = bytemuck::bytes_of(&edge);
mmap[offset..offset + std::mem::size_of::<EdgeRec>()].copy_from_slice(edge_bytes);
self.edge_count += 1;
mmap[edge_header_offset..edge_header_offset + 8]
.copy_from_slice(&(self.edge_count as u64).to_le_bytes());
if self.metadata_count == 0 {
self.metadata_offset =
self.edge_offset + 8 + self.edge_count * std::mem::size_of::<EdgeRec>();
}
mmap.flush().context("Failed to flush mmap")?;
}
Ok(edge_id)
}
fn move_metadata_section(&mut self) -> Result<()> {
let old_metadata_offset = self.metadata_offset;
let old_edge_section_size = self.edge_count * std::mem::size_of::<EdgeRec>();
let new_edge_section_size = old_edge_section_size + std::mem::size_of::<EdgeRec>();
let new_metadata_offset = self.edge_offset + 8 + new_edge_section_size;
let metadata_rec_size = std::mem::size_of::<MetadataRec>();
let metadata_header_size = 8;
let metadata_section_size = self.metadata_count * metadata_rec_size;
let total_metadata_size = metadata_header_size + metadata_section_size;
if let Some(ref mut mmap) = self.mmap {
if total_metadata_size > 0 && old_metadata_offset != new_metadata_offset {
let mut data = vec![0u8; total_metadata_size];
data.copy_from_slice(
&mmap[old_metadata_offset..old_metadata_offset + total_metadata_size],
);
mmap[new_metadata_offset..new_metadata_offset + total_metadata_size]
.copy_from_slice(&data);
}
}
self.metadata_offset = new_metadata_offset;
Ok(())
}
pub fn get_node(&self, id: u64) -> Option<&NodeRec> {
if id as usize >= self.node_count {
return None;
}
self.mmap.as_ref().and_then(|mmap| {
let offset = 8 + id as usize * std::mem::size_of::<NodeRec>();
let bytes = &mmap[offset..offset + std::mem::size_of::<NodeRec>()];
bytemuck::try_from_bytes::<NodeRec>(bytes).ok()
})
}
pub fn get_edge(&self, id: u64) -> Option<&EdgeRec> {
if id as usize >= self.edge_count {
return None;
}
self.mmap.as_ref().and_then(|mmap| {
let edge_data_offset = self.edge_offset + 8;
let offset = edge_data_offset + id as usize * std::mem::size_of::<EdgeRec>();
let bytes = &mmap[offset..offset + std::mem::size_of::<EdgeRec>()];
bytemuck::try_from_bytes::<EdgeRec>(bytes).ok()
})
}
pub fn get_edges_for_node(&self, _node_id: u64) -> Vec<&EdgeRec> {
Vec::new()
}
pub fn get_version_history(&self, logical_id: u64) -> Option<&Vec<u64>> {
self.version_index.get(&logical_id)
}
pub fn get_node_at_timestamp(&self, logical_id: u64, timestamp: u64) -> Option<&NodeRec> {
#[cfg(feature = "telemetry")]
let loop_guard = LoopGuard::new("get_node_at_timestamp", 1000);
let versions = self.version_index.get(&logical_id)?;
for &version_id in versions {
#[cfg(feature = "telemetry")]
loop_guard.check().ok()?;
if let Some(node) = self.get_node(version_id) {
if node.visibility != 1 {
continue;
}
if node.begin_ts <= timestamp && (node.end_ts == 0 || node.end_ts > timestamp) {
return Some(node);
}
}
}
None
}
pub fn update_node(
&mut self,
logical_id: u64,
new_node: NodeRec,
new_begin_ts: u64,
) -> Result<u64> {
let versions = self
.version_index
.get(&logical_id)
.ok_or_else(|| anyhow::anyhow!("Logical node {} not found", logical_id))?;
if versions.is_empty() {
return Err(anyhow::anyhow!(
"No versions found for logical node {}",
logical_id
));
}
let latest_version_id = *versions.last().unwrap();
if let Some(ref mut mmap) = self.mmap {
let offset = 8 + latest_version_id as usize * std::mem::size_of::<NodeRec>();
let bytes = &mut mmap[offset..offset + std::mem::size_of::<NodeRec>()];
if let Ok(node) = bytemuck::try_from_bytes_mut::<NodeRec>(bytes) {
node.end_ts = new_begin_ts; }
}
let new_version_id = self.insert_node(new_node)?;
self.version_index
.entry(logical_id)
.or_insert_with(Vec::new)
.push(new_version_id);
Ok(new_version_id)
}
fn move_data_sections(&mut self, new_node_section_size: usize) -> Result<()> {
let old_edge_offset = self.edge_offset;
let new_edge_offset = 8 + new_node_section_size;
if let Some(ref mut mmap) = self.mmap {
let edge_count_header_size = 8;
let edge_section_size = self.edge_count * std::mem::size_of::<EdgeRec>();
let metadata_count_header_size = 8;
let metadata_section_size = self.metadata_count * std::mem::size_of::<MetadataRec>();
let total_data_size = edge_count_header_size
+ edge_section_size
+ metadata_count_header_size
+ metadata_section_size;
if total_data_size > 0 {
let mut data = vec![0u8; total_data_size];
data.copy_from_slice(&mmap[old_edge_offset..old_edge_offset + total_data_size]);
mmap[new_edge_offset..new_edge_offset + total_data_size].copy_from_slice(&data);
}
}
self.edge_offset = new_edge_offset;
self.metadata_offset =
self.edge_offset + 8 + self.edge_count * std::mem::size_of::<EdgeRec>();
Ok(())
}
pub fn flush(&mut self) -> Result<()> {
if let Some(ref mut mmap) = self.mmap {
mmap.flush().context("Failed to flush mmap")?;
}
self.file.sync_all().context("Failed to sync file")?;
Ok(())
}
}
impl Drop for StorageManager {
fn drop(&mut self) {
if let Some(ref mut mmap) = self.mmap {
let _ = mmap.flush();
}
let _ = self.file.sync_all();
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::{tempdir, TempDir};
#[test]
fn test_storage_manager_create() {
let temp_dir = tempdir().unwrap();
let db_path = temp_dir.path().join("test.db");
let result = StorageManager::create(&db_path);
assert!(result.is_ok());
}
#[test]
fn test_temporal_versioning_lsts_basic() {
let temp_dir = tempdir().unwrap();
let db_path = temp_dir.path().join("test_temporal.db");
let mut manager = StorageManager::create(&db_path).unwrap();
let node_v1 = NodeRec {
id: 1,
morton_code: 0,
x: 10.0,
y: 20.0,
z: 30.0,
edge_off: 0,
edge_len: 0,
flags: 0,
begin_ts: 100, end_ts: 0, tx_id: 1,
visibility: 1, _padding: [0; 7],
};
let storage_id_v1 = manager.insert_node(node_v1).unwrap();
assert_eq!(
storage_id_v1, 0,
"Layer 1: First node should have storage ID 0"
);
let logical_id = 1u64;
let node_v2 = NodeRec {
id: 1,
morton_code: 0,
x: 15.0,
y: 25.0,
z: 35.0, edge_off: 0,
edge_len: 0,
flags: 0,
begin_ts: 200, end_ts: 0, tx_id: 2,
visibility: 1,
_padding: [0; 7],
};
let storage_id_v2 = manager.update_node(logical_id, node_v2, 200).unwrap();
assert_eq!(
storage_id_v2, 1,
"Layer 2: Second version should have storage ID 1"
);
let node_at_150 = manager.get_node_at_timestamp(logical_id, 150);
assert!(
node_at_150.is_some(),
"Layer 3: Time-travel query should return version 1 at timestamp 150"
);
let node = node_at_150.unwrap();
assert_eq!(
node.x, 10.0,
"Layer 3: At timestamp 150, should see v1 coordinates"
);
assert_eq!(node.begin_ts, 100, "Layer 3: Should be version 1");
let node_current = manager.get_node_at_timestamp(logical_id, 250);
assert!(
node_current.is_some(),
"Layer 4: Should return current version at timestamp 250"
);
let current = node_current.unwrap();
assert_eq!(
current.x, 15.0,
"Layer 4: Current version should have v2 coordinates"
);
assert_eq!(
current.begin_ts, 200,
"Layer 4: Current version should be v2"
);
}
#[test]
#[cfg(feature = "telemetry")]
fn test_lsts_telemetry_loop_detection() {
use crate::telemetry::LoopGuard;
let temp_dir = tempdir().unwrap();
let db_path = temp_dir.path().join("test_telemetry.db");
let mut manager = StorageManager::create(&db_path).unwrap();
let node = NodeRec {
id: 1,
morton_code: 0,
x: 10.0,
y: 20.0,
z: 30.0,
edge_off: 0,
edge_len: 0,
flags: 0,
begin_ts: 100,
end_ts: 0,
tx_id: 1,
visibility: 1,
_padding: [0; 7],
};
manager.insert_node(node).unwrap();
let guard = LoopGuard::new("test_loop", 5);
for i in 0..6 {
assert!(guard.check().is_ok(), "Should allow iteration {}", i);
}
assert!(guard.check().is_err(), "Should fail at iteration 6");
}
#[test]
#[cfg(feature = "telemetry")]
fn test_lsts_telemetry_op_tracing() {
use crate::telemetry::OpTracer;
let tracer = OpTracer::new();
tracer.trace("insert_node", file!(), line!());
tracer.trace("get_node", file!(), line!());
tracer.trace("update_node", file!(), line!());
assert!(true, "OpTracer should complete without errors");
}
#[test]
fn test_storage_manager_insert_edge() {
let temp_dir = tempdir().unwrap();
let db_path = temp_dir.path().join("test.db");
let mut storage = StorageManager::create(&db_path).unwrap();
let edge = EdgeRec {
src: 0,
dst: 1,
w: 1.0,
flags: 0,
begin_ts: 0,
end_ts: 0,
tx_id: 0,
visibility: 1,
_padding: [0; 7],
};
let edge_id = storage.insert_edge(edge).unwrap();
assert_eq!(edge_id, 0);
assert_eq!(storage.edge_count(), 1);
let retrieved = storage.get_edge(0).unwrap();
assert_eq!(retrieved.dst, 1);
}
}