Skip to main content

geographdb_core/storage/
manager.rs

1//! Storage manager - coordinates node and edge stores with memory-mapped files
2//!
3//! # File Format
4//!
5//! ```text
6//! +----------------------+
7//! | Header (8 bytes)     | <- Node count
8//! +----------------------+
9//! | Node 0 (72 bytes)    |
10//! +----------------------+
11//! | ...                  |
12//! +----------------------+
13//! | Node N (72 bytes)    |
14//! +----------------------+
15//! | Edge Count (8B)      |
16//! +----------------------+
17//! | Edge 0 (48 bytes)    |
18//! +----------------------+
19//! | ...                  |
20//! +----------------------+
21//! | Edge M (48 bytes)    |
22//! +----------------------+
23//! | Metadata Count (8B)  |
24//! +----------------------+
25//! | Metadata 0 (256B)    |
26//! +----------------------+
27//! | ...                  |
28//! +----------------------+
29//! | Metadata K (256B)    |
30//! +----------------------+
31//! ```
32
33use anyhow::{Context, Result};
34use memmap2::MmapMut;
35use std::fs::{File, OpenOptions};
36use std::io::{Read, Seek, SeekFrom, Write};
37use std::path::Path;
38
39use super::dual_octree::OctreePageStore;
40use super::spatial_page::{build_spatial_pages, BoundingBox, DEFAULT_MAX_NODES_PER_PAGE};
41use crate::storage::data_structures::{EdgeRec, MetadataRec, NodeRec};
42
43#[cfg(feature = "telemetry")]
44use crate::telemetry::LoopGuard;
45
46/// Storage manager for geometric graph database
47/// Uses memory-mapped files for persistent storage
48pub struct StorageManager {
49    file: File,
50    mmap: Option<MmapMut>,
51    node_count: usize,
52    edge_count: usize,
53    metadata_count: usize,
54    edge_offset: usize,     // Byte offset where edge section starts
55    metadata_offset: usize, // Byte offset where metadata section starts
56    #[allow(dead_code)]
57    path: std::path::PathBuf,
58    /// LSTS: Maps logical node ID -> Vec of physical storage IDs (versions)
59    /// This enables time-travel queries by tracking all versions of a node
60    version_index: std::collections::HashMap<u64, Vec<u64>>,
61    /// Dual-octree spatial page store (lazy-rebuilt on first spatial query)
62    spatial_page_store: Option<OctreePageStore>,
63}
64
65impl StorageManager {
66    /// Create a new storage manager with a new database file
67    pub fn create(path: &Path) -> Result<Self> {
68        let mut file = OpenOptions::new()
69            .read(true)
70            .write(true)
71            .create(true)
72            .truncate(true)
73            .open(path)
74            .context("Failed to create database file")?;
75
76        let initial_count: u64 = 0;
77        file.write_all(&initial_count.to_le_bytes())
78            .context("Failed to write header")?;
79        file.sync_all().context("Failed to sync file")?;
80
81        let mmap = unsafe { MmapMut::map_mut(&file).context("Failed to memory-map file")? };
82
83        Ok(Self {
84            file,
85            mmap: Some(mmap),
86            node_count: 0,
87            edge_count: 0,
88            metadata_count: 0,
89            edge_offset: 8,
90            metadata_offset: 8,
91            path: path.to_path_buf(),
92            version_index: std::collections::HashMap::new(),
93            spatial_page_store: None,
94        })
95    }
96
97    /// Open an existing database file
98    pub fn open(path: &Path) -> Result<Self> {
99        let mut file = OpenOptions::new()
100            .read(true)
101            .write(true)
102            .open(path)
103            .context("Failed to open database file")?;
104
105        let mut header = [0u8; 8];
106        file.read_exact(&mut header)
107            .context("Failed to read header")?;
108        let node_count = u64::from_le_bytes(header) as usize;
109
110        let edge_offset = 8 + node_count * std::mem::size_of::<NodeRec>();
111        file.seek(SeekFrom::Start(edge_offset as u64))
112            .context("Failed to seek to edge section")?;
113
114        let mut edge_header = [0u8; 8];
115        let edge_count = if file.read_exact(&mut edge_header).is_ok() {
116            u64::from_le_bytes(edge_header) as usize
117        } else {
118            0
119        };
120
121        // Calculate metadata offset and read metadata count
122        let metadata_offset = edge_offset + 8 + edge_count * std::mem::size_of::<EdgeRec>();
123        file.seek(SeekFrom::Start(metadata_offset as u64))
124            .context("Failed to seek to metadata section")?;
125
126        let mut metadata_header = [0u8; 8];
127        let metadata_count = if file.read_exact(&mut metadata_header).is_ok() {
128            u64::from_le_bytes(metadata_header) as usize
129        } else {
130            0
131        };
132
133        let mmap = unsafe { MmapMut::map_mut(&file).context("Failed to memory-map file")? };
134
135        Ok(Self {
136            file,
137            mmap: Some(mmap),
138            node_count,
139            edge_count,
140            metadata_count,
141            edge_offset,
142            metadata_offset,
143            path: path.to_path_buf(),
144            version_index: std::collections::HashMap::new(),
145            spatial_page_store: None,
146        })
147    }
148
149    pub fn node_count(&self) -> usize {
150        self.node_count
151    }
152
153    pub fn edge_count(&self) -> usize {
154        self.edge_count
155    }
156
157    pub fn metadata_count(&self) -> usize {
158        self.metadata_count
159    }
160
161    /// Insert a metadata record at a specific ID (matches node ID)
162    /// This ensures metadata[N] always corresponds to node[N]
163    pub fn insert_metadata_at(&mut self, id: u64, metadata: MetadataRec) -> Result<()> {
164        // Ensure we have enough space for this metadata ID
165        let required_id = id as usize + 1;
166        if required_id > self.metadata_count {
167            self.metadata_count = required_id;
168        }
169
170        let node_section_size = self.node_count * std::mem::size_of::<NodeRec>();
171        let edge_header_size = 8;
172        let edge_section_size = self.edge_count * std::mem::size_of::<EdgeRec>();
173        let metadata_header_size = 8;
174        let metadata_section_size = self.metadata_count * std::mem::size_of::<MetadataRec>();
175        let required_size = 8
176            + node_section_size
177            + edge_header_size
178            + edge_section_size
179            + metadata_header_size
180            + metadata_section_size;
181
182        self.file
183            .set_len(required_size as u64)
184            .context("Failed to grow file for metadata")?;
185
186        if let Some(ref mut mmap) = self.mmap {
187            if mmap.len() < required_size {
188                mmap.flush().ok();
189                *mmap = unsafe { MmapMut::map_mut(&self.file).context("Failed to re-map file")? };
190            }
191        }
192
193        if let Some(ref mut mmap) = self.mmap {
194            let metadata_data_offset = self.metadata_offset + 8;
195            let offset = metadata_data_offset + id as usize * std::mem::size_of::<MetadataRec>();
196            let metadata_bytes = bytemuck::bytes_of(&metadata);
197            mmap[offset..offset + std::mem::size_of::<MetadataRec>()]
198                .copy_from_slice(metadata_bytes);
199
200            // Update metadata count header
201            let metadata_header_offset = self.metadata_offset;
202            mmap[metadata_header_offset..metadata_header_offset + 8]
203                .copy_from_slice(&(self.metadata_count as u64).to_le_bytes());
204            mmap.flush().context("Failed to flush mmap")?;
205        }
206
207        Ok(())
208    }
209
210    /// Get a metadata record by ID
211    pub fn get_metadata(&self, id: u64) -> Option<&MetadataRec> {
212        if id as usize >= self.metadata_count {
213            return None;
214        }
215
216        self.mmap.as_ref().and_then(|mmap| {
217            let metadata_data_offset = self.metadata_offset + 8;
218            let offset = metadata_data_offset + id as usize * std::mem::size_of::<MetadataRec>();
219            let bytes = &mmap[offset..offset + std::mem::size_of::<MetadataRec>()];
220            bytemuck::try_from_bytes::<MetadataRec>(bytes).ok()
221        })
222    }
223
224    pub fn insert_node(&mut self, node: NodeRec) -> Result<u64> {
225        let node_id = self.node_count as u64;
226
227        let node_section_size = (self.node_count + 1) * std::mem::size_of::<NodeRec>();
228        let edge_header_size = 8;
229        let edge_section_size = self.edge_count * std::mem::size_of::<EdgeRec>();
230        let metadata_header_size = 8;
231        let metadata_section_size = self.metadata_count * std::mem::size_of::<MetadataRec>();
232        let required_size = 8
233            + node_section_size
234            + edge_header_size
235            + edge_section_size
236            + metadata_header_size
237            + metadata_section_size;
238
239        self.file
240            .set_len(required_size as u64)
241            .context("Failed to grow file")?;
242
243        if let Some(ref mut mmap) = self.mmap {
244            if mmap.len() < required_size {
245                mmap.flush().ok();
246                *mmap = unsafe { MmapMut::map_mut(&self.file).context("Failed to re-map file")? };
247            }
248        }
249
250        if self.edge_count > 0 || self.metadata_count > 0 {
251            self.move_data_sections(node_section_size)?;
252        }
253
254        if let Some(ref mut mmap) = self.mmap {
255            let offset = 8 + self.node_count * std::mem::size_of::<NodeRec>();
256            let node_bytes = bytemuck::bytes_of(&node);
257            mmap[offset..offset + std::mem::size_of::<NodeRec>()].copy_from_slice(node_bytes);
258        }
259
260        self.node_count += 1;
261        self.edge_offset = 8 + self.node_count * std::mem::size_of::<NodeRec>();
262        // Update metadata_offset after node section size change
263        self.metadata_offset =
264            self.edge_offset + 8 + self.edge_count * std::mem::size_of::<EdgeRec>();
265
266        if let Some(ref mut mmap) = self.mmap {
267            mmap[0..8].copy_from_slice(&(self.node_count as u64).to_le_bytes());
268            let edge_header_offset = self.edge_offset;
269            mmap[edge_header_offset..edge_header_offset + 8]
270                .copy_from_slice(&(self.edge_count as u64).to_le_bytes());
271            let metadata_header_offset = self.metadata_offset;
272            mmap[metadata_header_offset..metadata_header_offset + 8]
273                .copy_from_slice(&(self.metadata_count as u64).to_le_bytes());
274            mmap.flush().context("Failed to flush mmap")?;
275        }
276
277        // LSTS: Track this as the first version of this logical node
278        self.version_index.entry(node.id).or_default().push(node_id);
279
280        Ok(node_id)
281    }
282
283    pub fn insert_edge(&mut self, edge: EdgeRec) -> Result<u64> {
284        let edge_id = self.edge_count as u64;
285
286        let node_section_size = self.node_count * std::mem::size_of::<NodeRec>();
287        let edge_header_size = 8;
288        let edge_section_size = (self.edge_count + 1) * std::mem::size_of::<EdgeRec>();
289        let metadata_header_size = 8;
290        let metadata_section_size = self.metadata_count * std::mem::size_of::<MetadataRec>();
291        let required_size = 8
292            + node_section_size
293            + edge_header_size
294            + edge_section_size
295            + metadata_header_size
296            + metadata_section_size;
297
298        // Grow file first to make room for the new edge
299        self.file
300            .set_len(required_size as u64)
301            .context("Failed to grow file")?;
302
303        if let Some(ref mut mmap) = self.mmap {
304            if mmap.len() < required_size {
305                mmap.flush().ok();
306                *mmap = unsafe { MmapMut::map_mut(&self.file).context("Failed to re-map file")? };
307            }
308        }
309
310        // If metadata exists, we need to move it to make room for the new edge
311        if self.metadata_count > 0 {
312            self.move_metadata_section()?;
313        }
314
315        if let Some(ref mut mmap) = self.mmap {
316            let edge_header_offset = self.edge_offset;
317            let edge_data_offset = edge_header_offset + 8;
318            let offset = edge_data_offset + self.edge_count * std::mem::size_of::<EdgeRec>();
319            let edge_bytes = bytemuck::bytes_of(&edge);
320            mmap[offset..offset + std::mem::size_of::<EdgeRec>()].copy_from_slice(edge_bytes);
321
322            self.edge_count += 1;
323            mmap[edge_header_offset..edge_header_offset + 8]
324                .copy_from_slice(&(self.edge_count as u64).to_le_bytes());
325            // Update metadata_offset after edge section growth (already updated by move_metadata_section if needed)
326            // If no metadata exists, update it here
327            if self.metadata_count == 0 {
328                self.metadata_offset =
329                    self.edge_offset + 8 + self.edge_count * std::mem::size_of::<EdgeRec>();
330            }
331            mmap.flush().context("Failed to flush mmap")?;
332        }
333
334        Ok(edge_id)
335    }
336
337    /// Move metadata section when edge section grows
338    fn move_metadata_section(&mut self) -> Result<()> {
339        let old_metadata_offset = self.metadata_offset;
340        let old_edge_section_size = self.edge_count * std::mem::size_of::<EdgeRec>();
341        let new_edge_section_size = old_edge_section_size + std::mem::size_of::<EdgeRec>();
342        let new_metadata_offset = self.edge_offset + 8 + new_edge_section_size;
343
344        let metadata_rec_size = std::mem::size_of::<MetadataRec>();
345        let metadata_header_size = 8;
346        let metadata_section_size = self.metadata_count * metadata_rec_size;
347        let total_metadata_size = metadata_header_size + metadata_section_size;
348
349        if let Some(ref mut mmap) = self.mmap {
350            if total_metadata_size > 0 && old_metadata_offset != new_metadata_offset {
351                // Move metadata section to make room for new edge
352                let mut data = vec![0u8; total_metadata_size];
353                data.copy_from_slice(
354                    &mmap[old_metadata_offset..old_metadata_offset + total_metadata_size],
355                );
356                mmap[new_metadata_offset..new_metadata_offset + total_metadata_size]
357                    .copy_from_slice(&data);
358            }
359        }
360
361        self.metadata_offset = new_metadata_offset;
362        Ok(())
363    }
364
365    pub fn get_node(&self, id: u64) -> Option<&NodeRec> {
366        if id as usize >= self.node_count {
367            return None;
368        }
369
370        self.mmap.as_ref().and_then(|mmap| {
371            let offset = 8 + id as usize * std::mem::size_of::<NodeRec>();
372            let bytes = &mmap[offset..offset + std::mem::size_of::<NodeRec>()];
373            bytemuck::try_from_bytes::<NodeRec>(bytes).ok()
374        })
375    }
376
377    pub fn get_edge(&self, id: u64) -> Option<&EdgeRec> {
378        if id as usize >= self.edge_count {
379            return None;
380        }
381
382        self.mmap.as_ref().and_then(|mmap| {
383            let edge_data_offset = self.edge_offset + 8;
384            let offset = edge_data_offset + id as usize * std::mem::size_of::<EdgeRec>();
385            let bytes = &mmap[offset..offset + std::mem::size_of::<EdgeRec>()];
386            bytemuck::try_from_bytes::<EdgeRec>(bytes).ok()
387        })
388    }
389
390    pub fn get_edges_for_node(&self, _node_id: u64) -> Vec<&EdgeRec> {
391        Vec::new()
392    }
393
394    /// LSTS (Linearly Versioned Timestamp) query
395    ///
396    /// Returns the node version that was visible at the given timestamp.
397    /// A version is visible if:
398    /// - begin_ts <= query_timestamp
399    /// - end_ts == 0 OR end_ts > query_timestamp
400    /// - visibility == VERSION_COMMITTED (1)
401    ///
402    /// Get the version history for a logical node
403    pub fn get_version_history(&self, logical_id: u64) -> Option<&Vec<u64>> {
404        self.version_index.get(&logical_id)
405    }
406
407    pub fn get_node_at_timestamp(&self, logical_id: u64, timestamp: u64) -> Option<&NodeRec> {
408        // Telemetry: Loop guard to prevent infinite scanning
409        #[cfg(feature = "telemetry")]
410        let loop_guard = LoopGuard::new("get_node_at_timestamp", 1000);
411
412        // Get all versions of this logical node
413        let versions = self.version_index.get(&logical_id)?;
414
415        // Scan all versions to find the one visible at the given timestamp
416        for &version_id in versions {
417            #[cfg(feature = "telemetry")]
418            loop_guard.check().ok()?;
419
420            if let Some(node) = self.get_node(version_id) {
421                if node.visibility != 1 {
422                    // VERSION_COMMITTED
423                    continue;
424                }
425
426                // Check if this version was visible at the given timestamp
427                if node.begin_ts <= timestamp && (node.end_ts == 0 || node.end_ts > timestamp) {
428                    return Some(node);
429                }
430            }
431        }
432
433        None
434    }
435
436    /// Update a node with LSTS versioning
437    ///
438    /// This implements the core LSTS update semantics:
439    /// 1. Find the current version of the node (where end_ts == 0)
440    /// 2. Set end_ts = new_begin_ts on the current version (mark as superseded)
441    /// 3. Insert a new version with begin_ts = new_begin_ts, end_ts = 0
442    ///
443    /// Returns the ID of the new version
444    pub fn update_node(
445        &mut self,
446        logical_id: u64,
447        new_node: NodeRec,
448        new_begin_ts: u64,
449    ) -> Result<u64> {
450        // Get the version history for this logical node
451        let versions = self
452            .version_index
453            .get(&logical_id)
454            .ok_or_else(|| anyhow::anyhow!("Logical node {} not found", logical_id))?;
455
456        if versions.is_empty() {
457            return Err(anyhow::anyhow!(
458                "No versions found for logical node {}",
459                logical_id
460            ));
461        }
462
463        // Get the latest version (last in the list)
464        let latest_version_id = *versions.last().unwrap();
465
466        // Step 1: "Close" the current version by setting its end_ts
467        if let Some(ref mut mmap) = self.mmap {
468            let offset = 8 + latest_version_id as usize * std::mem::size_of::<NodeRec>();
469            let bytes = &mut mmap[offset..offset + std::mem::size_of::<NodeRec>()];
470            if let Ok(node) = bytemuck::try_from_bytes_mut::<NodeRec>(bytes) {
471                node.end_ts = new_begin_ts; // Mark as superseded at this timestamp
472            }
473        }
474
475        // Step 2: Insert the new version
476        let new_version_id = self.insert_node(new_node)?;
477
478        // Update the version index to include the new version
479        self.version_index
480            .entry(logical_id)
481            .or_default()
482            .push(new_version_id);
483
484        Ok(new_version_id)
485    }
486
487    /// Move edge and metadata sections when node section grows
488    fn move_data_sections(&mut self, new_node_section_size: usize) -> Result<()> {
489        let old_edge_offset = self.edge_offset;
490        let new_edge_offset = 8 + new_node_section_size;
491
492        if let Some(ref mut mmap) = self.mmap {
493            // Calculate total data size to move
494            // This includes: edge count header + edge section + metadata count header + metadata section
495            let edge_count_header_size = 8;
496            let edge_section_size = self.edge_count * std::mem::size_of::<EdgeRec>();
497            let metadata_count_header_size = 8;
498            let metadata_section_size = self.metadata_count * std::mem::size_of::<MetadataRec>();
499            let total_data_size = edge_count_header_size
500                + edge_section_size
501                + metadata_count_header_size
502                + metadata_section_size;
503
504            if total_data_size > 0 {
505                let mut data = vec![0u8; total_data_size];
506                data.copy_from_slice(&mmap[old_edge_offset..old_edge_offset + total_data_size]);
507                mmap[new_edge_offset..new_edge_offset + total_data_size].copy_from_slice(&data);
508            }
509        }
510
511        self.edge_offset = new_edge_offset;
512        // Update metadata_offset after moving edge section
513        self.metadata_offset =
514            self.edge_offset + 8 + self.edge_count * std::mem::size_of::<EdgeRec>();
515        Ok(())
516    }
517
518    pub fn flush(&mut self) -> Result<()> {
519        if let Some(ref mut mmap) = self.mmap {
520            mmap.flush().context("Failed to flush mmap")?;
521        }
522        self.file.sync_all().context("Failed to sync file")?;
523        self.maybe_save_spatial_store()?;
524        Ok(())
525    }
526
527    /// Rebuild the spatial page store from all current nodes + edges.
528    /// Call this explicitly after bulk insertions.
529    pub fn rebuild_spatial_index(&mut self) -> Result<()> {
530        let nodes = self.all_nodes();
531        let edges = self.all_edges();
532        let edge_refs: Vec<(u64, u64, f32, u32)> =
533            edges.iter().map(|e| (e.src, e.dst, e.w, e.flags)).collect();
534        let properties = self.all_node_properties();
535
536        let pages = build_spatial_pages(nodes, &properties, &edge_refs, DEFAULT_MAX_NODES_PER_PAGE);
537        self.spatial_page_store = Some(OctreePageStore::new(pages, 4, 1));
538        Ok(())
539    }
540
541    /// Range query via dual-octree page store. Returns node IDs in matching pages.
542    /// Rebuilds the index lazily on first call if not explicitly built.
543    pub fn spatial_range_query(&mut self, query: &BoundingBox) -> Vec<u64> {
544        if self.spatial_page_store.is_none() {
545            if let Err(e) = self.rebuild_spatial_index() {
546                eprintln!(
547                    "spatial_range_query: failed to rebuild spatial index: {}",
548                    e
549                );
550                return vec![];
551            }
552        }
553        let store = self.spatial_page_store.as_ref().unwrap();
554        let page_indices = store.range_query(query);
555        let mut node_ids = Vec::new();
556        let mut seen = std::collections::HashSet::new();
557        for &idx in &page_indices {
558            if let Some(page) = store.get_page(idx) {
559                for node in &page.nodes {
560                    if seen.insert(node.id) {
561                        node_ids.push(node.id);
562                    }
563                }
564            }
565        }
566        node_ids
567    }
568
569    /// Point query via dual-octree page store.
570    pub fn spatial_point_query(&mut self, x: f32, y: f32, z: f32) -> Vec<u64> {
571        if self.spatial_page_store.is_none() {
572            if let Err(e) = self.rebuild_spatial_index() {
573                eprintln!(
574                    "spatial_point_query: failed to rebuild spatial index: {}",
575                    e
576                );
577                return vec![];
578            }
579        }
580        let store = self.spatial_page_store.as_ref().unwrap();
581        let page_indices = store.point_query(x, y, z);
582        let mut node_ids = Vec::new();
583        let mut seen = std::collections::HashSet::new();
584        for &idx in &page_indices {
585            if let Some(page) = store.get_page(idx) {
586                for node in &page.nodes {
587                    if seen.insert(node.id) {
588                        node_ids.push(node.id);
589                    }
590                }
591            }
592        }
593        node_ids
594    }
595
596    fn all_nodes(&self) -> Vec<NodeRec> {
597        let mut nodes = Vec::with_capacity(self.node_count);
598        for i in 0..self.node_count {
599            if let Some(node) = self.get_node(i as u64) {
600                nodes.push(*node);
601            }
602        }
603        nodes
604    }
605
606    fn all_edges(&self) -> Vec<EdgeRec> {
607        let mut edges = Vec::with_capacity(self.edge_count);
608        for i in 0..self.edge_count {
609            if let Some(edge) = self.get_edge(i as u64) {
610                edges.push(*edge);
611            }
612        }
613        edges
614    }
615
616    fn all_node_properties(
617        &self,
618    ) -> std::collections::HashMap<u64, std::collections::HashMap<String, String>> {
619        // Node properties not persisted in the mmap format yet.
620        std::collections::HashMap::new()
621    }
622
623    /// Save the spatial page store to disk alongside the main db file.
624    fn maybe_save_spatial_store(&self) -> Result<()> {
625        if let Some(ref store) = self.spatial_page_store {
626            let spatial_path = self.path.with_extension("spatial");
627            store.save(&spatial_path)?;
628        }
629        Ok(())
630    }
631
632    /// Try to load a persisted spatial page store from the sidecar file.
633    pub fn maybe_load_spatial_store(&mut self) -> Result<()> {
634        let spatial_path = self.path.with_extension("spatial");
635        if spatial_path.exists() {
636            let store = OctreePageStore::open(&spatial_path, 4, 1)?;
637            self.spatial_page_store = Some(store);
638        }
639        Ok(())
640    }
641}
642
643impl Drop for StorageManager {
644    fn drop(&mut self) {
645        if let Some(ref mut mmap) = self.mmap {
646            let _ = mmap.flush();
647        }
648        let _ = self.file.sync_all();
649    }
650}
651
652#[cfg(test)]
653mod tests {
654    use super::*;
655    use tempfile::tempdir;
656
657    #[test]
658    fn test_storage_manager_create() {
659        let temp_dir = tempdir().unwrap();
660        let db_path = temp_dir.path().join("test.db");
661        let result = StorageManager::create(&db_path);
662        assert!(result.is_ok());
663    }
664
665    /// Layer 1 Test: 4D Temporal - Time-travel query support
666    ///
667    /// This test demonstrates the need for LSTS (Linearly Versioned Timestamp) support.
668    /// Currently begin_ts/end_ts fields exist but are NOT used for version tracking.
669    ///
670    /// EXPECTED TO FAIL until LSTS is implemented.
671    #[test]
672    fn test_temporal_versioning_lsts_basic() {
673        let temp_dir = tempdir().unwrap();
674        let db_path = temp_dir.path().join("test_temporal.db");
675        let mut manager = StorageManager::create(&db_path).unwrap();
676
677        // Layer 1: Insert a node at timestamp 100
678        let node_v1 = NodeRec {
679            id: 1,
680            morton_code: 0,
681            x: 10.0,
682            y: 20.0,
683            z: 30.0,
684            edge_off: 0,
685            edge_len: 0,
686            flags: 0,
687            begin_ts: 100, // Version starts at timestamp 100
688            end_ts: 0,     // 0 means "current" version
689            tx_id: 1,
690            visibility: 1, // VERSION_COMMITTED
691            _padding: [0; 7],
692        };
693
694        let storage_id_v1 = manager.insert_node(node_v1).unwrap();
695        assert_eq!(
696            storage_id_v1, 0,
697            "Layer 1: First node should have storage ID 0"
698        );
699
700        // Use the logical node ID (node.id) for LSTS operations
701        let logical_id = 1u64;
702
703        // Layer 2: Update the node at timestamp 200 (create new version)
704        // In LSTS, this should:
705        // 1. Set end_ts=200 on the old version
706        // 2. Insert a new version with begin_ts=200, end_ts=0
707        let node_v2 = NodeRec {
708            id: 1,
709            morton_code: 0,
710            x: 15.0,
711            y: 25.0,
712            z: 35.0, // Changed coordinates
713            edge_off: 0,
714            edge_len: 0,
715            flags: 0,
716            begin_ts: 200, // New version starts at timestamp 200
717            end_ts: 0,     // Current version
718            tx_id: 2,
719            visibility: 1,
720            _padding: [0; 7],
721        };
722
723        let storage_id_v2 = manager.update_node(logical_id, node_v2, 200).unwrap();
724        assert_eq!(
725            storage_id_v2, 1,
726            "Layer 2: Second version should have storage ID 1"
727        );
728
729        // Layer 3: Time-travel query - get node state at timestamp 150
730        // At timestamp 150, we should see version 1 (begin_ts=100, end_ts=200)
731        let node_at_150 = manager.get_node_at_timestamp(logical_id, 150);
732
733        // This assertion will FAIL because get_node_at_timestamp doesn't exist yet
734        assert!(
735            node_at_150.is_some(),
736            "Layer 3: Time-travel query should return version 1 at timestamp 150"
737        );
738
739        let node = node_at_150.unwrap();
740        assert_eq!(
741            node.x, 10.0,
742            "Layer 3: At timestamp 150, should see v1 coordinates"
743        );
744        assert_eq!(node.begin_ts, 100, "Layer 3: Should be version 1");
745
746        // Layer 4: Query current version (timestamp 250)
747        let node_current = manager.get_node_at_timestamp(logical_id, 250);
748        assert!(
749            node_current.is_some(),
750            "Layer 4: Should return current version at timestamp 250"
751        );
752
753        let current = node_current.unwrap();
754        assert_eq!(
755            current.x, 15.0,
756            "Layer 4: Current version should have v2 coordinates"
757        );
758        assert_eq!(
759            current.begin_ts, 200,
760            "Layer 4: Current version should be v2"
761        );
762    }
763
764    /// Layer 4 Test: Telemetry - Loop detection in LSTS operations
765    ///
766    /// Verifies that loop guards prevent infinite iteration when scanning versions.
767    /// This test runs with telemetry enabled to validate loop detection.
768    #[test]
769    #[cfg(feature = "telemetry")]
770    fn test_lsts_telemetry_loop_detection() {
771        use crate::telemetry::LoopGuard;
772
773        let temp_dir = tempdir().unwrap();
774        let db_path = temp_dir.path().join("test_telemetry.db");
775        let mut manager = StorageManager::create(&db_path).unwrap();
776
777        // Insert a node
778        let node = NodeRec {
779            id: 1,
780            morton_code: 0,
781            x: 10.0,
782            y: 20.0,
783            z: 30.0,
784            edge_off: 0,
785            edge_len: 0,
786            flags: 0,
787            begin_ts: 100,
788            end_ts: 0,
789            tx_id: 1,
790            visibility: 1,
791            _padding: [0; 7],
792        };
793
794        manager.insert_node(node).unwrap();
795
796        // Test that loop guard works
797        let guard = LoopGuard::new("test_loop", 5);
798        // Should succeed for first 5 iterations (0–4)
799        for i in 0..5 {
800            assert!(guard.check().is_ok(), "Should allow iteration {}", i);
801        }
802        // Should fail on 6th iteration (count becomes 5, 5 >= 5)
803        assert!(guard.check().is_err(), "Should fail at iteration 5");
804    }
805
806    /// Layer 4 Test: Telemetry - Operation tracing
807    ///
808    /// Verifies that operation tracing tracks method calls.
809    #[test]
810    #[cfg(feature = "telemetry")]
811    fn test_lsts_telemetry_op_tracing() {
812        use crate::telemetry::OpTracer;
813
814        let tracer = OpTracer::new();
815
816        // Trace some operations
817        tracer.trace("insert_node", file!(), line!());
818        tracer.trace("get_node", file!(), line!());
819        tracer.trace("update_node", file!(), line!());
820
821        // Tracer should not panic and should track calls — this is a smoke-only test,
822        // so there is no deterministic assertion against call_counts.
823    }
824
825    #[test]
826    fn test_storage_manager_insert_edge() {
827        let temp_dir = tempdir().unwrap();
828        let db_path = temp_dir.path().join("test.db");
829        let mut storage = StorageManager::create(&db_path).unwrap();
830
831        let edge = EdgeRec {
832            src: 0,
833            dst: 1,
834            w: 1.0,
835            flags: 0,
836            begin_ts: 0,
837            end_ts: 0,
838            tx_id: 0,
839            visibility: 1,
840            _padding: [0; 7],
841        };
842        let edge_id = storage.insert_edge(edge).unwrap();
843        assert_eq!(edge_id, 0);
844        assert_eq!(storage.edge_count(), 1);
845
846        let retrieved = storage.get_edge(0).unwrap();
847        assert_eq!(retrieved.dst, 1);
848    }
849
850    // ───────────────────────────────────────────────────────────────────────────
851    // Dual-octree spatial integration tests
852    // ───────────────────────────────────────────────────────────────────────────
853
854    #[test]
855    fn test_spatial_range_query_lazily_builds() {
856        let temp_dir = tempdir().unwrap();
857        let db_path = temp_dir.path().join("spatial.db");
858        let mut storage = StorageManager::create(&db_path).unwrap();
859
860        for i in 0..10 {
861            let node = NodeRec {
862                id: i as u64,
863                morton_code: i as u64, // simple morton for sorting
864                x: i as f32 * 1.0,
865                y: 0.0,
866                z: 0.0,
867                edge_off: 0,
868                edge_len: 0,
869                flags: 0,
870                begin_ts: 0,
871                end_ts: 0,
872                tx_id: 0,
873                visibility: 1,
874                _padding: [0; 7],
875            };
876            storage.insert_node(node).unwrap();
877        }
878
879        // Query box covering nodes 3.0..7.0 on X axis
880        let query = BoundingBox::new(2.5, 7.5, -1.0, 1.0, -1.0, 1.0);
881        let ids = storage.spatial_range_query(&query);
882
883        // Should get some subset of nodes (page-based query returns
884        // whole pages that intersect the query; precise IDs depend
885        // on how the builder split the 10 nodes into pages).
886        assert!(!ids.is_empty());
887        assert!(ids.contains(&3));
888        assert!(ids.contains(&6));
889    }
890
891    #[test]
892    fn test_spatial_save_and_load() {
893        let temp_dir = tempdir().unwrap();
894        let db_path = temp_dir.path().join("spatial_persist.db");
895        let mut storage = StorageManager::create(&db_path).unwrap();
896
897        for i in 0..5 {
898            let node = NodeRec {
899                id: i as u64,
900                morton_code: i as u64,
901                x: i as f32 * 1.0,
902                y: 0.0,
903                z: 0.0,
904                edge_off: 0,
905                edge_len: 0,
906                flags: 0,
907                begin_ts: 0,
908                end_ts: 0,
909                tx_id: 0,
910                visibility: 1,
911                _padding: [0; 7],
912            };
913            storage.insert_node(node).unwrap();
914        }
915
916        // Trigger first spatial query to build the index
917        let query = BoundingBox::new(0.0, 3.0, -1.0, 1.0, -1.0, 1.0);
918        let before_ids = storage.spatial_range_query(&query);
919        assert!(!before_ids.is_empty());
920
921        // Flush (saves spatial sidecar)
922        storage.flush().unwrap();
923
924        // Re-open via StorageManager::open
925        let mut storage2 = StorageManager::open(&db_path).unwrap();
926        storage2.maybe_load_spatial_store().unwrap();
927
928        // Query again
929        let after_ids = storage2.spatial_range_query(&query);
930        assert_eq!(before_ids, after_ids);
931    }
932
933    #[test]
934    fn test_spatial_point_query() {
935        let temp_dir = tempdir().unwrap();
936        let db_path = temp_dir.path().join("spatial_point.db");
937        let mut storage = StorageManager::create(&db_path).unwrap();
938
939        let node = NodeRec {
940            id: 42,
941            morton_code: 0,
942            x: 1.0,
943            y: 2.0,
944            z: 3.0,
945            edge_off: 0,
946            edge_len: 0,
947            flags: 0,
948            begin_ts: 0,
949            end_ts: 0,
950            tx_id: 0,
951            visibility: 1,
952            _padding: [0; 7],
953        };
954        storage.insert_node(node).unwrap();
955
956        let ids = storage.spatial_point_query(1.0, 2.0, 3.0);
957        assert!(ids.contains(&42));
958
959        let ids_outside = storage.spatial_point_query(100.0, 100.0, 100.0);
960        assert!(!ids_outside.contains(&42));
961    }
962}