use anyhow::{Context, Result};
use memmap2::MmapMut;
use std::fs::{File, OpenOptions};
use std::io::{Read, Seek, SeekFrom, Write};
use std::path::Path;
use super::dual_octree::OctreePageStore;
use super::spatial_page::{build_spatial_pages, BoundingBox, DEFAULT_MAX_NODES_PER_PAGE};
use crate::storage::data_structures::{EdgeRec, MetadataRec, NodeRec};
#[cfg(feature = "telemetry")]
use crate::telemetry::LoopGuard;
pub struct StorageManager {
file: File,
mmap: Option<MmapMut>,
node_count: usize,
edge_count: usize,
metadata_count: usize,
edge_offset: usize, metadata_offset: usize, #[allow(dead_code)]
path: std::path::PathBuf,
version_index: std::collections::HashMap<u64, Vec<u64>>,
spatial_page_store: Option<OctreePageStore>,
}
impl StorageManager {
pub fn create(path: &Path) -> Result<Self> {
let mut file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(true)
.open(path)
.context("Failed to create database file")?;
let initial_count: u64 = 0;
file.write_all(&initial_count.to_le_bytes())
.context("Failed to write header")?;
file.sync_all().context("Failed to sync file")?;
let mmap = unsafe { MmapMut::map_mut(&file).context("Failed to memory-map file")? };
Ok(Self {
file,
mmap: Some(mmap),
node_count: 0,
edge_count: 0,
metadata_count: 0,
edge_offset: 8,
metadata_offset: 8,
path: path.to_path_buf(),
version_index: std::collections::HashMap::new(),
spatial_page_store: None,
})
}
pub fn open(path: &Path) -> Result<Self> {
let mut file = OpenOptions::new()
.read(true)
.write(true)
.open(path)
.context("Failed to open database file")?;
let mut header = [0u8; 8];
file.read_exact(&mut header)
.context("Failed to read header")?;
let node_count = u64::from_le_bytes(header) as usize;
let edge_offset = 8 + node_count * std::mem::size_of::<NodeRec>();
file.seek(SeekFrom::Start(edge_offset as u64))
.context("Failed to seek to edge section")?;
let mut edge_header = [0u8; 8];
let edge_count = if file.read_exact(&mut edge_header).is_ok() {
u64::from_le_bytes(edge_header) as usize
} else {
0
};
let metadata_offset = edge_offset + 8 + edge_count * std::mem::size_of::<EdgeRec>();
file.seek(SeekFrom::Start(metadata_offset as u64))
.context("Failed to seek to metadata section")?;
let mut metadata_header = [0u8; 8];
let metadata_count = if file.read_exact(&mut metadata_header).is_ok() {
u64::from_le_bytes(metadata_header) as usize
} else {
0
};
let mmap = unsafe { MmapMut::map_mut(&file).context("Failed to memory-map file")? };
Ok(Self {
file,
mmap: Some(mmap),
node_count,
edge_count,
metadata_count,
edge_offset,
metadata_offset,
path: path.to_path_buf(),
version_index: std::collections::HashMap::new(),
spatial_page_store: None,
})
}
pub fn node_count(&self) -> usize {
self.node_count
}
pub fn edge_count(&self) -> usize {
self.edge_count
}
pub fn metadata_count(&self) -> usize {
self.metadata_count
}
pub fn insert_metadata_at(&mut self, id: u64, metadata: MetadataRec) -> Result<()> {
let required_id = id as usize + 1;
if required_id > self.metadata_count {
self.metadata_count = required_id;
}
let node_section_size = self.node_count * std::mem::size_of::<NodeRec>();
let edge_header_size = 8;
let edge_section_size = self.edge_count * std::mem::size_of::<EdgeRec>();
let metadata_header_size = 8;
let metadata_section_size = self.metadata_count * std::mem::size_of::<MetadataRec>();
let required_size = 8
+ node_section_size
+ edge_header_size
+ edge_section_size
+ metadata_header_size
+ metadata_section_size;
self.file
.set_len(required_size as u64)
.context("Failed to grow file for metadata")?;
if let Some(ref mut mmap) = self.mmap {
if mmap.len() < required_size {
mmap.flush().ok();
*mmap = unsafe { MmapMut::map_mut(&self.file).context("Failed to re-map file")? };
}
}
if let Some(ref mut mmap) = self.mmap {
let metadata_data_offset = self.metadata_offset + 8;
let offset = metadata_data_offset + id as usize * std::mem::size_of::<MetadataRec>();
let metadata_bytes = bytemuck::bytes_of(&metadata);
mmap[offset..offset + std::mem::size_of::<MetadataRec>()]
.copy_from_slice(metadata_bytes);
let metadata_header_offset = self.metadata_offset;
mmap[metadata_header_offset..metadata_header_offset + 8]
.copy_from_slice(&(self.metadata_count as u64).to_le_bytes());
mmap.flush().context("Failed to flush mmap")?;
}
Ok(())
}
pub fn get_metadata(&self, id: u64) -> Option<&MetadataRec> {
if id as usize >= self.metadata_count {
return None;
}
self.mmap.as_ref().and_then(|mmap| {
let metadata_data_offset = self.metadata_offset + 8;
let offset = metadata_data_offset + id as usize * std::mem::size_of::<MetadataRec>();
let bytes = &mmap[offset..offset + std::mem::size_of::<MetadataRec>()];
bytemuck::try_from_bytes::<MetadataRec>(bytes).ok()
})
}
pub fn insert_node(&mut self, node: NodeRec) -> Result<u64> {
let node_id = self.node_count as u64;
let node_section_size = (self.node_count + 1) * std::mem::size_of::<NodeRec>();
let edge_header_size = 8;
let edge_section_size = self.edge_count * std::mem::size_of::<EdgeRec>();
let metadata_header_size = 8;
let metadata_section_size = self.metadata_count * std::mem::size_of::<MetadataRec>();
let required_size = 8
+ node_section_size
+ edge_header_size
+ edge_section_size
+ metadata_header_size
+ metadata_section_size;
self.file
.set_len(required_size as u64)
.context("Failed to grow file")?;
if let Some(ref mut mmap) = self.mmap {
if mmap.len() < required_size {
mmap.flush().ok();
*mmap = unsafe { MmapMut::map_mut(&self.file).context("Failed to re-map file")? };
}
}
if self.edge_count > 0 || self.metadata_count > 0 {
self.move_data_sections(node_section_size)?;
}
if let Some(ref mut mmap) = self.mmap {
let offset = 8 + self.node_count * std::mem::size_of::<NodeRec>();
let node_bytes = bytemuck::bytes_of(&node);
mmap[offset..offset + std::mem::size_of::<NodeRec>()].copy_from_slice(node_bytes);
}
self.node_count += 1;
self.edge_offset = 8 + self.node_count * std::mem::size_of::<NodeRec>();
self.metadata_offset =
self.edge_offset + 8 + self.edge_count * std::mem::size_of::<EdgeRec>();
if let Some(ref mut mmap) = self.mmap {
mmap[0..8].copy_from_slice(&(self.node_count as u64).to_le_bytes());
let edge_header_offset = self.edge_offset;
mmap[edge_header_offset..edge_header_offset + 8]
.copy_from_slice(&(self.edge_count as u64).to_le_bytes());
let metadata_header_offset = self.metadata_offset;
mmap[metadata_header_offset..metadata_header_offset + 8]
.copy_from_slice(&(self.metadata_count as u64).to_le_bytes());
mmap.flush().context("Failed to flush mmap")?;
}
self.version_index.entry(node.id).or_default().push(node_id);
Ok(node_id)
}
pub fn insert_edge(&mut self, edge: EdgeRec) -> Result<u64> {
let edge_id = self.edge_count as u64;
let node_section_size = self.node_count * std::mem::size_of::<NodeRec>();
let edge_header_size = 8;
let edge_section_size = (self.edge_count + 1) * std::mem::size_of::<EdgeRec>();
let metadata_header_size = 8;
let metadata_section_size = self.metadata_count * std::mem::size_of::<MetadataRec>();
let required_size = 8
+ node_section_size
+ edge_header_size
+ edge_section_size
+ metadata_header_size
+ metadata_section_size;
self.file
.set_len(required_size as u64)
.context("Failed to grow file")?;
if let Some(ref mut mmap) = self.mmap {
if mmap.len() < required_size {
mmap.flush().ok();
*mmap = unsafe { MmapMut::map_mut(&self.file).context("Failed to re-map file")? };
}
}
if self.metadata_count > 0 {
self.move_metadata_section()?;
}
if let Some(ref mut mmap) = self.mmap {
let edge_header_offset = self.edge_offset;
let edge_data_offset = edge_header_offset + 8;
let offset = edge_data_offset + self.edge_count * std::mem::size_of::<EdgeRec>();
let edge_bytes = bytemuck::bytes_of(&edge);
mmap[offset..offset + std::mem::size_of::<EdgeRec>()].copy_from_slice(edge_bytes);
self.edge_count += 1;
mmap[edge_header_offset..edge_header_offset + 8]
.copy_from_slice(&(self.edge_count as u64).to_le_bytes());
if self.metadata_count == 0 {
self.metadata_offset =
self.edge_offset + 8 + self.edge_count * std::mem::size_of::<EdgeRec>();
}
mmap.flush().context("Failed to flush mmap")?;
}
Ok(edge_id)
}
fn move_metadata_section(&mut self) -> Result<()> {
let old_metadata_offset = self.metadata_offset;
let old_edge_section_size = self.edge_count * std::mem::size_of::<EdgeRec>();
let new_edge_section_size = old_edge_section_size + std::mem::size_of::<EdgeRec>();
let new_metadata_offset = self.edge_offset + 8 + new_edge_section_size;
let metadata_rec_size = std::mem::size_of::<MetadataRec>();
let metadata_header_size = 8;
let metadata_section_size = self.metadata_count * metadata_rec_size;
let total_metadata_size = metadata_header_size + metadata_section_size;
if let Some(ref mut mmap) = self.mmap {
if total_metadata_size > 0 && old_metadata_offset != new_metadata_offset {
let mut data = vec![0u8; total_metadata_size];
data.copy_from_slice(
&mmap[old_metadata_offset..old_metadata_offset + total_metadata_size],
);
mmap[new_metadata_offset..new_metadata_offset + total_metadata_size]
.copy_from_slice(&data);
}
}
self.metadata_offset = new_metadata_offset;
Ok(())
}
pub fn get_node(&self, id: u64) -> Option<&NodeRec> {
if id as usize >= self.node_count {
return None;
}
self.mmap.as_ref().and_then(|mmap| {
let offset = 8 + id as usize * std::mem::size_of::<NodeRec>();
let bytes = &mmap[offset..offset + std::mem::size_of::<NodeRec>()];
bytemuck::try_from_bytes::<NodeRec>(bytes).ok()
})
}
pub fn get_edge(&self, id: u64) -> Option<&EdgeRec> {
if id as usize >= self.edge_count {
return None;
}
self.mmap.as_ref().and_then(|mmap| {
let edge_data_offset = self.edge_offset + 8;
let offset = edge_data_offset + id as usize * std::mem::size_of::<EdgeRec>();
let bytes = &mmap[offset..offset + std::mem::size_of::<EdgeRec>()];
bytemuck::try_from_bytes::<EdgeRec>(bytes).ok()
})
}
pub fn get_edges_for_node(&self, _node_id: u64) -> Vec<&EdgeRec> {
Vec::new()
}
pub fn get_version_history(&self, logical_id: u64) -> Option<&Vec<u64>> {
self.version_index.get(&logical_id)
}
pub fn get_node_at_timestamp(&self, logical_id: u64, timestamp: u64) -> Option<&NodeRec> {
#[cfg(feature = "telemetry")]
let loop_guard = LoopGuard::new("get_node_at_timestamp", 1000);
let versions = self.version_index.get(&logical_id)?;
for &version_id in versions {
#[cfg(feature = "telemetry")]
loop_guard.check().ok()?;
if let Some(node) = self.get_node(version_id) {
if node.visibility != 1 {
continue;
}
if node.begin_ts <= timestamp && (node.end_ts == 0 || node.end_ts > timestamp) {
return Some(node);
}
}
}
None
}
pub fn update_node(
&mut self,
logical_id: u64,
new_node: NodeRec,
new_begin_ts: u64,
) -> Result<u64> {
let versions = self
.version_index
.get(&logical_id)
.ok_or_else(|| anyhow::anyhow!("Logical node {} not found", logical_id))?;
if versions.is_empty() {
return Err(anyhow::anyhow!(
"No versions found for logical node {}",
logical_id
));
}
let latest_version_id = *versions.last().unwrap();
if let Some(ref mut mmap) = self.mmap {
let offset = 8 + latest_version_id as usize * std::mem::size_of::<NodeRec>();
let bytes = &mut mmap[offset..offset + std::mem::size_of::<NodeRec>()];
if let Ok(node) = bytemuck::try_from_bytes_mut::<NodeRec>(bytes) {
node.end_ts = new_begin_ts; }
}
let new_version_id = self.insert_node(new_node)?;
self.version_index
.entry(logical_id)
.or_default()
.push(new_version_id);
Ok(new_version_id)
}
fn move_data_sections(&mut self, new_node_section_size: usize) -> Result<()> {
let old_edge_offset = self.edge_offset;
let new_edge_offset = 8 + new_node_section_size;
if let Some(ref mut mmap) = self.mmap {
let edge_count_header_size = 8;
let edge_section_size = self.edge_count * std::mem::size_of::<EdgeRec>();
let metadata_count_header_size = 8;
let metadata_section_size = self.metadata_count * std::mem::size_of::<MetadataRec>();
let total_data_size = edge_count_header_size
+ edge_section_size
+ metadata_count_header_size
+ metadata_section_size;
if total_data_size > 0 {
let mut data = vec![0u8; total_data_size];
data.copy_from_slice(&mmap[old_edge_offset..old_edge_offset + total_data_size]);
mmap[new_edge_offset..new_edge_offset + total_data_size].copy_from_slice(&data);
}
}
self.edge_offset = new_edge_offset;
self.metadata_offset =
self.edge_offset + 8 + self.edge_count * std::mem::size_of::<EdgeRec>();
Ok(())
}
pub fn flush(&mut self) -> Result<()> {
if let Some(ref mut mmap) = self.mmap {
mmap.flush().context("Failed to flush mmap")?;
}
self.file.sync_all().context("Failed to sync file")?;
self.maybe_save_spatial_store()?;
Ok(())
}
pub fn rebuild_spatial_index(&mut self) -> Result<()> {
let nodes = self.all_nodes();
let edges = self.all_edges();
let edge_refs: Vec<(u64, u64, f32, u32)> =
edges.iter().map(|e| (e.src, e.dst, e.w, e.flags)).collect();
let properties = self.all_node_properties();
let pages = build_spatial_pages(nodes, &properties, &edge_refs, DEFAULT_MAX_NODES_PER_PAGE);
self.spatial_page_store = Some(OctreePageStore::new(pages, 4, 1));
Ok(())
}
pub fn spatial_range_query(&mut self, query: &BoundingBox) -> Vec<u64> {
if self.spatial_page_store.is_none() {
if let Err(e) = self.rebuild_spatial_index() {
eprintln!(
"spatial_range_query: failed to rebuild spatial index: {}",
e
);
return vec![];
}
}
let store = self.spatial_page_store.as_ref().unwrap();
let page_indices = store.range_query(query);
let mut node_ids = Vec::new();
let mut seen = std::collections::HashSet::new();
for &idx in &page_indices {
if let Some(page) = store.get_page(idx) {
for node in &page.nodes {
if seen.insert(node.id) {
node_ids.push(node.id);
}
}
}
}
node_ids
}
pub fn spatial_point_query(&mut self, x: f32, y: f32, z: f32) -> Vec<u64> {
if self.spatial_page_store.is_none() {
if let Err(e) = self.rebuild_spatial_index() {
eprintln!(
"spatial_point_query: failed to rebuild spatial index: {}",
e
);
return vec![];
}
}
let store = self.spatial_page_store.as_ref().unwrap();
let page_indices = store.point_query(x, y, z);
let mut node_ids = Vec::new();
let mut seen = std::collections::HashSet::new();
for &idx in &page_indices {
if let Some(page) = store.get_page(idx) {
for node in &page.nodes {
if seen.insert(node.id) {
node_ids.push(node.id);
}
}
}
}
node_ids
}
fn all_nodes(&self) -> Vec<NodeRec> {
let mut nodes = Vec::with_capacity(self.node_count);
for i in 0..self.node_count {
if let Some(node) = self.get_node(i as u64) {
nodes.push(*node);
}
}
nodes
}
fn all_edges(&self) -> Vec<EdgeRec> {
let mut edges = Vec::with_capacity(self.edge_count);
for i in 0..self.edge_count {
if let Some(edge) = self.get_edge(i as u64) {
edges.push(*edge);
}
}
edges
}
fn all_node_properties(
&self,
) -> std::collections::HashMap<u64, std::collections::HashMap<String, String>> {
std::collections::HashMap::new()
}
fn maybe_save_spatial_store(&self) -> Result<()> {
if let Some(ref store) = self.spatial_page_store {
let spatial_path = self.path.with_extension("spatial");
store.save(&spatial_path)?;
}
Ok(())
}
pub fn maybe_load_spatial_store(&mut self) -> Result<()> {
let spatial_path = self.path.with_extension("spatial");
if spatial_path.exists() {
let store = OctreePageStore::open(&spatial_path, 4, 1)?;
self.spatial_page_store = Some(store);
}
Ok(())
}
}
impl Drop for StorageManager {
fn drop(&mut self) {
if let Some(ref mut mmap) = self.mmap {
let _ = mmap.flush();
}
let _ = self.file.sync_all();
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[test]
fn test_storage_manager_create() {
let temp_dir = tempdir().unwrap();
let db_path = temp_dir.path().join("test.db");
let result = StorageManager::create(&db_path);
assert!(result.is_ok());
}
#[test]
fn test_temporal_versioning_lsts_basic() {
let temp_dir = tempdir().unwrap();
let db_path = temp_dir.path().join("test_temporal.db");
let mut manager = StorageManager::create(&db_path).unwrap();
let node_v1 = NodeRec {
id: 1,
morton_code: 0,
x: 10.0,
y: 20.0,
z: 30.0,
edge_off: 0,
edge_len: 0,
flags: 0,
begin_ts: 100, end_ts: 0, tx_id: 1,
visibility: 1, _padding: [0; 7],
};
let storage_id_v1 = manager.insert_node(node_v1).unwrap();
assert_eq!(
storage_id_v1, 0,
"Layer 1: First node should have storage ID 0"
);
let logical_id = 1u64;
let node_v2 = NodeRec {
id: 1,
morton_code: 0,
x: 15.0,
y: 25.0,
z: 35.0, edge_off: 0,
edge_len: 0,
flags: 0,
begin_ts: 200, end_ts: 0, tx_id: 2,
visibility: 1,
_padding: [0; 7],
};
let storage_id_v2 = manager.update_node(logical_id, node_v2, 200).unwrap();
assert_eq!(
storage_id_v2, 1,
"Layer 2: Second version should have storage ID 1"
);
let node_at_150 = manager.get_node_at_timestamp(logical_id, 150);
assert!(
node_at_150.is_some(),
"Layer 3: Time-travel query should return version 1 at timestamp 150"
);
let node = node_at_150.unwrap();
assert_eq!(
node.x, 10.0,
"Layer 3: At timestamp 150, should see v1 coordinates"
);
assert_eq!(node.begin_ts, 100, "Layer 3: Should be version 1");
let node_current = manager.get_node_at_timestamp(logical_id, 250);
assert!(
node_current.is_some(),
"Layer 4: Should return current version at timestamp 250"
);
let current = node_current.unwrap();
assert_eq!(
current.x, 15.0,
"Layer 4: Current version should have v2 coordinates"
);
assert_eq!(
current.begin_ts, 200,
"Layer 4: Current version should be v2"
);
}
#[test]
#[cfg(feature = "telemetry")]
fn test_lsts_telemetry_loop_detection() {
use crate::telemetry::LoopGuard;
let temp_dir = tempdir().unwrap();
let db_path = temp_dir.path().join("test_telemetry.db");
let mut manager = StorageManager::create(&db_path).unwrap();
let node = NodeRec {
id: 1,
morton_code: 0,
x: 10.0,
y: 20.0,
z: 30.0,
edge_off: 0,
edge_len: 0,
flags: 0,
begin_ts: 100,
end_ts: 0,
tx_id: 1,
visibility: 1,
_padding: [0; 7],
};
manager.insert_node(node).unwrap();
let guard = LoopGuard::new("test_loop", 5);
for i in 0..5 {
assert!(guard.check().is_ok(), "Should allow iteration {}", i);
}
assert!(guard.check().is_err(), "Should fail at iteration 5");
}
#[test]
#[cfg(feature = "telemetry")]
fn test_lsts_telemetry_op_tracing() {
use crate::telemetry::OpTracer;
let tracer = OpTracer::new();
tracer.trace("insert_node", file!(), line!());
tracer.trace("get_node", file!(), line!());
tracer.trace("update_node", file!(), line!());
}
#[test]
fn test_storage_manager_insert_edge() {
let temp_dir = tempdir().unwrap();
let db_path = temp_dir.path().join("test.db");
let mut storage = StorageManager::create(&db_path).unwrap();
let edge = EdgeRec {
src: 0,
dst: 1,
w: 1.0,
flags: 0,
begin_ts: 0,
end_ts: 0,
tx_id: 0,
visibility: 1,
_padding: [0; 7],
};
let edge_id = storage.insert_edge(edge).unwrap();
assert_eq!(edge_id, 0);
assert_eq!(storage.edge_count(), 1);
let retrieved = storage.get_edge(0).unwrap();
assert_eq!(retrieved.dst, 1);
}
#[test]
fn test_spatial_range_query_lazily_builds() {
let temp_dir = tempdir().unwrap();
let db_path = temp_dir.path().join("spatial.db");
let mut storage = StorageManager::create(&db_path).unwrap();
for i in 0..10 {
let node = NodeRec {
id: i as u64,
morton_code: i as u64, x: i as f32 * 1.0,
y: 0.0,
z: 0.0,
edge_off: 0,
edge_len: 0,
flags: 0,
begin_ts: 0,
end_ts: 0,
tx_id: 0,
visibility: 1,
_padding: [0; 7],
};
storage.insert_node(node).unwrap();
}
let query = BoundingBox::new(2.5, 7.5, -1.0, 1.0, -1.0, 1.0);
let ids = storage.spatial_range_query(&query);
assert!(!ids.is_empty());
assert!(ids.contains(&3));
assert!(ids.contains(&6));
}
#[test]
fn test_spatial_save_and_load() {
let temp_dir = tempdir().unwrap();
let db_path = temp_dir.path().join("spatial_persist.db");
let mut storage = StorageManager::create(&db_path).unwrap();
for i in 0..5 {
let node = NodeRec {
id: i as u64,
morton_code: i as u64,
x: i as f32 * 1.0,
y: 0.0,
z: 0.0,
edge_off: 0,
edge_len: 0,
flags: 0,
begin_ts: 0,
end_ts: 0,
tx_id: 0,
visibility: 1,
_padding: [0; 7],
};
storage.insert_node(node).unwrap();
}
let query = BoundingBox::new(0.0, 3.0, -1.0, 1.0, -1.0, 1.0);
let before_ids = storage.spatial_range_query(&query);
assert!(!before_ids.is_empty());
storage.flush().unwrap();
let mut storage2 = StorageManager::open(&db_path).unwrap();
storage2.maybe_load_spatial_store().unwrap();
let after_ids = storage2.spatial_range_query(&query);
assert_eq!(before_ids, after_ids);
}
#[test]
fn test_spatial_point_query() {
let temp_dir = tempdir().unwrap();
let db_path = temp_dir.path().join("spatial_point.db");
let mut storage = StorageManager::create(&db_path).unwrap();
let node = NodeRec {
id: 42,
morton_code: 0,
x: 1.0,
y: 2.0,
z: 3.0,
edge_off: 0,
edge_len: 0,
flags: 0,
begin_ts: 0,
end_ts: 0,
tx_id: 0,
visibility: 1,
_padding: [0; 7],
};
storage.insert_node(node).unwrap();
let ids = storage.spatial_point_query(1.0, 2.0, 3.0);
assert!(ids.contains(&42));
let ids_outside = storage.spatial_point_query(100.0, 100.0, 100.0);
assert!(!ids_outside.contains(&42));
}
}