Skip to main content

cypherlite_storage/
lib.rs

1#![warn(missing_docs)]
2//! Storage engine for CypherLite: page management, B-tree indexes, WAL, and transactions.
3
4/// B-tree index structures for nodes and edges.
5pub mod btree;
6/// Catalog for label, property key, and relationship type name resolution.
7pub mod catalog;
8/// Hyperedge entity storage and reverse index.
9#[cfg(feature = "hypergraph")]
10pub mod hyperedge;
11/// Property index infrastructure for fast node lookups.
12pub mod index;
13/// Page layout, buffer pool, and page manager.
14pub mod page;
15/// Subgraph entity storage and membership index.
16#[cfg(feature = "subgraph")]
17pub mod subgraph;
18/// MVCC transaction management.
19pub mod transaction;
20/// Version storage for pre-update entity snapshots.
21pub mod version;
22/// Write-ahead log (WAL) for crash recovery.
23pub mod wal;
24
25use std::collections::HashMap;
26
27use cypherlite_core::{
28    CypherLiteError, DatabaseConfig, EdgeId, LabelRegistry, NodeId, NodeRecord, PageId,
29    PropertyValue, RelationshipRecord, Result,
30};
31#[cfg(feature = "subgraph")]
32use cypherlite_core::{SubgraphId, SubgraphRecord};
33use fs2::FileExt;
34
35use btree::edge_store::EdgeStore;
36use btree::node_store::NodeStore;
37#[cfg(feature = "hypergraph")]
38use cypherlite_core::{HyperEdgeId, HyperEdgeRecord};
39#[cfg(feature = "hypergraph")]
40use hyperedge::reverse_index::HyperEdgeReverseIndex;
41#[cfg(feature = "hypergraph")]
42use hyperedge::HyperEdgeStore;
43use index::edge_index::EdgeIndexManager;
44use index::IndexManager;
45use page::buffer_pool::BufferPool;
46use page::page_manager::PageManager;
47use page::PAGE_SIZE;
48#[cfg(feature = "subgraph")]
49use subgraph::membership::MembershipIndex;
50#[cfg(feature = "subgraph")]
51use subgraph::SubgraphStore;
52use transaction::mvcc::TransactionManager;
53use version::VersionStore;
54use wal::checkpoint::Checkpoint;
55use wal::reader::WalReader;
56use wal::recovery::Recovery;
57use wal::writer::WalWriter;
58
59/// The main storage engine for CypherLite.
60///
61/// Provides high-level access to node/edge CRUD, transactions,
62/// WAL, and checkpoint operations.
63#[allow(dead_code)]
64pub struct StorageEngine {
65    /// Exclusive file lock on the .cyl file, held for the engine's lifetime.
66    /// Dropped automatically when StorageEngine is dropped, releasing the lock.
67    lock_file: std::fs::File,
68    page_manager: PageManager,
69    buffer_pool: BufferPool,
70    wal_writer: WalWriter,
71    wal_reader: WalReader,
72    tx_manager: TransactionManager,
73    node_store: NodeStore,
74    edge_store: EdgeStore,
75    catalog: catalog::Catalog,
76    index_manager: IndexManager,
77    edge_index_manager: EdgeIndexManager,
78    version_store: VersionStore,
79    #[cfg(feature = "subgraph")]
80    subgraph_store: SubgraphStore,
81    #[cfg(feature = "subgraph")]
82    membership_index: MembershipIndex,
83    #[cfg(feature = "hypergraph")]
84    hyperedge_store: HyperEdgeStore,
85    #[cfg(feature = "hypergraph")]
86    hyperedge_reverse_index: HyperEdgeReverseIndex,
87    config: DatabaseConfig,
88    /// Maps node_id -> data page ID where its record is stored.
89    node_page_map: HashMap<u64, u32>,
90    /// Maps edge_id -> data page ID where its record is stored.
91    edge_page_map: HashMap<u64, u32>,
92    /// Current node data page with free space: (page_id, page_buffer).
93    current_node_data_page: Option<(u32, [u8; PAGE_SIZE])>,
94    /// Current edge data page with free space: (page_id, page_buffer).
95    current_edge_data_page: Option<(u32, [u8; PAGE_SIZE])>,
96    /// Current subgraph data page with free space.
97    #[cfg(feature = "subgraph")]
98    current_subgraph_data_page: Option<(u32, [u8; PAGE_SIZE])>,
99    /// Current hyperedge data page with free space.
100    #[cfg(feature = "hypergraph")]
101    current_hyperedge_data_page: Option<(u32, [u8; PAGE_SIZE])>,
102    /// Current version data page with free space.
103    current_version_data_page: Option<(u32, [u8; PAGE_SIZE])>,
104}
105
106impl StorageEngine {
107    /// Open or create a CypherLite database.
108    ///
109    /// Acquires an exclusive file lock (flock) on the `.cyl` file. If the lock
110    /// cannot be acquired (e.g. another process holds it), returns
111    /// [`CypherLiteError::DatabaseLocked`].
112    pub fn open(config: DatabaseConfig) -> Result<Self> {
113        let wal_path = config.wal_path();
114        let db_exists = config.path.exists();
115
116        // R-PERSIST-035: Acquire exclusive file lock on .cyl file.
117        // Use a .lock sidecar file so we don't interfere with PageManager's
118        // own file I/O (creating an empty .cyl before PageManager would break
119        // its exists() check).
120        let lock_path = config.path.with_extension("cyl-lock");
121        let lock_file = std::fs::OpenOptions::new()
122            .read(true)
123            .write(true)
124            .create(true)
125            .truncate(false)
126            .open(&lock_path)
127            .map_err(CypherLiteError::IoError)?;
128
129        lock_file
130            .try_lock_exclusive()
131            .map_err(|_| CypherLiteError::DatabaseLocked(config.path.display().to_string()))?;
132
133        // Try to open existing database, or create a new one
134        let mut page_manager = if db_exists {
135            PageManager::open_database(&config)?
136        } else {
137            PageManager::create_database(&config)?
138        };
139
140        // Run recovery if WAL file exists
141        let (_recovered, wal_reader) = if wal_path.exists() {
142            Recovery::recover(&mut page_manager, &wal_path)?
143        } else {
144            (0, WalReader::new())
145        };
146
147        // Create or open WAL
148        let wal_writer = if wal_path.exists() {
149            WalWriter::open(&wal_path, config.wal_sync_mode.clone())?
150        } else {
151            WalWriter::create(&wal_path, 12345, config.wal_sync_mode.clone())?
152        };
153
154        let buffer_pool = BufferPool::new(config.cache_capacity);
155        let tx_manager = TransactionManager::new();
156
157        // Initialize ID counters from header
158        let next_node_id = page_manager.header().next_node_id;
159        let next_edge_id = page_manager.header().next_edge_id;
160        let node_store = NodeStore::new(next_node_id);
161        let edge_store = EdgeStore::new(next_edge_id);
162
163        // Update tx manager with WAL frame count
164        tx_manager.update_current_frame(wal_writer.frame_count());
165
166        // GG-005: Initialize subgraph store from header
167        #[cfg(feature = "subgraph")]
168        let next_subgraph_id = page_manager.header().next_subgraph_id;
169        #[cfg(feature = "subgraph")]
170        let subgraph_store = if next_subgraph_id > 0 {
171            SubgraphStore::new(next_subgraph_id)
172        } else {
173            SubgraphStore::new(1)
174        };
175
176        // HH-005: Initialize hyperedge store from header
177        #[cfg(feature = "hypergraph")]
178        let next_hyperedge_id = page_manager.header().next_hyperedge_id;
179        #[cfg(feature = "hypergraph")]
180        let hyperedge_store = if next_hyperedge_id > 0 {
181            HyperEdgeStore::new(next_hyperedge_id)
182        } else {
183            HyperEdgeStore::new(1)
184        };
185
186        let mut engine = Self {
187            lock_file,
188            page_manager,
189            buffer_pool,
190            wal_writer,
191            wal_reader,
192            tx_manager,
193            node_store,
194            edge_store,
195            catalog: catalog::Catalog::default(),
196            index_manager: IndexManager::new(),
197            edge_index_manager: EdgeIndexManager::new(),
198            version_store: VersionStore::new(),
199            #[cfg(feature = "subgraph")]
200            subgraph_store,
201            #[cfg(feature = "subgraph")]
202            membership_index: MembershipIndex::new(),
203            #[cfg(feature = "hypergraph")]
204            hyperedge_store,
205            #[cfg(feature = "hypergraph")]
206            hyperedge_reverse_index: HyperEdgeReverseIndex::new(),
207            config,
208            node_page_map: HashMap::new(),
209            edge_page_map: HashMap::new(),
210            current_node_data_page: None,
211            current_edge_data_page: None,
212            #[cfg(feature = "subgraph")]
213            current_subgraph_data_page: None,
214            #[cfg(feature = "hypergraph")]
215            current_hyperedge_data_page: None,
216            current_version_data_page: None,
217        };
218
219        // R-PERSIST-012: Load catalog from persisted pages before node/edge loading
220        // so that label/property/rel-type IDs are available.
221        engine.load_catalog()?;
222
223        // R-PERSIST-031/032: Load persisted data from disk pages into memory.
224        engine.load_nodes_from_pages()?;
225        engine.load_edges_from_pages()?;
226        // R-PERSIST-050/051/052: Load feature-gated store data from disk.
227        #[cfg(feature = "subgraph")]
228        engine.load_subgraphs_from_pages()?;
229        #[cfg(feature = "hypergraph")]
230        engine.load_hyperedges_from_pages()?;
231        engine.load_versions_from_pages()?;
232
233        Ok(engine)
234    }
235
236    // -- Node CRUD --
237
238    /// Create a new node.
239    pub fn create_node(
240        &mut self,
241        labels: Vec<u32>,
242        properties: Vec<(u32, PropertyValue)>,
243    ) -> NodeId {
244        let id = self
245            .node_store
246            .create_node(labels.clone(), properties.clone());
247        // Update header with new next_node_id
248        self.page_manager.header_mut().next_node_id = self.node_store.next_id();
249        // Auto-update indexes: for each label and property, check if an index applies
250        for &label_id in &labels {
251            for (prop_key_id, value) in &properties {
252                if let Some(idx) = self.index_manager.find_index_mut(label_id, *prop_key_id) {
253                    idx.insert(value, id);
254                }
255            }
256        }
257        // R-PERSIST-001: Persist node record to data page via WAL
258        if let Some(record) = self.node_store.get_node(id).cloned() {
259            let _ = self.persist_node(id, &record, false);
260        }
261        id
262    }
263
264    /// Get a node by ID.
265    pub fn get_node(&self, node_id: NodeId) -> Option<&NodeRecord> {
266        self.node_store.get_node(node_id)
267    }
268
269    /// Update a node's properties.
270    pub fn update_node(
271        &mut self,
272        node_id: NodeId,
273        properties: Vec<(u32, PropertyValue)>,
274    ) -> Result<()> {
275        // Capture old properties for index removal
276        let old_node = self.node_store.get_node(node_id).cloned();
277
278        // W-002: Pre-update snapshot into VersionStore
279        if self.config.version_storage_enabled {
280            if let Some(ref old) = old_node {
281                let seq = self.version_store.snapshot_node(node_id.0, old.clone());
282                // R-PERSIST-052: Persist version record to data page via WAL
283                let vr = version::VersionRecord::Node(old.clone());
284                let _ = self.persist_version(node_id.0, seq, &vr);
285            }
286        }
287
288        self.node_store.update_node(node_id, properties.clone())?;
289        // Update indexes: remove old values, insert new values
290        if let Some(old) = old_node {
291            for &label_id in &old.labels {
292                // Remove old property values from indexes
293                for (prop_key_id, old_value) in &old.properties {
294                    if let Some(idx) = self.index_manager.find_index_mut(label_id, *prop_key_id) {
295                        idx.remove(old_value, node_id);
296                    }
297                }
298                // Insert new property values into indexes
299                for (prop_key_id, new_value) in &properties {
300                    if let Some(idx) = self.index_manager.find_index_mut(label_id, *prop_key_id) {
301                        idx.insert(new_value, node_id);
302                    }
303                }
304            }
305        }
306        // R-PERSIST-003: Persist updated node record to data page via WAL
307        if let Some(updated) = self.node_store.get_node(node_id).cloned() {
308            let _ = self.rewrite_node_on_page(node_id, &updated, false);
309        }
310        Ok(())
311    }
312
313    /// Delete a node and all its connected edges.
314    /// REQ-STORE-004: Delete all connected edges first.
315    pub fn delete_node(&mut self, node_id: NodeId) -> Result<NodeRecord> {
316        // Capture node data for index removal and tombstone before deletion
317        let node_data = self.node_store.get_node(node_id).cloned();
318        // Delete connected edges first
319        self.edge_store
320            .delete_edges_for_node(node_id, &mut self.node_store)?;
321        let deleted = self.node_store.delete_node(node_id)?;
322        // Remove from all applicable indexes
323        if let Some(ref node) = node_data {
324            for &label_id in &node.labels {
325                for (prop_key_id, value) in &node.properties {
326                    if let Some(idx) = self.index_manager.find_index_mut(label_id, *prop_key_id) {
327                        idx.remove(value, node_id);
328                    }
329                }
330            }
331        }
332        // R-PERSIST-004: Write tombstone record to data page via WAL
333        if let Some(ref node) = node_data {
334            let _ = self.rewrite_node_on_page(node_id, node, true);
335        }
336        Ok(deleted)
337    }
338
339    // -- Edge CRUD --
340
341    /// Create a new edge.
342    pub fn create_edge(
343        &mut self,
344        start_node: NodeId,
345        end_node: NodeId,
346        rel_type_id: u32,
347        properties: Vec<(u32, PropertyValue)>,
348    ) -> Result<EdgeId> {
349        let id = self.edge_store.create_edge(
350            start_node,
351            end_node,
352            rel_type_id,
353            properties.clone(),
354            &mut self.node_store,
355        )?;
356        self.page_manager.header_mut().next_edge_id = self.edge_store.next_id();
357        // CC-T5: Auto-update edge indexes on CREATE
358        for (prop_key_id, value) in &properties {
359            if let Some(idx) = self
360                .edge_index_manager
361                .find_index_mut(rel_type_id, *prop_key_id)
362            {
363                idx.insert(value, id);
364            }
365        }
366        // R-PERSIST-002: Persist edge record to data page via WAL
367        if let Some(record) = self.edge_store.get_edge(id).cloned() {
368            let _ = self.persist_edge(id, &record, false);
369        }
370        Ok(id)
371    }
372
373    /// Get an edge by ID.
374    pub fn get_edge(&self, edge_id: EdgeId) -> Option<&RelationshipRecord> {
375        self.edge_store.get_edge(edge_id)
376    }
377
378    /// Update an edge's properties.
379    pub fn update_edge(
380        &mut self,
381        edge_id: EdgeId,
382        properties: Vec<(u32, PropertyValue)>,
383    ) -> Result<()> {
384        // CC-T5: Update edge indexes on SET
385        let old_edge = self.edge_store.get_edge(edge_id).cloned();
386        self.edge_store.update_edge(edge_id, properties.clone())?;
387        if let Some(old) = old_edge {
388            let rel_type_id = old.rel_type_id;
389            // Remove old values from indexes
390            for (prop_key_id, old_value) in &old.properties {
391                if let Some(idx) = self
392                    .edge_index_manager
393                    .find_index_mut(rel_type_id, *prop_key_id)
394                {
395                    idx.remove(old_value, edge_id);
396                }
397            }
398            // Insert new values into indexes
399            for (prop_key_id, new_value) in &properties {
400                if let Some(idx) = self
401                    .edge_index_manager
402                    .find_index_mut(rel_type_id, *prop_key_id)
403                {
404                    idx.insert(new_value, edge_id);
405                }
406            }
407        }
408        // R-PERSIST-003: Persist updated edge record to data page via WAL
409        if let Some(updated) = self.edge_store.get_edge(edge_id).cloned() {
410            let _ = self.rewrite_edge_on_page(edge_id, &updated, false);
411        }
412        Ok(())
413    }
414
415    /// Get all edges connected to a node.
416    pub fn get_edges_for_node(&self, node_id: NodeId) -> Vec<&RelationshipRecord> {
417        self.edge_store
418            .get_edges_for_node(node_id, &self.node_store)
419    }
420
421    /// Delete an edge.
422    pub fn delete_edge(&mut self, edge_id: EdgeId) -> Result<RelationshipRecord> {
423        // CC-T5: Capture data for index removal and tombstone
424        let edge_data = self.edge_store.get_edge(edge_id).cloned();
425        let deleted = self.edge_store.delete_edge(edge_id, &mut self.node_store)?;
426        // Remove from edge indexes
427        if let Some(ref edge) = edge_data {
428            for (prop_key_id, value) in &edge.properties {
429                if let Some(idx) = self
430                    .edge_index_manager
431                    .find_index_mut(edge.rel_type_id, *prop_key_id)
432                {
433                    idx.remove(value, edge_id);
434                }
435            }
436        }
437        // R-PERSIST-004: Write tombstone record to data page via WAL
438        if let Some(ref edge) = edge_data {
439            let _ = self.rewrite_edge_on_page(edge_id, edge, true);
440        }
441        Ok(deleted)
442    }
443
444    // -- Scan operations --
445
446    /// Scan all nodes in the database.
447    pub fn scan_nodes(&self) -> Vec<&NodeRecord> {
448        self.node_store.scan_all()
449    }
450
451    /// Scan nodes that have the given label.
452    pub fn scan_nodes_by_label(&self, label_id: u32) -> Vec<&NodeRecord> {
453        self.node_store.scan_by_label(label_id)
454    }
455
456    /// Scan edges of the given relationship type.
457    pub fn scan_edges_by_type(&self, type_id: u32) -> Vec<&RelationshipRecord> {
458        self.edge_store.scan_by_type(type_id)
459    }
460
461    // -- Transaction operations --
462
463    /// Begin a read transaction.
464    pub fn begin_read(&self) -> transaction::ReadTransaction {
465        self.tx_manager.begin_read()
466    }
467
468    /// Begin a write transaction.
469    pub fn begin_write(&self) -> Result<transaction::WriteTransaction> {
470        self.tx_manager.begin_write()
471    }
472
473    // -- WAL operations --
474
475    /// Write a page to the WAL (used internally).
476    pub fn wal_write_page(&mut self, page_id: PageId, data: &[u8; PAGE_SIZE]) -> Result<u64> {
477        let db_size = self.page_manager.header().page_count;
478        self.wal_writer.write_frame(page_id, db_size, data)
479    }
480
481    /// Commit the current WAL transaction.
482    pub fn wal_commit(&mut self) -> Result<u64> {
483        let frame = self.wal_writer.commit()?;
484        self.tx_manager.update_current_frame(frame);
485        Ok(frame)
486    }
487
488    /// Discard uncommitted WAL frames.
489    pub fn wal_discard(&mut self) {
490        self.wal_writer.discard();
491    }
492
493    // -- Checkpoint --
494
495    /// Run a checkpoint: copy WAL frames to main file.
496    pub fn checkpoint(&mut self) -> Result<u64> {
497        Checkpoint::run(
498            &mut self.page_manager,
499            &mut self.wal_writer,
500            &mut self.wal_reader,
501        )
502    }
503
504    // -- Misc --
505
506    /// Flush the database header to disk.
507    pub fn flush_header(&mut self) -> Result<()> {
508        self.page_manager.flush_header()
509    }
510
511    /// Returns the number of nodes.
512    pub fn node_count(&self) -> usize {
513        self.node_store.len()
514    }
515
516    /// Returns the number of edges.
517    pub fn edge_count(&self) -> usize {
518        self.edge_store.len()
519    }
520
521    /// Find the first node matching all given labels and properties.
522    ///
523    /// Scans nodes by the first label (if any) for efficiency, then filters
524    /// by remaining labels and all property key-value pairs (exact equality).
525    /// Returns `None` if no match is found.
526    pub fn find_node(
527        &self,
528        label_ids: &[u32],
529        properties: &[(u32, PropertyValue)],
530    ) -> Option<NodeId> {
531        let candidates: Vec<&NodeRecord> = if let Some(&first_label) = label_ids.first() {
532            self.scan_nodes_by_label(first_label)
533        } else {
534            self.scan_nodes()
535        };
536
537        for node in candidates {
538            // Check all required labels
539            let has_all_labels = label_ids.iter().all(|lid| node.labels.contains(lid));
540            if !has_all_labels {
541                continue;
542            }
543            // Check all required properties (exact equality)
544            let has_all_props = properties
545                .iter()
546                .all(|(key, val)| node.properties.iter().any(|(k, v)| k == key && v == val));
547            if has_all_props {
548                return Some(node.node_id);
549            }
550        }
551        None
552    }
553
554    /// Find the first edge from `start` to `end` with the given relationship type.
555    ///
556    /// Checks edges connected to the start node and returns the first one
557    /// matching both the end node and relationship type.
558    pub fn find_edge(&self, start: NodeId, end: NodeId, type_id: u32) -> Option<EdgeId> {
559        let edges = self.get_edges_for_node(start);
560        for edge in edges {
561            if edge.start_node == start && edge.end_node == end && edge.rel_type_id == type_id {
562                return Some(edge.edge_id);
563            }
564        }
565        None
566    }
567
568    /// Returns a reference to the config.
569    pub fn config(&self) -> &DatabaseConfig {
570        &self.config
571    }
572
573    /// Scan nodes by (label, property_key, value) using index if available.
574    ///
575    /// If an index exists for (label_id, prop_key_id), uses the fast index lookup.
576    /// Otherwise falls back to linear scan.
577    pub fn scan_nodes_by_property(
578        &self,
579        label_id: u32,
580        prop_key_id: u32,
581        value: &PropertyValue,
582    ) -> Vec<NodeId> {
583        if let Some(idx) = self.index_manager.find_index(label_id, prop_key_id) {
584            // Fast path: index lookup
585            idx.lookup(value)
586        } else {
587            // Slow path: linear scan
588            self.node_store
589                .scan_by_label(label_id)
590                .iter()
591                .filter(|n| {
592                    n.properties
593                        .iter()
594                        .any(|(k, v)| *k == prop_key_id && v == value)
595                })
596                .map(|n| n.node_id)
597                .collect()
598        }
599    }
600
601    /// Range scan by (label, property_key, min, max) using index if available.
602    ///
603    /// Returns node IDs where the property value is in [min, max] (inclusive).
604    /// Uses index range query if available, otherwise falls back to linear scan.
605    pub fn scan_nodes_by_range(
606        &self,
607        label_id: u32,
608        prop_key_id: u32,
609        min: &PropertyValue,
610        max: &PropertyValue,
611    ) -> Vec<NodeId> {
612        if let Some(idx) = self.index_manager.find_index(label_id, prop_key_id) {
613            // Fast path: index range query
614            idx.range(min, max)
615        } else {
616            // Slow path: linear scan with comparison
617            let min_key = index::PropertyValueKey(min.clone());
618            let max_key = index::PropertyValueKey(max.clone());
619            self.node_store
620                .scan_by_label(label_id)
621                .iter()
622                .filter(|n| {
623                    n.properties.iter().any(|(k, v)| {
624                        if *k != prop_key_id {
625                            return false;
626                        }
627                        let vk = index::PropertyValueKey(v.clone());
628                        vk >= min_key && vk <= max_key
629                    })
630                })
631                .map(|n| n.node_id)
632                .collect()
633        }
634    }
635
636    /// Returns a reference to the index manager.
637    pub fn index_manager(&self) -> &IndexManager {
638        &self.index_manager
639    }
640
641    /// Returns a mutable reference to the index manager.
642    pub fn index_manager_mut(&mut self) -> &mut IndexManager {
643        &mut self.index_manager
644    }
645
646    /// Returns a reference to the edge index manager.
647    pub fn edge_index_manager(&self) -> &EdgeIndexManager {
648        &self.edge_index_manager
649    }
650
651    /// Returns a mutable reference to the edge index manager.
652    pub fn edge_index_manager_mut(&mut self) -> &mut EdgeIndexManager {
653        &mut self.edge_index_manager
654    }
655
656    /// Scan edges by (rel_type_id, prop_key_id, value) using index if available.
657    ///
658    /// If an index exists for (rel_type_id, prop_key_id), uses the fast index lookup.
659    /// Otherwise falls back to linear scan.
660    pub fn scan_edges_by_property(
661        &self,
662        rel_type_id: u32,
663        prop_key_id: u32,
664        value: &PropertyValue,
665    ) -> Vec<EdgeId> {
666        if let Some(idx) = self.edge_index_manager.find_index(rel_type_id, prop_key_id) {
667            idx.lookup(value)
668        } else {
669            // Slow path: linear scan
670            self.edge_store
671                .scan_by_type(rel_type_id)
672                .iter()
673                .filter(|e| {
674                    e.properties
675                        .iter()
676                        .any(|(k, v)| *k == prop_key_id && v == value)
677                })
678                .map(|e| e.edge_id)
679                .collect()
680        }
681    }
682
683    /// Returns a reference to the catalog.
684    pub fn catalog(&self) -> &catalog::Catalog {
685        &self.catalog
686    }
687
688    /// Returns a mutable reference to the catalog.
689    pub fn catalog_mut(&mut self) -> &mut catalog::Catalog {
690        &mut self.catalog
691    }
692
693    /// Returns a reference to the version store.
694    pub fn version_store(&self) -> &VersionStore {
695        &self.version_store
696    }
697
698    /// Returns a mutable reference to the version store.
699    pub fn version_store_mut(&mut self) -> &mut VersionStore {
700        &mut self.version_store
701    }
702
703    // -- Subgraph operations (cfg-gated) --
704
705    /// Create a new subgraph with the given properties and optional temporal anchor.
706    #[cfg(feature = "subgraph")]
707    pub fn create_subgraph(
708        &mut self,
709        properties: Vec<(u32, PropertyValue)>,
710        temporal_anchor: Option<i64>,
711    ) -> SubgraphId {
712        let id = self.subgraph_store.create(properties, temporal_anchor);
713        self.page_manager.header_mut().next_subgraph_id = self.subgraph_store.next_id();
714        // R-PERSIST-050: Persist subgraph record to data page via WAL
715        if let Some(record) = self.subgraph_store.get(id).cloned() {
716            let members = self.membership_index.members(id);
717            let _ = self.persist_subgraph(id, &record, &members, false);
718        }
719        id
720    }
721
722    /// Get a subgraph record by ID.
723    #[cfg(feature = "subgraph")]
724    pub fn get_subgraph(&self, id: SubgraphId) -> Option<&SubgraphRecord> {
725        self.subgraph_store.get(id)
726    }
727
728    /// Delete a subgraph by ID. Also removes all memberships.
729    #[cfg(feature = "subgraph")]
730    pub fn delete_subgraph(&mut self, id: SubgraphId) -> cypherlite_core::Result<SubgraphRecord> {
731        // Remove all memberships first
732        self.membership_index.remove_all(id);
733        self.subgraph_store
734            .delete(id)
735            .ok_or(cypherlite_core::CypherLiteError::SubgraphNotFound(id.0))
736    }
737
738    /// Add a node as a member of a subgraph.
739    #[cfg(feature = "subgraph")]
740    pub fn add_member(
741        &mut self,
742        subgraph_id: SubgraphId,
743        node_id: NodeId,
744    ) -> cypherlite_core::Result<()> {
745        if self.subgraph_store.get(subgraph_id).is_none() {
746            return Err(cypherlite_core::CypherLiteError::SubgraphNotFound(
747                subgraph_id.0,
748            ));
749        }
750        if self.node_store.get_node(node_id).is_none() {
751            return Err(cypherlite_core::CypherLiteError::NodeNotFound(node_id.0));
752        }
753        self.membership_index.add(subgraph_id, node_id);
754        // R-PERSIST-050: Re-persist subgraph record with updated membership
755        if let Some(record) = self.subgraph_store.get(subgraph_id).cloned() {
756            let members = self.membership_index.members(subgraph_id);
757            let _ = self.persist_subgraph(subgraph_id, &record, &members, false);
758        }
759        Ok(())
760    }
761
762    /// Remove a node from a subgraph.
763    #[cfg(feature = "subgraph")]
764    pub fn remove_member(
765        &mut self,
766        subgraph_id: SubgraphId,
767        node_id: NodeId,
768    ) -> cypherlite_core::Result<()> {
769        if self.subgraph_store.get(subgraph_id).is_none() {
770            return Err(cypherlite_core::CypherLiteError::SubgraphNotFound(
771                subgraph_id.0,
772            ));
773        }
774        self.membership_index.remove(subgraph_id, node_id);
775        Ok(())
776    }
777
778    /// List all node members of a subgraph.
779    #[cfg(feature = "subgraph")]
780    pub fn list_members(&self, subgraph_id: SubgraphId) -> Vec<NodeId> {
781        self.membership_index.members(subgraph_id)
782    }
783
784    /// Get all subgraphs that a node belongs to.
785    #[cfg(feature = "subgraph")]
786    pub fn get_subgraph_memberships(&self, node_id: NodeId) -> Vec<SubgraphId> {
787        self.membership_index.memberships(node_id)
788    }
789
790    /// Scan all subgraph records.
791    #[cfg(feature = "subgraph")]
792    pub fn scan_subgraphs(&self) -> Vec<&SubgraphRecord> {
793        self.subgraph_store.all().collect()
794    }
795
796    // -- Hyperedge operations (cfg-gated) --
797
798    /// Create a new hyperedge with the given type, sources, targets, and properties.
799    #[cfg(feature = "hypergraph")]
800    pub fn create_hyperedge(
801        &mut self,
802        rel_type_id: u32,
803        sources: Vec<cypherlite_core::GraphEntity>,
804        targets: Vec<cypherlite_core::GraphEntity>,
805        properties: Vec<(u32, PropertyValue)>,
806    ) -> HyperEdgeId {
807        let id =
808            self.hyperedge_store
809                .create(rel_type_id, sources.clone(), targets.clone(), properties);
810        // Sync next_hyperedge_id with header
811        self.page_manager.header_mut().next_hyperedge_id = self.hyperedge_store.next_id();
812        // Update reverse index for all source and target participants
813        for entity in sources.iter().chain(targets.iter()) {
814            let raw_id = match entity {
815                cypherlite_core::GraphEntity::Node(nid) => nid.0,
816                cypherlite_core::GraphEntity::Subgraph(sid) => sid.0,
817                #[cfg(feature = "hypergraph")]
818                cypherlite_core::GraphEntity::HyperEdge(hid) => hid.0,
819                #[cfg(feature = "hypergraph")]
820                cypherlite_core::GraphEntity::TemporalRef(nid, _) => nid.0,
821            };
822            self.hyperedge_reverse_index.add(id.0, raw_id);
823        }
824        // R-PERSIST-051: Persist hyperedge record to data page via WAL
825        if let Some(record) = self.hyperedge_store.get(id).cloned() {
826            let _ = self.persist_hyperedge(id, &record, false);
827        }
828        id
829    }
830
831    /// Get a hyperedge record by ID.
832    #[cfg(feature = "hypergraph")]
833    pub fn get_hyperedge(&self, id: HyperEdgeId) -> Option<&HyperEdgeRecord> {
834        self.hyperedge_store.get(id)
835    }
836
837    /// Delete a hyperedge by ID. Also removes all reverse index entries.
838    #[cfg(feature = "hypergraph")]
839    pub fn delete_hyperedge(
840        &mut self,
841        id: HyperEdgeId,
842    ) -> cypherlite_core::Result<HyperEdgeRecord> {
843        // Remove all reverse index entries first
844        self.hyperedge_reverse_index.remove_all(id.0);
845        self.hyperedge_store
846            .delete(id)
847            .ok_or(cypherlite_core::CypherLiteError::HyperEdgeNotFound(id.0))
848    }
849
850    /// Scan all hyperedge records.
851    #[cfg(feature = "hypergraph")]
852    pub fn scan_hyperedges(&self) -> Vec<&HyperEdgeRecord> {
853        self.hyperedge_store.all().collect()
854    }
855
856    /// Find all hyperedge IDs that an entity participates in (by raw entity ID).
857    #[cfg(feature = "hypergraph")]
858    pub fn hyperedges_for_entity(&self, raw_entity_id: u64) -> Vec<u64> {
859        self.hyperedge_reverse_index.hyperedges_for(raw_entity_id)
860    }
861
862    // -- Data page persistence (Phase 2: R-PERSIST-001..004) --
863
864    /// Returns the node data root page ID from the database header.
865    pub fn node_data_root_page(&self) -> u32 {
866        self.page_manager.header().node_data_root_page
867    }
868
869    /// Returns the edge data root page ID from the database header.
870    pub fn edge_data_root_page(&self) -> u32 {
871        self.page_manager.header().edge_data_root_page
872    }
873
874    /// Returns the number of version snapshots for a given entity.
875    pub fn version_count(&self, entity_id: u64) -> u64 {
876        self.version_store.version_count(entity_id)
877    }
878
879    /// Returns the version chain for a given entity (oldest to newest).
880    pub fn version_chain(&self, entity_id: u64) -> Vec<(u64, &version::VersionRecord)> {
881        self.version_store.get_version_chain(entity_id)
882    }
883
884    /// Read a data page by page ID.
885    ///
886    /// Returns the in-memory cached copy if the page is the current active
887    /// data page (which may contain WAL-only writes not yet checkpointed).
888    /// Otherwise reads from the main database file.
889    pub fn read_data_page(&mut self, page_id: u32) -> Result<[u8; PAGE_SIZE]> {
890        // Check cached node data page first
891        if let Some((cached_pid, ref cached_buf)) = self.current_node_data_page {
892            if cached_pid == page_id {
893                return Ok(*cached_buf);
894            }
895        }
896        // Check cached edge data page
897        if let Some((cached_pid, ref cached_buf)) = self.current_edge_data_page {
898            if cached_pid == page_id {
899                return Ok(*cached_buf);
900            }
901        }
902        // Check cached subgraph data page
903        #[cfg(feature = "subgraph")]
904        if let Some((cached_pid, ref cached_buf)) = self.current_subgraph_data_page {
905            if cached_pid == page_id {
906                return Ok(*cached_buf);
907            }
908        }
909        // Check cached hyperedge data page
910        #[cfg(feature = "hypergraph")]
911        if let Some((cached_pid, ref cached_buf)) = self.current_hyperedge_data_page {
912            if cached_pid == page_id {
913                return Ok(*cached_buf);
914            }
915        }
916        // Check cached version data page
917        if let Some((cached_pid, ref cached_buf)) = self.current_version_data_page {
918            if cached_pid == page_id {
919                return Ok(*cached_buf);
920            }
921        }
922        // Fall back to main file (reads checkpointed data)
923        self.page_manager.read_page(PageId(page_id))
924    }
925
926    /// Returns the number of committed WAL frames.
927    pub fn wal_frame_count(&self) -> u64 {
928        self.wal_writer.frame_count()
929    }
930
931    /// Returns the number of node data pages currently allocated.
932    pub fn node_data_page_count(&self) -> usize {
933        let root = self.page_manager.header().node_data_root_page;
934        if root == 0 {
935            return 0;
936        }
937        // Count unique pages referenced in node_page_map + current page
938        let mut pages: std::collections::HashSet<u32> =
939            self.node_page_map.values().copied().collect();
940        if let Some((pid, _)) = &self.current_node_data_page {
941            pages.insert(*pid);
942        }
943        pages.len()
944    }
945
946    /// Load all persisted node records from data pages into the in-memory NodeStore.
947    ///
948    /// Walks the node data page chain starting from `node_data_root_page` in the
949    /// database header. For each page, deserializes all records and inserts
950    /// non-tombstone records into the NodeStore. Also rebuilds `node_page_map`
951    /// and sets `current_node_data_page` to the last page in the chain.
952    ///
953    /// R-PERSIST-031: After WAL recovery, all node data pages MUST be read and
954    /// deserialized into NodeStore.
955    fn load_nodes_from_pages(&mut self) -> Result<()> {
956        use page::record_serialization::{
957            deserialize_node_record, read_records_from_page, DataPageHeader,
958        };
959
960        let root_page = self.page_manager.header().node_data_root_page;
961        if root_page == 0 {
962            return Ok(()); // No node data persisted
963        }
964
965        let mut current_page_id = root_page;
966        loop {
967            let page_buf = self.page_manager.read_page(PageId(current_page_id))?;
968            let header = DataPageHeader::read_from(&page_buf);
969
970            // Read and deserialize all records from this page
971            let entries = read_records_from_page(&page_buf);
972            for (off, len) in &entries {
973                if let Some((record, deleted, _)) =
974                    deserialize_node_record(&page_buf[*off..*off + *len])
975                {
976                    if !deleted {
977                        self.node_page_map.insert(record.node_id.0, current_page_id);
978                        self.node_store.insert_loaded_record(record);
979                    }
980                }
981            }
982
983            // Follow the page chain or stop
984            if header.next_page == 0 {
985                // Last page in chain -- cache it for future appends
986                self.current_node_data_page = Some((current_page_id, page_buf));
987                break;
988            }
989            current_page_id = header.next_page;
990        }
991
992        Ok(())
993    }
994
995    /// Load all persisted edge records from data pages into the in-memory EdgeStore.
996    ///
997    /// Walks the edge data page chain starting from `edge_data_root_page` in the
998    /// database header. For each page, deserializes all records and inserts
999    /// non-tombstone records into the EdgeStore. Also rebuilds `edge_page_map`
1000    /// and sets `current_edge_data_page` to the last page in the chain.
1001    ///
1002    /// R-PERSIST-032: After WAL recovery, all edge data pages MUST be read and
1003    /// deserialized into EdgeStore.
1004    fn load_edges_from_pages(&mut self) -> Result<()> {
1005        use page::record_serialization::{
1006            deserialize_edge_record, read_records_from_page, DataPageHeader,
1007        };
1008
1009        let root_page = self.page_manager.header().edge_data_root_page;
1010        if root_page == 0 {
1011            return Ok(()); // No edge data persisted
1012        }
1013
1014        let mut current_page_id = root_page;
1015        loop {
1016            let page_buf = self.page_manager.read_page(PageId(current_page_id))?;
1017            let header = DataPageHeader::read_from(&page_buf);
1018
1019            let entries = read_records_from_page(&page_buf);
1020            for (off, len) in &entries {
1021                if let Some((record, deleted, _)) =
1022                    deserialize_edge_record(&page_buf[*off..*off + *len])
1023                {
1024                    if !deleted {
1025                        self.edge_page_map.insert(record.edge_id.0, current_page_id);
1026                        self.edge_store.insert_loaded_record(record);
1027                    }
1028                }
1029            }
1030
1031            if header.next_page == 0 {
1032                self.current_edge_data_page = Some((current_page_id, page_buf));
1033                break;
1034            }
1035            current_page_id = header.next_page;
1036        }
1037
1038        Ok(())
1039    }
1040
1041    /// R-PERSIST-010: Save the catalog to one or more CatalogData pages.
1042    ///
1043    /// Serializes the in-memory `Catalog` via `catalog.save()` (bincode) and
1044    /// writes the resulting bytes across chained CatalogData pages.  The first
1045    /// page ID is stored in `DatabaseHeader.catalog_page_id`.
1046    fn save_catalog(&mut self) -> Result<()> {
1047        use page::record_serialization::DataPageHeader;
1048        use page::PageType;
1049
1050        let catalog_bytes = self.catalog.save();
1051        if catalog_bytes.is_empty() {
1052            return Ok(());
1053        }
1054
1055        let usable_per_page = PAGE_SIZE - DataPageHeader::SIZE;
1056        let mut first_page_id: Option<u32> = None;
1057        let mut prev_page: Option<(u32, [u8; PAGE_SIZE])> = None;
1058
1059        for chunk in catalog_bytes.chunks(usable_per_page) {
1060            let new_page_id = self.page_manager.allocate_page()?;
1061            let mut page_buf = [0u8; PAGE_SIZE];
1062            let mut header = DataPageHeader::new(PageType::CatalogData as u8);
1063            header.free_offset = (DataPageHeader::SIZE + chunk.len()) as u16;
1064            header.record_count = 1; // treat as single blob fragment
1065            header.write_to(&mut page_buf);
1066
1067            // Write chunk data after header
1068            page_buf[DataPageHeader::SIZE..DataPageHeader::SIZE + chunk.len()]
1069                .copy_from_slice(chunk);
1070
1071            if first_page_id.is_none() {
1072                first_page_id = Some(new_page_id.0);
1073            }
1074
1075            // Chain previous page to this one
1076            if let Some((prev_id, ref mut prev_buf)) = prev_page {
1077                let mut prev_header = DataPageHeader::read_from(prev_buf);
1078                prev_header.next_page = new_page_id.0;
1079                prev_header.write_to(prev_buf);
1080                // Write previous page through WAL
1081                let db_size = self.page_manager.header().page_count;
1082                self.wal_writer
1083                    .write_frame(PageId(prev_id), db_size, prev_buf)?;
1084            }
1085
1086            prev_page = Some((new_page_id.0, page_buf));
1087        }
1088
1089        // Write the last page
1090        if let Some((last_id, ref last_buf)) = prev_page {
1091            let db_size = self.page_manager.header().page_count;
1092            self.wal_writer
1093                .write_frame(PageId(last_id), db_size, last_buf)?;
1094            self.wal_writer.commit()?;
1095        }
1096
1097        // Update header with catalog root page
1098        if let Some(root_id) = first_page_id {
1099            self.page_manager.header_mut().catalog_page_id = root_id;
1100        }
1101
1102        Ok(())
1103    }
1104
1105    /// R-PERSIST-012: Load catalog from CatalogData pages on database open.
1106    ///
1107    /// Reads the chained CatalogData pages starting from
1108    /// `DatabaseHeader.catalog_page_id`, concatenates the data payloads, and
1109    /// deserializes via `Catalog::load()`.
1110    fn load_catalog(&mut self) -> Result<()> {
1111        use page::record_serialization::DataPageHeader;
1112
1113        let root_page = self.page_manager.header().catalog_page_id;
1114        if root_page == 0 {
1115            return Ok(()); // No catalog data persisted — use default
1116        }
1117
1118        let mut catalog_bytes = Vec::new();
1119        let mut current_page_id = root_page;
1120
1121        loop {
1122            let page_buf = self.page_manager.read_page(PageId(current_page_id))?;
1123            let header = DataPageHeader::read_from(&page_buf);
1124
1125            // Extract payload between header and free_offset
1126            let data_start = DataPageHeader::SIZE;
1127            let data_end = header.free_offset as usize;
1128            if data_end > data_start && data_end <= PAGE_SIZE {
1129                catalog_bytes.extend_from_slice(&page_buf[data_start..data_end]);
1130            }
1131
1132            if header.next_page == 0 {
1133                break;
1134            }
1135            current_page_id = header.next_page;
1136        }
1137
1138        self.catalog = catalog::Catalog::load(&catalog_bytes)?;
1139        Ok(())
1140    }
1141
1142    /// Persist a node record to a data page and write through WAL.
1143    fn persist_node(&mut self, node_id: NodeId, record: &NodeRecord, deleted: bool) -> Result<()> {
1144        use page::record_serialization::{
1145            pack_record_into_page, serialize_node_record, DataPageHeader,
1146        };
1147        use page::PageType;
1148
1149        let record_bytes = serialize_node_record(record, deleted);
1150
1151        // Try to pack into current node data page
1152        if let Some((page_id, ref mut page_buf)) = self.current_node_data_page {
1153            if pack_record_into_page(page_buf, &record_bytes) {
1154                // Write the page through WAL
1155                let db_size = self.page_manager.header().page_count;
1156                self.wal_writer
1157                    .write_frame(PageId(page_id), db_size, page_buf)?;
1158                self.wal_writer.commit()?;
1159                self.node_page_map.insert(node_id.0, page_id);
1160                return Ok(());
1161            }
1162        }
1163
1164        // Current page is full or doesn't exist -- allocate a new one
1165        let new_page_id = self.page_manager.allocate_page()?;
1166        let mut new_page = [0u8; PAGE_SIZE];
1167        let header = DataPageHeader::new(PageType::NodeData as u8);
1168        header.write_to(&mut new_page);
1169
1170        // Chain the new page to the previous one
1171        if let Some((old_page_id, ref mut old_buf)) = self.current_node_data_page {
1172            // Update old page's next_page pointer
1173            let mut old_header = DataPageHeader::read_from(old_buf);
1174            old_header.next_page = new_page_id.0;
1175            old_header.write_to(old_buf);
1176            // Write old page with updated chain pointer
1177            let db_size = self.page_manager.header().page_count;
1178            self.wal_writer
1179                .write_frame(PageId(old_page_id), db_size, old_buf)?;
1180            self.wal_writer.commit()?;
1181        }
1182
1183        // Pack record into new page
1184        let packed = pack_record_into_page(&mut new_page, &record_bytes);
1185        debug_assert!(packed, "fresh page should always have space for a record");
1186
1187        // Write new page through WAL
1188        let db_size = self.page_manager.header().page_count;
1189        self.wal_writer
1190            .write_frame(new_page_id, db_size, &new_page)?;
1191        self.wal_writer.commit()?;
1192
1193        // Update header if this is the first node data page
1194        if self.page_manager.header().node_data_root_page == 0 {
1195            self.page_manager.header_mut().node_data_root_page = new_page_id.0;
1196            self.page_manager.flush_header()?;
1197        }
1198
1199        self.node_page_map.insert(node_id.0, new_page_id.0);
1200        self.current_node_data_page = Some((new_page_id.0, new_page));
1201
1202        Ok(())
1203    }
1204
1205    /// Persist an edge record to a data page and write through WAL.
1206    fn persist_edge(
1207        &mut self,
1208        edge_id: EdgeId,
1209        record: &RelationshipRecord,
1210        deleted: bool,
1211    ) -> Result<()> {
1212        use page::record_serialization::{
1213            pack_record_into_page, serialize_edge_record, DataPageHeader,
1214        };
1215        use page::PageType;
1216
1217        let record_bytes = serialize_edge_record(record, deleted);
1218
1219        // Try to pack into current edge data page
1220        if let Some((page_id, ref mut page_buf)) = self.current_edge_data_page {
1221            if pack_record_into_page(page_buf, &record_bytes) {
1222                let db_size = self.page_manager.header().page_count;
1223                self.wal_writer
1224                    .write_frame(PageId(page_id), db_size, page_buf)?;
1225                self.wal_writer.commit()?;
1226                self.edge_page_map.insert(edge_id.0, page_id);
1227                return Ok(());
1228            }
1229        }
1230
1231        // Allocate new edge data page
1232        let new_page_id = self.page_manager.allocate_page()?;
1233        let mut new_page = [0u8; PAGE_SIZE];
1234        let header = DataPageHeader::new(PageType::EdgeData as u8);
1235        header.write_to(&mut new_page);
1236
1237        // Chain to previous page
1238        if let Some((old_page_id, ref mut old_buf)) = self.current_edge_data_page {
1239            let mut old_header = DataPageHeader::read_from(old_buf);
1240            old_header.next_page = new_page_id.0;
1241            old_header.write_to(old_buf);
1242            let db_size = self.page_manager.header().page_count;
1243            self.wal_writer
1244                .write_frame(PageId(old_page_id), db_size, old_buf)?;
1245            self.wal_writer.commit()?;
1246        }
1247
1248        let packed = pack_record_into_page(&mut new_page, &record_bytes);
1249        debug_assert!(packed, "fresh page should always have space for a record");
1250
1251        let db_size = self.page_manager.header().page_count;
1252        self.wal_writer
1253            .write_frame(new_page_id, db_size, &new_page)?;
1254        self.wal_writer.commit()?;
1255
1256        if self.page_manager.header().edge_data_root_page == 0 {
1257            self.page_manager.header_mut().edge_data_root_page = new_page_id.0;
1258            self.page_manager.flush_header()?;
1259        }
1260
1261        self.edge_page_map.insert(edge_id.0, new_page_id.0);
1262        self.current_edge_data_page = Some((new_page_id.0, new_page));
1263
1264        Ok(())
1265    }
1266
1267    /// Rewrite a node record on its existing data page (for update/delete).
1268    /// This rewrites the entire page with the updated record replacing the old one.
1269    fn rewrite_node_on_page(
1270        &mut self,
1271        node_id: NodeId,
1272        record: &NodeRecord,
1273        deleted: bool,
1274    ) -> Result<()> {
1275        use page::record_serialization::{
1276            deserialize_node_record, pack_record_into_page, read_records_from_page,
1277            serialize_node_record, DataPageHeader,
1278        };
1279
1280        let page_id = match self.node_page_map.get(&node_id.0) {
1281            Some(&pid) => pid,
1282            None => {
1283                // Node was never persisted; write as new record
1284                return self.persist_node(node_id, record, deleted);
1285            }
1286        };
1287
1288        // Read the current page (from cache or main file)
1289        let old_page = self.read_data_page(page_id)?;
1290        let old_header = DataPageHeader::read_from(&old_page);
1291
1292        // Rebuild the page: copy all records except the one being updated
1293        let mut new_page = [0u8; PAGE_SIZE];
1294        let mut new_header = DataPageHeader::new(old_header.page_type);
1295        new_header.next_page = old_header.next_page;
1296        new_header.write_to(&mut new_page);
1297
1298        let entries = read_records_from_page(&old_page);
1299        for (off, len) in &entries {
1300            let slice = &old_page[*off..*off + *len];
1301            if let Some((rec, _del, _)) = deserialize_node_record(slice) {
1302                if rec.node_id == node_id {
1303                    // Replace with updated record
1304                    let updated_bytes = serialize_node_record(record, deleted);
1305                    pack_record_into_page(&mut new_page, &updated_bytes);
1306                } else {
1307                    // Copy existing record as-is
1308                    pack_record_into_page(&mut new_page, slice);
1309                }
1310            }
1311        }
1312
1313        // Write updated page through WAL
1314        let db_size = self.page_manager.header().page_count;
1315        self.wal_writer
1316            .write_frame(PageId(page_id), db_size, &new_page)?;
1317        self.wal_writer.commit()?;
1318
1319        // Update cached page if it matches
1320        if let Some((cached_pid, ref mut cached_buf)) = self.current_node_data_page {
1321            if cached_pid == page_id {
1322                *cached_buf = new_page;
1323            }
1324        }
1325
1326        Ok(())
1327    }
1328
1329    /// Rewrite an edge record on its existing data page (for update/delete).
1330    fn rewrite_edge_on_page(
1331        &mut self,
1332        edge_id: EdgeId,
1333        record: &RelationshipRecord,
1334        deleted: bool,
1335    ) -> Result<()> {
1336        use page::record_serialization::{
1337            deserialize_edge_record, pack_record_into_page, read_records_from_page,
1338            serialize_edge_record, DataPageHeader,
1339        };
1340
1341        let page_id = match self.edge_page_map.get(&edge_id.0) {
1342            Some(&pid) => pid,
1343            None => {
1344                return self.persist_edge(edge_id, record, deleted);
1345            }
1346        };
1347
1348        // Read the current page (from cache or main file)
1349        let old_page = self.read_data_page(page_id)?;
1350        let old_header = DataPageHeader::read_from(&old_page);
1351
1352        let mut new_page = [0u8; PAGE_SIZE];
1353        let mut new_header = DataPageHeader::new(old_header.page_type);
1354        new_header.next_page = old_header.next_page;
1355        new_header.write_to(&mut new_page);
1356
1357        let entries = read_records_from_page(&old_page);
1358        for (off, len) in &entries {
1359            let slice = &old_page[*off..*off + *len];
1360            if let Some((rec, _del, _)) = deserialize_edge_record(slice) {
1361                if rec.edge_id == edge_id {
1362                    let updated_bytes = serialize_edge_record(record, deleted);
1363                    pack_record_into_page(&mut new_page, &updated_bytes);
1364                } else {
1365                    pack_record_into_page(&mut new_page, slice);
1366                }
1367            }
1368        }
1369
1370        let db_size = self.page_manager.header().page_count;
1371        self.wal_writer
1372            .write_frame(PageId(page_id), db_size, &new_page)?;
1373        self.wal_writer.commit()?;
1374
1375        if let Some((cached_pid, ref mut cached_buf)) = self.current_edge_data_page {
1376            if cached_pid == page_id {
1377                *cached_buf = new_page;
1378            }
1379        }
1380
1381        Ok(())
1382    }
1383
1384    // ================================================================
1385    // PERSIST-001 Phase 5: SubgraphStore persistence
1386    // ================================================================
1387
1388    /// Load all persisted subgraph records from data pages into the in-memory
1389    /// SubgraphStore and MembershipIndex.
1390    #[cfg(feature = "subgraph")]
1391    fn load_subgraphs_from_pages(&mut self) -> Result<()> {
1392        use page::record_serialization::{
1393            deserialize_subgraph_record, read_records_from_page, DataPageHeader,
1394        };
1395
1396        let root_page = self.page_manager.header().subgraph_data_root_page;
1397        if root_page == 0 {
1398            return Ok(());
1399        }
1400
1401        let mut current_page_id = root_page;
1402        loop {
1403            let page_buf = self.page_manager.read_page(PageId(current_page_id))?;
1404            let header = DataPageHeader::read_from(&page_buf);
1405
1406            let entries = read_records_from_page(&page_buf);
1407            for (off, len) in &entries {
1408                if let Some((record, members, deleted, _)) =
1409                    deserialize_subgraph_record(&page_buf[*off..*off + *len])
1410                {
1411                    if !deleted {
1412                        let sg_id = record.subgraph_id;
1413                        self.subgraph_store.insert_loaded_record(record);
1414                        // Rebuild membership index from persisted member lists
1415                        for node_id in members {
1416                            self.membership_index.add(sg_id, node_id);
1417                        }
1418                    }
1419                }
1420            }
1421
1422            if header.next_page == 0 {
1423                self.current_subgraph_data_page = Some((current_page_id, page_buf));
1424                break;
1425            }
1426            current_page_id = header.next_page;
1427        }
1428
1429        Ok(())
1430    }
1431
1432    /// Persist a subgraph record (with membership data) to a data page via WAL.
1433    #[cfg(feature = "subgraph")]
1434    fn persist_subgraph(
1435        &mut self,
1436        _id: SubgraphId,
1437        record: &SubgraphRecord,
1438        members: &[NodeId],
1439        deleted: bool,
1440    ) -> Result<()> {
1441        use page::record_serialization::{
1442            pack_record_into_page, serialize_subgraph_record, DataPageHeader,
1443        };
1444        use page::PageType;
1445
1446        let record_bytes = serialize_subgraph_record(record, members, deleted);
1447
1448        // Try to pack into current subgraph data page
1449        if let Some((page_id, ref mut page_buf)) = self.current_subgraph_data_page {
1450            if pack_record_into_page(page_buf, &record_bytes) {
1451                let db_size = self.page_manager.header().page_count;
1452                self.wal_writer
1453                    .write_frame(PageId(page_id), db_size, page_buf)?;
1454                self.wal_writer.commit()?;
1455                return Ok(());
1456            }
1457        }
1458
1459        // Allocate new subgraph data page
1460        let new_page_id = self.page_manager.allocate_page()?;
1461        let mut new_page = [0u8; PAGE_SIZE];
1462        let header = DataPageHeader::new(PageType::SubgraphData as u8);
1463        header.write_to(&mut new_page);
1464
1465        // Chain to previous page
1466        if let Some((old_page_id, ref mut old_buf)) = self.current_subgraph_data_page {
1467            let mut old_header = DataPageHeader::read_from(old_buf);
1468            old_header.next_page = new_page_id.0;
1469            old_header.write_to(old_buf);
1470            let db_size = self.page_manager.header().page_count;
1471            self.wal_writer
1472                .write_frame(PageId(old_page_id), db_size, old_buf)?;
1473            self.wal_writer.commit()?;
1474        }
1475
1476        let packed = pack_record_into_page(&mut new_page, &record_bytes);
1477        debug_assert!(packed, "fresh page should always have space for a record");
1478
1479        let db_size = self.page_manager.header().page_count;
1480        self.wal_writer
1481            .write_frame(new_page_id, db_size, &new_page)?;
1482        self.wal_writer.commit()?;
1483
1484        if self.page_manager.header().subgraph_data_root_page == 0 {
1485            self.page_manager.header_mut().subgraph_data_root_page = new_page_id.0;
1486            self.page_manager.flush_header()?;
1487        }
1488
1489        self.current_subgraph_data_page = Some((new_page_id.0, new_page));
1490
1491        Ok(())
1492    }
1493
1494    // ================================================================
1495    // PERSIST-001 Phase 5: HyperEdgeStore persistence
1496    // ================================================================
1497
1498    /// Load all persisted hyperedge records from data pages into the in-memory
1499    /// HyperEdgeStore and HyperEdgeReverseIndex.
1500    #[cfg(feature = "hypergraph")]
1501    fn load_hyperedges_from_pages(&mut self) -> Result<()> {
1502        use page::record_serialization::{
1503            deserialize_hyperedge_record, read_records_from_page, DataPageHeader,
1504        };
1505
1506        let root_page = self.page_manager.header().hyperedge_data_root_page;
1507        if root_page == 0 {
1508            return Ok(());
1509        }
1510
1511        let mut current_page_id = root_page;
1512        loop {
1513            let page_buf = self.page_manager.read_page(PageId(current_page_id))?;
1514            let header = DataPageHeader::read_from(&page_buf);
1515
1516            let entries = read_records_from_page(&page_buf);
1517            for (off, len) in &entries {
1518                if let Some((record, deleted, _)) =
1519                    deserialize_hyperedge_record(&page_buf[*off..*off + *len])
1520                {
1521                    if !deleted {
1522                        let he_id = record.id;
1523                        // Rebuild reverse index
1524                        for entity in record.sources.iter().chain(record.targets.iter()) {
1525                            let raw_id = match entity {
1526                                cypherlite_core::GraphEntity::Node(nid) => nid.0,
1527                                cypherlite_core::GraphEntity::Subgraph(sid) => sid.0,
1528                                cypherlite_core::GraphEntity::HyperEdge(hid) => hid.0,
1529                                cypherlite_core::GraphEntity::TemporalRef(nid, _) => nid.0,
1530                            };
1531                            self.hyperedge_reverse_index.add(he_id.0, raw_id);
1532                        }
1533                        self.hyperedge_store.insert_loaded_record(record);
1534                    }
1535                }
1536            }
1537
1538            if header.next_page == 0 {
1539                self.current_hyperedge_data_page = Some((current_page_id, page_buf));
1540                break;
1541            }
1542            current_page_id = header.next_page;
1543        }
1544
1545        Ok(())
1546    }
1547
1548    /// Persist a hyperedge record to a data page via WAL.
1549    #[cfg(feature = "hypergraph")]
1550    fn persist_hyperedge(
1551        &mut self,
1552        _id: HyperEdgeId,
1553        record: &HyperEdgeRecord,
1554        deleted: bool,
1555    ) -> Result<()> {
1556        use page::record_serialization::{
1557            pack_record_into_page, serialize_hyperedge_record, DataPageHeader,
1558        };
1559        use page::PageType;
1560
1561        let record_bytes = serialize_hyperedge_record(record, deleted);
1562
1563        // Try to pack into current hyperedge data page
1564        if let Some((page_id, ref mut page_buf)) = self.current_hyperedge_data_page {
1565            if pack_record_into_page(page_buf, &record_bytes) {
1566                let db_size = self.page_manager.header().page_count;
1567                self.wal_writer
1568                    .write_frame(PageId(page_id), db_size, page_buf)?;
1569                self.wal_writer.commit()?;
1570                return Ok(());
1571            }
1572        }
1573
1574        // Allocate new hyperedge data page
1575        let new_page_id = self.page_manager.allocate_page()?;
1576        let mut new_page = [0u8; PAGE_SIZE];
1577        let header = DataPageHeader::new(PageType::HyperEdgeData as u8);
1578        header.write_to(&mut new_page);
1579
1580        // Chain to previous page
1581        if let Some((old_page_id, ref mut old_buf)) = self.current_hyperedge_data_page {
1582            let mut old_header = DataPageHeader::read_from(old_buf);
1583            old_header.next_page = new_page_id.0;
1584            old_header.write_to(old_buf);
1585            let db_size = self.page_manager.header().page_count;
1586            self.wal_writer
1587                .write_frame(PageId(old_page_id), db_size, old_buf)?;
1588            self.wal_writer.commit()?;
1589        }
1590
1591        let packed = pack_record_into_page(&mut new_page, &record_bytes);
1592        debug_assert!(packed, "fresh page should always have space for a record");
1593
1594        let db_size = self.page_manager.header().page_count;
1595        self.wal_writer
1596            .write_frame(new_page_id, db_size, &new_page)?;
1597        self.wal_writer.commit()?;
1598
1599        if self.page_manager.header().hyperedge_data_root_page == 0 {
1600            self.page_manager.header_mut().hyperedge_data_root_page = new_page_id.0;
1601            self.page_manager.flush_header()?;
1602        }
1603
1604        self.current_hyperedge_data_page = Some((new_page_id.0, new_page));
1605
1606        Ok(())
1607    }
1608
1609    // ================================================================
1610    // PERSIST-001 Phase 5: VersionStore persistence
1611    // ================================================================
1612
1613    /// Load all persisted version records from data pages into the in-memory
1614    /// VersionStore.
1615    fn load_versions_from_pages(&mut self) -> Result<()> {
1616        use page::record_serialization::{
1617            deserialize_version_record, read_records_from_page, DataPageHeader,
1618        };
1619
1620        let root_page = self.page_manager.header().version_data_root_page;
1621        if root_page == 0 {
1622            return Ok(());
1623        }
1624
1625        let mut current_page_id = root_page;
1626        loop {
1627            let page_buf = self.page_manager.read_page(PageId(current_page_id))?;
1628            let header = DataPageHeader::read_from(&page_buf);
1629
1630            let entries = read_records_from_page(&page_buf);
1631            for (off, len) in &entries {
1632                if let Some((entity_id, version_seq, record, _)) =
1633                    deserialize_version_record(&page_buf[*off..*off + *len])
1634                {
1635                    self.version_store
1636                        .insert_loaded_record(entity_id, version_seq, record);
1637                }
1638            }
1639
1640            if header.next_page == 0 {
1641                self.current_version_data_page = Some((current_page_id, page_buf));
1642                break;
1643            }
1644            current_page_id = header.next_page;
1645        }
1646
1647        Ok(())
1648    }
1649
1650    /// Persist a version record to a data page via WAL.
1651    fn persist_version(
1652        &mut self,
1653        entity_id: u64,
1654        version_seq: u64,
1655        record: &version::VersionRecord,
1656    ) -> Result<()> {
1657        use page::record_serialization::{
1658            pack_record_into_page, serialize_version_record, DataPageHeader,
1659        };
1660        use page::PageType;
1661
1662        let record_bytes = serialize_version_record(entity_id, version_seq, record);
1663
1664        // Try to pack into current version data page
1665        if let Some((page_id, ref mut page_buf)) = self.current_version_data_page {
1666            if pack_record_into_page(page_buf, &record_bytes) {
1667                let db_size = self.page_manager.header().page_count;
1668                self.wal_writer
1669                    .write_frame(PageId(page_id), db_size, page_buf)?;
1670                self.wal_writer.commit()?;
1671                return Ok(());
1672            }
1673        }
1674
1675        // Allocate new version data page
1676        let new_page_id = self.page_manager.allocate_page()?;
1677        let mut new_page = [0u8; PAGE_SIZE];
1678        let header = DataPageHeader::new(PageType::VersionData as u8);
1679        header.write_to(&mut new_page);
1680
1681        // Chain to previous page
1682        if let Some((old_page_id, ref mut old_buf)) = self.current_version_data_page {
1683            let mut old_header = DataPageHeader::read_from(old_buf);
1684            old_header.next_page = new_page_id.0;
1685            old_header.write_to(old_buf);
1686            let db_size = self.page_manager.header().page_count;
1687            self.wal_writer
1688                .write_frame(PageId(old_page_id), db_size, old_buf)?;
1689            self.wal_writer.commit()?;
1690        }
1691
1692        let packed = pack_record_into_page(&mut new_page, &record_bytes);
1693        debug_assert!(packed, "fresh page should always have space for a record");
1694
1695        let db_size = self.page_manager.header().page_count;
1696        self.wal_writer
1697            .write_frame(new_page_id, db_size, &new_page)?;
1698        self.wal_writer.commit()?;
1699
1700        if self.page_manager.header().version_data_root_page == 0 {
1701            self.page_manager.header_mut().version_data_root_page = new_page_id.0;
1702            self.page_manager.flush_header()?;
1703        }
1704
1705        self.current_version_data_page = Some((new_page_id.0, new_page));
1706
1707        Ok(())
1708    }
1709}
1710
1711impl LabelRegistry for StorageEngine {
1712    fn get_or_create_label(&mut self, name: &str) -> u32 {
1713        self.catalog.get_or_create_label(name)
1714    }
1715
1716    fn label_id(&self, name: &str) -> Option<u32> {
1717        self.catalog.label_id(name)
1718    }
1719
1720    fn label_name(&self, id: u32) -> Option<&str> {
1721        self.catalog.label_name(id)
1722    }
1723
1724    fn get_or_create_rel_type(&mut self, name: &str) -> u32 {
1725        self.catalog.get_or_create_rel_type(name)
1726    }
1727
1728    fn rel_type_id(&self, name: &str) -> Option<u32> {
1729        self.catalog.rel_type_id(name)
1730    }
1731
1732    fn rel_type_name(&self, id: u32) -> Option<&str> {
1733        self.catalog.rel_type_name(id)
1734    }
1735
1736    fn get_or_create_prop_key(&mut self, name: &str) -> u32 {
1737        self.catalog.get_or_create_prop_key(name)
1738    }
1739
1740    fn prop_key_id(&self, name: &str) -> Option<u32> {
1741        self.catalog.prop_key_id(name)
1742    }
1743
1744    fn prop_key_name(&self, id: u32) -> Option<&str> {
1745        self.catalog.prop_key_name(id)
1746    }
1747}
1748
1749impl Drop for StorageEngine {
1750    fn drop(&mut self) {
1751        // R-PERSIST-010: Persist catalog before closing.
1752        let _ = self.save_catalog();
1753        // R-PERSIST-037: On Drop, checkpoint executes, WAL deleted, then file lock released.
1754        // R-PERSIST-005: Flush header to persist next_node_id/next_edge_id before checkpoint.
1755        let _ = self.page_manager.flush_header();
1756        // Flush WAL to main database file, then delete WAL only if successful.
1757        // If checkpoint fails, WAL is preserved for crash recovery on next open.
1758        if self.checkpoint().is_ok() {
1759            let _ = std::fs::remove_file(self.config.wal_path());
1760        }
1761        // Lock file is released automatically when self.lock_file is dropped.
1762        // Clean up the .cyl-lock sidecar file.
1763        let _ = std::fs::remove_file(self.config.path.with_extension("cyl-lock"));
1764    }
1765}
1766
1767#[cfg(test)]
1768mod tests {
1769    use super::*;
1770    use cypherlite_core::{CypherLiteError, SyncMode};
1771    use tempfile::tempdir;
1772
1773    fn test_engine(dir: &std::path::Path) -> StorageEngine {
1774        let config = DatabaseConfig {
1775            path: dir.join("test.cyl"),
1776            wal_sync_mode: SyncMode::Normal,
1777            ..Default::default()
1778        };
1779        StorageEngine::open(config).expect("open")
1780    }
1781
1782    #[test]
1783    fn test_open_creates_database() {
1784        let dir = tempdir().expect("tempdir");
1785        let engine = test_engine(dir.path());
1786        assert_eq!(engine.node_count(), 0);
1787        assert_eq!(engine.edge_count(), 0);
1788    }
1789
1790    #[test]
1791    fn test_create_and_get_node() {
1792        let dir = tempdir().expect("tempdir");
1793        let mut engine = test_engine(dir.path());
1794        let id = engine.create_node(vec![1, 2], vec![(1, PropertyValue::String("Alice".into()))]);
1795        let node = engine.get_node(id).expect("found");
1796        assert_eq!(node.node_id, id);
1797        assert_eq!(node.labels, vec![1, 2]);
1798        assert_eq!(engine.node_count(), 1);
1799    }
1800
1801    #[test]
1802    fn test_update_node() {
1803        let dir = tempdir().expect("tempdir");
1804        let mut engine = test_engine(dir.path());
1805        let id = engine.create_node(vec![], vec![(1, PropertyValue::Int64(10))]);
1806        engine
1807            .update_node(id, vec![(1, PropertyValue::Int64(20))])
1808            .expect("update");
1809        let node = engine.get_node(id).expect("found");
1810        assert_eq!(node.properties[0].1, PropertyValue::Int64(20));
1811    }
1812
1813    #[test]
1814    fn test_delete_node() {
1815        let dir = tempdir().expect("tempdir");
1816        let mut engine = test_engine(dir.path());
1817        let id = engine.create_node(vec![], vec![]);
1818        engine.delete_node(id).expect("delete");
1819        assert!(engine.get_node(id).is_none());
1820        assert_eq!(engine.node_count(), 0);
1821    }
1822
1823    #[test]
1824    fn test_create_and_get_edge() {
1825        let dir = tempdir().expect("tempdir");
1826        let mut engine = test_engine(dir.path());
1827        let n1 = engine.create_node(vec![], vec![]);
1828        let n2 = engine.create_node(vec![], vec![]);
1829        let e = engine.create_edge(n1, n2, 1, vec![]).expect("edge");
1830        let edge = engine.get_edge(e).expect("found");
1831        assert_eq!(edge.start_node, n1);
1832        assert_eq!(edge.end_node, n2);
1833    }
1834
1835    #[test]
1836    fn test_get_edges_for_node() {
1837        let dir = tempdir().expect("tempdir");
1838        let mut engine = test_engine(dir.path());
1839        let n1 = engine.create_node(vec![], vec![]);
1840        let n2 = engine.create_node(vec![], vec![]);
1841        let n3 = engine.create_node(vec![], vec![]);
1842        engine.create_edge(n1, n2, 1, vec![]).expect("e1");
1843        engine.create_edge(n1, n3, 2, vec![]).expect("e2");
1844        let edges = engine.get_edges_for_node(n1);
1845        assert_eq!(edges.len(), 2);
1846    }
1847
1848    #[test]
1849    fn test_delete_edge() {
1850        let dir = tempdir().expect("tempdir");
1851        let mut engine = test_engine(dir.path());
1852        let n1 = engine.create_node(vec![], vec![]);
1853        let n2 = engine.create_node(vec![], vec![]);
1854        let e = engine.create_edge(n1, n2, 1, vec![]).expect("edge");
1855        engine.delete_edge(e).expect("delete");
1856        assert!(engine.get_edge(e).is_none());
1857    }
1858
1859    // REQ-STORE-004: Delete node deletes connected edges first
1860    #[test]
1861    fn test_delete_node_cascades_edges() {
1862        let dir = tempdir().expect("tempdir");
1863        let mut engine = test_engine(dir.path());
1864        let n1 = engine.create_node(vec![], vec![]);
1865        let n2 = engine.create_node(vec![], vec![]);
1866        let e = engine.create_edge(n1, n2, 1, vec![]).expect("edge");
1867        engine.delete_node(n1).expect("delete");
1868        assert!(engine.get_edge(e).is_none());
1869        assert_eq!(engine.edge_count(), 0);
1870    }
1871
1872    #[test]
1873    fn test_begin_read_transaction() {
1874        let dir = tempdir().expect("tempdir");
1875        let engine = test_engine(dir.path());
1876        let tx = engine.begin_read();
1877        assert_eq!(tx.tx_id(), 1);
1878    }
1879
1880    #[test]
1881    fn test_begin_write_transaction() {
1882        let dir = tempdir().expect("tempdir");
1883        let engine = test_engine(dir.path());
1884        let tx = engine.begin_write().expect("write");
1885        assert_eq!(tx.tx_id(), 1);
1886    }
1887
1888    // REQ-TX-010: Second write returns conflict
1889    #[test]
1890    fn test_write_conflict() {
1891        let dir = tempdir().expect("tempdir");
1892        let engine = test_engine(dir.path());
1893        let _w1 = engine.begin_write().expect("w1");
1894        let result = engine.begin_write();
1895        assert!(matches!(result, Err(CypherLiteError::TransactionConflict)));
1896    }
1897
1898    #[test]
1899    fn test_wal_write_and_commit() {
1900        let dir = tempdir().expect("tempdir");
1901        let mut engine = test_engine(dir.path());
1902        let data = [0xAB; PAGE_SIZE];
1903        engine.wal_write_page(PageId(2), &data).expect("write");
1904        let frame = engine.wal_commit().expect("commit");
1905        assert!(frame > 0);
1906    }
1907
1908    #[test]
1909    fn test_checkpoint() {
1910        let dir = tempdir().expect("tempdir");
1911        let mut engine = test_engine(dir.path());
1912        let data = [0xAB; PAGE_SIZE];
1913        engine.wal_write_page(PageId(2), &data).expect("write");
1914        engine.wal_commit().expect("commit");
1915        let count = engine.checkpoint().expect("checkpoint");
1916        assert_eq!(count, 1);
1917    }
1918
1919    #[test]
1920    fn test_reopen_database() {
1921        let dir = tempdir().expect("tempdir");
1922        let config = DatabaseConfig {
1923            path: dir.path().join("test.cyl"),
1924            wal_sync_mode: SyncMode::Normal,
1925            ..Default::default()
1926        };
1927
1928        // Create and populate
1929        {
1930            let mut engine = StorageEngine::open(config.clone()).expect("open");
1931            engine.create_node(vec![1], vec![(1, PropertyValue::String("Alice".into()))]);
1932        }
1933
1934        // Reopen - data pages loaded back into memory (R-PERSIST-005)
1935        {
1936            let engine = StorageEngine::open(config).expect("reopen");
1937            assert_eq!(engine.node_count(), 1);
1938            let node = engine.get_node(NodeId(1)).expect("node should be loaded");
1939            assert_eq!(node.labels, vec![1]);
1940            assert_eq!(
1941                node.properties[0],
1942                (1, PropertyValue::String("Alice".into()))
1943            );
1944        }
1945    }
1946
1947    // TASK-006: scan_nodes
1948    #[test]
1949    fn test_scan_nodes_empty() {
1950        let dir = tempdir().expect("tempdir");
1951        let engine = test_engine(dir.path());
1952        let nodes = engine.scan_nodes();
1953        assert!(nodes.is_empty());
1954    }
1955
1956    #[test]
1957    fn test_scan_nodes_returns_all() {
1958        let dir = tempdir().expect("tempdir");
1959        let mut engine = test_engine(dir.path());
1960        engine.create_node(vec![1], vec![]);
1961        engine.create_node(vec![2], vec![]);
1962        engine.create_node(vec![3], vec![]);
1963        let nodes = engine.scan_nodes();
1964        assert_eq!(nodes.len(), 3);
1965    }
1966
1967    // TASK-007: scan_nodes_by_label
1968    #[test]
1969    fn test_scan_nodes_by_label_returns_matching() {
1970        let dir = tempdir().expect("tempdir");
1971        let mut engine = test_engine(dir.path());
1972        engine.create_node(vec![1, 2], vec![]);
1973        engine.create_node(vec![2, 3], vec![]);
1974        engine.create_node(vec![3], vec![]);
1975        let nodes = engine.scan_nodes_by_label(2);
1976        assert_eq!(nodes.len(), 2);
1977    }
1978
1979    #[test]
1980    fn test_scan_nodes_by_label_nonexistent() {
1981        let dir = tempdir().expect("tempdir");
1982        let mut engine = test_engine(dir.path());
1983        engine.create_node(vec![1], vec![]);
1984        let nodes = engine.scan_nodes_by_label(999);
1985        assert!(nodes.is_empty());
1986    }
1987
1988    // TASK-008: scan_edges_by_type
1989    #[test]
1990    fn test_scan_edges_by_type_returns_matching() {
1991        let dir = tempdir().expect("tempdir");
1992        let mut engine = test_engine(dir.path());
1993        let n1 = engine.create_node(vec![], vec![]);
1994        let n2 = engine.create_node(vec![], vec![]);
1995        let n3 = engine.create_node(vec![], vec![]);
1996        engine.create_edge(n1, n2, 1, vec![]).expect("e1");
1997        engine.create_edge(n1, n3, 2, vec![]).expect("e2");
1998        engine.create_edge(n2, n3, 1, vec![]).expect("e3");
1999        let edges = engine.scan_edges_by_type(1);
2000        assert_eq!(edges.len(), 2);
2001        for edge in &edges {
2002            assert_eq!(edge.rel_type_id, 1);
2003        }
2004    }
2005
2006    #[test]
2007    fn test_scan_edges_by_type_empty() {
2008        let dir = tempdir().expect("tempdir");
2009        let engine = test_engine(dir.path());
2010        let edges = engine.scan_edges_by_type(1);
2011        assert!(edges.is_empty());
2012    }
2013
2014    // TASK-080: find_node API
2015    #[test]
2016    fn test_find_node_returns_matching_node() {
2017        let dir = tempdir().expect("tempdir");
2018        let mut engine = test_engine(dir.path());
2019        let label_id = engine.get_or_create_label("Person");
2020        let name_key = engine.get_or_create_prop_key("name");
2021        let nid = engine.create_node(
2022            vec![label_id],
2023            vec![(name_key, PropertyValue::String("Alice".into()))],
2024        );
2025        let found = engine.find_node(
2026            &[label_id],
2027            &[(name_key, PropertyValue::String("Alice".into()))],
2028        );
2029        assert_eq!(found, Some(nid));
2030    }
2031
2032    #[test]
2033    fn test_find_node_returns_none_when_no_match() {
2034        let dir = tempdir().expect("tempdir");
2035        let mut engine = test_engine(dir.path());
2036        let label_id = engine.get_or_create_label("Person");
2037        let name_key = engine.get_or_create_prop_key("name");
2038        engine.create_node(
2039            vec![label_id],
2040            vec![(name_key, PropertyValue::String("Alice".into()))],
2041        );
2042        let found = engine.find_node(
2043            &[label_id],
2044            &[(name_key, PropertyValue::String("Bob".into()))],
2045        );
2046        assert_eq!(found, None);
2047    }
2048
2049    #[test]
2050    fn test_find_node_empty_db() {
2051        let dir = tempdir().expect("tempdir");
2052        let engine = test_engine(dir.path());
2053        let found = engine.find_node(&[0], &[]);
2054        assert_eq!(found, None);
2055    }
2056
2057    #[test]
2058    fn test_find_node_multiple_labels() {
2059        let dir = tempdir().expect("tempdir");
2060        let mut engine = test_engine(dir.path());
2061        let person = engine.get_or_create_label("Person");
2062        let employee = engine.get_or_create_label("Employee");
2063        let name_key = engine.get_or_create_prop_key("name");
2064        let nid = engine.create_node(
2065            vec![person, employee],
2066            vec![(name_key, PropertyValue::String("Alice".into()))],
2067        );
2068        // Both labels required
2069        let found = engine.find_node(
2070            &[person, employee],
2071            &[(name_key, PropertyValue::String("Alice".into()))],
2072        );
2073        assert_eq!(found, Some(nid));
2074        // Only person label - should still match (node has both)
2075        let found2 = engine.find_node(
2076            &[person],
2077            &[(name_key, PropertyValue::String("Alice".into()))],
2078        );
2079        assert_eq!(found2, Some(nid));
2080    }
2081
2082    #[test]
2083    fn test_find_node_no_properties() {
2084        let dir = tempdir().expect("tempdir");
2085        let mut engine = test_engine(dir.path());
2086        let label_id = engine.get_or_create_label("Person");
2087        let nid = engine.create_node(vec![label_id], vec![]);
2088        let found = engine.find_node(&[label_id], &[]);
2089        assert_eq!(found, Some(nid));
2090    }
2091
2092    // TASK-081: find_edge API
2093    #[test]
2094    fn test_find_edge_returns_matching_edge() {
2095        let dir = tempdir().expect("tempdir");
2096        let mut engine = test_engine(dir.path());
2097        let n1 = engine.create_node(vec![], vec![]);
2098        let n2 = engine.create_node(vec![], vec![]);
2099        let type_id = engine.get_or_create_rel_type("KNOWS");
2100        let eid = engine.create_edge(n1, n2, type_id, vec![]).expect("edge");
2101        let found = engine.find_edge(n1, n2, type_id);
2102        assert_eq!(found, Some(eid));
2103    }
2104
2105    #[test]
2106    fn test_find_edge_returns_none_wrong_type() {
2107        let dir = tempdir().expect("tempdir");
2108        let mut engine = test_engine(dir.path());
2109        let n1 = engine.create_node(vec![], vec![]);
2110        let n2 = engine.create_node(vec![], vec![]);
2111        let knows = engine.get_or_create_rel_type("KNOWS");
2112        let likes = engine.get_or_create_rel_type("LIKES");
2113        engine.create_edge(n1, n2, knows, vec![]).expect("edge");
2114        let found = engine.find_edge(n1, n2, likes);
2115        assert_eq!(found, None);
2116    }
2117
2118    #[test]
2119    fn test_find_edge_returns_none_wrong_endpoints() {
2120        let dir = tempdir().expect("tempdir");
2121        let mut engine = test_engine(dir.path());
2122        let n1 = engine.create_node(vec![], vec![]);
2123        let n2 = engine.create_node(vec![], vec![]);
2124        let n3 = engine.create_node(vec![], vec![]);
2125        let type_id = engine.get_or_create_rel_type("KNOWS");
2126        engine.create_edge(n1, n2, type_id, vec![]).expect("edge");
2127        let found = engine.find_edge(n1, n3, type_id);
2128        assert_eq!(found, None);
2129    }
2130
2131    #[test]
2132    fn test_find_edge_empty_db() {
2133        let dir = tempdir().expect("tempdir");
2134        let mut engine = test_engine(dir.path());
2135        let n1 = engine.create_node(vec![], vec![]);
2136        let n2 = engine.create_node(vec![], vec![]);
2137        let found = engine.find_edge(n1, n2, 0);
2138        assert_eq!(found, None);
2139    }
2140
2141    // ======================================================================
2142    // TASK-095: Auto-update indexes on mutations
2143    // ======================================================================
2144
2145    #[test]
2146    fn test_create_node_updates_index() {
2147        let dir = tempdir().expect("tempdir");
2148        let mut engine = test_engine(dir.path());
2149        let label_id = engine.get_or_create_label("Person");
2150        let name_key = engine.get_or_create_prop_key("name");
2151
2152        // Create index before creating nodes
2153        engine
2154            .index_manager_mut()
2155            .create_index("idx_person_name".to_string(), label_id, name_key)
2156            .expect("create index");
2157
2158        let nid = engine.create_node(
2159            vec![label_id],
2160            vec![(name_key, PropertyValue::String("Alice".into()))],
2161        );
2162
2163        // Index should contain the new node
2164        let result = engine
2165            .index_manager()
2166            .find_index(label_id, name_key)
2167            .expect("index exists")
2168            .lookup(&PropertyValue::String("Alice".into()));
2169        assert_eq!(result, vec![nid]);
2170    }
2171
2172    #[test]
2173    fn test_update_node_updates_index() {
2174        let dir = tempdir().expect("tempdir");
2175        let mut engine = test_engine(dir.path());
2176        let label_id = engine.get_or_create_label("Person");
2177        let name_key = engine.get_or_create_prop_key("name");
2178
2179        engine
2180            .index_manager_mut()
2181            .create_index("idx_person_name".to_string(), label_id, name_key)
2182            .expect("create index");
2183
2184        let nid = engine.create_node(
2185            vec![label_id],
2186            vec![(name_key, PropertyValue::String("Alice".into()))],
2187        );
2188
2189        // Update the property
2190        engine
2191            .update_node(nid, vec![(name_key, PropertyValue::String("Bob".into()))])
2192            .expect("update");
2193
2194        let idx = engine
2195            .index_manager()
2196            .find_index(label_id, name_key)
2197            .expect("idx");
2198        // Old value should not be in index
2199        assert!(idx
2200            .lookup(&PropertyValue::String("Alice".into()))
2201            .is_empty());
2202        // New value should be in index
2203        assert_eq!(idx.lookup(&PropertyValue::String("Bob".into())), vec![nid]);
2204    }
2205
2206    #[test]
2207    fn test_delete_node_removes_from_index() {
2208        let dir = tempdir().expect("tempdir");
2209        let mut engine = test_engine(dir.path());
2210        let label_id = engine.get_or_create_label("Person");
2211        let name_key = engine.get_or_create_prop_key("name");
2212
2213        engine
2214            .index_manager_mut()
2215            .create_index("idx_person_name".to_string(), label_id, name_key)
2216            .expect("create index");
2217
2218        let nid = engine.create_node(
2219            vec![label_id],
2220            vec![(name_key, PropertyValue::String("Alice".into()))],
2221        );
2222
2223        engine.delete_node(nid).expect("delete");
2224
2225        let idx = engine
2226            .index_manager()
2227            .find_index(label_id, name_key)
2228            .expect("idx");
2229        assert!(idx
2230            .lookup(&PropertyValue::String("Alice".into()))
2231            .is_empty());
2232    }
2233
2234    // ======================================================================
2235    // TASK-096: scan_nodes_by_property
2236    // ======================================================================
2237
2238    #[test]
2239    fn test_scan_nodes_by_property_with_index() {
2240        let dir = tempdir().expect("tempdir");
2241        let mut engine = test_engine(dir.path());
2242        let label_id = engine.get_or_create_label("Person");
2243        let name_key = engine.get_or_create_prop_key("name");
2244
2245        engine
2246            .index_manager_mut()
2247            .create_index("idx_person_name".to_string(), label_id, name_key)
2248            .expect("create index");
2249
2250        let n1 = engine.create_node(
2251            vec![label_id],
2252            vec![(name_key, PropertyValue::String("Alice".into()))],
2253        );
2254        engine.create_node(
2255            vec![label_id],
2256            vec![(name_key, PropertyValue::String("Bob".into()))],
2257        );
2258
2259        let result = engine.scan_nodes_by_property(
2260            label_id,
2261            name_key,
2262            &PropertyValue::String("Alice".into()),
2263        );
2264        assert_eq!(result, vec![n1]);
2265    }
2266
2267    #[test]
2268    fn test_scan_nodes_by_property_without_index() {
2269        let dir = tempdir().expect("tempdir");
2270        let mut engine = test_engine(dir.path());
2271        let label_id = engine.get_or_create_label("Person");
2272        let name_key = engine.get_or_create_prop_key("name");
2273
2274        // No index created -- should use linear scan
2275        let n1 = engine.create_node(
2276            vec![label_id],
2277            vec![(name_key, PropertyValue::String("Alice".into()))],
2278        );
2279        engine.create_node(
2280            vec![label_id],
2281            vec![(name_key, PropertyValue::String("Bob".into()))],
2282        );
2283
2284        let result = engine.scan_nodes_by_property(
2285            label_id,
2286            name_key,
2287            &PropertyValue::String("Alice".into()),
2288        );
2289        assert_eq!(result, vec![n1]);
2290    }
2291
2292    #[test]
2293    fn test_scan_nodes_by_property_both_paths_agree() {
2294        let dir = tempdir().expect("tempdir");
2295        let mut engine = test_engine(dir.path());
2296        let label_id = engine.get_or_create_label("Person");
2297        let name_key = engine.get_or_create_prop_key("name");
2298
2299        // Create nodes without index
2300        engine.create_node(
2301            vec![label_id],
2302            vec![(name_key, PropertyValue::String("Alice".into()))],
2303        );
2304        engine.create_node(
2305            vec![label_id],
2306            vec![(name_key, PropertyValue::String("Bob".into()))],
2307        );
2308        engine.create_node(
2309            vec![label_id],
2310            vec![(name_key, PropertyValue::String("Alice".into()))],
2311        );
2312
2313        // Linear scan result
2314        let without_idx = engine.scan_nodes_by_property(
2315            label_id,
2316            name_key,
2317            &PropertyValue::String("Alice".into()),
2318        );
2319
2320        // Now create index and backfill
2321        engine
2322            .index_manager_mut()
2323            .create_index("idx".to_string(), label_id, name_key)
2324            .expect("create");
2325        // Backfill: manually insert existing nodes into the index
2326        let nodes: Vec<_> = engine
2327            .scan_nodes_by_label(label_id)
2328            .iter()
2329            .map(|n| (n.node_id, n.properties.clone()))
2330            .collect();
2331        for (nid, props) in &nodes {
2332            for (pk, v) in props {
2333                if *pk == name_key {
2334                    engine
2335                        .index_manager_mut()
2336                        .find_index_mut(label_id, name_key)
2337                        .expect("idx")
2338                        .insert(v, *nid);
2339                }
2340            }
2341        }
2342
2343        let with_idx = engine.scan_nodes_by_property(
2344            label_id,
2345            name_key,
2346            &PropertyValue::String("Alice".into()),
2347        );
2348
2349        // Both paths should return same IDs (order may differ)
2350        let mut a: Vec<u64> = without_idx.iter().map(|n| n.0).collect();
2351        let mut b: Vec<u64> = with_idx.iter().map(|n| n.0).collect();
2352        a.sort();
2353        b.sort();
2354        assert_eq!(a, b);
2355    }
2356
2357    // ======================================================================
2358    // TASK-097: scan_nodes_by_range
2359    // ======================================================================
2360
2361    #[test]
2362    fn test_scan_nodes_by_range_with_index() {
2363        let dir = tempdir().expect("tempdir");
2364        let mut engine = test_engine(dir.path());
2365        let label_id = engine.get_or_create_label("Person");
2366        let age_key = engine.get_or_create_prop_key("age");
2367
2368        engine
2369            .index_manager_mut()
2370            .create_index("idx_person_age".to_string(), label_id, age_key)
2371            .expect("create index");
2372
2373        for age in [20, 25, 30, 35, 40] {
2374            engine.create_node(vec![label_id], vec![(age_key, PropertyValue::Int64(age))]);
2375        }
2376
2377        let result = engine.scan_nodes_by_range(
2378            label_id,
2379            age_key,
2380            &PropertyValue::Int64(25),
2381            &PropertyValue::Int64(35),
2382        );
2383        assert_eq!(result.len(), 3); // 25, 30, 35
2384    }
2385
2386    #[test]
2387    fn test_scan_nodes_by_range_without_index() {
2388        let dir = tempdir().expect("tempdir");
2389        let mut engine = test_engine(dir.path());
2390        let label_id = engine.get_or_create_label("Person");
2391        let age_key = engine.get_or_create_prop_key("age");
2392
2393        for age in [20, 25, 30, 35, 40] {
2394            engine.create_node(vec![label_id], vec![(age_key, PropertyValue::Int64(age))]);
2395        }
2396
2397        let result = engine.scan_nodes_by_range(
2398            label_id,
2399            age_key,
2400            &PropertyValue::Int64(25),
2401            &PropertyValue::Int64(35),
2402        );
2403        assert_eq!(result.len(), 3);
2404    }
2405
2406    // REQ-CATALOG-030: StorageEngine exposes catalog accessor
2407    #[test]
2408    fn test_storage_engine_has_catalog() {
2409        let dir = tempdir().expect("tempdir");
2410        let engine = test_engine(dir.path());
2411        let cat = engine.catalog();
2412        // Empty catalog on fresh database
2413        assert_eq!(cat.label_id("Person"), None);
2414    }
2415
2416    // REQ-CATALOG-031: StorageEngine exposes mutable catalog accessor
2417    #[test]
2418    fn test_storage_engine_catalog_mut() {
2419        let dir = tempdir().expect("tempdir");
2420        let mut engine = test_engine(dir.path());
2421        let cat = engine.catalog_mut();
2422        let id = cat.get_or_create_label("Person");
2423        assert_eq!(engine.catalog().label_id("Person"), Some(id));
2424    }
2425
2426    // REQ-CATALOG-032: StorageEngine implements LabelRegistry by delegation
2427    #[test]
2428    fn test_storage_engine_label_registry() {
2429        use cypherlite_core::LabelRegistry;
2430
2431        let dir = tempdir().expect("tempdir");
2432        let mut engine = test_engine(dir.path());
2433
2434        let label_id = engine.get_or_create_label("Person");
2435        assert_eq!(engine.label_id("Person"), Some(label_id));
2436        assert_eq!(engine.label_name(label_id), Some("Person"));
2437
2438        let rel_id = engine.get_or_create_rel_type("KNOWS");
2439        assert_eq!(engine.rel_type_id("KNOWS"), Some(rel_id));
2440        assert_eq!(engine.rel_type_name(rel_id), Some("KNOWS"));
2441
2442        let prop_id = engine.get_or_create_prop_key("name");
2443        assert_eq!(engine.prop_key_id("name"), Some(prop_id));
2444        assert_eq!(engine.prop_key_name(prop_id), Some("name"));
2445    }
2446
2447    // ======================================================================
2448    // GG-005: StorageEngine subgraph integration tests
2449    // ======================================================================
2450
2451    #[cfg(feature = "subgraph")]
2452    mod subgraph_engine_tests {
2453        use super::*;
2454        use cypherlite_core::SubgraphId;
2455
2456        fn test_engine_sg(dir: &std::path::Path) -> StorageEngine {
2457            let config = DatabaseConfig {
2458                path: dir.join("test.cyl"),
2459                wal_sync_mode: SyncMode::Normal,
2460                ..Default::default()
2461            };
2462            StorageEngine::open(config).expect("open")
2463        }
2464
2465        // GG-005: Create subgraph via StorageEngine
2466        #[test]
2467        fn test_engine_create_subgraph() {
2468            let dir = tempdir().expect("tempdir");
2469            let mut engine = test_engine_sg(dir.path());
2470            let id = engine.create_subgraph(vec![], None);
2471            assert_eq!(id, SubgraphId(1));
2472        }
2473
2474        // GG-005: Get subgraph via StorageEngine
2475        #[test]
2476        fn test_engine_get_subgraph() {
2477            let dir = tempdir().expect("tempdir");
2478            let mut engine = test_engine_sg(dir.path());
2479            let id = engine
2480                .create_subgraph(vec![(1, PropertyValue::String("test".into()))], Some(1_000));
2481            let record = engine.get_subgraph(id).expect("found");
2482            assert_eq!(record.subgraph_id, id);
2483            assert_eq!(record.temporal_anchor, Some(1_000));
2484        }
2485
2486        // GG-005: Get nonexistent subgraph returns None
2487        #[test]
2488        fn test_engine_get_nonexistent_subgraph() {
2489            let dir = tempdir().expect("tempdir");
2490            let engine = test_engine_sg(dir.path());
2491            assert!(engine.get_subgraph(SubgraphId(999)).is_none());
2492        }
2493
2494        // GG-005: Delete subgraph via StorageEngine
2495        #[test]
2496        fn test_engine_delete_subgraph() {
2497            let dir = tempdir().expect("tempdir");
2498            let mut engine = test_engine_sg(dir.path());
2499            let id = engine.create_subgraph(vec![], None);
2500            engine.delete_subgraph(id).expect("delete");
2501            assert!(engine.get_subgraph(id).is_none());
2502        }
2503
2504        // GG-005: Delete nonexistent subgraph returns error
2505        #[test]
2506        fn test_engine_delete_nonexistent_subgraph() {
2507            let dir = tempdir().expect("tempdir");
2508            let mut engine = test_engine_sg(dir.path());
2509            let result = engine.delete_subgraph(SubgraphId(999));
2510            assert!(result.is_err());
2511        }
2512
2513        // GG-005: Add member to subgraph
2514        #[test]
2515        fn test_engine_add_member() {
2516            let dir = tempdir().expect("tempdir");
2517            let mut engine = test_engine_sg(dir.path());
2518            let sg = engine.create_subgraph(vec![], None);
2519            let n1 = engine.create_node(vec![], vec![]);
2520            engine.add_member(sg, n1).expect("add member");
2521            let members = engine.list_members(sg);
2522            assert_eq!(members.len(), 1);
2523            assert_eq!(members[0], n1);
2524        }
2525
2526        // GG-005: Add member - subgraph not found
2527        #[test]
2528        fn test_engine_add_member_subgraph_not_found() {
2529            let dir = tempdir().expect("tempdir");
2530            let mut engine = test_engine_sg(dir.path());
2531            let n1 = engine.create_node(vec![], vec![]);
2532            let result = engine.add_member(SubgraphId(999), n1);
2533            assert!(result.is_err());
2534        }
2535
2536        // GG-005: Add member - node not found
2537        #[test]
2538        fn test_engine_add_member_node_not_found() {
2539            let dir = tempdir().expect("tempdir");
2540            let mut engine = test_engine_sg(dir.path());
2541            let sg = engine.create_subgraph(vec![], None);
2542            let result = engine.add_member(sg, NodeId(999));
2543            assert!(result.is_err());
2544        }
2545
2546        // GG-005: Remove member from subgraph
2547        #[test]
2548        fn test_engine_remove_member() {
2549            let dir = tempdir().expect("tempdir");
2550            let mut engine = test_engine_sg(dir.path());
2551            let sg = engine.create_subgraph(vec![], None);
2552            let n1 = engine.create_node(vec![], vec![]);
2553            engine.add_member(sg, n1).expect("add");
2554            engine.remove_member(sg, n1).expect("remove");
2555            assert!(engine.list_members(sg).is_empty());
2556        }
2557
2558        // GG-005: Remove member - subgraph not found
2559        #[test]
2560        fn test_engine_remove_member_subgraph_not_found() {
2561            let dir = tempdir().expect("tempdir");
2562            let mut engine = test_engine_sg(dir.path());
2563            let n1 = engine.create_node(vec![], vec![]);
2564            let result = engine.remove_member(SubgraphId(999), n1);
2565            assert!(result.is_err());
2566        }
2567
2568        // GG-005: List members of empty subgraph
2569        #[test]
2570        fn test_engine_list_members_empty() {
2571            let dir = tempdir().expect("tempdir");
2572            let mut engine = test_engine_sg(dir.path());
2573            let sg = engine.create_subgraph(vec![], None);
2574            assert!(engine.list_members(sg).is_empty());
2575        }
2576
2577        // GG-005: Get subgraph memberships for node
2578        #[test]
2579        fn test_engine_get_subgraph_memberships() {
2580            let dir = tempdir().expect("tempdir");
2581            let mut engine = test_engine_sg(dir.path());
2582            let sg1 = engine.create_subgraph(vec![], None);
2583            let sg2 = engine.create_subgraph(vec![], None);
2584            let n1 = engine.create_node(vec![], vec![]);
2585            engine.add_member(sg1, n1).expect("add1");
2586            engine.add_member(sg2, n1).expect("add2");
2587            let memberships = engine.get_subgraph_memberships(n1);
2588            assert_eq!(memberships.len(), 2);
2589            assert!(memberships.contains(&sg1));
2590            assert!(memberships.contains(&sg2));
2591        }
2592
2593        // GG-005: Delete subgraph cascades membership removal
2594        #[test]
2595        fn test_engine_delete_subgraph_cascades_memberships() {
2596            let dir = tempdir().expect("tempdir");
2597            let mut engine = test_engine_sg(dir.path());
2598            let sg = engine.create_subgraph(vec![], None);
2599            let n1 = engine.create_node(vec![], vec![]);
2600            let n2 = engine.create_node(vec![], vec![]);
2601            engine.add_member(sg, n1).expect("add1");
2602            engine.add_member(sg, n2).expect("add2");
2603            engine.delete_subgraph(sg).expect("delete");
2604            // Memberships should be gone
2605            assert!(engine.get_subgraph_memberships(n1).is_empty());
2606            assert!(engine.get_subgraph_memberships(n2).is_empty());
2607        }
2608    }
2609
2610    // ======================================================================
2611    // HH-005: StorageEngine hyperedge integration tests
2612    // ======================================================================
2613
2614    #[cfg(feature = "hypergraph")]
2615    mod hypergraph_engine_tests {
2616        use super::*;
2617        use cypherlite_core::{GraphEntity, HyperEdgeId};
2618
2619        fn test_engine_hg(dir: &std::path::Path) -> StorageEngine {
2620            let config = DatabaseConfig {
2621                path: dir.join("test.cyl"),
2622                wal_sync_mode: SyncMode::Normal,
2623                ..Default::default()
2624            };
2625            StorageEngine::open(config).expect("open")
2626        }
2627
2628        // HH-005: Create hyperedge via StorageEngine
2629        #[test]
2630        fn test_storage_engine_create_hyperedge() {
2631            let dir = tempdir().expect("tempdir");
2632            let mut engine = test_engine_hg(dir.path());
2633            let n1 = engine.create_node(vec![], vec![]);
2634            let n2 = engine.create_node(vec![], vec![]);
2635            let he = engine.create_hyperedge(
2636                1,
2637                vec![GraphEntity::Node(n1)],
2638                vec![GraphEntity::Node(n2)],
2639                vec![],
2640            );
2641            assert_eq!(he, HyperEdgeId(1));
2642        }
2643
2644        // HH-005: Get hyperedge via StorageEngine
2645        #[test]
2646        fn test_storage_engine_get_hyperedge() {
2647            let dir = tempdir().expect("tempdir");
2648            let mut engine = test_engine_hg(dir.path());
2649            let n1 = engine.create_node(vec![], vec![]);
2650            let n2 = engine.create_node(vec![], vec![]);
2651            let he = engine.create_hyperedge(
2652                5,
2653                vec![GraphEntity::Node(n1)],
2654                vec![GraphEntity::Node(n2)],
2655                vec![(1, PropertyValue::Int64(42))],
2656            );
2657            let record = engine.get_hyperedge(he).expect("found");
2658            assert_eq!(record.id, he);
2659            assert_eq!(record.rel_type_id, 5);
2660            assert_eq!(record.sources.len(), 1);
2661            assert_eq!(record.targets.len(), 1);
2662            assert_eq!(record.properties.len(), 1);
2663        }
2664
2665        // HH-005: Get nonexistent hyperedge returns None
2666        #[test]
2667        fn test_storage_engine_get_nonexistent_hyperedge() {
2668            let dir = tempdir().expect("tempdir");
2669            let engine = test_engine_hg(dir.path());
2670            assert!(engine.get_hyperedge(HyperEdgeId(999)).is_none());
2671        }
2672
2673        // HH-005: Delete hyperedge via StorageEngine
2674        #[test]
2675        fn test_storage_engine_delete_hyperedge() {
2676            let dir = tempdir().expect("tempdir");
2677            let mut engine = test_engine_hg(dir.path());
2678            let he = engine.create_hyperedge(1, vec![], vec![], vec![]);
2679            engine.delete_hyperedge(he).expect("delete");
2680            assert!(engine.get_hyperedge(he).is_none());
2681        }
2682
2683        // HH-005: Delete nonexistent hyperedge returns error
2684        #[test]
2685        fn test_storage_engine_delete_nonexistent_hyperedge() {
2686            let dir = tempdir().expect("tempdir");
2687            let mut engine = test_engine_hg(dir.path());
2688            let result = engine.delete_hyperedge(HyperEdgeId(999));
2689            assert!(result.is_err());
2690        }
2691
2692        // HH-005: Reverse index is updated on create/delete
2693        #[test]
2694        fn test_storage_engine_reverse_index_update() {
2695            let dir = tempdir().expect("tempdir");
2696            let mut engine = test_engine_hg(dir.path());
2697            let n1 = engine.create_node(vec![], vec![]);
2698            let n2 = engine.create_node(vec![], vec![]);
2699            let he = engine.create_hyperedge(
2700                1,
2701                vec![GraphEntity::Node(n1)],
2702                vec![GraphEntity::Node(n2)],
2703                vec![],
2704            );
2705            // n1 participates in he
2706            let hes = engine.hyperedges_for_entity(n1.0);
2707            assert_eq!(hes.len(), 1);
2708            assert_eq!(hes[0], he.0);
2709            // n2 participates in he
2710            let hes = engine.hyperedges_for_entity(n2.0);
2711            assert_eq!(hes.len(), 1);
2712            // Delete hyperedge should clean reverse index
2713            engine.delete_hyperedge(he).expect("delete");
2714            assert!(engine.hyperedges_for_entity(n1.0).is_empty());
2715            assert!(engine.hyperedges_for_entity(n2.0).is_empty());
2716        }
2717
2718        // HH-005: Scan all hyperedges
2719        #[test]
2720        fn test_storage_engine_scan_hyperedges() {
2721            let dir = tempdir().expect("tempdir");
2722            let mut engine = test_engine_hg(dir.path());
2723            engine.create_hyperedge(1, vec![], vec![], vec![]);
2724            engine.create_hyperedge(2, vec![], vec![], vec![]);
2725            let all = engine.scan_hyperedges();
2726            assert_eq!(all.len(), 2);
2727        }
2728
2729        // HH-005: Header next_hyperedge_id is synced
2730        #[test]
2731        fn test_storage_engine_header_sync() {
2732            let dir = tempdir().expect("tempdir");
2733            let mut engine = test_engine_hg(dir.path());
2734            engine.create_hyperedge(1, vec![], vec![], vec![]);
2735            engine.create_hyperedge(2, vec![], vec![], vec![]);
2736            // After creating 2 hyperedges, next_hyperedge_id should be 3
2737            assert_eq!(engine.page_manager.header().next_hyperedge_id, 3);
2738        }
2739    }
2740
2741    // ======================================================================
2742    // R-PERSIST-035..039: File locking tests
2743    // ======================================================================
2744
2745    // R-PERSIST-038: Two StorageEngine instances MUST NOT open same .cyl file simultaneously
2746    #[test]
2747    fn test_second_open_returns_database_locked() {
2748        let dir = tempdir().expect("tempdir");
2749        let db_path = dir.path().join("lock_test.cyl");
2750        let config1 = DatabaseConfig {
2751            path: db_path.clone(),
2752            wal_sync_mode: SyncMode::Normal,
2753            ..Default::default()
2754        };
2755        let _engine1 = StorageEngine::open(config1).expect("first open should succeed");
2756
2757        let config2 = DatabaseConfig {
2758            path: db_path.clone(),
2759            wal_sync_mode: SyncMode::Normal,
2760            ..Default::default()
2761        };
2762        let result = StorageEngine::open(config2);
2763        match result {
2764            Err(CypherLiteError::DatabaseLocked(ref msg)) => {
2765                assert!(
2766                    msg.contains("lock_test.cyl"),
2767                    "error should contain file path: {msg}"
2768                );
2769            }
2770            Err(other) => panic!("expected DatabaseLocked, got: {other}"),
2771            Ok(_) => panic!("expected DatabaseLocked error, but open succeeded"),
2772        }
2773    }
2774
2775    // R-PERSIST-036: File lock held for entire StorageEngine lifetime, released only on Drop
2776    // R-PERSIST-037: On Drop, file lock released
2777    #[test]
2778    fn test_drop_releases_lock_then_reopen_succeeds() {
2779        let dir = tempdir().expect("tempdir");
2780        let db_path = dir.path().join("drop_test.cyl");
2781        let config = DatabaseConfig {
2782            path: db_path.clone(),
2783            wal_sync_mode: SyncMode::Normal,
2784            ..Default::default()
2785        };
2786
2787        // Open and immediately drop
2788        {
2789            let _engine = StorageEngine::open(config.clone()).expect("first open");
2790        }
2791        // Should succeed after drop
2792        let _engine2 = StorageEngine::open(config).expect("reopen after drop should succeed");
2793    }
2794
2795    // ======================================================================
2796    // TASK-019/020: R-PERSIST-001 create_node writes to WAL via data pages
2797    // ======================================================================
2798
2799    #[test]
2800    fn test_create_node_sets_node_data_root_page() {
2801        let dir = tempdir().expect("tempdir");
2802        let mut engine = test_engine(dir.path());
2803        // Before any node creation, root page should be 0
2804        assert_eq!(engine.node_data_root_page(), 0);
2805        engine.create_node(vec![1], vec![(1, PropertyValue::String("Alice".into()))]);
2806        // After creation, a node data page should have been allocated
2807        assert_ne!(engine.node_data_root_page(), 0);
2808    }
2809
2810    #[test]
2811    fn test_create_node_data_page_contains_record() {
2812        let dir = tempdir().expect("tempdir");
2813        let mut engine = test_engine(dir.path());
2814        let id = engine.create_node(vec![1, 2], vec![(1, PropertyValue::String("Alice".into()))]);
2815        // Read back the data page and verify the node is in it
2816        let page_id = engine.node_data_root_page();
2817        assert_ne!(page_id, 0);
2818        let page = engine.read_data_page(page_id).expect("read page");
2819        let entries = page::record_serialization::read_records_from_page(&page);
2820        assert_eq!(entries.len(), 1);
2821        // Deserialize and verify
2822        let (off, len) = entries[0];
2823        let (record, deleted, _) =
2824            page::record_serialization::deserialize_node_record(&page[off..off + len])
2825                .expect("deserialize");
2826        assert_eq!(record.node_id, id);
2827        assert_eq!(record.labels, vec![1, 2]);
2828        assert!(!deleted);
2829    }
2830
2831    #[test]
2832    fn test_create_multiple_nodes_all_persisted() {
2833        let dir = tempdir().expect("tempdir");
2834        let mut engine = test_engine(dir.path());
2835        let mut ids = vec![];
2836        for i in 0..5u64 {
2837            let id = engine.create_node(vec![1], vec![(1, PropertyValue::Int64(i as i64))]);
2838            ids.push(id);
2839        }
2840        // All nodes should be in data pages
2841        let page_id = engine.node_data_root_page();
2842        let page = engine.read_data_page(page_id).expect("read page");
2843        let entries = page::record_serialization::read_records_from_page(&page);
2844        assert_eq!(entries.len(), 5);
2845    }
2846
2847    // ======================================================================
2848    // TASK-021/022: R-PERSIST-002 create_edge writes to WAL via data pages
2849    // ======================================================================
2850
2851    #[test]
2852    fn test_create_edge_sets_edge_data_root_page() {
2853        let dir = tempdir().expect("tempdir");
2854        let mut engine = test_engine(dir.path());
2855        assert_eq!(engine.edge_data_root_page(), 0);
2856        let n1 = engine.create_node(vec![], vec![]);
2857        let n2 = engine.create_node(vec![], vec![]);
2858        engine.create_edge(n1, n2, 1, vec![]).expect("edge");
2859        assert_ne!(engine.edge_data_root_page(), 0);
2860    }
2861
2862    #[test]
2863    fn test_create_edge_data_page_contains_record() {
2864        let dir = tempdir().expect("tempdir");
2865        let mut engine = test_engine(dir.path());
2866        let n1 = engine.create_node(vec![], vec![]);
2867        let n2 = engine.create_node(vec![], vec![]);
2868        let eid = engine
2869            .create_edge(n1, n2, 5, vec![(1, PropertyValue::Int64(42))])
2870            .expect("edge");
2871        let page_id = engine.edge_data_root_page();
2872        let page = engine.read_data_page(page_id).expect("read page");
2873        let entries = page::record_serialization::read_records_from_page(&page);
2874        assert_eq!(entries.len(), 1);
2875        let (off, len) = entries[0];
2876        let (record, deleted, _) =
2877            page::record_serialization::deserialize_edge_record(&page[off..off + len])
2878                .expect("deserialize");
2879        assert_eq!(record.edge_id, eid);
2880        assert_eq!(record.start_node, n1);
2881        assert_eq!(record.end_node, n2);
2882        assert_eq!(record.rel_type_id, 5);
2883        assert!(!deleted);
2884    }
2885
2886    // ======================================================================
2887    // TASK-023/024: R-PERSIST-003 update_node writes to WAL
2888    // ======================================================================
2889
2890    #[test]
2891    fn test_update_node_rewrites_data_page() {
2892        let dir = tempdir().expect("tempdir");
2893        let mut engine = test_engine(dir.path());
2894        let id = engine.create_node(vec![1], vec![(1, PropertyValue::Int64(10))]);
2895        engine
2896            .update_node(id, vec![(1, PropertyValue::Int64(20))])
2897            .expect("update");
2898        // Find the node in data pages -- should have the updated value
2899        let page_id = engine.node_data_root_page();
2900        let page = engine.read_data_page(page_id).expect("read page");
2901        let entries = page::record_serialization::read_records_from_page(&page);
2902        // Find the entry for this node (may be the latest non-deleted version)
2903        let mut found = false;
2904        for (off, len) in &entries {
2905            let (record, deleted, _) =
2906                page::record_serialization::deserialize_node_record(&page[*off..*off + *len])
2907                    .expect("deserialize");
2908            if record.node_id == id && !deleted {
2909                assert_eq!(record.properties[0].1, PropertyValue::Int64(20));
2910                found = true;
2911            }
2912        }
2913        assert!(found, "updated node record should be in data page");
2914    }
2915
2916    // ======================================================================
2917    // TASK-025/026: R-PERSIST-004 delete_node/edge writes tombstone
2918    // ======================================================================
2919
2920    #[test]
2921    fn test_delete_node_writes_tombstone() {
2922        let dir = tempdir().expect("tempdir");
2923        let mut engine = test_engine(dir.path());
2924        let id = engine.create_node(vec![1], vec![]);
2925        engine.delete_node(id).expect("delete");
2926        // Check that data page contains a tombstone record
2927        let page_id = engine.node_data_root_page();
2928        let page = engine.read_data_page(page_id).expect("read page");
2929        let entries = page::record_serialization::read_records_from_page(&page);
2930        let mut tombstone_found = false;
2931        for (off, len) in &entries {
2932            let (record, deleted, _) =
2933                page::record_serialization::deserialize_node_record(&page[*off..*off + *len])
2934                    .expect("deserialize");
2935            if record.node_id == id && deleted {
2936                tombstone_found = true;
2937            }
2938        }
2939        assert!(
2940            tombstone_found,
2941            "deleted node should have a tombstone record"
2942        );
2943    }
2944
2945    #[test]
2946    fn test_delete_edge_writes_tombstone() {
2947        let dir = tempdir().expect("tempdir");
2948        let mut engine = test_engine(dir.path());
2949        let n1 = engine.create_node(vec![], vec![]);
2950        let n2 = engine.create_node(vec![], vec![]);
2951        let eid = engine.create_edge(n1, n2, 1, vec![]).expect("edge");
2952        engine.delete_edge(eid).expect("delete");
2953        let page_id = engine.edge_data_root_page();
2954        let page = engine.read_data_page(page_id).expect("read page");
2955        let entries = page::record_serialization::read_records_from_page(&page);
2956        let mut tombstone_found = false;
2957        for (off, len) in &entries {
2958            let (record, deleted, _) =
2959                page::record_serialization::deserialize_edge_record(&page[*off..*off + *len])
2960                    .expect("deserialize");
2961            if record.edge_id == eid && deleted {
2962                tombstone_found = true;
2963            }
2964        }
2965        assert!(
2966            tombstone_found,
2967            "deleted edge should have a tombstone record"
2968        );
2969    }
2970
2971    // ======================================================================
2972    // TASK-029: WAL commit verification
2973    // ======================================================================
2974
2975    #[test]
2976    fn test_create_node_wal_has_committed_frames() {
2977        let dir = tempdir().expect("tempdir");
2978        let mut engine = test_engine(dir.path());
2979        engine.create_node(vec![1], vec![(1, PropertyValue::Int64(42))]);
2980        // WAL should have at least 1 committed frame (the data page write)
2981        assert!(
2982            engine.wal_frame_count() > 0,
2983            "WAL should have committed frames"
2984        );
2985    }
2986
2987    #[test]
2988    fn test_page_overflow_allocates_new_page() {
2989        let dir = tempdir().expect("tempdir");
2990        let mut engine = test_engine(dir.path());
2991        // Create many nodes with large properties to fill a page
2992        for i in 0..200u64 {
2993            let big_str = "x".repeat(100);
2994            engine.create_node(
2995                vec![1, 2, 3],
2996                vec![
2997                    (1, PropertyValue::String(big_str)),
2998                    (2, PropertyValue::Int64(i as i64)),
2999                ],
3000            );
3001        }
3002        // Should have allocated more than one data page
3003        assert!(
3004            engine.node_data_page_count() > 1,
3005            "should use multiple data pages"
3006        );
3007    }
3008
3009    // ======================================================================
3010    // TASK-030/031: R-PERSIST-005 close/reopen preserves nodes
3011    // ======================================================================
3012
3013    #[test]
3014    fn test_close_reopen_preserves_nodes() {
3015        let dir = tempdir().expect("tempdir");
3016        let db_path = dir.path().join("persist_nodes.cyl");
3017        let config = DatabaseConfig {
3018            path: db_path.clone(),
3019            wal_sync_mode: SyncMode::Normal,
3020            ..Default::default()
3021        };
3022
3023        // Phase 1: Create nodes, then close (drop triggers checkpoint)
3024        {
3025            let mut engine = StorageEngine::open(config.clone()).expect("open");
3026            engine.create_node(vec![1, 2], vec![(1, PropertyValue::String("Alice".into()))]);
3027            engine.create_node(vec![3], vec![(2, PropertyValue::Int64(42))]);
3028            engine.create_node(
3029                vec![1],
3030                vec![
3031                    (1, PropertyValue::String("Charlie".into())),
3032                    (3, PropertyValue::Bool(true)),
3033                ],
3034            );
3035            assert_eq!(engine.node_count(), 3);
3036            // Drop: checkpoint flushes WAL -> main file
3037        }
3038
3039        // Phase 2: Reopen and verify all nodes are present
3040        {
3041            let engine = StorageEngine::open(config).expect("reopen");
3042            assert_eq!(
3043                engine.node_count(),
3044                3,
3045                "all nodes should be loaded from disk"
3046            );
3047
3048            // Verify node 1 (NodeId(1))
3049            let n1 = engine.get_node(NodeId(1)).expect("node 1 should exist");
3050            assert_eq!(n1.labels, vec![1, 2]);
3051            assert_eq!(n1.properties.len(), 1);
3052            assert_eq!(n1.properties[0], (1, PropertyValue::String("Alice".into())));
3053
3054            // Verify node 2 (NodeId(2))
3055            let n2 = engine.get_node(NodeId(2)).expect("node 2 should exist");
3056            assert_eq!(n2.labels, vec![3]);
3057            assert_eq!(n2.properties[0], (2, PropertyValue::Int64(42)));
3058
3059            // Verify node 3 (NodeId(3))
3060            let n3 = engine.get_node(NodeId(3)).expect("node 3 should exist");
3061            assert_eq!(n3.labels, vec![1]);
3062            assert_eq!(n3.properties.len(), 2);
3063        }
3064    }
3065
3066    // ======================================================================
3067    // TASK-032/033: R-PERSIST-005 close/reopen preserves edges
3068    // ======================================================================
3069
3070    #[test]
3071    fn test_close_reopen_preserves_edges() {
3072        let dir = tempdir().expect("tempdir");
3073        let db_path = dir.path().join("persist_edges.cyl");
3074        let config = DatabaseConfig {
3075            path: db_path.clone(),
3076            wal_sync_mode: SyncMode::Normal,
3077            ..Default::default()
3078        };
3079
3080        // Phase 1: Create nodes and edges
3081        {
3082            let mut engine = StorageEngine::open(config.clone()).expect("open");
3083            let n1 = engine.create_node(vec![1], vec![]);
3084            let n2 = engine.create_node(vec![2], vec![]);
3085            let n3 = engine.create_node(vec![3], vec![]);
3086            engine
3087                .create_edge(n1, n2, 10, vec![(1, PropertyValue::String("since".into()))])
3088                .expect("edge1");
3089            engine.create_edge(n2, n3, 20, vec![]).expect("edge2");
3090            assert_eq!(engine.node_count(), 3);
3091            assert_eq!(engine.edge_count(), 2);
3092        }
3093
3094        // Phase 2: Reopen and verify
3095        {
3096            let engine = StorageEngine::open(config).expect("reopen");
3097            assert_eq!(engine.node_count(), 3, "nodes should persist");
3098            assert_eq!(engine.edge_count(), 2, "edges should persist");
3099
3100            // Verify edge 1
3101            let e1 = engine.get_edge(EdgeId(1)).expect("edge 1 should exist");
3102            assert_eq!(e1.start_node, NodeId(1));
3103            assert_eq!(e1.end_node, NodeId(2));
3104            assert_eq!(e1.rel_type_id, 10);
3105            assert_eq!(e1.properties.len(), 1);
3106            assert_eq!(e1.properties[0], (1, PropertyValue::String("since".into())));
3107
3108            // Verify edge 2
3109            let e2 = engine.get_edge(EdgeId(2)).expect("edge 2 should exist");
3110            assert_eq!(e2.start_node, NodeId(2));
3111            assert_eq!(e2.end_node, NodeId(3));
3112            assert_eq!(e2.rel_type_id, 20);
3113            assert!(e2.properties.is_empty());
3114        }
3115    }
3116
3117    // ======================================================================
3118    // TASK-034/035: Large dataset close/reopen
3119    // ======================================================================
3120
3121    #[test]
3122    fn test_close_reopen_large_dataset() {
3123        let dir = tempdir().expect("tempdir");
3124        let db_path = dir.path().join("persist_large.cyl");
3125        let config = DatabaseConfig {
3126            path: db_path.clone(),
3127            wal_sync_mode: SyncMode::Normal,
3128            ..Default::default()
3129        };
3130
3131        let node_count = 1000;
3132        let edge_count = 500;
3133
3134        // Phase 1: Create large dataset
3135        {
3136            let mut engine = StorageEngine::open(config.clone()).expect("open");
3137            for i in 0..node_count {
3138                engine.create_node(
3139                    vec![(i % 5) as u32],
3140                    vec![(1, PropertyValue::Int64(i as i64))],
3141                );
3142            }
3143            // Create edges between consecutive nodes
3144            for i in 0..edge_count {
3145                let src = NodeId((i + 1) as u64);
3146                let dst = NodeId((i + 2) as u64);
3147                engine
3148                    .create_edge(src, dst, 1, vec![(1, PropertyValue::Int64(i as i64))])
3149                    .expect("edge");
3150            }
3151            assert_eq!(engine.node_count(), node_count);
3152            assert_eq!(engine.edge_count(), edge_count);
3153        }
3154
3155        // Phase 2: Reopen and verify all data
3156        {
3157            let engine = StorageEngine::open(config).expect("reopen");
3158            assert_eq!(
3159                engine.node_count(),
3160                node_count,
3161                "all {} nodes should be loaded",
3162                node_count
3163            );
3164            assert_eq!(
3165                engine.edge_count(),
3166                edge_count,
3167                "all {} edges should be loaded",
3168                edge_count
3169            );
3170
3171            // Spot-check some nodes
3172            let first = engine.get_node(NodeId(1)).expect("first node");
3173            assert_eq!(first.properties[0], (1, PropertyValue::Int64(0)));
3174            let last = engine
3175                .get_node(NodeId(node_count as u64))
3176                .expect("last node");
3177            assert_eq!(
3178                last.properties[0],
3179                (1, PropertyValue::Int64((node_count - 1) as i64))
3180            );
3181
3182            // Spot-check some edges
3183            let first_edge = engine.get_edge(EdgeId(1)).expect("first edge");
3184            assert_eq!(first_edge.start_node, NodeId(1));
3185            assert_eq!(first_edge.end_node, NodeId(2));
3186        }
3187    }
3188
3189    // ======================================================================
3190    // TASK-036: Close/reopen empty database
3191    // ======================================================================
3192
3193    #[test]
3194    fn test_close_reopen_empty_database() {
3195        let dir = tempdir().expect("tempdir");
3196        let db_path = dir.path().join("persist_empty.cyl");
3197        let config = DatabaseConfig {
3198            path: db_path.clone(),
3199            wal_sync_mode: SyncMode::Normal,
3200            ..Default::default()
3201        };
3202
3203        // Phase 1: Open empty, close
3204        {
3205            let _engine = StorageEngine::open(config.clone()).expect("open");
3206        }
3207
3208        // Phase 2: Reopen empty database - should not crash
3209        {
3210            let engine = StorageEngine::open(config).expect("reopen");
3211            assert_eq!(engine.node_count(), 0);
3212            assert_eq!(engine.edge_count(), 0);
3213        }
3214    }
3215
3216    // ======================================================================
3217    // TASK-036: New node IDs continue from where they left off
3218    // ======================================================================
3219
3220    #[test]
3221    fn test_close_reopen_id_continuity() {
3222        let dir = tempdir().expect("tempdir");
3223        let db_path = dir.path().join("persist_ids.cyl");
3224        let config = DatabaseConfig {
3225            path: db_path.clone(),
3226            wal_sync_mode: SyncMode::Normal,
3227            ..Default::default()
3228        };
3229
3230        // Phase 1: Create 3 nodes
3231        {
3232            let mut engine = StorageEngine::open(config.clone()).expect("open");
3233            engine.create_node(vec![], vec![]);
3234            engine.create_node(vec![], vec![]);
3235            engine.create_node(vec![], vec![]);
3236        }
3237
3238        // Phase 2: Reopen, create another node - should get NodeId(4)
3239        {
3240            let mut engine = StorageEngine::open(config).expect("reopen");
3241            assert_eq!(engine.node_count(), 3);
3242            let new_id = engine.create_node(vec![99], vec![]);
3243            assert_eq!(new_id, NodeId(4), "new node should get next sequential ID");
3244            assert_eq!(engine.node_count(), 4);
3245        }
3246    }
3247
3248    // ======================================================================
3249    // TASK-037: R-PERSIST-010 close/reopen preserves catalog labels
3250    // ======================================================================
3251
3252    #[test]
3253    fn test_close_reopen_preserves_catalog_labels() {
3254        let dir = tempdir().expect("tempdir");
3255        let db_path = dir.path().join("persist_catalog.cyl");
3256        let config = DatabaseConfig {
3257            path: db_path.clone(),
3258            wal_sync_mode: SyncMode::Normal,
3259            ..Default::default()
3260        };
3261
3262        // Phase 1: Register labels and create a node using them
3263        let person_id;
3264        let company_id;
3265        {
3266            let mut engine = StorageEngine::open(config.clone()).expect("open");
3267            person_id = engine.get_or_create_label("Person");
3268            company_id = engine.get_or_create_label("Company");
3269            // Create a node so there is something to persist
3270            engine.create_node(vec![person_id], vec![]);
3271        }
3272
3273        // Phase 2: Reopen and verify catalog labels survived
3274        {
3275            let engine = StorageEngine::open(config).expect("reopen");
3276            assert_eq!(
3277                engine.label_id("Person"),
3278                Some(person_id),
3279                "Person label should persist across close/reopen"
3280            );
3281            assert_eq!(
3282                engine.label_id("Company"),
3283                Some(company_id),
3284                "Company label should persist across close/reopen"
3285            );
3286            assert_eq!(
3287                engine.label_name(person_id),
3288                Some("Person"),
3289                "Reverse lookup should work after reopen"
3290            );
3291        }
3292    }
3293
3294    // ======================================================================
3295    // TASK-039: R-PERSIST-010 close/reopen preserves all catalog entries
3296    // ======================================================================
3297
3298    #[test]
3299    fn test_close_reopen_preserves_all_catalog_entries() {
3300        let dir = tempdir().expect("tempdir");
3301        let db_path = dir.path().join("persist_catalog_all.cyl");
3302        let config = DatabaseConfig {
3303            path: db_path.clone(),
3304            wal_sync_mode: SyncMode::Normal,
3305            ..Default::default()
3306        };
3307
3308        // Phase 1: Register labels, property keys, and relationship types
3309        let label_person;
3310        let label_company;
3311        let prop_name;
3312        let prop_age;
3313        let rel_knows;
3314        let rel_works_at;
3315        {
3316            let mut engine = StorageEngine::open(config.clone()).expect("open");
3317            label_person = engine.get_or_create_label("Person");
3318            label_company = engine.get_or_create_label("Company");
3319            prop_name = engine.get_or_create_prop_key("name");
3320            prop_age = engine.get_or_create_prop_key("age");
3321            rel_knows = engine.get_or_create_rel_type("KNOWS");
3322            rel_works_at = engine.get_or_create_rel_type("WORKS_AT");
3323            // Create some data so engine has something to persist
3324            let n1 = engine.create_node(
3325                vec![label_person],
3326                vec![(prop_name, PropertyValue::String("Alice".into()))],
3327            );
3328            let n2 = engine.create_node(
3329                vec![label_company],
3330                vec![(prop_name, PropertyValue::String("Acme".into()))],
3331            );
3332            engine
3333                .create_edge(n1, n2, rel_works_at, vec![])
3334                .expect("edge");
3335        }
3336
3337        // Phase 2: Reopen and verify ALL catalog entries survived
3338        {
3339            let engine = StorageEngine::open(config).expect("reopen");
3340
3341            // Labels
3342            assert_eq!(engine.label_id("Person"), Some(label_person));
3343            assert_eq!(engine.label_id("Company"), Some(label_company));
3344            assert_eq!(engine.label_name(label_person), Some("Person"));
3345            assert_eq!(engine.label_name(label_company), Some("Company"));
3346
3347            // Property keys
3348            assert_eq!(engine.prop_key_id("name"), Some(prop_name));
3349            assert_eq!(engine.prop_key_id("age"), Some(prop_age));
3350            assert_eq!(engine.prop_key_name(prop_name), Some("name"));
3351            assert_eq!(engine.prop_key_name(prop_age), Some("age"));
3352
3353            // Relationship types
3354            assert_eq!(engine.rel_type_id("KNOWS"), Some(rel_knows));
3355            assert_eq!(engine.rel_type_id("WORKS_AT"), Some(rel_works_at));
3356            assert_eq!(engine.rel_type_name(rel_knows), Some("KNOWS"));
3357            assert_eq!(engine.rel_type_name(rel_works_at), Some("WORKS_AT"));
3358        }
3359    }
3360
3361    // ======================================================================
3362    // TASK-039b: Catalog ID sequence continues after reopen
3363    // ======================================================================
3364
3365    #[test]
3366    fn test_close_reopen_catalog_id_continuity() {
3367        let dir = tempdir().expect("tempdir");
3368        let db_path = dir.path().join("persist_catalog_ids.cyl");
3369        let config = DatabaseConfig {
3370            path: db_path.clone(),
3371            wal_sync_mode: SyncMode::Normal,
3372            ..Default::default()
3373        };
3374
3375        // Phase 1: Register 2 labels (ids 0, 1)
3376        {
3377            let mut engine = StorageEngine::open(config.clone()).expect("open");
3378            engine.get_or_create_label("Person"); // id=0
3379            engine.get_or_create_label("Company"); // id=1
3380        }
3381
3382        // Phase 2: Reopen and register a new label - should get id=2
3383        {
3384            let mut engine = StorageEngine::open(config).expect("reopen");
3385            let new_id = engine.get_or_create_label("City");
3386            assert_eq!(
3387                new_id, 2,
3388                "new label after reopen should continue ID sequence"
3389            );
3390            // Existing labels still accessible
3391            assert_eq!(engine.label_id("Person"), Some(0));
3392            assert_eq!(engine.label_id("Company"), Some(1));
3393            assert_eq!(engine.label_id("City"), Some(2));
3394        }
3395    }
3396
3397    // ======================================================================
3398    // TASK-039c: Empty catalog persistence (no labels registered)
3399    // ======================================================================
3400
3401    #[test]
3402    fn test_close_reopen_empty_catalog() {
3403        let dir = tempdir().expect("tempdir");
3404        let db_path = dir.path().join("persist_catalog_empty.cyl");
3405        let config = DatabaseConfig {
3406            path: db_path.clone(),
3407            wal_sync_mode: SyncMode::Normal,
3408            ..Default::default()
3409        };
3410
3411        // Phase 1: Open and close without registering any catalog entries
3412        {
3413            let _engine = StorageEngine::open(config.clone()).expect("open");
3414        }
3415
3416        // Phase 2: Reopen - should not crash
3417        {
3418            let engine = StorageEngine::open(config).expect("reopen");
3419            assert_eq!(engine.label_id("anything"), None);
3420            assert_eq!(engine.node_count(), 0);
3421        }
3422    }
3423
3424    // ======================================================================
3425    // TASK-042/043: R-PERSIST-050 SubgraphStore persistence
3426    // ======================================================================
3427
3428    #[cfg(feature = "subgraph")]
3429    #[test]
3430    fn test_close_reopen_preserves_subgraphs() {
3431        use cypherlite_core::SubgraphId;
3432
3433        let dir = tempdir().expect("tempdir");
3434        let db_path = dir.path().join("persist_subgraphs.cyl");
3435        let config = DatabaseConfig {
3436            path: db_path.clone(),
3437            wal_sync_mode: SyncMode::Normal,
3438            ..Default::default()
3439        };
3440
3441        // Phase 1: Create subgraphs, then close
3442        {
3443            let mut engine = StorageEngine::open(config.clone()).expect("open");
3444            // Create subgraph with properties and temporal anchor
3445            let sg1 = engine.create_subgraph(
3446                vec![(1, PropertyValue::String("graph-A".into()))],
3447                Some(1_700_000_000_000),
3448            );
3449            // Create empty subgraph
3450            let sg2 = engine.create_subgraph(vec![], None);
3451            // Create subgraph with multiple properties
3452            let sg3 = engine.create_subgraph(
3453                vec![
3454                    (2, PropertyValue::Int64(42)),
3455                    (3, PropertyValue::Bool(true)),
3456                ],
3457                Some(1_700_000_001_000),
3458            );
3459            assert_eq!(sg1, SubgraphId(1));
3460            assert_eq!(sg2, SubgraphId(2));
3461            assert_eq!(sg3, SubgraphId(3));
3462        }
3463
3464        // Phase 2: Reopen and verify all subgraphs are present
3465        {
3466            let engine = StorageEngine::open(config).expect("reopen");
3467            // Verify subgraph 1
3468            let s1 = engine.get_subgraph(SubgraphId(1)).expect("subgraph 1");
3469            assert_eq!(s1.subgraph_id, SubgraphId(1));
3470            assert_eq!(
3471                s1.properties,
3472                vec![(1, PropertyValue::String("graph-A".into()))]
3473            );
3474            assert_eq!(s1.temporal_anchor, Some(1_700_000_000_000));
3475
3476            // Verify subgraph 2
3477            let s2 = engine.get_subgraph(SubgraphId(2)).expect("subgraph 2");
3478            assert!(s2.properties.is_empty());
3479            assert_eq!(s2.temporal_anchor, None);
3480
3481            // Verify subgraph 3
3482            let s3 = engine.get_subgraph(SubgraphId(3)).expect("subgraph 3");
3483            assert_eq!(s3.properties.len(), 2);
3484            assert_eq!(s3.temporal_anchor, Some(1_700_000_001_000));
3485
3486            // Verify next_subgraph_id is preserved (should be 4)
3487            let sg4 = engine.get_subgraph(SubgraphId(4));
3488            assert!(sg4.is_none());
3489        }
3490    }
3491
3492    #[cfg(feature = "subgraph")]
3493    #[test]
3494    fn test_close_reopen_preserves_memberships() {
3495        use cypherlite_core::SubgraphId;
3496
3497        let dir = tempdir().expect("tempdir");
3498        let db_path = dir.path().join("persist_memberships.cyl");
3499        let config = DatabaseConfig {
3500            path: db_path.clone(),
3501            wal_sync_mode: SyncMode::Normal,
3502            ..Default::default()
3503        };
3504
3505        // Phase 1: Create subgraph with members
3506        {
3507            let mut engine = StorageEngine::open(config.clone()).expect("open");
3508            let sg = engine.create_subgraph(vec![], None);
3509            let n1 = engine.create_node(vec![1], vec![]);
3510            let n2 = engine.create_node(vec![2], vec![]);
3511            engine.add_member(sg, n1).expect("add n1");
3512            engine.add_member(sg, n2).expect("add n2");
3513            assert_eq!(engine.list_members(sg).len(), 2);
3514        }
3515
3516        // Phase 2: Reopen and verify memberships
3517        {
3518            let engine = StorageEngine::open(config).expect("reopen");
3519            let members = engine.list_members(SubgraphId(1));
3520            assert_eq!(members.len(), 2);
3521            assert!(members.contains(&NodeId(1)));
3522            assert!(members.contains(&NodeId(2)));
3523        }
3524    }
3525
3526    // ======================================================================
3527    // TASK-044/045: R-PERSIST-051 HyperEdgeStore persistence
3528    // ======================================================================
3529
3530    #[cfg(feature = "hypergraph")]
3531    #[test]
3532    fn test_close_reopen_preserves_hyperedges() {
3533        use cypherlite_core::{GraphEntity, HyperEdgeId};
3534
3535        let dir = tempdir().expect("tempdir");
3536        let db_path = dir.path().join("persist_hyperedges.cyl");
3537        let config = DatabaseConfig {
3538            path: db_path.clone(),
3539            wal_sync_mode: SyncMode::Normal,
3540            ..Default::default()
3541        };
3542
3543        // Phase 1: Create hyperedges, then close
3544        {
3545            let mut engine = StorageEngine::open(config.clone()).expect("open");
3546            let n1 = engine.create_node(vec![1], vec![]);
3547            let n2 = engine.create_node(vec![2], vec![]);
3548            let n3 = engine.create_node(vec![3], vec![]);
3549
3550            // Create hyperedge with properties
3551            let he1 = engine.create_hyperedge(
3552                10,
3553                vec![GraphEntity::Node(n1)],
3554                vec![GraphEntity::Node(n2), GraphEntity::Node(n3)],
3555                vec![(1, PropertyValue::String("rel-A".into()))],
3556            );
3557            // Create empty hyperedge
3558            let he2 = engine.create_hyperedge(20, vec![], vec![], vec![]);
3559            assert_eq!(he1, HyperEdgeId(1));
3560            assert_eq!(he2, HyperEdgeId(2));
3561        }
3562
3563        // Phase 2: Reopen and verify all hyperedges are present
3564        {
3565            let engine = StorageEngine::open(config).expect("reopen");
3566            // Verify hyperedge 1
3567            let h1 = engine.get_hyperedge(HyperEdgeId(1)).expect("hyperedge 1");
3568            assert_eq!(h1.id, HyperEdgeId(1));
3569            assert_eq!(h1.rel_type_id, 10);
3570            assert_eq!(h1.sources.len(), 1);
3571            assert_eq!(h1.targets.len(), 2);
3572            assert_eq!(
3573                h1.properties,
3574                vec![(1, PropertyValue::String("rel-A".into()))]
3575            );
3576
3577            // Verify hyperedge 2
3578            let h2 = engine.get_hyperedge(HyperEdgeId(2)).expect("hyperedge 2");
3579            assert_eq!(h2.id, HyperEdgeId(2));
3580            assert_eq!(h2.rel_type_id, 20);
3581            assert!(h2.sources.is_empty());
3582            assert!(h2.targets.is_empty());
3583            assert!(h2.properties.is_empty());
3584
3585            // Verify nodes also persisted
3586            assert_eq!(engine.node_count(), 3);
3587        }
3588    }
3589
3590    // ======================================================================
3591    // TASK-046/047: R-PERSIST-052 VersionStore persistence
3592    // ======================================================================
3593
3594    #[test]
3595    fn test_close_reopen_preserves_version_store() {
3596        let dir = tempdir().expect("tempdir");
3597        let db_path = dir.path().join("persist_versions.cyl");
3598        let config = DatabaseConfig {
3599            path: db_path.clone(),
3600            wal_sync_mode: SyncMode::Normal,
3601            version_storage_enabled: true,
3602            ..Default::default()
3603        };
3604
3605        // Phase 1: Create nodes, update them to create version snapshots
3606        {
3607            let mut engine = StorageEngine::open(config.clone()).expect("open");
3608            let n1 =
3609                engine.create_node(vec![1], vec![(1, PropertyValue::String("Alice-v1".into()))]);
3610            // Update node to create a version snapshot
3611            engine
3612                .update_node(n1, vec![(1, PropertyValue::String("Alice-v2".into()))])
3613                .expect("update n1");
3614            // Verify version was created before close
3615            assert_eq!(engine.version_count(n1.0), 1);
3616        }
3617
3618        // Phase 2: Reopen and verify version history is present
3619        {
3620            let engine = StorageEngine::open(config).expect("reopen");
3621            // Node should be present with latest state
3622            let n = engine.get_node(NodeId(1)).expect("node 1");
3623            assert_eq!(
3624                n.properties[0],
3625                (1, PropertyValue::String("Alice-v2".into()))
3626            );
3627            // Version history should be preserved
3628            assert_eq!(engine.version_count(1), 1);
3629            let chain = engine.version_chain(1);
3630            assert_eq!(chain.len(), 1);
3631        }
3632    }
3633}