1#![warn(missing_docs)]
2pub mod btree;
6pub mod catalog;
8#[cfg(feature = "hypergraph")]
10pub mod hyperedge;
11pub mod index;
13pub mod page;
15#[cfg(feature = "subgraph")]
17pub mod subgraph;
18pub mod transaction;
20pub mod version;
22pub mod wal;
24
25use std::collections::HashMap;
26
27use cypherlite_core::{
28 CypherLiteError, DatabaseConfig, EdgeId, LabelRegistry, NodeId, NodeRecord, PageId,
29 PropertyValue, RelationshipRecord, Result,
30};
31#[cfg(feature = "subgraph")]
32use cypherlite_core::{SubgraphId, SubgraphRecord};
33use fs2::FileExt;
34
35use btree::edge_store::EdgeStore;
36use btree::node_store::NodeStore;
37#[cfg(feature = "hypergraph")]
38use cypherlite_core::{HyperEdgeId, HyperEdgeRecord};
39#[cfg(feature = "hypergraph")]
40use hyperedge::reverse_index::HyperEdgeReverseIndex;
41#[cfg(feature = "hypergraph")]
42use hyperedge::HyperEdgeStore;
43use index::edge_index::EdgeIndexManager;
44use index::IndexManager;
45use page::buffer_pool::BufferPool;
46use page::page_manager::PageManager;
47use page::PAGE_SIZE;
48#[cfg(feature = "subgraph")]
49use subgraph::membership::MembershipIndex;
50#[cfg(feature = "subgraph")]
51use subgraph::SubgraphStore;
52use transaction::mvcc::TransactionManager;
53use version::VersionStore;
54use wal::checkpoint::Checkpoint;
55use wal::reader::WalReader;
56use wal::recovery::Recovery;
57use wal::writer::WalWriter;
58
59#[allow(dead_code)]
64pub struct StorageEngine {
65 lock_file: std::fs::File,
68 page_manager: PageManager,
69 buffer_pool: BufferPool,
70 wal_writer: WalWriter,
71 wal_reader: WalReader,
72 tx_manager: TransactionManager,
73 node_store: NodeStore,
74 edge_store: EdgeStore,
75 catalog: catalog::Catalog,
76 index_manager: IndexManager,
77 edge_index_manager: EdgeIndexManager,
78 version_store: VersionStore,
79 #[cfg(feature = "subgraph")]
80 subgraph_store: SubgraphStore,
81 #[cfg(feature = "subgraph")]
82 membership_index: MembershipIndex,
83 #[cfg(feature = "hypergraph")]
84 hyperedge_store: HyperEdgeStore,
85 #[cfg(feature = "hypergraph")]
86 hyperedge_reverse_index: HyperEdgeReverseIndex,
87 config: DatabaseConfig,
88 node_page_map: HashMap<u64, u32>,
90 edge_page_map: HashMap<u64, u32>,
92 current_node_data_page: Option<(u32, [u8; PAGE_SIZE])>,
94 current_edge_data_page: Option<(u32, [u8; PAGE_SIZE])>,
96 #[cfg(feature = "subgraph")]
98 current_subgraph_data_page: Option<(u32, [u8; PAGE_SIZE])>,
99 #[cfg(feature = "hypergraph")]
101 current_hyperedge_data_page: Option<(u32, [u8; PAGE_SIZE])>,
102 current_version_data_page: Option<(u32, [u8; PAGE_SIZE])>,
104}
105
106impl StorageEngine {
107 pub fn open(config: DatabaseConfig) -> Result<Self> {
113 let wal_path = config.wal_path();
114 let db_exists = config.path.exists();
115
116 let lock_path = config.path.with_extension("cyl-lock");
121 let lock_file = std::fs::OpenOptions::new()
122 .read(true)
123 .write(true)
124 .create(true)
125 .truncate(false)
126 .open(&lock_path)
127 .map_err(CypherLiteError::IoError)?;
128
129 lock_file
130 .try_lock_exclusive()
131 .map_err(|_| CypherLiteError::DatabaseLocked(config.path.display().to_string()))?;
132
133 let mut page_manager = if db_exists {
135 PageManager::open_database(&config)?
136 } else {
137 PageManager::create_database(&config)?
138 };
139
140 let (_recovered, wal_reader) = if wal_path.exists() {
142 Recovery::recover(&mut page_manager, &wal_path)?
143 } else {
144 (0, WalReader::new())
145 };
146
147 let wal_writer = if wal_path.exists() {
149 WalWriter::open(&wal_path, config.wal_sync_mode.clone())?
150 } else {
151 WalWriter::create(&wal_path, 12345, config.wal_sync_mode.clone())?
152 };
153
154 let buffer_pool = BufferPool::new(config.cache_capacity);
155 let tx_manager = TransactionManager::new();
156
157 let next_node_id = page_manager.header().next_node_id;
159 let next_edge_id = page_manager.header().next_edge_id;
160 let node_store = NodeStore::new(next_node_id);
161 let edge_store = EdgeStore::new(next_edge_id);
162
163 tx_manager.update_current_frame(wal_writer.frame_count());
165
166 #[cfg(feature = "subgraph")]
168 let next_subgraph_id = page_manager.header().next_subgraph_id;
169 #[cfg(feature = "subgraph")]
170 let subgraph_store = if next_subgraph_id > 0 {
171 SubgraphStore::new(next_subgraph_id)
172 } else {
173 SubgraphStore::new(1)
174 };
175
176 #[cfg(feature = "hypergraph")]
178 let next_hyperedge_id = page_manager.header().next_hyperedge_id;
179 #[cfg(feature = "hypergraph")]
180 let hyperedge_store = if next_hyperedge_id > 0 {
181 HyperEdgeStore::new(next_hyperedge_id)
182 } else {
183 HyperEdgeStore::new(1)
184 };
185
186 let mut engine = Self {
187 lock_file,
188 page_manager,
189 buffer_pool,
190 wal_writer,
191 wal_reader,
192 tx_manager,
193 node_store,
194 edge_store,
195 catalog: catalog::Catalog::default(),
196 index_manager: IndexManager::new(),
197 edge_index_manager: EdgeIndexManager::new(),
198 version_store: VersionStore::new(),
199 #[cfg(feature = "subgraph")]
200 subgraph_store,
201 #[cfg(feature = "subgraph")]
202 membership_index: MembershipIndex::new(),
203 #[cfg(feature = "hypergraph")]
204 hyperedge_store,
205 #[cfg(feature = "hypergraph")]
206 hyperedge_reverse_index: HyperEdgeReverseIndex::new(),
207 config,
208 node_page_map: HashMap::new(),
209 edge_page_map: HashMap::new(),
210 current_node_data_page: None,
211 current_edge_data_page: None,
212 #[cfg(feature = "subgraph")]
213 current_subgraph_data_page: None,
214 #[cfg(feature = "hypergraph")]
215 current_hyperedge_data_page: None,
216 current_version_data_page: None,
217 };
218
219 engine.load_catalog()?;
222
223 engine.load_nodes_from_pages()?;
225 engine.load_edges_from_pages()?;
226 #[cfg(feature = "subgraph")]
228 engine.load_subgraphs_from_pages()?;
229 #[cfg(feature = "hypergraph")]
230 engine.load_hyperedges_from_pages()?;
231 engine.load_versions_from_pages()?;
232
233 Ok(engine)
234 }
235
236 pub fn create_node(
240 &mut self,
241 labels: Vec<u32>,
242 properties: Vec<(u32, PropertyValue)>,
243 ) -> NodeId {
244 let id = self
245 .node_store
246 .create_node(labels.clone(), properties.clone());
247 self.page_manager.header_mut().next_node_id = self.node_store.next_id();
249 for &label_id in &labels {
251 for (prop_key_id, value) in &properties {
252 if let Some(idx) = self.index_manager.find_index_mut(label_id, *prop_key_id) {
253 idx.insert(value, id);
254 }
255 }
256 }
257 if let Some(record) = self.node_store.get_node(id).cloned() {
259 let _ = self.persist_node(id, &record, false);
260 }
261 id
262 }
263
264 pub fn get_node(&self, node_id: NodeId) -> Option<&NodeRecord> {
266 self.node_store.get_node(node_id)
267 }
268
269 pub fn update_node(
271 &mut self,
272 node_id: NodeId,
273 properties: Vec<(u32, PropertyValue)>,
274 ) -> Result<()> {
275 let old_node = self.node_store.get_node(node_id).cloned();
277
278 if self.config.version_storage_enabled {
280 if let Some(ref old) = old_node {
281 let seq = self.version_store.snapshot_node(node_id.0, old.clone());
282 let vr = version::VersionRecord::Node(old.clone());
284 let _ = self.persist_version(node_id.0, seq, &vr);
285 }
286 }
287
288 self.node_store.update_node(node_id, properties.clone())?;
289 if let Some(old) = old_node {
291 for &label_id in &old.labels {
292 for (prop_key_id, old_value) in &old.properties {
294 if let Some(idx) = self.index_manager.find_index_mut(label_id, *prop_key_id) {
295 idx.remove(old_value, node_id);
296 }
297 }
298 for (prop_key_id, new_value) in &properties {
300 if let Some(idx) = self.index_manager.find_index_mut(label_id, *prop_key_id) {
301 idx.insert(new_value, node_id);
302 }
303 }
304 }
305 }
306 if let Some(updated) = self.node_store.get_node(node_id).cloned() {
308 let _ = self.rewrite_node_on_page(node_id, &updated, false);
309 }
310 Ok(())
311 }
312
313 pub fn delete_node(&mut self, node_id: NodeId) -> Result<NodeRecord> {
316 let node_data = self.node_store.get_node(node_id).cloned();
318 self.edge_store
320 .delete_edges_for_node(node_id, &mut self.node_store)?;
321 let deleted = self.node_store.delete_node(node_id)?;
322 if let Some(ref node) = node_data {
324 for &label_id in &node.labels {
325 for (prop_key_id, value) in &node.properties {
326 if let Some(idx) = self.index_manager.find_index_mut(label_id, *prop_key_id) {
327 idx.remove(value, node_id);
328 }
329 }
330 }
331 }
332 if let Some(ref node) = node_data {
334 let _ = self.rewrite_node_on_page(node_id, node, true);
335 }
336 Ok(deleted)
337 }
338
339 pub fn create_edge(
343 &mut self,
344 start_node: NodeId,
345 end_node: NodeId,
346 rel_type_id: u32,
347 properties: Vec<(u32, PropertyValue)>,
348 ) -> Result<EdgeId> {
349 let id = self.edge_store.create_edge(
350 start_node,
351 end_node,
352 rel_type_id,
353 properties.clone(),
354 &mut self.node_store,
355 )?;
356 self.page_manager.header_mut().next_edge_id = self.edge_store.next_id();
357 for (prop_key_id, value) in &properties {
359 if let Some(idx) = self
360 .edge_index_manager
361 .find_index_mut(rel_type_id, *prop_key_id)
362 {
363 idx.insert(value, id);
364 }
365 }
366 if let Some(record) = self.edge_store.get_edge(id).cloned() {
368 let _ = self.persist_edge(id, &record, false);
369 }
370 Ok(id)
371 }
372
373 pub fn get_edge(&self, edge_id: EdgeId) -> Option<&RelationshipRecord> {
375 self.edge_store.get_edge(edge_id)
376 }
377
378 pub fn update_edge(
380 &mut self,
381 edge_id: EdgeId,
382 properties: Vec<(u32, PropertyValue)>,
383 ) -> Result<()> {
384 let old_edge = self.edge_store.get_edge(edge_id).cloned();
386 self.edge_store.update_edge(edge_id, properties.clone())?;
387 if let Some(old) = old_edge {
388 let rel_type_id = old.rel_type_id;
389 for (prop_key_id, old_value) in &old.properties {
391 if let Some(idx) = self
392 .edge_index_manager
393 .find_index_mut(rel_type_id, *prop_key_id)
394 {
395 idx.remove(old_value, edge_id);
396 }
397 }
398 for (prop_key_id, new_value) in &properties {
400 if let Some(idx) = self
401 .edge_index_manager
402 .find_index_mut(rel_type_id, *prop_key_id)
403 {
404 idx.insert(new_value, edge_id);
405 }
406 }
407 }
408 if let Some(updated) = self.edge_store.get_edge(edge_id).cloned() {
410 let _ = self.rewrite_edge_on_page(edge_id, &updated, false);
411 }
412 Ok(())
413 }
414
415 pub fn get_edges_for_node(&self, node_id: NodeId) -> Vec<&RelationshipRecord> {
417 self.edge_store
418 .get_edges_for_node(node_id, &self.node_store)
419 }
420
421 pub fn delete_edge(&mut self, edge_id: EdgeId) -> Result<RelationshipRecord> {
423 let edge_data = self.edge_store.get_edge(edge_id).cloned();
425 let deleted = self.edge_store.delete_edge(edge_id, &mut self.node_store)?;
426 if let Some(ref edge) = edge_data {
428 for (prop_key_id, value) in &edge.properties {
429 if let Some(idx) = self
430 .edge_index_manager
431 .find_index_mut(edge.rel_type_id, *prop_key_id)
432 {
433 idx.remove(value, edge_id);
434 }
435 }
436 }
437 if let Some(ref edge) = edge_data {
439 let _ = self.rewrite_edge_on_page(edge_id, edge, true);
440 }
441 Ok(deleted)
442 }
443
444 pub fn scan_nodes(&self) -> Vec<&NodeRecord> {
448 self.node_store.scan_all()
449 }
450
451 pub fn scan_nodes_by_label(&self, label_id: u32) -> Vec<&NodeRecord> {
453 self.node_store.scan_by_label(label_id)
454 }
455
456 pub fn scan_edges_by_type(&self, type_id: u32) -> Vec<&RelationshipRecord> {
458 self.edge_store.scan_by_type(type_id)
459 }
460
461 pub fn begin_read(&self) -> transaction::ReadTransaction {
465 self.tx_manager.begin_read()
466 }
467
468 pub fn begin_write(&self) -> Result<transaction::WriteTransaction> {
470 self.tx_manager.begin_write()
471 }
472
473 pub fn wal_write_page(&mut self, page_id: PageId, data: &[u8; PAGE_SIZE]) -> Result<u64> {
477 let db_size = self.page_manager.header().page_count;
478 self.wal_writer.write_frame(page_id, db_size, data)
479 }
480
481 pub fn wal_commit(&mut self) -> Result<u64> {
483 let frame = self.wal_writer.commit()?;
484 self.tx_manager.update_current_frame(frame);
485 Ok(frame)
486 }
487
488 pub fn wal_discard(&mut self) {
490 self.wal_writer.discard();
491 }
492
493 pub fn checkpoint(&mut self) -> Result<u64> {
497 Checkpoint::run(
498 &mut self.page_manager,
499 &mut self.wal_writer,
500 &mut self.wal_reader,
501 )
502 }
503
504 pub fn flush_header(&mut self) -> Result<()> {
508 self.page_manager.flush_header()
509 }
510
511 pub fn node_count(&self) -> usize {
513 self.node_store.len()
514 }
515
516 pub fn edge_count(&self) -> usize {
518 self.edge_store.len()
519 }
520
521 pub fn find_node(
527 &self,
528 label_ids: &[u32],
529 properties: &[(u32, PropertyValue)],
530 ) -> Option<NodeId> {
531 let candidates: Vec<&NodeRecord> = if let Some(&first_label) = label_ids.first() {
532 self.scan_nodes_by_label(first_label)
533 } else {
534 self.scan_nodes()
535 };
536
537 for node in candidates {
538 let has_all_labels = label_ids.iter().all(|lid| node.labels.contains(lid));
540 if !has_all_labels {
541 continue;
542 }
543 let has_all_props = properties
545 .iter()
546 .all(|(key, val)| node.properties.iter().any(|(k, v)| k == key && v == val));
547 if has_all_props {
548 return Some(node.node_id);
549 }
550 }
551 None
552 }
553
554 pub fn find_edge(&self, start: NodeId, end: NodeId, type_id: u32) -> Option<EdgeId> {
559 let edges = self.get_edges_for_node(start);
560 for edge in edges {
561 if edge.start_node == start && edge.end_node == end && edge.rel_type_id == type_id {
562 return Some(edge.edge_id);
563 }
564 }
565 None
566 }
567
568 pub fn config(&self) -> &DatabaseConfig {
570 &self.config
571 }
572
573 pub fn scan_nodes_by_property(
578 &self,
579 label_id: u32,
580 prop_key_id: u32,
581 value: &PropertyValue,
582 ) -> Vec<NodeId> {
583 if let Some(idx) = self.index_manager.find_index(label_id, prop_key_id) {
584 idx.lookup(value)
586 } else {
587 self.node_store
589 .scan_by_label(label_id)
590 .iter()
591 .filter(|n| {
592 n.properties
593 .iter()
594 .any(|(k, v)| *k == prop_key_id && v == value)
595 })
596 .map(|n| n.node_id)
597 .collect()
598 }
599 }
600
601 pub fn scan_nodes_by_range(
606 &self,
607 label_id: u32,
608 prop_key_id: u32,
609 min: &PropertyValue,
610 max: &PropertyValue,
611 ) -> Vec<NodeId> {
612 if let Some(idx) = self.index_manager.find_index(label_id, prop_key_id) {
613 idx.range(min, max)
615 } else {
616 let min_key = index::PropertyValueKey(min.clone());
618 let max_key = index::PropertyValueKey(max.clone());
619 self.node_store
620 .scan_by_label(label_id)
621 .iter()
622 .filter(|n| {
623 n.properties.iter().any(|(k, v)| {
624 if *k != prop_key_id {
625 return false;
626 }
627 let vk = index::PropertyValueKey(v.clone());
628 vk >= min_key && vk <= max_key
629 })
630 })
631 .map(|n| n.node_id)
632 .collect()
633 }
634 }
635
636 pub fn index_manager(&self) -> &IndexManager {
638 &self.index_manager
639 }
640
641 pub fn index_manager_mut(&mut self) -> &mut IndexManager {
643 &mut self.index_manager
644 }
645
646 pub fn edge_index_manager(&self) -> &EdgeIndexManager {
648 &self.edge_index_manager
649 }
650
651 pub fn edge_index_manager_mut(&mut self) -> &mut EdgeIndexManager {
653 &mut self.edge_index_manager
654 }
655
656 pub fn scan_edges_by_property(
661 &self,
662 rel_type_id: u32,
663 prop_key_id: u32,
664 value: &PropertyValue,
665 ) -> Vec<EdgeId> {
666 if let Some(idx) = self.edge_index_manager.find_index(rel_type_id, prop_key_id) {
667 idx.lookup(value)
668 } else {
669 self.edge_store
671 .scan_by_type(rel_type_id)
672 .iter()
673 .filter(|e| {
674 e.properties
675 .iter()
676 .any(|(k, v)| *k == prop_key_id && v == value)
677 })
678 .map(|e| e.edge_id)
679 .collect()
680 }
681 }
682
683 pub fn catalog(&self) -> &catalog::Catalog {
685 &self.catalog
686 }
687
688 pub fn catalog_mut(&mut self) -> &mut catalog::Catalog {
690 &mut self.catalog
691 }
692
693 pub fn version_store(&self) -> &VersionStore {
695 &self.version_store
696 }
697
698 pub fn version_store_mut(&mut self) -> &mut VersionStore {
700 &mut self.version_store
701 }
702
703 #[cfg(feature = "subgraph")]
707 pub fn create_subgraph(
708 &mut self,
709 properties: Vec<(u32, PropertyValue)>,
710 temporal_anchor: Option<i64>,
711 ) -> SubgraphId {
712 let id = self.subgraph_store.create(properties, temporal_anchor);
713 self.page_manager.header_mut().next_subgraph_id = self.subgraph_store.next_id();
714 if let Some(record) = self.subgraph_store.get(id).cloned() {
716 let members = self.membership_index.members(id);
717 let _ = self.persist_subgraph(id, &record, &members, false);
718 }
719 id
720 }
721
722 #[cfg(feature = "subgraph")]
724 pub fn get_subgraph(&self, id: SubgraphId) -> Option<&SubgraphRecord> {
725 self.subgraph_store.get(id)
726 }
727
728 #[cfg(feature = "subgraph")]
730 pub fn delete_subgraph(&mut self, id: SubgraphId) -> cypherlite_core::Result<SubgraphRecord> {
731 self.membership_index.remove_all(id);
733 self.subgraph_store
734 .delete(id)
735 .ok_or(cypherlite_core::CypherLiteError::SubgraphNotFound(id.0))
736 }
737
738 #[cfg(feature = "subgraph")]
740 pub fn add_member(
741 &mut self,
742 subgraph_id: SubgraphId,
743 node_id: NodeId,
744 ) -> cypherlite_core::Result<()> {
745 if self.subgraph_store.get(subgraph_id).is_none() {
746 return Err(cypherlite_core::CypherLiteError::SubgraphNotFound(
747 subgraph_id.0,
748 ));
749 }
750 if self.node_store.get_node(node_id).is_none() {
751 return Err(cypherlite_core::CypherLiteError::NodeNotFound(node_id.0));
752 }
753 self.membership_index.add(subgraph_id, node_id);
754 if let Some(record) = self.subgraph_store.get(subgraph_id).cloned() {
756 let members = self.membership_index.members(subgraph_id);
757 let _ = self.persist_subgraph(subgraph_id, &record, &members, false);
758 }
759 Ok(())
760 }
761
762 #[cfg(feature = "subgraph")]
764 pub fn remove_member(
765 &mut self,
766 subgraph_id: SubgraphId,
767 node_id: NodeId,
768 ) -> cypherlite_core::Result<()> {
769 if self.subgraph_store.get(subgraph_id).is_none() {
770 return Err(cypherlite_core::CypherLiteError::SubgraphNotFound(
771 subgraph_id.0,
772 ));
773 }
774 self.membership_index.remove(subgraph_id, node_id);
775 Ok(())
776 }
777
778 #[cfg(feature = "subgraph")]
780 pub fn list_members(&self, subgraph_id: SubgraphId) -> Vec<NodeId> {
781 self.membership_index.members(subgraph_id)
782 }
783
784 #[cfg(feature = "subgraph")]
786 pub fn get_subgraph_memberships(&self, node_id: NodeId) -> Vec<SubgraphId> {
787 self.membership_index.memberships(node_id)
788 }
789
790 #[cfg(feature = "subgraph")]
792 pub fn scan_subgraphs(&self) -> Vec<&SubgraphRecord> {
793 self.subgraph_store.all().collect()
794 }
795
796 #[cfg(feature = "hypergraph")]
800 pub fn create_hyperedge(
801 &mut self,
802 rel_type_id: u32,
803 sources: Vec<cypherlite_core::GraphEntity>,
804 targets: Vec<cypherlite_core::GraphEntity>,
805 properties: Vec<(u32, PropertyValue)>,
806 ) -> HyperEdgeId {
807 let id =
808 self.hyperedge_store
809 .create(rel_type_id, sources.clone(), targets.clone(), properties);
810 self.page_manager.header_mut().next_hyperedge_id = self.hyperedge_store.next_id();
812 for entity in sources.iter().chain(targets.iter()) {
814 let raw_id = match entity {
815 cypherlite_core::GraphEntity::Node(nid) => nid.0,
816 cypherlite_core::GraphEntity::Subgraph(sid) => sid.0,
817 #[cfg(feature = "hypergraph")]
818 cypherlite_core::GraphEntity::HyperEdge(hid) => hid.0,
819 #[cfg(feature = "hypergraph")]
820 cypherlite_core::GraphEntity::TemporalRef(nid, _) => nid.0,
821 };
822 self.hyperedge_reverse_index.add(id.0, raw_id);
823 }
824 if let Some(record) = self.hyperedge_store.get(id).cloned() {
826 let _ = self.persist_hyperedge(id, &record, false);
827 }
828 id
829 }
830
831 #[cfg(feature = "hypergraph")]
833 pub fn get_hyperedge(&self, id: HyperEdgeId) -> Option<&HyperEdgeRecord> {
834 self.hyperedge_store.get(id)
835 }
836
837 #[cfg(feature = "hypergraph")]
839 pub fn delete_hyperedge(
840 &mut self,
841 id: HyperEdgeId,
842 ) -> cypherlite_core::Result<HyperEdgeRecord> {
843 self.hyperedge_reverse_index.remove_all(id.0);
845 self.hyperedge_store
846 .delete(id)
847 .ok_or(cypherlite_core::CypherLiteError::HyperEdgeNotFound(id.0))
848 }
849
850 #[cfg(feature = "hypergraph")]
852 pub fn scan_hyperedges(&self) -> Vec<&HyperEdgeRecord> {
853 self.hyperedge_store.all().collect()
854 }
855
856 #[cfg(feature = "hypergraph")]
858 pub fn hyperedges_for_entity(&self, raw_entity_id: u64) -> Vec<u64> {
859 self.hyperedge_reverse_index.hyperedges_for(raw_entity_id)
860 }
861
862 pub fn node_data_root_page(&self) -> u32 {
866 self.page_manager.header().node_data_root_page
867 }
868
869 pub fn edge_data_root_page(&self) -> u32 {
871 self.page_manager.header().edge_data_root_page
872 }
873
874 pub fn version_count(&self, entity_id: u64) -> u64 {
876 self.version_store.version_count(entity_id)
877 }
878
879 pub fn version_chain(&self, entity_id: u64) -> Vec<(u64, &version::VersionRecord)> {
881 self.version_store.get_version_chain(entity_id)
882 }
883
884 pub fn read_data_page(&mut self, page_id: u32) -> Result<[u8; PAGE_SIZE]> {
890 if let Some((cached_pid, ref cached_buf)) = self.current_node_data_page {
892 if cached_pid == page_id {
893 return Ok(*cached_buf);
894 }
895 }
896 if let Some((cached_pid, ref cached_buf)) = self.current_edge_data_page {
898 if cached_pid == page_id {
899 return Ok(*cached_buf);
900 }
901 }
902 #[cfg(feature = "subgraph")]
904 if let Some((cached_pid, ref cached_buf)) = self.current_subgraph_data_page {
905 if cached_pid == page_id {
906 return Ok(*cached_buf);
907 }
908 }
909 #[cfg(feature = "hypergraph")]
911 if let Some((cached_pid, ref cached_buf)) = self.current_hyperedge_data_page {
912 if cached_pid == page_id {
913 return Ok(*cached_buf);
914 }
915 }
916 if let Some((cached_pid, ref cached_buf)) = self.current_version_data_page {
918 if cached_pid == page_id {
919 return Ok(*cached_buf);
920 }
921 }
922 self.page_manager.read_page(PageId(page_id))
924 }
925
926 pub fn wal_frame_count(&self) -> u64 {
928 self.wal_writer.frame_count()
929 }
930
931 pub fn node_data_page_count(&self) -> usize {
933 let root = self.page_manager.header().node_data_root_page;
934 if root == 0 {
935 return 0;
936 }
937 let mut pages: std::collections::HashSet<u32> =
939 self.node_page_map.values().copied().collect();
940 if let Some((pid, _)) = &self.current_node_data_page {
941 pages.insert(*pid);
942 }
943 pages.len()
944 }
945
946 fn load_nodes_from_pages(&mut self) -> Result<()> {
956 use page::record_serialization::{
957 deserialize_node_record, read_records_from_page, DataPageHeader,
958 };
959
960 let root_page = self.page_manager.header().node_data_root_page;
961 if root_page == 0 {
962 return Ok(()); }
964
965 let mut current_page_id = root_page;
966 loop {
967 let page_buf = self.page_manager.read_page(PageId(current_page_id))?;
968 let header = DataPageHeader::read_from(&page_buf);
969
970 let entries = read_records_from_page(&page_buf);
972 for (off, len) in &entries {
973 if let Some((record, deleted, _)) =
974 deserialize_node_record(&page_buf[*off..*off + *len])
975 {
976 if !deleted {
977 self.node_page_map.insert(record.node_id.0, current_page_id);
978 self.node_store.insert_loaded_record(record);
979 }
980 }
981 }
982
983 if header.next_page == 0 {
985 self.current_node_data_page = Some((current_page_id, page_buf));
987 break;
988 }
989 current_page_id = header.next_page;
990 }
991
992 Ok(())
993 }
994
995 fn load_edges_from_pages(&mut self) -> Result<()> {
1005 use page::record_serialization::{
1006 deserialize_edge_record, read_records_from_page, DataPageHeader,
1007 };
1008
1009 let root_page = self.page_manager.header().edge_data_root_page;
1010 if root_page == 0 {
1011 return Ok(()); }
1013
1014 let mut current_page_id = root_page;
1015 loop {
1016 let page_buf = self.page_manager.read_page(PageId(current_page_id))?;
1017 let header = DataPageHeader::read_from(&page_buf);
1018
1019 let entries = read_records_from_page(&page_buf);
1020 for (off, len) in &entries {
1021 if let Some((record, deleted, _)) =
1022 deserialize_edge_record(&page_buf[*off..*off + *len])
1023 {
1024 if !deleted {
1025 self.edge_page_map.insert(record.edge_id.0, current_page_id);
1026 self.edge_store.insert_loaded_record(record);
1027 }
1028 }
1029 }
1030
1031 if header.next_page == 0 {
1032 self.current_edge_data_page = Some((current_page_id, page_buf));
1033 break;
1034 }
1035 current_page_id = header.next_page;
1036 }
1037
1038 Ok(())
1039 }
1040
1041 fn save_catalog(&mut self) -> Result<()> {
1047 use page::record_serialization::DataPageHeader;
1048 use page::PageType;
1049
1050 let catalog_bytes = self.catalog.save();
1051 if catalog_bytes.is_empty() {
1052 return Ok(());
1053 }
1054
1055 let usable_per_page = PAGE_SIZE - DataPageHeader::SIZE;
1056 let mut first_page_id: Option<u32> = None;
1057 let mut prev_page: Option<(u32, [u8; PAGE_SIZE])> = None;
1058
1059 for chunk in catalog_bytes.chunks(usable_per_page) {
1060 let new_page_id = self.page_manager.allocate_page()?;
1061 let mut page_buf = [0u8; PAGE_SIZE];
1062 let mut header = DataPageHeader::new(PageType::CatalogData as u8);
1063 header.free_offset = (DataPageHeader::SIZE + chunk.len()) as u16;
1064 header.record_count = 1; header.write_to(&mut page_buf);
1066
1067 page_buf[DataPageHeader::SIZE..DataPageHeader::SIZE + chunk.len()]
1069 .copy_from_slice(chunk);
1070
1071 if first_page_id.is_none() {
1072 first_page_id = Some(new_page_id.0);
1073 }
1074
1075 if let Some((prev_id, ref mut prev_buf)) = prev_page {
1077 let mut prev_header = DataPageHeader::read_from(prev_buf);
1078 prev_header.next_page = new_page_id.0;
1079 prev_header.write_to(prev_buf);
1080 let db_size = self.page_manager.header().page_count;
1082 self.wal_writer
1083 .write_frame(PageId(prev_id), db_size, prev_buf)?;
1084 }
1085
1086 prev_page = Some((new_page_id.0, page_buf));
1087 }
1088
1089 if let Some((last_id, ref last_buf)) = prev_page {
1091 let db_size = self.page_manager.header().page_count;
1092 self.wal_writer
1093 .write_frame(PageId(last_id), db_size, last_buf)?;
1094 self.wal_writer.commit()?;
1095 }
1096
1097 if let Some(root_id) = first_page_id {
1099 self.page_manager.header_mut().catalog_page_id = root_id;
1100 }
1101
1102 Ok(())
1103 }
1104
1105 fn load_catalog(&mut self) -> Result<()> {
1111 use page::record_serialization::DataPageHeader;
1112
1113 let root_page = self.page_manager.header().catalog_page_id;
1114 if root_page == 0 {
1115 return Ok(()); }
1117
1118 let mut catalog_bytes = Vec::new();
1119 let mut current_page_id = root_page;
1120
1121 loop {
1122 let page_buf = self.page_manager.read_page(PageId(current_page_id))?;
1123 let header = DataPageHeader::read_from(&page_buf);
1124
1125 let data_start = DataPageHeader::SIZE;
1127 let data_end = header.free_offset as usize;
1128 if data_end > data_start && data_end <= PAGE_SIZE {
1129 catalog_bytes.extend_from_slice(&page_buf[data_start..data_end]);
1130 }
1131
1132 if header.next_page == 0 {
1133 break;
1134 }
1135 current_page_id = header.next_page;
1136 }
1137
1138 self.catalog = catalog::Catalog::load(&catalog_bytes)?;
1139 Ok(())
1140 }
1141
1142 fn persist_node(&mut self, node_id: NodeId, record: &NodeRecord, deleted: bool) -> Result<()> {
1144 use page::record_serialization::{
1145 pack_record_into_page, serialize_node_record, DataPageHeader,
1146 };
1147 use page::PageType;
1148
1149 let record_bytes = serialize_node_record(record, deleted);
1150
1151 if let Some((page_id, ref mut page_buf)) = self.current_node_data_page {
1153 if pack_record_into_page(page_buf, &record_bytes) {
1154 let db_size = self.page_manager.header().page_count;
1156 self.wal_writer
1157 .write_frame(PageId(page_id), db_size, page_buf)?;
1158 self.wal_writer.commit()?;
1159 self.node_page_map.insert(node_id.0, page_id);
1160 return Ok(());
1161 }
1162 }
1163
1164 let new_page_id = self.page_manager.allocate_page()?;
1166 let mut new_page = [0u8; PAGE_SIZE];
1167 let header = DataPageHeader::new(PageType::NodeData as u8);
1168 header.write_to(&mut new_page);
1169
1170 if let Some((old_page_id, ref mut old_buf)) = self.current_node_data_page {
1172 let mut old_header = DataPageHeader::read_from(old_buf);
1174 old_header.next_page = new_page_id.0;
1175 old_header.write_to(old_buf);
1176 let db_size = self.page_manager.header().page_count;
1178 self.wal_writer
1179 .write_frame(PageId(old_page_id), db_size, old_buf)?;
1180 self.wal_writer.commit()?;
1181 }
1182
1183 let packed = pack_record_into_page(&mut new_page, &record_bytes);
1185 debug_assert!(packed, "fresh page should always have space for a record");
1186
1187 let db_size = self.page_manager.header().page_count;
1189 self.wal_writer
1190 .write_frame(new_page_id, db_size, &new_page)?;
1191 self.wal_writer.commit()?;
1192
1193 if self.page_manager.header().node_data_root_page == 0 {
1195 self.page_manager.header_mut().node_data_root_page = new_page_id.0;
1196 self.page_manager.flush_header()?;
1197 }
1198
1199 self.node_page_map.insert(node_id.0, new_page_id.0);
1200 self.current_node_data_page = Some((new_page_id.0, new_page));
1201
1202 Ok(())
1203 }
1204
1205 fn persist_edge(
1207 &mut self,
1208 edge_id: EdgeId,
1209 record: &RelationshipRecord,
1210 deleted: bool,
1211 ) -> Result<()> {
1212 use page::record_serialization::{
1213 pack_record_into_page, serialize_edge_record, DataPageHeader,
1214 };
1215 use page::PageType;
1216
1217 let record_bytes = serialize_edge_record(record, deleted);
1218
1219 if let Some((page_id, ref mut page_buf)) = self.current_edge_data_page {
1221 if pack_record_into_page(page_buf, &record_bytes) {
1222 let db_size = self.page_manager.header().page_count;
1223 self.wal_writer
1224 .write_frame(PageId(page_id), db_size, page_buf)?;
1225 self.wal_writer.commit()?;
1226 self.edge_page_map.insert(edge_id.0, page_id);
1227 return Ok(());
1228 }
1229 }
1230
1231 let new_page_id = self.page_manager.allocate_page()?;
1233 let mut new_page = [0u8; PAGE_SIZE];
1234 let header = DataPageHeader::new(PageType::EdgeData as u8);
1235 header.write_to(&mut new_page);
1236
1237 if let Some((old_page_id, ref mut old_buf)) = self.current_edge_data_page {
1239 let mut old_header = DataPageHeader::read_from(old_buf);
1240 old_header.next_page = new_page_id.0;
1241 old_header.write_to(old_buf);
1242 let db_size = self.page_manager.header().page_count;
1243 self.wal_writer
1244 .write_frame(PageId(old_page_id), db_size, old_buf)?;
1245 self.wal_writer.commit()?;
1246 }
1247
1248 let packed = pack_record_into_page(&mut new_page, &record_bytes);
1249 debug_assert!(packed, "fresh page should always have space for a record");
1250
1251 let db_size = self.page_manager.header().page_count;
1252 self.wal_writer
1253 .write_frame(new_page_id, db_size, &new_page)?;
1254 self.wal_writer.commit()?;
1255
1256 if self.page_manager.header().edge_data_root_page == 0 {
1257 self.page_manager.header_mut().edge_data_root_page = new_page_id.0;
1258 self.page_manager.flush_header()?;
1259 }
1260
1261 self.edge_page_map.insert(edge_id.0, new_page_id.0);
1262 self.current_edge_data_page = Some((new_page_id.0, new_page));
1263
1264 Ok(())
1265 }
1266
1267 fn rewrite_node_on_page(
1270 &mut self,
1271 node_id: NodeId,
1272 record: &NodeRecord,
1273 deleted: bool,
1274 ) -> Result<()> {
1275 use page::record_serialization::{
1276 deserialize_node_record, pack_record_into_page, read_records_from_page,
1277 serialize_node_record, DataPageHeader,
1278 };
1279
1280 let page_id = match self.node_page_map.get(&node_id.0) {
1281 Some(&pid) => pid,
1282 None => {
1283 return self.persist_node(node_id, record, deleted);
1285 }
1286 };
1287
1288 let old_page = self.read_data_page(page_id)?;
1290 let old_header = DataPageHeader::read_from(&old_page);
1291
1292 let mut new_page = [0u8; PAGE_SIZE];
1294 let mut new_header = DataPageHeader::new(old_header.page_type);
1295 new_header.next_page = old_header.next_page;
1296 new_header.write_to(&mut new_page);
1297
1298 let entries = read_records_from_page(&old_page);
1299 for (off, len) in &entries {
1300 let slice = &old_page[*off..*off + *len];
1301 if let Some((rec, _del, _)) = deserialize_node_record(slice) {
1302 if rec.node_id == node_id {
1303 let updated_bytes = serialize_node_record(record, deleted);
1305 pack_record_into_page(&mut new_page, &updated_bytes);
1306 } else {
1307 pack_record_into_page(&mut new_page, slice);
1309 }
1310 }
1311 }
1312
1313 let db_size = self.page_manager.header().page_count;
1315 self.wal_writer
1316 .write_frame(PageId(page_id), db_size, &new_page)?;
1317 self.wal_writer.commit()?;
1318
1319 if let Some((cached_pid, ref mut cached_buf)) = self.current_node_data_page {
1321 if cached_pid == page_id {
1322 *cached_buf = new_page;
1323 }
1324 }
1325
1326 Ok(())
1327 }
1328
1329 fn rewrite_edge_on_page(
1331 &mut self,
1332 edge_id: EdgeId,
1333 record: &RelationshipRecord,
1334 deleted: bool,
1335 ) -> Result<()> {
1336 use page::record_serialization::{
1337 deserialize_edge_record, pack_record_into_page, read_records_from_page,
1338 serialize_edge_record, DataPageHeader,
1339 };
1340
1341 let page_id = match self.edge_page_map.get(&edge_id.0) {
1342 Some(&pid) => pid,
1343 None => {
1344 return self.persist_edge(edge_id, record, deleted);
1345 }
1346 };
1347
1348 let old_page = self.read_data_page(page_id)?;
1350 let old_header = DataPageHeader::read_from(&old_page);
1351
1352 let mut new_page = [0u8; PAGE_SIZE];
1353 let mut new_header = DataPageHeader::new(old_header.page_type);
1354 new_header.next_page = old_header.next_page;
1355 new_header.write_to(&mut new_page);
1356
1357 let entries = read_records_from_page(&old_page);
1358 for (off, len) in &entries {
1359 let slice = &old_page[*off..*off + *len];
1360 if let Some((rec, _del, _)) = deserialize_edge_record(slice) {
1361 if rec.edge_id == edge_id {
1362 let updated_bytes = serialize_edge_record(record, deleted);
1363 pack_record_into_page(&mut new_page, &updated_bytes);
1364 } else {
1365 pack_record_into_page(&mut new_page, slice);
1366 }
1367 }
1368 }
1369
1370 let db_size = self.page_manager.header().page_count;
1371 self.wal_writer
1372 .write_frame(PageId(page_id), db_size, &new_page)?;
1373 self.wal_writer.commit()?;
1374
1375 if let Some((cached_pid, ref mut cached_buf)) = self.current_edge_data_page {
1376 if cached_pid == page_id {
1377 *cached_buf = new_page;
1378 }
1379 }
1380
1381 Ok(())
1382 }
1383
1384 #[cfg(feature = "subgraph")]
1391 fn load_subgraphs_from_pages(&mut self) -> Result<()> {
1392 use page::record_serialization::{
1393 deserialize_subgraph_record, read_records_from_page, DataPageHeader,
1394 };
1395
1396 let root_page = self.page_manager.header().subgraph_data_root_page;
1397 if root_page == 0 {
1398 return Ok(());
1399 }
1400
1401 let mut current_page_id = root_page;
1402 loop {
1403 let page_buf = self.page_manager.read_page(PageId(current_page_id))?;
1404 let header = DataPageHeader::read_from(&page_buf);
1405
1406 let entries = read_records_from_page(&page_buf);
1407 for (off, len) in &entries {
1408 if let Some((record, members, deleted, _)) =
1409 deserialize_subgraph_record(&page_buf[*off..*off + *len])
1410 {
1411 if !deleted {
1412 let sg_id = record.subgraph_id;
1413 self.subgraph_store.insert_loaded_record(record);
1414 for node_id in members {
1416 self.membership_index.add(sg_id, node_id);
1417 }
1418 }
1419 }
1420 }
1421
1422 if header.next_page == 0 {
1423 self.current_subgraph_data_page = Some((current_page_id, page_buf));
1424 break;
1425 }
1426 current_page_id = header.next_page;
1427 }
1428
1429 Ok(())
1430 }
1431
1432 #[cfg(feature = "subgraph")]
1434 fn persist_subgraph(
1435 &mut self,
1436 _id: SubgraphId,
1437 record: &SubgraphRecord,
1438 members: &[NodeId],
1439 deleted: bool,
1440 ) -> Result<()> {
1441 use page::record_serialization::{
1442 pack_record_into_page, serialize_subgraph_record, DataPageHeader,
1443 };
1444 use page::PageType;
1445
1446 let record_bytes = serialize_subgraph_record(record, members, deleted);
1447
1448 if let Some((page_id, ref mut page_buf)) = self.current_subgraph_data_page {
1450 if pack_record_into_page(page_buf, &record_bytes) {
1451 let db_size = self.page_manager.header().page_count;
1452 self.wal_writer
1453 .write_frame(PageId(page_id), db_size, page_buf)?;
1454 self.wal_writer.commit()?;
1455 return Ok(());
1456 }
1457 }
1458
1459 let new_page_id = self.page_manager.allocate_page()?;
1461 let mut new_page = [0u8; PAGE_SIZE];
1462 let header = DataPageHeader::new(PageType::SubgraphData as u8);
1463 header.write_to(&mut new_page);
1464
1465 if let Some((old_page_id, ref mut old_buf)) = self.current_subgraph_data_page {
1467 let mut old_header = DataPageHeader::read_from(old_buf);
1468 old_header.next_page = new_page_id.0;
1469 old_header.write_to(old_buf);
1470 let db_size = self.page_manager.header().page_count;
1471 self.wal_writer
1472 .write_frame(PageId(old_page_id), db_size, old_buf)?;
1473 self.wal_writer.commit()?;
1474 }
1475
1476 let packed = pack_record_into_page(&mut new_page, &record_bytes);
1477 debug_assert!(packed, "fresh page should always have space for a record");
1478
1479 let db_size = self.page_manager.header().page_count;
1480 self.wal_writer
1481 .write_frame(new_page_id, db_size, &new_page)?;
1482 self.wal_writer.commit()?;
1483
1484 if self.page_manager.header().subgraph_data_root_page == 0 {
1485 self.page_manager.header_mut().subgraph_data_root_page = new_page_id.0;
1486 self.page_manager.flush_header()?;
1487 }
1488
1489 self.current_subgraph_data_page = Some((new_page_id.0, new_page));
1490
1491 Ok(())
1492 }
1493
1494 #[cfg(feature = "hypergraph")]
1501 fn load_hyperedges_from_pages(&mut self) -> Result<()> {
1502 use page::record_serialization::{
1503 deserialize_hyperedge_record, read_records_from_page, DataPageHeader,
1504 };
1505
1506 let root_page = self.page_manager.header().hyperedge_data_root_page;
1507 if root_page == 0 {
1508 return Ok(());
1509 }
1510
1511 let mut current_page_id = root_page;
1512 loop {
1513 let page_buf = self.page_manager.read_page(PageId(current_page_id))?;
1514 let header = DataPageHeader::read_from(&page_buf);
1515
1516 let entries = read_records_from_page(&page_buf);
1517 for (off, len) in &entries {
1518 if let Some((record, deleted, _)) =
1519 deserialize_hyperedge_record(&page_buf[*off..*off + *len])
1520 {
1521 if !deleted {
1522 let he_id = record.id;
1523 for entity in record.sources.iter().chain(record.targets.iter()) {
1525 let raw_id = match entity {
1526 cypherlite_core::GraphEntity::Node(nid) => nid.0,
1527 cypherlite_core::GraphEntity::Subgraph(sid) => sid.0,
1528 cypherlite_core::GraphEntity::HyperEdge(hid) => hid.0,
1529 cypherlite_core::GraphEntity::TemporalRef(nid, _) => nid.0,
1530 };
1531 self.hyperedge_reverse_index.add(he_id.0, raw_id);
1532 }
1533 self.hyperedge_store.insert_loaded_record(record);
1534 }
1535 }
1536 }
1537
1538 if header.next_page == 0 {
1539 self.current_hyperedge_data_page = Some((current_page_id, page_buf));
1540 break;
1541 }
1542 current_page_id = header.next_page;
1543 }
1544
1545 Ok(())
1546 }
1547
1548 #[cfg(feature = "hypergraph")]
1550 fn persist_hyperedge(
1551 &mut self,
1552 _id: HyperEdgeId,
1553 record: &HyperEdgeRecord,
1554 deleted: bool,
1555 ) -> Result<()> {
1556 use page::record_serialization::{
1557 pack_record_into_page, serialize_hyperedge_record, DataPageHeader,
1558 };
1559 use page::PageType;
1560
1561 let record_bytes = serialize_hyperedge_record(record, deleted);
1562
1563 if let Some((page_id, ref mut page_buf)) = self.current_hyperedge_data_page {
1565 if pack_record_into_page(page_buf, &record_bytes) {
1566 let db_size = self.page_manager.header().page_count;
1567 self.wal_writer
1568 .write_frame(PageId(page_id), db_size, page_buf)?;
1569 self.wal_writer.commit()?;
1570 return Ok(());
1571 }
1572 }
1573
1574 let new_page_id = self.page_manager.allocate_page()?;
1576 let mut new_page = [0u8; PAGE_SIZE];
1577 let header = DataPageHeader::new(PageType::HyperEdgeData as u8);
1578 header.write_to(&mut new_page);
1579
1580 if let Some((old_page_id, ref mut old_buf)) = self.current_hyperedge_data_page {
1582 let mut old_header = DataPageHeader::read_from(old_buf);
1583 old_header.next_page = new_page_id.0;
1584 old_header.write_to(old_buf);
1585 let db_size = self.page_manager.header().page_count;
1586 self.wal_writer
1587 .write_frame(PageId(old_page_id), db_size, old_buf)?;
1588 self.wal_writer.commit()?;
1589 }
1590
1591 let packed = pack_record_into_page(&mut new_page, &record_bytes);
1592 debug_assert!(packed, "fresh page should always have space for a record");
1593
1594 let db_size = self.page_manager.header().page_count;
1595 self.wal_writer
1596 .write_frame(new_page_id, db_size, &new_page)?;
1597 self.wal_writer.commit()?;
1598
1599 if self.page_manager.header().hyperedge_data_root_page == 0 {
1600 self.page_manager.header_mut().hyperedge_data_root_page = new_page_id.0;
1601 self.page_manager.flush_header()?;
1602 }
1603
1604 self.current_hyperedge_data_page = Some((new_page_id.0, new_page));
1605
1606 Ok(())
1607 }
1608
1609 fn load_versions_from_pages(&mut self) -> Result<()> {
1616 use page::record_serialization::{
1617 deserialize_version_record, read_records_from_page, DataPageHeader,
1618 };
1619
1620 let root_page = self.page_manager.header().version_data_root_page;
1621 if root_page == 0 {
1622 return Ok(());
1623 }
1624
1625 let mut current_page_id = root_page;
1626 loop {
1627 let page_buf = self.page_manager.read_page(PageId(current_page_id))?;
1628 let header = DataPageHeader::read_from(&page_buf);
1629
1630 let entries = read_records_from_page(&page_buf);
1631 for (off, len) in &entries {
1632 if let Some((entity_id, version_seq, record, _)) =
1633 deserialize_version_record(&page_buf[*off..*off + *len])
1634 {
1635 self.version_store
1636 .insert_loaded_record(entity_id, version_seq, record);
1637 }
1638 }
1639
1640 if header.next_page == 0 {
1641 self.current_version_data_page = Some((current_page_id, page_buf));
1642 break;
1643 }
1644 current_page_id = header.next_page;
1645 }
1646
1647 Ok(())
1648 }
1649
1650 fn persist_version(
1652 &mut self,
1653 entity_id: u64,
1654 version_seq: u64,
1655 record: &version::VersionRecord,
1656 ) -> Result<()> {
1657 use page::record_serialization::{
1658 pack_record_into_page, serialize_version_record, DataPageHeader,
1659 };
1660 use page::PageType;
1661
1662 let record_bytes = serialize_version_record(entity_id, version_seq, record);
1663
1664 if let Some((page_id, ref mut page_buf)) = self.current_version_data_page {
1666 if pack_record_into_page(page_buf, &record_bytes) {
1667 let db_size = self.page_manager.header().page_count;
1668 self.wal_writer
1669 .write_frame(PageId(page_id), db_size, page_buf)?;
1670 self.wal_writer.commit()?;
1671 return Ok(());
1672 }
1673 }
1674
1675 let new_page_id = self.page_manager.allocate_page()?;
1677 let mut new_page = [0u8; PAGE_SIZE];
1678 let header = DataPageHeader::new(PageType::VersionData as u8);
1679 header.write_to(&mut new_page);
1680
1681 if let Some((old_page_id, ref mut old_buf)) = self.current_version_data_page {
1683 let mut old_header = DataPageHeader::read_from(old_buf);
1684 old_header.next_page = new_page_id.0;
1685 old_header.write_to(old_buf);
1686 let db_size = self.page_manager.header().page_count;
1687 self.wal_writer
1688 .write_frame(PageId(old_page_id), db_size, old_buf)?;
1689 self.wal_writer.commit()?;
1690 }
1691
1692 let packed = pack_record_into_page(&mut new_page, &record_bytes);
1693 debug_assert!(packed, "fresh page should always have space for a record");
1694
1695 let db_size = self.page_manager.header().page_count;
1696 self.wal_writer
1697 .write_frame(new_page_id, db_size, &new_page)?;
1698 self.wal_writer.commit()?;
1699
1700 if self.page_manager.header().version_data_root_page == 0 {
1701 self.page_manager.header_mut().version_data_root_page = new_page_id.0;
1702 self.page_manager.flush_header()?;
1703 }
1704
1705 self.current_version_data_page = Some((new_page_id.0, new_page));
1706
1707 Ok(())
1708 }
1709}
1710
1711impl LabelRegistry for StorageEngine {
1712 fn get_or_create_label(&mut self, name: &str) -> u32 {
1713 self.catalog.get_or_create_label(name)
1714 }
1715
1716 fn label_id(&self, name: &str) -> Option<u32> {
1717 self.catalog.label_id(name)
1718 }
1719
1720 fn label_name(&self, id: u32) -> Option<&str> {
1721 self.catalog.label_name(id)
1722 }
1723
1724 fn get_or_create_rel_type(&mut self, name: &str) -> u32 {
1725 self.catalog.get_or_create_rel_type(name)
1726 }
1727
1728 fn rel_type_id(&self, name: &str) -> Option<u32> {
1729 self.catalog.rel_type_id(name)
1730 }
1731
1732 fn rel_type_name(&self, id: u32) -> Option<&str> {
1733 self.catalog.rel_type_name(id)
1734 }
1735
1736 fn get_or_create_prop_key(&mut self, name: &str) -> u32 {
1737 self.catalog.get_or_create_prop_key(name)
1738 }
1739
1740 fn prop_key_id(&self, name: &str) -> Option<u32> {
1741 self.catalog.prop_key_id(name)
1742 }
1743
1744 fn prop_key_name(&self, id: u32) -> Option<&str> {
1745 self.catalog.prop_key_name(id)
1746 }
1747}
1748
1749impl Drop for StorageEngine {
1750 fn drop(&mut self) {
1751 let _ = self.save_catalog();
1753 let _ = self.page_manager.flush_header();
1756 if self.checkpoint().is_ok() {
1759 let _ = std::fs::remove_file(self.config.wal_path());
1760 }
1761 let _ = std::fs::remove_file(self.config.path.with_extension("cyl-lock"));
1764 }
1765}
1766
1767#[cfg(test)]
1768mod tests {
1769 use super::*;
1770 use cypherlite_core::{CypherLiteError, SyncMode};
1771 use tempfile::tempdir;
1772
1773 fn test_engine(dir: &std::path::Path) -> StorageEngine {
1774 let config = DatabaseConfig {
1775 path: dir.join("test.cyl"),
1776 wal_sync_mode: SyncMode::Normal,
1777 ..Default::default()
1778 };
1779 StorageEngine::open(config).expect("open")
1780 }
1781
1782 #[test]
1783 fn test_open_creates_database() {
1784 let dir = tempdir().expect("tempdir");
1785 let engine = test_engine(dir.path());
1786 assert_eq!(engine.node_count(), 0);
1787 assert_eq!(engine.edge_count(), 0);
1788 }
1789
1790 #[test]
1791 fn test_create_and_get_node() {
1792 let dir = tempdir().expect("tempdir");
1793 let mut engine = test_engine(dir.path());
1794 let id = engine.create_node(vec![1, 2], vec![(1, PropertyValue::String("Alice".into()))]);
1795 let node = engine.get_node(id).expect("found");
1796 assert_eq!(node.node_id, id);
1797 assert_eq!(node.labels, vec![1, 2]);
1798 assert_eq!(engine.node_count(), 1);
1799 }
1800
1801 #[test]
1802 fn test_update_node() {
1803 let dir = tempdir().expect("tempdir");
1804 let mut engine = test_engine(dir.path());
1805 let id = engine.create_node(vec![], vec![(1, PropertyValue::Int64(10))]);
1806 engine
1807 .update_node(id, vec![(1, PropertyValue::Int64(20))])
1808 .expect("update");
1809 let node = engine.get_node(id).expect("found");
1810 assert_eq!(node.properties[0].1, PropertyValue::Int64(20));
1811 }
1812
1813 #[test]
1814 fn test_delete_node() {
1815 let dir = tempdir().expect("tempdir");
1816 let mut engine = test_engine(dir.path());
1817 let id = engine.create_node(vec![], vec![]);
1818 engine.delete_node(id).expect("delete");
1819 assert!(engine.get_node(id).is_none());
1820 assert_eq!(engine.node_count(), 0);
1821 }
1822
1823 #[test]
1824 fn test_create_and_get_edge() {
1825 let dir = tempdir().expect("tempdir");
1826 let mut engine = test_engine(dir.path());
1827 let n1 = engine.create_node(vec![], vec![]);
1828 let n2 = engine.create_node(vec![], vec![]);
1829 let e = engine.create_edge(n1, n2, 1, vec![]).expect("edge");
1830 let edge = engine.get_edge(e).expect("found");
1831 assert_eq!(edge.start_node, n1);
1832 assert_eq!(edge.end_node, n2);
1833 }
1834
1835 #[test]
1836 fn test_get_edges_for_node() {
1837 let dir = tempdir().expect("tempdir");
1838 let mut engine = test_engine(dir.path());
1839 let n1 = engine.create_node(vec![], vec![]);
1840 let n2 = engine.create_node(vec![], vec![]);
1841 let n3 = engine.create_node(vec![], vec![]);
1842 engine.create_edge(n1, n2, 1, vec![]).expect("e1");
1843 engine.create_edge(n1, n3, 2, vec![]).expect("e2");
1844 let edges = engine.get_edges_for_node(n1);
1845 assert_eq!(edges.len(), 2);
1846 }
1847
1848 #[test]
1849 fn test_delete_edge() {
1850 let dir = tempdir().expect("tempdir");
1851 let mut engine = test_engine(dir.path());
1852 let n1 = engine.create_node(vec![], vec![]);
1853 let n2 = engine.create_node(vec![], vec![]);
1854 let e = engine.create_edge(n1, n2, 1, vec![]).expect("edge");
1855 engine.delete_edge(e).expect("delete");
1856 assert!(engine.get_edge(e).is_none());
1857 }
1858
1859 #[test]
1861 fn test_delete_node_cascades_edges() {
1862 let dir = tempdir().expect("tempdir");
1863 let mut engine = test_engine(dir.path());
1864 let n1 = engine.create_node(vec![], vec![]);
1865 let n2 = engine.create_node(vec![], vec![]);
1866 let e = engine.create_edge(n1, n2, 1, vec![]).expect("edge");
1867 engine.delete_node(n1).expect("delete");
1868 assert!(engine.get_edge(e).is_none());
1869 assert_eq!(engine.edge_count(), 0);
1870 }
1871
1872 #[test]
1873 fn test_begin_read_transaction() {
1874 let dir = tempdir().expect("tempdir");
1875 let engine = test_engine(dir.path());
1876 let tx = engine.begin_read();
1877 assert_eq!(tx.tx_id(), 1);
1878 }
1879
1880 #[test]
1881 fn test_begin_write_transaction() {
1882 let dir = tempdir().expect("tempdir");
1883 let engine = test_engine(dir.path());
1884 let tx = engine.begin_write().expect("write");
1885 assert_eq!(tx.tx_id(), 1);
1886 }
1887
1888 #[test]
1890 fn test_write_conflict() {
1891 let dir = tempdir().expect("tempdir");
1892 let engine = test_engine(dir.path());
1893 let _w1 = engine.begin_write().expect("w1");
1894 let result = engine.begin_write();
1895 assert!(matches!(result, Err(CypherLiteError::TransactionConflict)));
1896 }
1897
1898 #[test]
1899 fn test_wal_write_and_commit() {
1900 let dir = tempdir().expect("tempdir");
1901 let mut engine = test_engine(dir.path());
1902 let data = [0xAB; PAGE_SIZE];
1903 engine.wal_write_page(PageId(2), &data).expect("write");
1904 let frame = engine.wal_commit().expect("commit");
1905 assert!(frame > 0);
1906 }
1907
1908 #[test]
1909 fn test_checkpoint() {
1910 let dir = tempdir().expect("tempdir");
1911 let mut engine = test_engine(dir.path());
1912 let data = [0xAB; PAGE_SIZE];
1913 engine.wal_write_page(PageId(2), &data).expect("write");
1914 engine.wal_commit().expect("commit");
1915 let count = engine.checkpoint().expect("checkpoint");
1916 assert_eq!(count, 1);
1917 }
1918
1919 #[test]
1920 fn test_reopen_database() {
1921 let dir = tempdir().expect("tempdir");
1922 let config = DatabaseConfig {
1923 path: dir.path().join("test.cyl"),
1924 wal_sync_mode: SyncMode::Normal,
1925 ..Default::default()
1926 };
1927
1928 {
1930 let mut engine = StorageEngine::open(config.clone()).expect("open");
1931 engine.create_node(vec![1], vec![(1, PropertyValue::String("Alice".into()))]);
1932 }
1933
1934 {
1936 let engine = StorageEngine::open(config).expect("reopen");
1937 assert_eq!(engine.node_count(), 1);
1938 let node = engine.get_node(NodeId(1)).expect("node should be loaded");
1939 assert_eq!(node.labels, vec![1]);
1940 assert_eq!(
1941 node.properties[0],
1942 (1, PropertyValue::String("Alice".into()))
1943 );
1944 }
1945 }
1946
1947 #[test]
1949 fn test_scan_nodes_empty() {
1950 let dir = tempdir().expect("tempdir");
1951 let engine = test_engine(dir.path());
1952 let nodes = engine.scan_nodes();
1953 assert!(nodes.is_empty());
1954 }
1955
1956 #[test]
1957 fn test_scan_nodes_returns_all() {
1958 let dir = tempdir().expect("tempdir");
1959 let mut engine = test_engine(dir.path());
1960 engine.create_node(vec![1], vec![]);
1961 engine.create_node(vec![2], vec![]);
1962 engine.create_node(vec![3], vec![]);
1963 let nodes = engine.scan_nodes();
1964 assert_eq!(nodes.len(), 3);
1965 }
1966
1967 #[test]
1969 fn test_scan_nodes_by_label_returns_matching() {
1970 let dir = tempdir().expect("tempdir");
1971 let mut engine = test_engine(dir.path());
1972 engine.create_node(vec![1, 2], vec![]);
1973 engine.create_node(vec![2, 3], vec![]);
1974 engine.create_node(vec![3], vec![]);
1975 let nodes = engine.scan_nodes_by_label(2);
1976 assert_eq!(nodes.len(), 2);
1977 }
1978
1979 #[test]
1980 fn test_scan_nodes_by_label_nonexistent() {
1981 let dir = tempdir().expect("tempdir");
1982 let mut engine = test_engine(dir.path());
1983 engine.create_node(vec![1], vec![]);
1984 let nodes = engine.scan_nodes_by_label(999);
1985 assert!(nodes.is_empty());
1986 }
1987
1988 #[test]
1990 fn test_scan_edges_by_type_returns_matching() {
1991 let dir = tempdir().expect("tempdir");
1992 let mut engine = test_engine(dir.path());
1993 let n1 = engine.create_node(vec![], vec![]);
1994 let n2 = engine.create_node(vec![], vec![]);
1995 let n3 = engine.create_node(vec![], vec![]);
1996 engine.create_edge(n1, n2, 1, vec![]).expect("e1");
1997 engine.create_edge(n1, n3, 2, vec![]).expect("e2");
1998 engine.create_edge(n2, n3, 1, vec![]).expect("e3");
1999 let edges = engine.scan_edges_by_type(1);
2000 assert_eq!(edges.len(), 2);
2001 for edge in &edges {
2002 assert_eq!(edge.rel_type_id, 1);
2003 }
2004 }
2005
2006 #[test]
2007 fn test_scan_edges_by_type_empty() {
2008 let dir = tempdir().expect("tempdir");
2009 let engine = test_engine(dir.path());
2010 let edges = engine.scan_edges_by_type(1);
2011 assert!(edges.is_empty());
2012 }
2013
2014 #[test]
2016 fn test_find_node_returns_matching_node() {
2017 let dir = tempdir().expect("tempdir");
2018 let mut engine = test_engine(dir.path());
2019 let label_id = engine.get_or_create_label("Person");
2020 let name_key = engine.get_or_create_prop_key("name");
2021 let nid = engine.create_node(
2022 vec![label_id],
2023 vec![(name_key, PropertyValue::String("Alice".into()))],
2024 );
2025 let found = engine.find_node(
2026 &[label_id],
2027 &[(name_key, PropertyValue::String("Alice".into()))],
2028 );
2029 assert_eq!(found, Some(nid));
2030 }
2031
2032 #[test]
2033 fn test_find_node_returns_none_when_no_match() {
2034 let dir = tempdir().expect("tempdir");
2035 let mut engine = test_engine(dir.path());
2036 let label_id = engine.get_or_create_label("Person");
2037 let name_key = engine.get_or_create_prop_key("name");
2038 engine.create_node(
2039 vec![label_id],
2040 vec![(name_key, PropertyValue::String("Alice".into()))],
2041 );
2042 let found = engine.find_node(
2043 &[label_id],
2044 &[(name_key, PropertyValue::String("Bob".into()))],
2045 );
2046 assert_eq!(found, None);
2047 }
2048
2049 #[test]
2050 fn test_find_node_empty_db() {
2051 let dir = tempdir().expect("tempdir");
2052 let engine = test_engine(dir.path());
2053 let found = engine.find_node(&[0], &[]);
2054 assert_eq!(found, None);
2055 }
2056
2057 #[test]
2058 fn test_find_node_multiple_labels() {
2059 let dir = tempdir().expect("tempdir");
2060 let mut engine = test_engine(dir.path());
2061 let person = engine.get_or_create_label("Person");
2062 let employee = engine.get_or_create_label("Employee");
2063 let name_key = engine.get_or_create_prop_key("name");
2064 let nid = engine.create_node(
2065 vec![person, employee],
2066 vec![(name_key, PropertyValue::String("Alice".into()))],
2067 );
2068 let found = engine.find_node(
2070 &[person, employee],
2071 &[(name_key, PropertyValue::String("Alice".into()))],
2072 );
2073 assert_eq!(found, Some(nid));
2074 let found2 = engine.find_node(
2076 &[person],
2077 &[(name_key, PropertyValue::String("Alice".into()))],
2078 );
2079 assert_eq!(found2, Some(nid));
2080 }
2081
2082 #[test]
2083 fn test_find_node_no_properties() {
2084 let dir = tempdir().expect("tempdir");
2085 let mut engine = test_engine(dir.path());
2086 let label_id = engine.get_or_create_label("Person");
2087 let nid = engine.create_node(vec![label_id], vec![]);
2088 let found = engine.find_node(&[label_id], &[]);
2089 assert_eq!(found, Some(nid));
2090 }
2091
2092 #[test]
2094 fn test_find_edge_returns_matching_edge() {
2095 let dir = tempdir().expect("tempdir");
2096 let mut engine = test_engine(dir.path());
2097 let n1 = engine.create_node(vec![], vec![]);
2098 let n2 = engine.create_node(vec![], vec![]);
2099 let type_id = engine.get_or_create_rel_type("KNOWS");
2100 let eid = engine.create_edge(n1, n2, type_id, vec![]).expect("edge");
2101 let found = engine.find_edge(n1, n2, type_id);
2102 assert_eq!(found, Some(eid));
2103 }
2104
2105 #[test]
2106 fn test_find_edge_returns_none_wrong_type() {
2107 let dir = tempdir().expect("tempdir");
2108 let mut engine = test_engine(dir.path());
2109 let n1 = engine.create_node(vec![], vec![]);
2110 let n2 = engine.create_node(vec![], vec![]);
2111 let knows = engine.get_or_create_rel_type("KNOWS");
2112 let likes = engine.get_or_create_rel_type("LIKES");
2113 engine.create_edge(n1, n2, knows, vec![]).expect("edge");
2114 let found = engine.find_edge(n1, n2, likes);
2115 assert_eq!(found, None);
2116 }
2117
2118 #[test]
2119 fn test_find_edge_returns_none_wrong_endpoints() {
2120 let dir = tempdir().expect("tempdir");
2121 let mut engine = test_engine(dir.path());
2122 let n1 = engine.create_node(vec![], vec![]);
2123 let n2 = engine.create_node(vec![], vec![]);
2124 let n3 = engine.create_node(vec![], vec![]);
2125 let type_id = engine.get_or_create_rel_type("KNOWS");
2126 engine.create_edge(n1, n2, type_id, vec![]).expect("edge");
2127 let found = engine.find_edge(n1, n3, type_id);
2128 assert_eq!(found, None);
2129 }
2130
2131 #[test]
2132 fn test_find_edge_empty_db() {
2133 let dir = tempdir().expect("tempdir");
2134 let mut engine = test_engine(dir.path());
2135 let n1 = engine.create_node(vec![], vec![]);
2136 let n2 = engine.create_node(vec![], vec![]);
2137 let found = engine.find_edge(n1, n2, 0);
2138 assert_eq!(found, None);
2139 }
2140
2141 #[test]
2146 fn test_create_node_updates_index() {
2147 let dir = tempdir().expect("tempdir");
2148 let mut engine = test_engine(dir.path());
2149 let label_id = engine.get_or_create_label("Person");
2150 let name_key = engine.get_or_create_prop_key("name");
2151
2152 engine
2154 .index_manager_mut()
2155 .create_index("idx_person_name".to_string(), label_id, name_key)
2156 .expect("create index");
2157
2158 let nid = engine.create_node(
2159 vec![label_id],
2160 vec![(name_key, PropertyValue::String("Alice".into()))],
2161 );
2162
2163 let result = engine
2165 .index_manager()
2166 .find_index(label_id, name_key)
2167 .expect("index exists")
2168 .lookup(&PropertyValue::String("Alice".into()));
2169 assert_eq!(result, vec![nid]);
2170 }
2171
2172 #[test]
2173 fn test_update_node_updates_index() {
2174 let dir = tempdir().expect("tempdir");
2175 let mut engine = test_engine(dir.path());
2176 let label_id = engine.get_or_create_label("Person");
2177 let name_key = engine.get_or_create_prop_key("name");
2178
2179 engine
2180 .index_manager_mut()
2181 .create_index("idx_person_name".to_string(), label_id, name_key)
2182 .expect("create index");
2183
2184 let nid = engine.create_node(
2185 vec![label_id],
2186 vec![(name_key, PropertyValue::String("Alice".into()))],
2187 );
2188
2189 engine
2191 .update_node(nid, vec![(name_key, PropertyValue::String("Bob".into()))])
2192 .expect("update");
2193
2194 let idx = engine
2195 .index_manager()
2196 .find_index(label_id, name_key)
2197 .expect("idx");
2198 assert!(idx
2200 .lookup(&PropertyValue::String("Alice".into()))
2201 .is_empty());
2202 assert_eq!(idx.lookup(&PropertyValue::String("Bob".into())), vec![nid]);
2204 }
2205
2206 #[test]
2207 fn test_delete_node_removes_from_index() {
2208 let dir = tempdir().expect("tempdir");
2209 let mut engine = test_engine(dir.path());
2210 let label_id = engine.get_or_create_label("Person");
2211 let name_key = engine.get_or_create_prop_key("name");
2212
2213 engine
2214 .index_manager_mut()
2215 .create_index("idx_person_name".to_string(), label_id, name_key)
2216 .expect("create index");
2217
2218 let nid = engine.create_node(
2219 vec![label_id],
2220 vec![(name_key, PropertyValue::String("Alice".into()))],
2221 );
2222
2223 engine.delete_node(nid).expect("delete");
2224
2225 let idx = engine
2226 .index_manager()
2227 .find_index(label_id, name_key)
2228 .expect("idx");
2229 assert!(idx
2230 .lookup(&PropertyValue::String("Alice".into()))
2231 .is_empty());
2232 }
2233
2234 #[test]
2239 fn test_scan_nodes_by_property_with_index() {
2240 let dir = tempdir().expect("tempdir");
2241 let mut engine = test_engine(dir.path());
2242 let label_id = engine.get_or_create_label("Person");
2243 let name_key = engine.get_or_create_prop_key("name");
2244
2245 engine
2246 .index_manager_mut()
2247 .create_index("idx_person_name".to_string(), label_id, name_key)
2248 .expect("create index");
2249
2250 let n1 = engine.create_node(
2251 vec![label_id],
2252 vec![(name_key, PropertyValue::String("Alice".into()))],
2253 );
2254 engine.create_node(
2255 vec![label_id],
2256 vec![(name_key, PropertyValue::String("Bob".into()))],
2257 );
2258
2259 let result = engine.scan_nodes_by_property(
2260 label_id,
2261 name_key,
2262 &PropertyValue::String("Alice".into()),
2263 );
2264 assert_eq!(result, vec![n1]);
2265 }
2266
2267 #[test]
2268 fn test_scan_nodes_by_property_without_index() {
2269 let dir = tempdir().expect("tempdir");
2270 let mut engine = test_engine(dir.path());
2271 let label_id = engine.get_or_create_label("Person");
2272 let name_key = engine.get_or_create_prop_key("name");
2273
2274 let n1 = engine.create_node(
2276 vec![label_id],
2277 vec![(name_key, PropertyValue::String("Alice".into()))],
2278 );
2279 engine.create_node(
2280 vec![label_id],
2281 vec![(name_key, PropertyValue::String("Bob".into()))],
2282 );
2283
2284 let result = engine.scan_nodes_by_property(
2285 label_id,
2286 name_key,
2287 &PropertyValue::String("Alice".into()),
2288 );
2289 assert_eq!(result, vec![n1]);
2290 }
2291
2292 #[test]
2293 fn test_scan_nodes_by_property_both_paths_agree() {
2294 let dir = tempdir().expect("tempdir");
2295 let mut engine = test_engine(dir.path());
2296 let label_id = engine.get_or_create_label("Person");
2297 let name_key = engine.get_or_create_prop_key("name");
2298
2299 engine.create_node(
2301 vec![label_id],
2302 vec![(name_key, PropertyValue::String("Alice".into()))],
2303 );
2304 engine.create_node(
2305 vec![label_id],
2306 vec![(name_key, PropertyValue::String("Bob".into()))],
2307 );
2308 engine.create_node(
2309 vec![label_id],
2310 vec![(name_key, PropertyValue::String("Alice".into()))],
2311 );
2312
2313 let without_idx = engine.scan_nodes_by_property(
2315 label_id,
2316 name_key,
2317 &PropertyValue::String("Alice".into()),
2318 );
2319
2320 engine
2322 .index_manager_mut()
2323 .create_index("idx".to_string(), label_id, name_key)
2324 .expect("create");
2325 let nodes: Vec<_> = engine
2327 .scan_nodes_by_label(label_id)
2328 .iter()
2329 .map(|n| (n.node_id, n.properties.clone()))
2330 .collect();
2331 for (nid, props) in &nodes {
2332 for (pk, v) in props {
2333 if *pk == name_key {
2334 engine
2335 .index_manager_mut()
2336 .find_index_mut(label_id, name_key)
2337 .expect("idx")
2338 .insert(v, *nid);
2339 }
2340 }
2341 }
2342
2343 let with_idx = engine.scan_nodes_by_property(
2344 label_id,
2345 name_key,
2346 &PropertyValue::String("Alice".into()),
2347 );
2348
2349 let mut a: Vec<u64> = without_idx.iter().map(|n| n.0).collect();
2351 let mut b: Vec<u64> = with_idx.iter().map(|n| n.0).collect();
2352 a.sort();
2353 b.sort();
2354 assert_eq!(a, b);
2355 }
2356
2357 #[test]
2362 fn test_scan_nodes_by_range_with_index() {
2363 let dir = tempdir().expect("tempdir");
2364 let mut engine = test_engine(dir.path());
2365 let label_id = engine.get_or_create_label("Person");
2366 let age_key = engine.get_or_create_prop_key("age");
2367
2368 engine
2369 .index_manager_mut()
2370 .create_index("idx_person_age".to_string(), label_id, age_key)
2371 .expect("create index");
2372
2373 for age in [20, 25, 30, 35, 40] {
2374 engine.create_node(vec![label_id], vec![(age_key, PropertyValue::Int64(age))]);
2375 }
2376
2377 let result = engine.scan_nodes_by_range(
2378 label_id,
2379 age_key,
2380 &PropertyValue::Int64(25),
2381 &PropertyValue::Int64(35),
2382 );
2383 assert_eq!(result.len(), 3); }
2385
2386 #[test]
2387 fn test_scan_nodes_by_range_without_index() {
2388 let dir = tempdir().expect("tempdir");
2389 let mut engine = test_engine(dir.path());
2390 let label_id = engine.get_or_create_label("Person");
2391 let age_key = engine.get_or_create_prop_key("age");
2392
2393 for age in [20, 25, 30, 35, 40] {
2394 engine.create_node(vec![label_id], vec![(age_key, PropertyValue::Int64(age))]);
2395 }
2396
2397 let result = engine.scan_nodes_by_range(
2398 label_id,
2399 age_key,
2400 &PropertyValue::Int64(25),
2401 &PropertyValue::Int64(35),
2402 );
2403 assert_eq!(result.len(), 3);
2404 }
2405
2406 #[test]
2408 fn test_storage_engine_has_catalog() {
2409 let dir = tempdir().expect("tempdir");
2410 let engine = test_engine(dir.path());
2411 let cat = engine.catalog();
2412 assert_eq!(cat.label_id("Person"), None);
2414 }
2415
2416 #[test]
2418 fn test_storage_engine_catalog_mut() {
2419 let dir = tempdir().expect("tempdir");
2420 let mut engine = test_engine(dir.path());
2421 let cat = engine.catalog_mut();
2422 let id = cat.get_or_create_label("Person");
2423 assert_eq!(engine.catalog().label_id("Person"), Some(id));
2424 }
2425
2426 #[test]
2428 fn test_storage_engine_label_registry() {
2429 use cypherlite_core::LabelRegistry;
2430
2431 let dir = tempdir().expect("tempdir");
2432 let mut engine = test_engine(dir.path());
2433
2434 let label_id = engine.get_or_create_label("Person");
2435 assert_eq!(engine.label_id("Person"), Some(label_id));
2436 assert_eq!(engine.label_name(label_id), Some("Person"));
2437
2438 let rel_id = engine.get_or_create_rel_type("KNOWS");
2439 assert_eq!(engine.rel_type_id("KNOWS"), Some(rel_id));
2440 assert_eq!(engine.rel_type_name(rel_id), Some("KNOWS"));
2441
2442 let prop_id = engine.get_or_create_prop_key("name");
2443 assert_eq!(engine.prop_key_id("name"), Some(prop_id));
2444 assert_eq!(engine.prop_key_name(prop_id), Some("name"));
2445 }
2446
2447 #[cfg(feature = "subgraph")]
2452 mod subgraph_engine_tests {
2453 use super::*;
2454 use cypherlite_core::SubgraphId;
2455
2456 fn test_engine_sg(dir: &std::path::Path) -> StorageEngine {
2457 let config = DatabaseConfig {
2458 path: dir.join("test.cyl"),
2459 wal_sync_mode: SyncMode::Normal,
2460 ..Default::default()
2461 };
2462 StorageEngine::open(config).expect("open")
2463 }
2464
2465 #[test]
2467 fn test_engine_create_subgraph() {
2468 let dir = tempdir().expect("tempdir");
2469 let mut engine = test_engine_sg(dir.path());
2470 let id = engine.create_subgraph(vec![], None);
2471 assert_eq!(id, SubgraphId(1));
2472 }
2473
2474 #[test]
2476 fn test_engine_get_subgraph() {
2477 let dir = tempdir().expect("tempdir");
2478 let mut engine = test_engine_sg(dir.path());
2479 let id = engine
2480 .create_subgraph(vec![(1, PropertyValue::String("test".into()))], Some(1_000));
2481 let record = engine.get_subgraph(id).expect("found");
2482 assert_eq!(record.subgraph_id, id);
2483 assert_eq!(record.temporal_anchor, Some(1_000));
2484 }
2485
2486 #[test]
2488 fn test_engine_get_nonexistent_subgraph() {
2489 let dir = tempdir().expect("tempdir");
2490 let engine = test_engine_sg(dir.path());
2491 assert!(engine.get_subgraph(SubgraphId(999)).is_none());
2492 }
2493
2494 #[test]
2496 fn test_engine_delete_subgraph() {
2497 let dir = tempdir().expect("tempdir");
2498 let mut engine = test_engine_sg(dir.path());
2499 let id = engine.create_subgraph(vec![], None);
2500 engine.delete_subgraph(id).expect("delete");
2501 assert!(engine.get_subgraph(id).is_none());
2502 }
2503
2504 #[test]
2506 fn test_engine_delete_nonexistent_subgraph() {
2507 let dir = tempdir().expect("tempdir");
2508 let mut engine = test_engine_sg(dir.path());
2509 let result = engine.delete_subgraph(SubgraphId(999));
2510 assert!(result.is_err());
2511 }
2512
2513 #[test]
2515 fn test_engine_add_member() {
2516 let dir = tempdir().expect("tempdir");
2517 let mut engine = test_engine_sg(dir.path());
2518 let sg = engine.create_subgraph(vec![], None);
2519 let n1 = engine.create_node(vec![], vec![]);
2520 engine.add_member(sg, n1).expect("add member");
2521 let members = engine.list_members(sg);
2522 assert_eq!(members.len(), 1);
2523 assert_eq!(members[0], n1);
2524 }
2525
2526 #[test]
2528 fn test_engine_add_member_subgraph_not_found() {
2529 let dir = tempdir().expect("tempdir");
2530 let mut engine = test_engine_sg(dir.path());
2531 let n1 = engine.create_node(vec![], vec![]);
2532 let result = engine.add_member(SubgraphId(999), n1);
2533 assert!(result.is_err());
2534 }
2535
2536 #[test]
2538 fn test_engine_add_member_node_not_found() {
2539 let dir = tempdir().expect("tempdir");
2540 let mut engine = test_engine_sg(dir.path());
2541 let sg = engine.create_subgraph(vec![], None);
2542 let result = engine.add_member(sg, NodeId(999));
2543 assert!(result.is_err());
2544 }
2545
2546 #[test]
2548 fn test_engine_remove_member() {
2549 let dir = tempdir().expect("tempdir");
2550 let mut engine = test_engine_sg(dir.path());
2551 let sg = engine.create_subgraph(vec![], None);
2552 let n1 = engine.create_node(vec![], vec![]);
2553 engine.add_member(sg, n1).expect("add");
2554 engine.remove_member(sg, n1).expect("remove");
2555 assert!(engine.list_members(sg).is_empty());
2556 }
2557
2558 #[test]
2560 fn test_engine_remove_member_subgraph_not_found() {
2561 let dir = tempdir().expect("tempdir");
2562 let mut engine = test_engine_sg(dir.path());
2563 let n1 = engine.create_node(vec![], vec![]);
2564 let result = engine.remove_member(SubgraphId(999), n1);
2565 assert!(result.is_err());
2566 }
2567
2568 #[test]
2570 fn test_engine_list_members_empty() {
2571 let dir = tempdir().expect("tempdir");
2572 let mut engine = test_engine_sg(dir.path());
2573 let sg = engine.create_subgraph(vec![], None);
2574 assert!(engine.list_members(sg).is_empty());
2575 }
2576
2577 #[test]
2579 fn test_engine_get_subgraph_memberships() {
2580 let dir = tempdir().expect("tempdir");
2581 let mut engine = test_engine_sg(dir.path());
2582 let sg1 = engine.create_subgraph(vec![], None);
2583 let sg2 = engine.create_subgraph(vec![], None);
2584 let n1 = engine.create_node(vec![], vec![]);
2585 engine.add_member(sg1, n1).expect("add1");
2586 engine.add_member(sg2, n1).expect("add2");
2587 let memberships = engine.get_subgraph_memberships(n1);
2588 assert_eq!(memberships.len(), 2);
2589 assert!(memberships.contains(&sg1));
2590 assert!(memberships.contains(&sg2));
2591 }
2592
2593 #[test]
2595 fn test_engine_delete_subgraph_cascades_memberships() {
2596 let dir = tempdir().expect("tempdir");
2597 let mut engine = test_engine_sg(dir.path());
2598 let sg = engine.create_subgraph(vec![], None);
2599 let n1 = engine.create_node(vec![], vec![]);
2600 let n2 = engine.create_node(vec![], vec![]);
2601 engine.add_member(sg, n1).expect("add1");
2602 engine.add_member(sg, n2).expect("add2");
2603 engine.delete_subgraph(sg).expect("delete");
2604 assert!(engine.get_subgraph_memberships(n1).is_empty());
2606 assert!(engine.get_subgraph_memberships(n2).is_empty());
2607 }
2608 }
2609
2610 #[cfg(feature = "hypergraph")]
2615 mod hypergraph_engine_tests {
2616 use super::*;
2617 use cypherlite_core::{GraphEntity, HyperEdgeId};
2618
2619 fn test_engine_hg(dir: &std::path::Path) -> StorageEngine {
2620 let config = DatabaseConfig {
2621 path: dir.join("test.cyl"),
2622 wal_sync_mode: SyncMode::Normal,
2623 ..Default::default()
2624 };
2625 StorageEngine::open(config).expect("open")
2626 }
2627
2628 #[test]
2630 fn test_storage_engine_create_hyperedge() {
2631 let dir = tempdir().expect("tempdir");
2632 let mut engine = test_engine_hg(dir.path());
2633 let n1 = engine.create_node(vec![], vec![]);
2634 let n2 = engine.create_node(vec![], vec![]);
2635 let he = engine.create_hyperedge(
2636 1,
2637 vec![GraphEntity::Node(n1)],
2638 vec![GraphEntity::Node(n2)],
2639 vec![],
2640 );
2641 assert_eq!(he, HyperEdgeId(1));
2642 }
2643
2644 #[test]
2646 fn test_storage_engine_get_hyperedge() {
2647 let dir = tempdir().expect("tempdir");
2648 let mut engine = test_engine_hg(dir.path());
2649 let n1 = engine.create_node(vec![], vec![]);
2650 let n2 = engine.create_node(vec![], vec![]);
2651 let he = engine.create_hyperedge(
2652 5,
2653 vec![GraphEntity::Node(n1)],
2654 vec![GraphEntity::Node(n2)],
2655 vec![(1, PropertyValue::Int64(42))],
2656 );
2657 let record = engine.get_hyperedge(he).expect("found");
2658 assert_eq!(record.id, he);
2659 assert_eq!(record.rel_type_id, 5);
2660 assert_eq!(record.sources.len(), 1);
2661 assert_eq!(record.targets.len(), 1);
2662 assert_eq!(record.properties.len(), 1);
2663 }
2664
2665 #[test]
2667 fn test_storage_engine_get_nonexistent_hyperedge() {
2668 let dir = tempdir().expect("tempdir");
2669 let engine = test_engine_hg(dir.path());
2670 assert!(engine.get_hyperedge(HyperEdgeId(999)).is_none());
2671 }
2672
2673 #[test]
2675 fn test_storage_engine_delete_hyperedge() {
2676 let dir = tempdir().expect("tempdir");
2677 let mut engine = test_engine_hg(dir.path());
2678 let he = engine.create_hyperedge(1, vec![], vec![], vec![]);
2679 engine.delete_hyperedge(he).expect("delete");
2680 assert!(engine.get_hyperedge(he).is_none());
2681 }
2682
2683 #[test]
2685 fn test_storage_engine_delete_nonexistent_hyperedge() {
2686 let dir = tempdir().expect("tempdir");
2687 let mut engine = test_engine_hg(dir.path());
2688 let result = engine.delete_hyperedge(HyperEdgeId(999));
2689 assert!(result.is_err());
2690 }
2691
2692 #[test]
2694 fn test_storage_engine_reverse_index_update() {
2695 let dir = tempdir().expect("tempdir");
2696 let mut engine = test_engine_hg(dir.path());
2697 let n1 = engine.create_node(vec![], vec![]);
2698 let n2 = engine.create_node(vec![], vec![]);
2699 let he = engine.create_hyperedge(
2700 1,
2701 vec![GraphEntity::Node(n1)],
2702 vec![GraphEntity::Node(n2)],
2703 vec![],
2704 );
2705 let hes = engine.hyperedges_for_entity(n1.0);
2707 assert_eq!(hes.len(), 1);
2708 assert_eq!(hes[0], he.0);
2709 let hes = engine.hyperedges_for_entity(n2.0);
2711 assert_eq!(hes.len(), 1);
2712 engine.delete_hyperedge(he).expect("delete");
2714 assert!(engine.hyperedges_for_entity(n1.0).is_empty());
2715 assert!(engine.hyperedges_for_entity(n2.0).is_empty());
2716 }
2717
2718 #[test]
2720 fn test_storage_engine_scan_hyperedges() {
2721 let dir = tempdir().expect("tempdir");
2722 let mut engine = test_engine_hg(dir.path());
2723 engine.create_hyperedge(1, vec![], vec![], vec![]);
2724 engine.create_hyperedge(2, vec![], vec![], vec![]);
2725 let all = engine.scan_hyperedges();
2726 assert_eq!(all.len(), 2);
2727 }
2728
2729 #[test]
2731 fn test_storage_engine_header_sync() {
2732 let dir = tempdir().expect("tempdir");
2733 let mut engine = test_engine_hg(dir.path());
2734 engine.create_hyperedge(1, vec![], vec![], vec![]);
2735 engine.create_hyperedge(2, vec![], vec![], vec![]);
2736 assert_eq!(engine.page_manager.header().next_hyperedge_id, 3);
2738 }
2739 }
2740
2741 #[test]
2747 fn test_second_open_returns_database_locked() {
2748 let dir = tempdir().expect("tempdir");
2749 let db_path = dir.path().join("lock_test.cyl");
2750 let config1 = DatabaseConfig {
2751 path: db_path.clone(),
2752 wal_sync_mode: SyncMode::Normal,
2753 ..Default::default()
2754 };
2755 let _engine1 = StorageEngine::open(config1).expect("first open should succeed");
2756
2757 let config2 = DatabaseConfig {
2758 path: db_path.clone(),
2759 wal_sync_mode: SyncMode::Normal,
2760 ..Default::default()
2761 };
2762 let result = StorageEngine::open(config2);
2763 match result {
2764 Err(CypherLiteError::DatabaseLocked(ref msg)) => {
2765 assert!(
2766 msg.contains("lock_test.cyl"),
2767 "error should contain file path: {msg}"
2768 );
2769 }
2770 Err(other) => panic!("expected DatabaseLocked, got: {other}"),
2771 Ok(_) => panic!("expected DatabaseLocked error, but open succeeded"),
2772 }
2773 }
2774
2775 #[test]
2778 fn test_drop_releases_lock_then_reopen_succeeds() {
2779 let dir = tempdir().expect("tempdir");
2780 let db_path = dir.path().join("drop_test.cyl");
2781 let config = DatabaseConfig {
2782 path: db_path.clone(),
2783 wal_sync_mode: SyncMode::Normal,
2784 ..Default::default()
2785 };
2786
2787 {
2789 let _engine = StorageEngine::open(config.clone()).expect("first open");
2790 }
2791 let _engine2 = StorageEngine::open(config).expect("reopen after drop should succeed");
2793 }
2794
2795 #[test]
2800 fn test_create_node_sets_node_data_root_page() {
2801 let dir = tempdir().expect("tempdir");
2802 let mut engine = test_engine(dir.path());
2803 assert_eq!(engine.node_data_root_page(), 0);
2805 engine.create_node(vec![1], vec![(1, PropertyValue::String("Alice".into()))]);
2806 assert_ne!(engine.node_data_root_page(), 0);
2808 }
2809
2810 #[test]
2811 fn test_create_node_data_page_contains_record() {
2812 let dir = tempdir().expect("tempdir");
2813 let mut engine = test_engine(dir.path());
2814 let id = engine.create_node(vec![1, 2], vec![(1, PropertyValue::String("Alice".into()))]);
2815 let page_id = engine.node_data_root_page();
2817 assert_ne!(page_id, 0);
2818 let page = engine.read_data_page(page_id).expect("read page");
2819 let entries = page::record_serialization::read_records_from_page(&page);
2820 assert_eq!(entries.len(), 1);
2821 let (off, len) = entries[0];
2823 let (record, deleted, _) =
2824 page::record_serialization::deserialize_node_record(&page[off..off + len])
2825 .expect("deserialize");
2826 assert_eq!(record.node_id, id);
2827 assert_eq!(record.labels, vec![1, 2]);
2828 assert!(!deleted);
2829 }
2830
2831 #[test]
2832 fn test_create_multiple_nodes_all_persisted() {
2833 let dir = tempdir().expect("tempdir");
2834 let mut engine = test_engine(dir.path());
2835 let mut ids = vec![];
2836 for i in 0..5u64 {
2837 let id = engine.create_node(vec![1], vec![(1, PropertyValue::Int64(i as i64))]);
2838 ids.push(id);
2839 }
2840 let page_id = engine.node_data_root_page();
2842 let page = engine.read_data_page(page_id).expect("read page");
2843 let entries = page::record_serialization::read_records_from_page(&page);
2844 assert_eq!(entries.len(), 5);
2845 }
2846
2847 #[test]
2852 fn test_create_edge_sets_edge_data_root_page() {
2853 let dir = tempdir().expect("tempdir");
2854 let mut engine = test_engine(dir.path());
2855 assert_eq!(engine.edge_data_root_page(), 0);
2856 let n1 = engine.create_node(vec![], vec![]);
2857 let n2 = engine.create_node(vec![], vec![]);
2858 engine.create_edge(n1, n2, 1, vec![]).expect("edge");
2859 assert_ne!(engine.edge_data_root_page(), 0);
2860 }
2861
2862 #[test]
2863 fn test_create_edge_data_page_contains_record() {
2864 let dir = tempdir().expect("tempdir");
2865 let mut engine = test_engine(dir.path());
2866 let n1 = engine.create_node(vec![], vec![]);
2867 let n2 = engine.create_node(vec![], vec![]);
2868 let eid = engine
2869 .create_edge(n1, n2, 5, vec![(1, PropertyValue::Int64(42))])
2870 .expect("edge");
2871 let page_id = engine.edge_data_root_page();
2872 let page = engine.read_data_page(page_id).expect("read page");
2873 let entries = page::record_serialization::read_records_from_page(&page);
2874 assert_eq!(entries.len(), 1);
2875 let (off, len) = entries[0];
2876 let (record, deleted, _) =
2877 page::record_serialization::deserialize_edge_record(&page[off..off + len])
2878 .expect("deserialize");
2879 assert_eq!(record.edge_id, eid);
2880 assert_eq!(record.start_node, n1);
2881 assert_eq!(record.end_node, n2);
2882 assert_eq!(record.rel_type_id, 5);
2883 assert!(!deleted);
2884 }
2885
2886 #[test]
2891 fn test_update_node_rewrites_data_page() {
2892 let dir = tempdir().expect("tempdir");
2893 let mut engine = test_engine(dir.path());
2894 let id = engine.create_node(vec![1], vec![(1, PropertyValue::Int64(10))]);
2895 engine
2896 .update_node(id, vec![(1, PropertyValue::Int64(20))])
2897 .expect("update");
2898 let page_id = engine.node_data_root_page();
2900 let page = engine.read_data_page(page_id).expect("read page");
2901 let entries = page::record_serialization::read_records_from_page(&page);
2902 let mut found = false;
2904 for (off, len) in &entries {
2905 let (record, deleted, _) =
2906 page::record_serialization::deserialize_node_record(&page[*off..*off + *len])
2907 .expect("deserialize");
2908 if record.node_id == id && !deleted {
2909 assert_eq!(record.properties[0].1, PropertyValue::Int64(20));
2910 found = true;
2911 }
2912 }
2913 assert!(found, "updated node record should be in data page");
2914 }
2915
2916 #[test]
2921 fn test_delete_node_writes_tombstone() {
2922 let dir = tempdir().expect("tempdir");
2923 let mut engine = test_engine(dir.path());
2924 let id = engine.create_node(vec![1], vec![]);
2925 engine.delete_node(id).expect("delete");
2926 let page_id = engine.node_data_root_page();
2928 let page = engine.read_data_page(page_id).expect("read page");
2929 let entries = page::record_serialization::read_records_from_page(&page);
2930 let mut tombstone_found = false;
2931 for (off, len) in &entries {
2932 let (record, deleted, _) =
2933 page::record_serialization::deserialize_node_record(&page[*off..*off + *len])
2934 .expect("deserialize");
2935 if record.node_id == id && deleted {
2936 tombstone_found = true;
2937 }
2938 }
2939 assert!(
2940 tombstone_found,
2941 "deleted node should have a tombstone record"
2942 );
2943 }
2944
2945 #[test]
2946 fn test_delete_edge_writes_tombstone() {
2947 let dir = tempdir().expect("tempdir");
2948 let mut engine = test_engine(dir.path());
2949 let n1 = engine.create_node(vec![], vec![]);
2950 let n2 = engine.create_node(vec![], vec![]);
2951 let eid = engine.create_edge(n1, n2, 1, vec![]).expect("edge");
2952 engine.delete_edge(eid).expect("delete");
2953 let page_id = engine.edge_data_root_page();
2954 let page = engine.read_data_page(page_id).expect("read page");
2955 let entries = page::record_serialization::read_records_from_page(&page);
2956 let mut tombstone_found = false;
2957 for (off, len) in &entries {
2958 let (record, deleted, _) =
2959 page::record_serialization::deserialize_edge_record(&page[*off..*off + *len])
2960 .expect("deserialize");
2961 if record.edge_id == eid && deleted {
2962 tombstone_found = true;
2963 }
2964 }
2965 assert!(
2966 tombstone_found,
2967 "deleted edge should have a tombstone record"
2968 );
2969 }
2970
2971 #[test]
2976 fn test_create_node_wal_has_committed_frames() {
2977 let dir = tempdir().expect("tempdir");
2978 let mut engine = test_engine(dir.path());
2979 engine.create_node(vec![1], vec![(1, PropertyValue::Int64(42))]);
2980 assert!(
2982 engine.wal_frame_count() > 0,
2983 "WAL should have committed frames"
2984 );
2985 }
2986
2987 #[test]
2988 fn test_page_overflow_allocates_new_page() {
2989 let dir = tempdir().expect("tempdir");
2990 let mut engine = test_engine(dir.path());
2991 for i in 0..200u64 {
2993 let big_str = "x".repeat(100);
2994 engine.create_node(
2995 vec![1, 2, 3],
2996 vec![
2997 (1, PropertyValue::String(big_str)),
2998 (2, PropertyValue::Int64(i as i64)),
2999 ],
3000 );
3001 }
3002 assert!(
3004 engine.node_data_page_count() > 1,
3005 "should use multiple data pages"
3006 );
3007 }
3008
3009 #[test]
3014 fn test_close_reopen_preserves_nodes() {
3015 let dir = tempdir().expect("tempdir");
3016 let db_path = dir.path().join("persist_nodes.cyl");
3017 let config = DatabaseConfig {
3018 path: db_path.clone(),
3019 wal_sync_mode: SyncMode::Normal,
3020 ..Default::default()
3021 };
3022
3023 {
3025 let mut engine = StorageEngine::open(config.clone()).expect("open");
3026 engine.create_node(vec![1, 2], vec![(1, PropertyValue::String("Alice".into()))]);
3027 engine.create_node(vec![3], vec![(2, PropertyValue::Int64(42))]);
3028 engine.create_node(
3029 vec![1],
3030 vec![
3031 (1, PropertyValue::String("Charlie".into())),
3032 (3, PropertyValue::Bool(true)),
3033 ],
3034 );
3035 assert_eq!(engine.node_count(), 3);
3036 }
3038
3039 {
3041 let engine = StorageEngine::open(config).expect("reopen");
3042 assert_eq!(
3043 engine.node_count(),
3044 3,
3045 "all nodes should be loaded from disk"
3046 );
3047
3048 let n1 = engine.get_node(NodeId(1)).expect("node 1 should exist");
3050 assert_eq!(n1.labels, vec![1, 2]);
3051 assert_eq!(n1.properties.len(), 1);
3052 assert_eq!(n1.properties[0], (1, PropertyValue::String("Alice".into())));
3053
3054 let n2 = engine.get_node(NodeId(2)).expect("node 2 should exist");
3056 assert_eq!(n2.labels, vec![3]);
3057 assert_eq!(n2.properties[0], (2, PropertyValue::Int64(42)));
3058
3059 let n3 = engine.get_node(NodeId(3)).expect("node 3 should exist");
3061 assert_eq!(n3.labels, vec![1]);
3062 assert_eq!(n3.properties.len(), 2);
3063 }
3064 }
3065
3066 #[test]
3071 fn test_close_reopen_preserves_edges() {
3072 let dir = tempdir().expect("tempdir");
3073 let db_path = dir.path().join("persist_edges.cyl");
3074 let config = DatabaseConfig {
3075 path: db_path.clone(),
3076 wal_sync_mode: SyncMode::Normal,
3077 ..Default::default()
3078 };
3079
3080 {
3082 let mut engine = StorageEngine::open(config.clone()).expect("open");
3083 let n1 = engine.create_node(vec![1], vec![]);
3084 let n2 = engine.create_node(vec![2], vec![]);
3085 let n3 = engine.create_node(vec![3], vec![]);
3086 engine
3087 .create_edge(n1, n2, 10, vec![(1, PropertyValue::String("since".into()))])
3088 .expect("edge1");
3089 engine.create_edge(n2, n3, 20, vec![]).expect("edge2");
3090 assert_eq!(engine.node_count(), 3);
3091 assert_eq!(engine.edge_count(), 2);
3092 }
3093
3094 {
3096 let engine = StorageEngine::open(config).expect("reopen");
3097 assert_eq!(engine.node_count(), 3, "nodes should persist");
3098 assert_eq!(engine.edge_count(), 2, "edges should persist");
3099
3100 let e1 = engine.get_edge(EdgeId(1)).expect("edge 1 should exist");
3102 assert_eq!(e1.start_node, NodeId(1));
3103 assert_eq!(e1.end_node, NodeId(2));
3104 assert_eq!(e1.rel_type_id, 10);
3105 assert_eq!(e1.properties.len(), 1);
3106 assert_eq!(e1.properties[0], (1, PropertyValue::String("since".into())));
3107
3108 let e2 = engine.get_edge(EdgeId(2)).expect("edge 2 should exist");
3110 assert_eq!(e2.start_node, NodeId(2));
3111 assert_eq!(e2.end_node, NodeId(3));
3112 assert_eq!(e2.rel_type_id, 20);
3113 assert!(e2.properties.is_empty());
3114 }
3115 }
3116
3117 #[test]
3122 fn test_close_reopen_large_dataset() {
3123 let dir = tempdir().expect("tempdir");
3124 let db_path = dir.path().join("persist_large.cyl");
3125 let config = DatabaseConfig {
3126 path: db_path.clone(),
3127 wal_sync_mode: SyncMode::Normal,
3128 ..Default::default()
3129 };
3130
3131 let node_count = 1000;
3132 let edge_count = 500;
3133
3134 {
3136 let mut engine = StorageEngine::open(config.clone()).expect("open");
3137 for i in 0..node_count {
3138 engine.create_node(
3139 vec![(i % 5) as u32],
3140 vec![(1, PropertyValue::Int64(i as i64))],
3141 );
3142 }
3143 for i in 0..edge_count {
3145 let src = NodeId((i + 1) as u64);
3146 let dst = NodeId((i + 2) as u64);
3147 engine
3148 .create_edge(src, dst, 1, vec![(1, PropertyValue::Int64(i as i64))])
3149 .expect("edge");
3150 }
3151 assert_eq!(engine.node_count(), node_count);
3152 assert_eq!(engine.edge_count(), edge_count);
3153 }
3154
3155 {
3157 let engine = StorageEngine::open(config).expect("reopen");
3158 assert_eq!(
3159 engine.node_count(),
3160 node_count,
3161 "all {} nodes should be loaded",
3162 node_count
3163 );
3164 assert_eq!(
3165 engine.edge_count(),
3166 edge_count,
3167 "all {} edges should be loaded",
3168 edge_count
3169 );
3170
3171 let first = engine.get_node(NodeId(1)).expect("first node");
3173 assert_eq!(first.properties[0], (1, PropertyValue::Int64(0)));
3174 let last = engine
3175 .get_node(NodeId(node_count as u64))
3176 .expect("last node");
3177 assert_eq!(
3178 last.properties[0],
3179 (1, PropertyValue::Int64((node_count - 1) as i64))
3180 );
3181
3182 let first_edge = engine.get_edge(EdgeId(1)).expect("first edge");
3184 assert_eq!(first_edge.start_node, NodeId(1));
3185 assert_eq!(first_edge.end_node, NodeId(2));
3186 }
3187 }
3188
3189 #[test]
3194 fn test_close_reopen_empty_database() {
3195 let dir = tempdir().expect("tempdir");
3196 let db_path = dir.path().join("persist_empty.cyl");
3197 let config = DatabaseConfig {
3198 path: db_path.clone(),
3199 wal_sync_mode: SyncMode::Normal,
3200 ..Default::default()
3201 };
3202
3203 {
3205 let _engine = StorageEngine::open(config.clone()).expect("open");
3206 }
3207
3208 {
3210 let engine = StorageEngine::open(config).expect("reopen");
3211 assert_eq!(engine.node_count(), 0);
3212 assert_eq!(engine.edge_count(), 0);
3213 }
3214 }
3215
3216 #[test]
3221 fn test_close_reopen_id_continuity() {
3222 let dir = tempdir().expect("tempdir");
3223 let db_path = dir.path().join("persist_ids.cyl");
3224 let config = DatabaseConfig {
3225 path: db_path.clone(),
3226 wal_sync_mode: SyncMode::Normal,
3227 ..Default::default()
3228 };
3229
3230 {
3232 let mut engine = StorageEngine::open(config.clone()).expect("open");
3233 engine.create_node(vec![], vec![]);
3234 engine.create_node(vec![], vec![]);
3235 engine.create_node(vec![], vec![]);
3236 }
3237
3238 {
3240 let mut engine = StorageEngine::open(config).expect("reopen");
3241 assert_eq!(engine.node_count(), 3);
3242 let new_id = engine.create_node(vec![99], vec![]);
3243 assert_eq!(new_id, NodeId(4), "new node should get next sequential ID");
3244 assert_eq!(engine.node_count(), 4);
3245 }
3246 }
3247
3248 #[test]
3253 fn test_close_reopen_preserves_catalog_labels() {
3254 let dir = tempdir().expect("tempdir");
3255 let db_path = dir.path().join("persist_catalog.cyl");
3256 let config = DatabaseConfig {
3257 path: db_path.clone(),
3258 wal_sync_mode: SyncMode::Normal,
3259 ..Default::default()
3260 };
3261
3262 let person_id;
3264 let company_id;
3265 {
3266 let mut engine = StorageEngine::open(config.clone()).expect("open");
3267 person_id = engine.get_or_create_label("Person");
3268 company_id = engine.get_or_create_label("Company");
3269 engine.create_node(vec![person_id], vec![]);
3271 }
3272
3273 {
3275 let engine = StorageEngine::open(config).expect("reopen");
3276 assert_eq!(
3277 engine.label_id("Person"),
3278 Some(person_id),
3279 "Person label should persist across close/reopen"
3280 );
3281 assert_eq!(
3282 engine.label_id("Company"),
3283 Some(company_id),
3284 "Company label should persist across close/reopen"
3285 );
3286 assert_eq!(
3287 engine.label_name(person_id),
3288 Some("Person"),
3289 "Reverse lookup should work after reopen"
3290 );
3291 }
3292 }
3293
3294 #[test]
3299 fn test_close_reopen_preserves_all_catalog_entries() {
3300 let dir = tempdir().expect("tempdir");
3301 let db_path = dir.path().join("persist_catalog_all.cyl");
3302 let config = DatabaseConfig {
3303 path: db_path.clone(),
3304 wal_sync_mode: SyncMode::Normal,
3305 ..Default::default()
3306 };
3307
3308 let label_person;
3310 let label_company;
3311 let prop_name;
3312 let prop_age;
3313 let rel_knows;
3314 let rel_works_at;
3315 {
3316 let mut engine = StorageEngine::open(config.clone()).expect("open");
3317 label_person = engine.get_or_create_label("Person");
3318 label_company = engine.get_or_create_label("Company");
3319 prop_name = engine.get_or_create_prop_key("name");
3320 prop_age = engine.get_or_create_prop_key("age");
3321 rel_knows = engine.get_or_create_rel_type("KNOWS");
3322 rel_works_at = engine.get_or_create_rel_type("WORKS_AT");
3323 let n1 = engine.create_node(
3325 vec![label_person],
3326 vec![(prop_name, PropertyValue::String("Alice".into()))],
3327 );
3328 let n2 = engine.create_node(
3329 vec![label_company],
3330 vec![(prop_name, PropertyValue::String("Acme".into()))],
3331 );
3332 engine
3333 .create_edge(n1, n2, rel_works_at, vec![])
3334 .expect("edge");
3335 }
3336
3337 {
3339 let engine = StorageEngine::open(config).expect("reopen");
3340
3341 assert_eq!(engine.label_id("Person"), Some(label_person));
3343 assert_eq!(engine.label_id("Company"), Some(label_company));
3344 assert_eq!(engine.label_name(label_person), Some("Person"));
3345 assert_eq!(engine.label_name(label_company), Some("Company"));
3346
3347 assert_eq!(engine.prop_key_id("name"), Some(prop_name));
3349 assert_eq!(engine.prop_key_id("age"), Some(prop_age));
3350 assert_eq!(engine.prop_key_name(prop_name), Some("name"));
3351 assert_eq!(engine.prop_key_name(prop_age), Some("age"));
3352
3353 assert_eq!(engine.rel_type_id("KNOWS"), Some(rel_knows));
3355 assert_eq!(engine.rel_type_id("WORKS_AT"), Some(rel_works_at));
3356 assert_eq!(engine.rel_type_name(rel_knows), Some("KNOWS"));
3357 assert_eq!(engine.rel_type_name(rel_works_at), Some("WORKS_AT"));
3358 }
3359 }
3360
3361 #[test]
3366 fn test_close_reopen_catalog_id_continuity() {
3367 let dir = tempdir().expect("tempdir");
3368 let db_path = dir.path().join("persist_catalog_ids.cyl");
3369 let config = DatabaseConfig {
3370 path: db_path.clone(),
3371 wal_sync_mode: SyncMode::Normal,
3372 ..Default::default()
3373 };
3374
3375 {
3377 let mut engine = StorageEngine::open(config.clone()).expect("open");
3378 engine.get_or_create_label("Person"); engine.get_or_create_label("Company"); }
3381
3382 {
3384 let mut engine = StorageEngine::open(config).expect("reopen");
3385 let new_id = engine.get_or_create_label("City");
3386 assert_eq!(
3387 new_id, 2,
3388 "new label after reopen should continue ID sequence"
3389 );
3390 assert_eq!(engine.label_id("Person"), Some(0));
3392 assert_eq!(engine.label_id("Company"), Some(1));
3393 assert_eq!(engine.label_id("City"), Some(2));
3394 }
3395 }
3396
3397 #[test]
3402 fn test_close_reopen_empty_catalog() {
3403 let dir = tempdir().expect("tempdir");
3404 let db_path = dir.path().join("persist_catalog_empty.cyl");
3405 let config = DatabaseConfig {
3406 path: db_path.clone(),
3407 wal_sync_mode: SyncMode::Normal,
3408 ..Default::default()
3409 };
3410
3411 {
3413 let _engine = StorageEngine::open(config.clone()).expect("open");
3414 }
3415
3416 {
3418 let engine = StorageEngine::open(config).expect("reopen");
3419 assert_eq!(engine.label_id("anything"), None);
3420 assert_eq!(engine.node_count(), 0);
3421 }
3422 }
3423
3424 #[cfg(feature = "subgraph")]
3429 #[test]
3430 fn test_close_reopen_preserves_subgraphs() {
3431 use cypherlite_core::SubgraphId;
3432
3433 let dir = tempdir().expect("tempdir");
3434 let db_path = dir.path().join("persist_subgraphs.cyl");
3435 let config = DatabaseConfig {
3436 path: db_path.clone(),
3437 wal_sync_mode: SyncMode::Normal,
3438 ..Default::default()
3439 };
3440
3441 {
3443 let mut engine = StorageEngine::open(config.clone()).expect("open");
3444 let sg1 = engine.create_subgraph(
3446 vec![(1, PropertyValue::String("graph-A".into()))],
3447 Some(1_700_000_000_000),
3448 );
3449 let sg2 = engine.create_subgraph(vec![], None);
3451 let sg3 = engine.create_subgraph(
3453 vec![
3454 (2, PropertyValue::Int64(42)),
3455 (3, PropertyValue::Bool(true)),
3456 ],
3457 Some(1_700_000_001_000),
3458 );
3459 assert_eq!(sg1, SubgraphId(1));
3460 assert_eq!(sg2, SubgraphId(2));
3461 assert_eq!(sg3, SubgraphId(3));
3462 }
3463
3464 {
3466 let engine = StorageEngine::open(config).expect("reopen");
3467 let s1 = engine.get_subgraph(SubgraphId(1)).expect("subgraph 1");
3469 assert_eq!(s1.subgraph_id, SubgraphId(1));
3470 assert_eq!(
3471 s1.properties,
3472 vec![(1, PropertyValue::String("graph-A".into()))]
3473 );
3474 assert_eq!(s1.temporal_anchor, Some(1_700_000_000_000));
3475
3476 let s2 = engine.get_subgraph(SubgraphId(2)).expect("subgraph 2");
3478 assert!(s2.properties.is_empty());
3479 assert_eq!(s2.temporal_anchor, None);
3480
3481 let s3 = engine.get_subgraph(SubgraphId(3)).expect("subgraph 3");
3483 assert_eq!(s3.properties.len(), 2);
3484 assert_eq!(s3.temporal_anchor, Some(1_700_000_001_000));
3485
3486 let sg4 = engine.get_subgraph(SubgraphId(4));
3488 assert!(sg4.is_none());
3489 }
3490 }
3491
3492 #[cfg(feature = "subgraph")]
3493 #[test]
3494 fn test_close_reopen_preserves_memberships() {
3495 use cypherlite_core::SubgraphId;
3496
3497 let dir = tempdir().expect("tempdir");
3498 let db_path = dir.path().join("persist_memberships.cyl");
3499 let config = DatabaseConfig {
3500 path: db_path.clone(),
3501 wal_sync_mode: SyncMode::Normal,
3502 ..Default::default()
3503 };
3504
3505 {
3507 let mut engine = StorageEngine::open(config.clone()).expect("open");
3508 let sg = engine.create_subgraph(vec![], None);
3509 let n1 = engine.create_node(vec![1], vec![]);
3510 let n2 = engine.create_node(vec![2], vec![]);
3511 engine.add_member(sg, n1).expect("add n1");
3512 engine.add_member(sg, n2).expect("add n2");
3513 assert_eq!(engine.list_members(sg).len(), 2);
3514 }
3515
3516 {
3518 let engine = StorageEngine::open(config).expect("reopen");
3519 let members = engine.list_members(SubgraphId(1));
3520 assert_eq!(members.len(), 2);
3521 assert!(members.contains(&NodeId(1)));
3522 assert!(members.contains(&NodeId(2)));
3523 }
3524 }
3525
3526 #[cfg(feature = "hypergraph")]
3531 #[test]
3532 fn test_close_reopen_preserves_hyperedges() {
3533 use cypherlite_core::{GraphEntity, HyperEdgeId};
3534
3535 let dir = tempdir().expect("tempdir");
3536 let db_path = dir.path().join("persist_hyperedges.cyl");
3537 let config = DatabaseConfig {
3538 path: db_path.clone(),
3539 wal_sync_mode: SyncMode::Normal,
3540 ..Default::default()
3541 };
3542
3543 {
3545 let mut engine = StorageEngine::open(config.clone()).expect("open");
3546 let n1 = engine.create_node(vec![1], vec![]);
3547 let n2 = engine.create_node(vec![2], vec![]);
3548 let n3 = engine.create_node(vec![3], vec![]);
3549
3550 let he1 = engine.create_hyperedge(
3552 10,
3553 vec![GraphEntity::Node(n1)],
3554 vec![GraphEntity::Node(n2), GraphEntity::Node(n3)],
3555 vec![(1, PropertyValue::String("rel-A".into()))],
3556 );
3557 let he2 = engine.create_hyperedge(20, vec![], vec![], vec![]);
3559 assert_eq!(he1, HyperEdgeId(1));
3560 assert_eq!(he2, HyperEdgeId(2));
3561 }
3562
3563 {
3565 let engine = StorageEngine::open(config).expect("reopen");
3566 let h1 = engine.get_hyperedge(HyperEdgeId(1)).expect("hyperedge 1");
3568 assert_eq!(h1.id, HyperEdgeId(1));
3569 assert_eq!(h1.rel_type_id, 10);
3570 assert_eq!(h1.sources.len(), 1);
3571 assert_eq!(h1.targets.len(), 2);
3572 assert_eq!(
3573 h1.properties,
3574 vec![(1, PropertyValue::String("rel-A".into()))]
3575 );
3576
3577 let h2 = engine.get_hyperedge(HyperEdgeId(2)).expect("hyperedge 2");
3579 assert_eq!(h2.id, HyperEdgeId(2));
3580 assert_eq!(h2.rel_type_id, 20);
3581 assert!(h2.sources.is_empty());
3582 assert!(h2.targets.is_empty());
3583 assert!(h2.properties.is_empty());
3584
3585 assert_eq!(engine.node_count(), 3);
3587 }
3588 }
3589
3590 #[test]
3595 fn test_close_reopen_preserves_version_store() {
3596 let dir = tempdir().expect("tempdir");
3597 let db_path = dir.path().join("persist_versions.cyl");
3598 let config = DatabaseConfig {
3599 path: db_path.clone(),
3600 wal_sync_mode: SyncMode::Normal,
3601 version_storage_enabled: true,
3602 ..Default::default()
3603 };
3604
3605 {
3607 let mut engine = StorageEngine::open(config.clone()).expect("open");
3608 let n1 =
3609 engine.create_node(vec![1], vec![(1, PropertyValue::String("Alice-v1".into()))]);
3610 engine
3612 .update_node(n1, vec![(1, PropertyValue::String("Alice-v2".into()))])
3613 .expect("update n1");
3614 assert_eq!(engine.version_count(n1.0), 1);
3616 }
3617
3618 {
3620 let engine = StorageEngine::open(config).expect("reopen");
3621 let n = engine.get_node(NodeId(1)).expect("node 1");
3623 assert_eq!(
3624 n.properties[0],
3625 (1, PropertyValue::String("Alice-v2".into()))
3626 );
3627 assert_eq!(engine.version_count(1), 1);
3629 let chain = engine.version_chain(1);
3630 assert_eq!(chain.len(), 1);
3631 }
3632 }
3633}