1#[cfg(feature = "wal")]
4use std::path::Path;
5
6#[cfg(any(feature = "vector-index", feature = "text-index"))]
7use grafeo_common::grafeo_warn;
8use grafeo_common::types::{EdgeId, EpochId, NodeId, Value};
9use grafeo_common::utils::error::{Error, Result};
10use hashbrown::HashSet;
11
12use crate::config::Config;
13
14#[cfg(feature = "wal")]
15use grafeo_adapters::storage::wal::WalRecord;
16
17use crate::catalog::{
18 EdgeTypeDefinition, GraphTypeDefinition, NodeTypeDefinition, ProcedureDefinition,
19};
20
21const SNAPSHOT_VERSION: u8 = 4;
23
24#[derive(serde::Serialize, serde::Deserialize)]
27struct Snapshot {
28 version: u8,
29 nodes: Vec<SnapshotNode>,
30 edges: Vec<SnapshotEdge>,
31 named_graphs: Vec<NamedGraphSnapshot>,
32 rdf_triples: Vec<SnapshotTriple>,
33 rdf_named_graphs: Vec<RdfNamedGraphSnapshot>,
34 schema: SnapshotSchema,
35 indexes: SnapshotIndexes,
36 epoch: u64,
38}
39
40#[derive(serde::Serialize, serde::Deserialize, Default)]
42struct SnapshotSchema {
43 node_types: Vec<NodeTypeDefinition>,
44 edge_types: Vec<EdgeTypeDefinition>,
45 graph_types: Vec<GraphTypeDefinition>,
46 procedures: Vec<ProcedureDefinition>,
47 schemas: Vec<String>,
48 graph_type_bindings: Vec<(String, String)>,
49}
50
51#[derive(serde::Serialize, serde::Deserialize, Default)]
53struct SnapshotIndexes {
54 property_indexes: Vec<String>,
55 vector_indexes: Vec<SnapshotVectorIndex>,
56 text_indexes: Vec<SnapshotTextIndex>,
57}
58
59#[derive(serde::Serialize, serde::Deserialize)]
61struct SnapshotVectorIndex {
62 label: String,
63 property: String,
64 dimensions: usize,
65 metric: grafeo_core::index::vector::DistanceMetric,
66 m: usize,
67 ef_construction: usize,
68}
69
70#[derive(serde::Serialize, serde::Deserialize)]
72struct SnapshotTextIndex {
73 label: String,
74 property: String,
75}
76
77#[derive(serde::Serialize, serde::Deserialize)]
79struct NamedGraphSnapshot {
80 name: String,
81 nodes: Vec<SnapshotNode>,
82 edges: Vec<SnapshotEdge>,
83}
84
85#[derive(serde::Serialize, serde::Deserialize)]
87struct SnapshotTriple {
88 subject: String,
89 predicate: String,
90 object: String,
91}
92
93#[derive(serde::Serialize, serde::Deserialize)]
95struct RdfNamedGraphSnapshot {
96 name: String,
97 triples: Vec<SnapshotTriple>,
98}
99
100#[derive(serde::Serialize, serde::Deserialize)]
101struct SnapshotNode {
102 id: NodeId,
103 labels: Vec<String>,
104 properties: Vec<(String, Vec<(EpochId, Value)>)>,
106}
107
108#[derive(serde::Serialize, serde::Deserialize)]
109struct SnapshotEdge {
110 id: EdgeId,
111 src: NodeId,
112 dst: NodeId,
113 edge_type: String,
114 properties: Vec<(String, Vec<(EpochId, Value)>)>,
116}
117
118fn collect_snapshot_nodes(store: &grafeo_core::graph::lpg::LpgStore) -> Vec<SnapshotNode> {
123 store
124 .all_nodes()
125 .map(|n| {
126 #[cfg(feature = "temporal")]
127 let properties = store
128 .node_property_history(n.id)
129 .into_iter()
130 .map(|(k, entries)| (k.to_string(), entries))
131 .collect();
132
133 #[cfg(not(feature = "temporal"))]
134 let properties = n
135 .properties
136 .into_iter()
137 .map(|(k, v)| (k.to_string(), vec![(EpochId::new(0), v)]))
138 .collect();
139
140 SnapshotNode {
141 id: n.id,
142 labels: n.labels.iter().map(|l| l.to_string()).collect(),
143 properties,
144 }
145 })
146 .collect()
147}
148
149fn collect_snapshot_edges(store: &grafeo_core::graph::lpg::LpgStore) -> Vec<SnapshotEdge> {
154 store
155 .all_edges()
156 .map(|e| {
157 #[cfg(feature = "temporal")]
158 let properties = store
159 .edge_property_history(e.id)
160 .into_iter()
161 .map(|(k, entries)| (k.to_string(), entries))
162 .collect();
163
164 #[cfg(not(feature = "temporal"))]
165 let properties = e
166 .properties
167 .into_iter()
168 .map(|(k, v)| (k.to_string(), vec![(EpochId::new(0), v)]))
169 .collect();
170
171 SnapshotEdge {
172 id: e.id,
173 src: e.src,
174 dst: e.dst,
175 edge_type: e.edge_type.to_string(),
176 properties,
177 }
178 })
179 .collect()
180}
181
182fn populate_store_from_snapshot(
187 store: &grafeo_core::graph::lpg::LpgStore,
188 nodes: Vec<SnapshotNode>,
189 edges: Vec<SnapshotEdge>,
190) -> Result<()> {
191 for node in nodes {
192 let label_refs: Vec<&str> = node.labels.iter().map(|s| s.as_str()).collect();
193 store.create_node_with_id(node.id, &label_refs)?;
194 for (key, entries) in node.properties {
195 #[cfg(feature = "temporal")]
196 for (epoch, value) in entries {
197 store.set_node_property_at_epoch(node.id, &key, value, epoch);
198 }
199 #[cfg(not(feature = "temporal"))]
200 if let Some((_, value)) = entries.into_iter().last() {
201 store.set_node_property(node.id, &key, value);
202 }
203 }
204 }
205 for edge in edges {
206 store.create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type)?;
207 for (key, entries) in edge.properties {
208 #[cfg(feature = "temporal")]
209 for (epoch, value) in entries {
210 store.set_edge_property_at_epoch(edge.id, &key, value, epoch);
211 }
212 #[cfg(not(feature = "temporal"))]
213 if let Some((_, value)) = entries.into_iter().last() {
214 store.set_edge_property(edge.id, &key, value);
215 }
216 }
217 }
218 Ok(())
219}
220
221fn validate_snapshot_data(nodes: &[SnapshotNode], edges: &[SnapshotEdge]) -> Result<()> {
223 let mut node_ids = HashSet::with_capacity(nodes.len());
224 for node in nodes {
225 if !node_ids.insert(node.id) {
226 return Err(Error::Internal(format!(
227 "snapshot contains duplicate node ID {}",
228 node.id
229 )));
230 }
231 }
232 let mut edge_ids = HashSet::with_capacity(edges.len());
233 for edge in edges {
234 if !edge_ids.insert(edge.id) {
235 return Err(Error::Internal(format!(
236 "snapshot contains duplicate edge ID {}",
237 edge.id
238 )));
239 }
240 if !node_ids.contains(&edge.src) {
241 return Err(Error::Internal(format!(
242 "snapshot edge {} references non-existent source node {}",
243 edge.id, edge.src
244 )));
245 }
246 if !node_ids.contains(&edge.dst) {
247 return Err(Error::Internal(format!(
248 "snapshot edge {} references non-existent destination node {}",
249 edge.id, edge.dst
250 )));
251 }
252 }
253 Ok(())
254}
255
256#[cfg(feature = "rdf")]
258fn collect_rdf_triples(store: &grafeo_core::graph::rdf::RdfStore) -> Vec<SnapshotTriple> {
259 store
260 .triples()
261 .into_iter()
262 .map(|t| SnapshotTriple {
263 subject: t.subject().to_string(),
264 predicate: t.predicate().to_string(),
265 object: t.object().to_string(),
266 })
267 .collect()
268}
269
270#[cfg(feature = "rdf")]
272fn populate_rdf_store(store: &grafeo_core::graph::rdf::RdfStore, triples: &[SnapshotTriple]) {
273 use grafeo_core::graph::rdf::{Term, Triple};
274 for triple in triples {
275 if let (Some(s), Some(p), Some(o)) = (
276 Term::from_ntriples(&triple.subject),
277 Term::from_ntriples(&triple.predicate),
278 Term::from_ntriples(&triple.object),
279 ) {
280 store.insert(Triple::new(s, p, o));
281 }
282 }
283}
284
285#[cfg(feature = "grafeo-file")]
291pub(super) fn load_snapshot_into_store(
292 store: &std::sync::Arc<grafeo_core::graph::lpg::LpgStore>,
293 catalog: &std::sync::Arc<crate::catalog::Catalog>,
294 #[cfg(feature = "rdf")] rdf_store: &std::sync::Arc<grafeo_core::graph::rdf::RdfStore>,
295 data: &[u8],
296) -> grafeo_common::utils::error::Result<()> {
297 use grafeo_common::utils::error::Error;
298
299 let config = bincode::config::standard();
300 let (snapshot, _) =
301 bincode::serde::decode_from_slice::<Snapshot, _>(data, config).map_err(|e| {
302 Error::Serialization(format!("failed to decode snapshot from .grafeo file: {e}"))
303 })?;
304
305 populate_store_from_snapshot_ref(store, &snapshot.nodes, &snapshot.edges)?;
306
307 #[cfg(feature = "temporal")]
310 store.sync_epoch(EpochId::new(snapshot.epoch));
311
312 for graph in &snapshot.named_graphs {
313 store
314 .create_graph(&graph.name)
315 .map_err(|e| Error::Internal(e.to_string()))?;
316 if let Some(graph_store) = store.graph(&graph.name) {
317 populate_store_from_snapshot_ref(&graph_store, &graph.nodes, &graph.edges)?;
318 #[cfg(feature = "temporal")]
319 graph_store.sync_epoch(EpochId::new(snapshot.epoch));
320 }
321 }
322 restore_schema_from_snapshot(catalog, &snapshot.schema);
323
324 #[cfg(feature = "rdf")]
326 {
327 populate_rdf_store(rdf_store, &snapshot.rdf_triples);
328 for rdf_graph in &snapshot.rdf_named_graphs {
329 rdf_store.create_graph(&rdf_graph.name);
330 if let Some(graph_store) = rdf_store.graph(&rdf_graph.name) {
331 populate_rdf_store(&graph_store, &rdf_graph.triples);
332 }
333 }
334 }
335
336 Ok(())
337}
338
339#[cfg(feature = "grafeo-file")]
341fn populate_store_from_snapshot_ref(
342 store: &grafeo_core::graph::lpg::LpgStore,
343 nodes: &[SnapshotNode],
344 edges: &[SnapshotEdge],
345) -> grafeo_common::utils::error::Result<()> {
346 for node in nodes {
347 let label_refs: Vec<&str> = node.labels.iter().map(|s| s.as_str()).collect();
348 store.create_node_with_id(node.id, &label_refs)?;
349 for (key, entries) in &node.properties {
350 #[cfg(feature = "temporal")]
351 for (epoch, value) in entries {
352 store.set_node_property_at_epoch(node.id, key, value.clone(), *epoch);
353 }
354 #[cfg(not(feature = "temporal"))]
355 if let Some((_, value)) = entries.last() {
356 store.set_node_property(node.id, key, value.clone());
357 }
358 }
359 }
360 for edge in edges {
361 store.create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type)?;
362 for (key, entries) in &edge.properties {
363 #[cfg(feature = "temporal")]
364 for (epoch, value) in entries {
365 store.set_edge_property_at_epoch(edge.id, key, value.clone(), *epoch);
366 }
367 #[cfg(not(feature = "temporal"))]
368 if let Some((_, value)) = entries.last() {
369 store.set_edge_property(edge.id, key, value.clone());
370 }
371 }
372 }
373 Ok(())
374}
375
376fn restore_schema_from_snapshot(
378 catalog: &std::sync::Arc<crate::catalog::Catalog>,
379 schema: &SnapshotSchema,
380) {
381 for def in &schema.node_types {
382 catalog.register_or_replace_node_type(def.clone());
383 }
384 for def in &schema.edge_types {
385 catalog.register_or_replace_edge_type_def(def.clone());
386 }
387 for def in &schema.graph_types {
388 let _ = catalog.register_graph_type(def.clone());
389 }
390 for def in &schema.procedures {
391 catalog.replace_procedure(def.clone()).ok();
392 }
393 for name in &schema.schemas {
394 let _ = catalog.register_schema_namespace(name.clone());
395 }
396 for (graph_name, type_name) in &schema.graph_type_bindings {
397 let _ = catalog.bind_graph_type(graph_name, type_name.clone());
398 }
399}
400
401fn collect_schema(catalog: &std::sync::Arc<crate::catalog::Catalog>) -> SnapshotSchema {
403 SnapshotSchema {
404 node_types: catalog.all_node_type_defs(),
405 edge_types: catalog.all_edge_type_defs(),
406 graph_types: catalog.all_graph_type_defs(),
407 procedures: catalog.all_procedure_defs(),
408 schemas: catalog.schema_names(),
409 graph_type_bindings: catalog.all_graph_type_bindings(),
410 }
411}
412
413fn restore_indexes_from_snapshot(db: &super::GrafeoDB, indexes: &SnapshotIndexes) {
418 for name in &indexes.property_indexes {
419 db.store.create_property_index(name);
420 }
421
422 #[cfg(feature = "vector-index")]
423 for vi in &indexes.vector_indexes {
424 if let Err(err) = db.create_vector_index(
425 &vi.label,
426 &vi.property,
427 Some(vi.dimensions),
428 Some(vi.metric.name()),
429 Some(vi.m),
430 Some(vi.ef_construction),
431 ) {
432 grafeo_warn!(
433 "Failed to restore vector index :{label}({property}): {err}",
434 label = vi.label,
435 property = vi.property,
436 );
437 }
438 }
439
440 #[cfg(feature = "text-index")]
441 for ti in &indexes.text_indexes {
442 if let Err(err) = db.create_text_index(&ti.label, &ti.property) {
443 grafeo_warn!(
444 "Failed to restore text index :{label}({property}): {err}",
445 label = ti.label,
446 property = ti.property,
447 );
448 }
449 }
450}
451
452fn collect_index_metadata(store: &grafeo_core::graph::lpg::LpgStore) -> SnapshotIndexes {
454 let property_indexes = store.property_index_keys();
455
456 #[cfg(feature = "vector-index")]
457 let vector_indexes: Vec<SnapshotVectorIndex> = store
458 .vector_index_entries()
459 .into_iter()
460 .filter_map(|(key, index)| {
461 let (label, property) = key.split_once(':')?;
462 let config = index.config();
463 Some(SnapshotVectorIndex {
464 label: label.to_string(),
465 property: property.to_string(),
466 dimensions: config.dimensions,
467 metric: config.metric,
468 m: config.m,
469 ef_construction: config.ef_construction,
470 })
471 })
472 .collect();
473 #[cfg(not(feature = "vector-index"))]
474 let vector_indexes = Vec::new();
475
476 #[cfg(feature = "text-index")]
477 let text_indexes: Vec<SnapshotTextIndex> = store
478 .text_index_entries()
479 .into_iter()
480 .filter_map(|(key, _)| {
481 let (label, property) = key.split_once(':')?;
482 Some(SnapshotTextIndex {
483 label: label.to_string(),
484 property: property.to_string(),
485 })
486 })
487 .collect();
488 #[cfg(not(feature = "text-index"))]
489 let text_indexes = Vec::new();
490
491 SnapshotIndexes {
492 property_indexes,
493 vector_indexes,
494 text_indexes,
495 }
496}
497
498impl super::GrafeoDB {
499 #[cfg(feature = "wal")]
518 pub fn save(&self, path: impl AsRef<Path>) -> Result<()> {
519 let path = path.as_ref();
520
521 #[cfg(feature = "grafeo-file")]
523 if path.extension().is_some_and(|ext| ext == "grafeo") {
524 return self.save_as_grafeo_file(path);
525 }
526
527 let target_config = Config::persistent(path);
529 let target = Self::with_config(target_config)?;
530
531 for node in self.store.all_nodes() {
533 let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
534 target.store.create_node_with_id(node.id, &label_refs)?;
535
536 target.log_wal(&WalRecord::CreateNode {
538 id: node.id,
539 labels: node.labels.iter().map(|s| s.to_string()).collect(),
540 })?;
541
542 for (key, value) in node.properties {
544 target
545 .store
546 .set_node_property(node.id, key.as_str(), value.clone());
547 target.log_wal(&WalRecord::SetNodeProperty {
548 id: node.id,
549 key: key.to_string(),
550 value,
551 })?;
552 }
553 }
554
555 for edge in self.store.all_edges() {
557 target
558 .store
559 .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type)?;
560
561 target.log_wal(&WalRecord::CreateEdge {
563 id: edge.id,
564 src: edge.src,
565 dst: edge.dst,
566 edge_type: edge.edge_type.to_string(),
567 })?;
568
569 for (key, value) in edge.properties {
571 target
572 .store
573 .set_edge_property(edge.id, key.as_str(), value.clone());
574 target.log_wal(&WalRecord::SetEdgeProperty {
575 id: edge.id,
576 key: key.to_string(),
577 value,
578 })?;
579 }
580 }
581
582 for graph_name in self.store.graph_names() {
584 if let Some(src_graph) = self.store.graph(&graph_name) {
585 target.log_wal(&WalRecord::CreateNamedGraph {
586 name: graph_name.clone(),
587 })?;
588 target
589 .store
590 .create_graph(&graph_name)
591 .map_err(|e| Error::Internal(e.to_string()))?;
592
593 if let Some(dst_graph) = target.store.graph(&graph_name) {
594 target.log_wal(&WalRecord::SwitchGraph {
596 name: Some(graph_name.clone()),
597 })?;
598
599 for node in src_graph.all_nodes() {
600 let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
601 dst_graph.create_node_with_id(node.id, &label_refs)?;
602 target.log_wal(&WalRecord::CreateNode {
603 id: node.id,
604 labels: node.labels.iter().map(|s| s.to_string()).collect(),
605 })?;
606 for (key, value) in node.properties {
607 dst_graph.set_node_property(node.id, key.as_str(), value.clone());
608 target.log_wal(&WalRecord::SetNodeProperty {
609 id: node.id,
610 key: key.to_string(),
611 value,
612 })?;
613 }
614 }
615 for edge in src_graph.all_edges() {
616 dst_graph.create_edge_with_id(
617 edge.id,
618 edge.src,
619 edge.dst,
620 &edge.edge_type,
621 )?;
622 target.log_wal(&WalRecord::CreateEdge {
623 id: edge.id,
624 src: edge.src,
625 dst: edge.dst,
626 edge_type: edge.edge_type.to_string(),
627 })?;
628 for (key, value) in edge.properties {
629 dst_graph.set_edge_property(edge.id, key.as_str(), value.clone());
630 target.log_wal(&WalRecord::SetEdgeProperty {
631 id: edge.id,
632 key: key.to_string(),
633 value,
634 })?;
635 }
636 }
637 }
638 }
639 }
640
641 if !self.store.graph_names().is_empty() {
643 target.log_wal(&WalRecord::SwitchGraph { name: None })?;
644 }
645
646 #[cfg(feature = "rdf")]
648 {
649 for triple in self.rdf_store.triples() {
650 let record = WalRecord::InsertRdfTriple {
651 subject: triple.subject().to_string(),
652 predicate: triple.predicate().to_string(),
653 object: triple.object().to_string(),
654 graph: None,
655 };
656 target.rdf_store.insert((*triple).clone());
657 target.log_wal(&record)?;
658 }
659 for name in self.rdf_store.graph_names() {
660 target.log_wal(&WalRecord::CreateRdfGraph { name: name.clone() })?;
661 if let Some(src_graph) = self.rdf_store.graph(&name) {
662 let dst_graph = target.rdf_store.graph_or_create(&name);
663 for triple in src_graph.triples() {
664 let record = WalRecord::InsertRdfTriple {
665 subject: triple.subject().to_string(),
666 predicate: triple.predicate().to_string(),
667 object: triple.object().to_string(),
668 graph: Some(name.clone()),
669 };
670 dst_graph.insert((*triple).clone());
671 target.log_wal(&record)?;
672 }
673 }
674 }
675 }
676
677 target.close()?;
679
680 Ok(())
681 }
682
683 #[cfg(feature = "grafeo-file")]
690 fn save_as_grafeo_file(&self, path: &Path) -> Result<()> {
691 use grafeo_adapters::storage::file::GrafeoFileManager;
692
693 let snapshot_data = self.export_snapshot()?;
694 let epoch = self.store.current_epoch();
695 let transaction_id = self
696 .transaction_manager
697 .last_assigned_transaction_id()
698 .map_or(0, |t| t.0);
699 let node_count = self.store.node_count() as u64;
700 let edge_count = self.store.edge_count() as u64;
701
702 let fm = GrafeoFileManager::create(path)?;
703 fm.write_snapshot(
704 &snapshot_data,
705 epoch.0,
706 transaction_id,
707 node_count,
708 edge_count,
709 )?;
710 Ok(())
711 }
712
713 pub fn to_memory(&self) -> Result<Self> {
720 let config = Config::in_memory();
721 let target = Self::with_config(config)?;
722
723 for node in self.store.all_nodes() {
725 let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
726 target.store.create_node_with_id(node.id, &label_refs)?;
727 for (key, value) in node.properties {
728 target.store.set_node_property(node.id, key.as_str(), value);
729 }
730 }
731
732 for edge in self.store.all_edges() {
734 target
735 .store
736 .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type)?;
737 for (key, value) in edge.properties {
738 target.store.set_edge_property(edge.id, key.as_str(), value);
739 }
740 }
741
742 for graph_name in self.store.graph_names() {
744 if let Some(src_graph) = self.store.graph(&graph_name) {
745 target
746 .store
747 .create_graph(&graph_name)
748 .map_err(|e| Error::Internal(e.to_string()))?;
749 if let Some(dst_graph) = target.store.graph(&graph_name) {
750 for node in src_graph.all_nodes() {
751 let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
752 dst_graph.create_node_with_id(node.id, &label_refs)?;
753 for (key, value) in node.properties {
754 dst_graph.set_node_property(node.id, key.as_str(), value);
755 }
756 }
757 for edge in src_graph.all_edges() {
758 dst_graph.create_edge_with_id(
759 edge.id,
760 edge.src,
761 edge.dst,
762 &edge.edge_type,
763 )?;
764 for (key, value) in edge.properties {
765 dst_graph.set_edge_property(edge.id, key.as_str(), value);
766 }
767 }
768 }
769 }
770 }
771
772 #[cfg(feature = "rdf")]
774 {
775 for triple in self.rdf_store.triples() {
776 target.rdf_store.insert((*triple).clone());
777 }
778 for name in self.rdf_store.graph_names() {
779 if let Some(src_graph) = self.rdf_store.graph(&name) {
780 let dst_graph = target.rdf_store.graph_or_create(&name);
781 for triple in src_graph.triples() {
782 dst_graph.insert((*triple).clone());
783 }
784 }
785 }
786 }
787
788 Ok(target)
789 }
790
791 #[cfg(feature = "wal")]
800 pub fn open_in_memory(path: impl AsRef<Path>) -> Result<Self> {
801 let source = Self::open(path)?;
803
804 let target = source.to_memory()?;
806
807 source.close()?;
809
810 Ok(target)
811 }
812
813 pub fn export_snapshot(&self) -> Result<Vec<u8>> {
831 let nodes = collect_snapshot_nodes(&self.store);
832 let edges = collect_snapshot_edges(&self.store);
833
834 let named_graphs: Vec<NamedGraphSnapshot> = self
836 .store
837 .graph_names()
838 .into_iter()
839 .filter_map(|name| {
840 self.store
841 .graph(&name)
842 .map(|graph_store| NamedGraphSnapshot {
843 name,
844 nodes: collect_snapshot_nodes(&graph_store),
845 edges: collect_snapshot_edges(&graph_store),
846 })
847 })
848 .collect();
849
850 #[cfg(feature = "rdf")]
852 let rdf_triples = collect_rdf_triples(&self.rdf_store);
853 #[cfg(not(feature = "rdf"))]
854 let rdf_triples = Vec::new();
855
856 #[cfg(feature = "rdf")]
857 let rdf_named_graphs: Vec<RdfNamedGraphSnapshot> = self
858 .rdf_store
859 .graph_names()
860 .into_iter()
861 .filter_map(|name| {
862 self.rdf_store
863 .graph(&name)
864 .map(|graph| RdfNamedGraphSnapshot {
865 name,
866 triples: collect_rdf_triples(&graph),
867 })
868 })
869 .collect();
870 #[cfg(not(feature = "rdf"))]
871 let rdf_named_graphs = Vec::new();
872
873 let schema = collect_schema(&self.catalog);
874 let indexes = collect_index_metadata(&self.store);
875
876 let snapshot = Snapshot {
877 version: SNAPSHOT_VERSION,
878 nodes,
879 edges,
880 named_graphs,
881 rdf_triples,
882 rdf_named_graphs,
883 schema,
884 indexes,
885 #[cfg(feature = "temporal")]
886 epoch: self.transaction_manager.current_epoch().as_u64(),
887 #[cfg(not(feature = "temporal"))]
888 epoch: 0,
889 };
890
891 let config = bincode::config::standard();
892 bincode::serde::encode_to_vec(&snapshot, config)
893 .map_err(|e| Error::Internal(format!("snapshot export failed: {e}")))
894 }
895
896 pub fn import_snapshot(data: &[u8]) -> Result<Self> {
910 if data.is_empty() {
911 return Err(Error::Internal("empty snapshot data".to_string()));
912 }
913
914 let version = data[0];
915 if version != 4 {
916 return Err(Error::Internal(format!(
917 "unsupported snapshot version: {version} (expected 4)"
918 )));
919 }
920
921 let config = bincode::config::standard();
922 let (snapshot, _): (Snapshot, _) = bincode::serde::decode_from_slice(data, config)
923 .map_err(|e| Error::Internal(format!("snapshot import failed: {e}")))?;
924
925 validate_snapshot_data(&snapshot.nodes, &snapshot.edges)?;
927
928 for ng in &snapshot.named_graphs {
930 validate_snapshot_data(&ng.nodes, &ng.edges)?;
931 }
932
933 let db = Self::new_in_memory();
934 populate_store_from_snapshot(&db.store, snapshot.nodes, snapshot.edges)?;
935
936 #[cfg(feature = "temporal")]
938 {
939 let epoch = EpochId::new(snapshot.epoch);
940 db.store.sync_epoch(epoch);
941 db.transaction_manager.sync_epoch(epoch);
942 }
943
944 #[cfg(feature = "temporal")]
946 let snapshot_epoch = EpochId::new(snapshot.epoch);
947
948 for ng in snapshot.named_graphs {
950 db.store
951 .create_graph(&ng.name)
952 .map_err(|e| Error::Internal(e.to_string()))?;
953 if let Some(graph_store) = db.store.graph(&ng.name) {
954 populate_store_from_snapshot(&graph_store, ng.nodes, ng.edges)?;
955 #[cfg(feature = "temporal")]
958 graph_store.sync_epoch(snapshot_epoch);
959 }
960 }
961
962 #[cfg(feature = "rdf")]
964 {
965 populate_rdf_store(&db.rdf_store, &snapshot.rdf_triples);
966 for rng in &snapshot.rdf_named_graphs {
967 let graph = db.rdf_store.graph_or_create(&rng.name);
968 populate_rdf_store(&graph, &rng.triples);
969 }
970 }
971
972 restore_schema_from_snapshot(&db.catalog, &snapshot.schema);
974
975 restore_indexes_from_snapshot(&db, &snapshot.indexes);
977
978 Ok(db)
979 }
980
981 pub fn restore_snapshot(&self, data: &[u8]) -> Result<()> {
997 if data.is_empty() {
998 return Err(Error::Internal("empty snapshot data".to_string()));
999 }
1000
1001 let version = data[0];
1002 if version != 4 {
1003 return Err(Error::Internal(format!(
1004 "unsupported snapshot version: {version} (expected 4)"
1005 )));
1006 }
1007
1008 let config = bincode::config::standard();
1009 let (snapshot, _): (Snapshot, _) = bincode::serde::decode_from_slice(data, config)
1010 .map_err(|e| Error::Internal(format!("snapshot restore failed: {e}")))?;
1011
1012 validate_snapshot_data(&snapshot.nodes, &snapshot.edges)?;
1014 for ng in &snapshot.named_graphs {
1015 validate_snapshot_data(&ng.nodes, &ng.edges)?;
1016 }
1017
1018 for name in self.store.graph_names() {
1020 self.store.drop_graph(&name);
1021 }
1022 self.store.clear();
1023
1024 populate_store_from_snapshot(&self.store, snapshot.nodes, snapshot.edges)?;
1025
1026 #[cfg(feature = "temporal")]
1028 let snapshot_epoch = {
1029 let epoch = EpochId::new(snapshot.epoch);
1030 self.store.sync_epoch(epoch);
1031 self.transaction_manager.sync_epoch(epoch);
1032 epoch
1033 };
1034
1035 for ng in snapshot.named_graphs {
1037 self.store
1038 .create_graph(&ng.name)
1039 .map_err(|e| Error::Internal(e.to_string()))?;
1040 if let Some(graph_store) = self.store.graph(&ng.name) {
1041 populate_store_from_snapshot(&graph_store, ng.nodes, ng.edges)?;
1042 #[cfg(feature = "temporal")]
1043 graph_store.sync_epoch(snapshot_epoch);
1044 }
1045 }
1046
1047 #[cfg(feature = "rdf")]
1049 {
1050 self.rdf_store.clear();
1052 for name in self.rdf_store.graph_names() {
1053 self.rdf_store.drop_graph(&name);
1054 }
1055 populate_rdf_store(&self.rdf_store, &snapshot.rdf_triples);
1056 for rng in &snapshot.rdf_named_graphs {
1057 let graph = self.rdf_store.graph_or_create(&rng.name);
1058 populate_rdf_store(&graph, &rng.triples);
1059 }
1060 }
1061
1062 restore_schema_from_snapshot(&self.catalog, &snapshot.schema);
1064
1065 restore_indexes_from_snapshot(self, &snapshot.indexes);
1067
1068 Ok(())
1069 }
1070
1071 pub fn iter_nodes(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Node> + '_ {
1079 self.store.all_nodes()
1080 }
1081
1082 pub fn iter_edges(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Edge> + '_ {
1086 self.store.all_edges()
1087 }
1088}
1089
1090#[cfg(test)]
1091mod tests {
1092 use grafeo_common::types::{EdgeId, NodeId, Value};
1093
1094 use super::super::GrafeoDB;
1095 use super::{
1096 SNAPSHOT_VERSION, Snapshot, SnapshotEdge, SnapshotIndexes, SnapshotNode, SnapshotSchema,
1097 };
1098
1099 #[test]
1100 fn test_restore_snapshot_basic() {
1101 let db = GrafeoDB::new_in_memory();
1102 let session = db.session();
1103
1104 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1106 session.execute("INSERT (:Person {name: 'Gus'})").unwrap();
1107
1108 let snapshot = db.export_snapshot().unwrap();
1109
1110 session
1112 .execute("INSERT (:Person {name: 'Vincent'})")
1113 .unwrap();
1114 assert_eq!(db.store.node_count(), 3);
1115
1116 db.restore_snapshot(&snapshot).unwrap();
1118
1119 assert_eq!(db.store.node_count(), 2);
1120 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
1121 assert_eq!(result.rows.len(), 2);
1122 }
1123
1124 #[test]
1125 fn test_restore_snapshot_validation_failure() {
1126 let db = GrafeoDB::new_in_memory();
1127 let session = db.session();
1128
1129 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1130
1131 let result = db.restore_snapshot(b"garbage");
1133 assert!(result.is_err());
1134
1135 assert_eq!(db.store.node_count(), 1);
1137 }
1138
1139 #[test]
1140 fn test_restore_snapshot_empty_db() {
1141 let db = GrafeoDB::new_in_memory();
1142
1143 let empty_snapshot = db.export_snapshot().unwrap();
1145
1146 let session = db.session();
1147 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1148 assert_eq!(db.store.node_count(), 1);
1149
1150 db.restore_snapshot(&empty_snapshot).unwrap();
1151 assert_eq!(db.store.node_count(), 0);
1152 }
1153
1154 #[test]
1155 fn test_restore_snapshot_with_edges() {
1156 let db = GrafeoDB::new_in_memory();
1157 let session = db.session();
1158
1159 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1160 session.execute("INSERT (:Person {name: 'Gus'})").unwrap();
1161 session
1162 .execute(
1163 "MATCH (a:Person {name: 'Alix'}), (b:Person {name: 'Gus'}) INSERT (a)-[:KNOWS]->(b)",
1164 )
1165 .unwrap();
1166
1167 let snapshot = db.export_snapshot().unwrap();
1168 assert_eq!(db.store.edge_count(), 1);
1169
1170 session
1172 .execute("INSERT (:Person {name: 'Vincent'})")
1173 .unwrap();
1174
1175 db.restore_snapshot(&snapshot).unwrap();
1177 assert_eq!(db.store.node_count(), 2);
1178 assert_eq!(db.store.edge_count(), 1);
1179 }
1180
1181 #[test]
1182 fn test_restore_snapshot_preserves_sessions() {
1183 let db = GrafeoDB::new_in_memory();
1184 let session = db.session();
1185
1186 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1187 let snapshot = db.export_snapshot().unwrap();
1188
1189 session.execute("INSERT (:Person {name: 'Gus'})").unwrap();
1191
1192 db.restore_snapshot(&snapshot).unwrap();
1194
1195 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
1197 assert_eq!(result.rows.len(), 1);
1198 }
1199
1200 #[test]
1201 fn test_export_import_roundtrip() {
1202 let db = GrafeoDB::new_in_memory();
1203 let session = db.session();
1204
1205 session
1206 .execute("INSERT (:Person {name: 'Alix', age: 30})")
1207 .unwrap();
1208
1209 let snapshot = db.export_snapshot().unwrap();
1210 let db2 = GrafeoDB::import_snapshot(&snapshot).unwrap();
1211 let session2 = db2.session();
1212
1213 let result = session2.execute("MATCH (n:Person) RETURN n.name").unwrap();
1214 assert_eq!(result.rows.len(), 1);
1215 }
1216
1217 #[test]
1220 fn test_to_memory_empty() {
1221 let db = GrafeoDB::new_in_memory();
1222 let copy = db.to_memory().unwrap();
1223 assert_eq!(copy.store.node_count(), 0);
1224 assert_eq!(copy.store.edge_count(), 0);
1225 }
1226
1227 #[test]
1228 fn test_to_memory_copies_nodes_and_properties() {
1229 let db = GrafeoDB::new_in_memory();
1230 let session = db.session();
1231 session
1232 .execute("INSERT (:Person {name: 'Alix', age: 30})")
1233 .unwrap();
1234 session
1235 .execute("INSERT (:Person {name: 'Gus', age: 25})")
1236 .unwrap();
1237
1238 let copy = db.to_memory().unwrap();
1239 assert_eq!(copy.store.node_count(), 2);
1240
1241 let s2 = copy.session();
1242 let result = s2
1243 .execute("MATCH (p:Person) RETURN p.name ORDER BY p.name")
1244 .unwrap();
1245 assert_eq!(result.rows.len(), 2);
1246 assert_eq!(result.rows[0][0], Value::String("Alix".into()));
1247 assert_eq!(result.rows[1][0], Value::String("Gus".into()));
1248 }
1249
1250 #[test]
1251 fn test_to_memory_copies_edges_and_properties() {
1252 let db = GrafeoDB::new_in_memory();
1253 let a = db.create_node(&["Person"]);
1254 db.set_node_property(a, "name", "Alix".into());
1255 let b = db.create_node(&["Person"]);
1256 db.set_node_property(b, "name", "Gus".into());
1257 let edge = db.create_edge(a, b, "KNOWS");
1258 db.set_edge_property(edge, "since", Value::Int64(2020));
1259
1260 let copy = db.to_memory().unwrap();
1261 assert_eq!(copy.store.node_count(), 2);
1262 assert_eq!(copy.store.edge_count(), 1);
1263
1264 let s2 = copy.session();
1265 let result = s2.execute("MATCH ()-[e:KNOWS]->() RETURN e.since").unwrap();
1266 assert_eq!(result.rows[0][0], Value::Int64(2020));
1267 }
1268
1269 #[test]
1270 fn test_to_memory_is_independent() {
1271 let db = GrafeoDB::new_in_memory();
1272 let session = db.session();
1273 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1274
1275 let copy = db.to_memory().unwrap();
1276
1277 session.execute("INSERT (:Person {name: 'Gus'})").unwrap();
1279 assert_eq!(db.store.node_count(), 2);
1280 assert_eq!(copy.store.node_count(), 1);
1281 }
1282
1283 #[test]
1286 fn test_iter_nodes_empty() {
1287 let db = GrafeoDB::new_in_memory();
1288 assert_eq!(db.iter_nodes().count(), 0);
1289 }
1290
1291 #[test]
1292 fn test_iter_nodes_returns_all() {
1293 let db = GrafeoDB::new_in_memory();
1294 let id1 = db.create_node(&["Person"]);
1295 db.set_node_property(id1, "name", "Alix".into());
1296 let id2 = db.create_node(&["Animal"]);
1297 db.set_node_property(id2, "name", "Fido".into());
1298
1299 let nodes: Vec<_> = db.iter_nodes().collect();
1300 assert_eq!(nodes.len(), 2);
1301
1302 let names: Vec<_> = nodes
1303 .iter()
1304 .filter_map(|n| n.properties.iter().find(|(k, _)| k.as_str() == "name"))
1305 .map(|(_, v)| v.clone())
1306 .collect();
1307 assert!(names.contains(&Value::String("Alix".into())));
1308 assert!(names.contains(&Value::String("Fido".into())));
1309 }
1310
1311 #[test]
1312 fn test_iter_edges_empty() {
1313 let db = GrafeoDB::new_in_memory();
1314 assert_eq!(db.iter_edges().count(), 0);
1315 }
1316
1317 #[test]
1318 fn test_iter_edges_returns_all() {
1319 let db = GrafeoDB::new_in_memory();
1320 let a = db.create_node(&["A"]);
1321 let b = db.create_node(&["B"]);
1322 let c = db.create_node(&["C"]);
1323 db.create_edge(a, b, "R1");
1324 db.create_edge(b, c, "R2");
1325
1326 let edges: Vec<_> = db.iter_edges().collect();
1327 assert_eq!(edges.len(), 2);
1328
1329 let types: Vec<_> = edges.iter().map(|e| e.edge_type.as_ref()).collect();
1330 assert!(types.contains(&"R1"));
1331 assert!(types.contains(&"R2"));
1332 }
1333
1334 fn make_snapshot(version: u8, nodes: Vec<SnapshotNode>, edges: Vec<SnapshotEdge>) -> Vec<u8> {
1337 let snap = Snapshot {
1338 version,
1339 nodes,
1340 edges,
1341 named_graphs: vec![],
1342 rdf_triples: vec![],
1343 rdf_named_graphs: vec![],
1344 schema: SnapshotSchema::default(),
1345 indexes: SnapshotIndexes::default(),
1346 epoch: 0,
1347 };
1348 bincode::serde::encode_to_vec(&snap, bincode::config::standard()).unwrap()
1349 }
1350
1351 #[test]
1352 fn test_restore_rejects_unsupported_version() {
1353 let db = GrafeoDB::new_in_memory();
1354 let session = db.session();
1355 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1356
1357 let bytes = make_snapshot(99, vec![], vec![]);
1358
1359 let result = db.restore_snapshot(&bytes);
1360 assert!(result.is_err());
1361 let err = result.unwrap_err().to_string();
1362 assert!(err.contains("unsupported snapshot version"), "got: {err}");
1363
1364 assert_eq!(db.store.node_count(), 1);
1366 }
1367
1368 #[test]
1369 fn test_restore_rejects_duplicate_node_ids() {
1370 let db = GrafeoDB::new_in_memory();
1371 let session = db.session();
1372 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1373
1374 let bytes = make_snapshot(
1375 SNAPSHOT_VERSION,
1376 vec![
1377 SnapshotNode {
1378 id: NodeId::new(0),
1379 labels: vec!["A".into()],
1380 properties: vec![],
1381 },
1382 SnapshotNode {
1383 id: NodeId::new(0),
1384 labels: vec!["B".into()],
1385 properties: vec![],
1386 },
1387 ],
1388 vec![],
1389 );
1390
1391 let result = db.restore_snapshot(&bytes);
1392 assert!(result.is_err());
1393 let err = result.unwrap_err().to_string();
1394 assert!(err.contains("duplicate node ID"), "got: {err}");
1395 assert_eq!(db.store.node_count(), 1);
1396 }
1397
1398 #[test]
1399 fn test_restore_rejects_duplicate_edge_ids() {
1400 let db = GrafeoDB::new_in_memory();
1401
1402 let bytes = make_snapshot(
1403 SNAPSHOT_VERSION,
1404 vec![
1405 SnapshotNode {
1406 id: NodeId::new(0),
1407 labels: vec![],
1408 properties: vec![],
1409 },
1410 SnapshotNode {
1411 id: NodeId::new(1),
1412 labels: vec![],
1413 properties: vec![],
1414 },
1415 ],
1416 vec![
1417 SnapshotEdge {
1418 id: EdgeId::new(0),
1419 src: NodeId::new(0),
1420 dst: NodeId::new(1),
1421 edge_type: "REL".into(),
1422 properties: vec![],
1423 },
1424 SnapshotEdge {
1425 id: EdgeId::new(0),
1426 src: NodeId::new(0),
1427 dst: NodeId::new(1),
1428 edge_type: "REL".into(),
1429 properties: vec![],
1430 },
1431 ],
1432 );
1433
1434 let result = db.restore_snapshot(&bytes);
1435 assert!(result.is_err());
1436 let err = result.unwrap_err().to_string();
1437 assert!(err.contains("duplicate edge ID"), "got: {err}");
1438 }
1439
1440 #[test]
1441 fn test_restore_rejects_dangling_source() {
1442 let db = GrafeoDB::new_in_memory();
1443
1444 let bytes = make_snapshot(
1445 SNAPSHOT_VERSION,
1446 vec![SnapshotNode {
1447 id: NodeId::new(0),
1448 labels: vec![],
1449 properties: vec![],
1450 }],
1451 vec![SnapshotEdge {
1452 id: EdgeId::new(0),
1453 src: NodeId::new(999),
1454 dst: NodeId::new(0),
1455 edge_type: "REL".into(),
1456 properties: vec![],
1457 }],
1458 );
1459
1460 let result = db.restore_snapshot(&bytes);
1461 assert!(result.is_err());
1462 let err = result.unwrap_err().to_string();
1463 assert!(err.contains("non-existent source node"), "got: {err}");
1464 }
1465
1466 #[test]
1467 fn test_restore_rejects_dangling_destination() {
1468 let db = GrafeoDB::new_in_memory();
1469
1470 let bytes = make_snapshot(
1471 SNAPSHOT_VERSION,
1472 vec![SnapshotNode {
1473 id: NodeId::new(0),
1474 labels: vec![],
1475 properties: vec![],
1476 }],
1477 vec![SnapshotEdge {
1478 id: EdgeId::new(0),
1479 src: NodeId::new(0),
1480 dst: NodeId::new(999),
1481 edge_type: "REL".into(),
1482 properties: vec![],
1483 }],
1484 );
1485
1486 let result = db.restore_snapshot(&bytes);
1487 assert!(result.is_err());
1488 let err = result.unwrap_err().to_string();
1489 assert!(err.contains("non-existent destination node"), "got: {err}");
1490 }
1491
1492 #[test]
1495 fn test_snapshot_roundtrip_property_index() {
1496 let db = GrafeoDB::new_in_memory();
1497 let session = db.session();
1498
1499 session
1500 .execute("INSERT (:Person {name: 'Alix', email: 'alix@example.com'})")
1501 .unwrap();
1502 db.create_property_index("email");
1503 assert!(db.has_property_index("email"));
1504
1505 let snapshot = db.export_snapshot().unwrap();
1506 let db2 = GrafeoDB::import_snapshot(&snapshot).unwrap();
1507
1508 assert!(db2.has_property_index("email"));
1509
1510 let found = db2.find_nodes_by_property("email", &Value::String("alix@example.com".into()));
1512 assert_eq!(found.len(), 1);
1513 }
1514
1515 #[cfg(feature = "vector-index")]
1516 #[test]
1517 fn test_snapshot_roundtrip_vector_index() {
1518 use std::sync::Arc;
1519
1520 let db = GrafeoDB::new_in_memory();
1521
1522 let n1 = db.create_node(&["Doc"]);
1523 db.set_node_property(
1524 n1,
1525 "embedding",
1526 Value::Vector(Arc::from([1.0_f32, 0.0, 0.0])),
1527 );
1528 let n2 = db.create_node(&["Doc"]);
1529 db.set_node_property(
1530 n2,
1531 "embedding",
1532 Value::Vector(Arc::from([0.0_f32, 1.0, 0.0])),
1533 );
1534
1535 db.create_vector_index("Doc", "embedding", None, Some("cosine"), Some(4), Some(32))
1536 .unwrap();
1537
1538 let snapshot = db.export_snapshot().unwrap();
1539 let db2 = GrafeoDB::import_snapshot(&snapshot).unwrap();
1540
1541 let results = db2
1543 .vector_search("Doc", "embedding", &[1.0, 0.0, 0.0], 2, None, None)
1544 .unwrap();
1545 assert_eq!(results.len(), 2);
1546 assert_eq!(results[0].0, n1);
1548 }
1549
1550 #[cfg(feature = "text-index")]
1551 #[test]
1552 fn test_snapshot_roundtrip_text_index() {
1553 let db = GrafeoDB::new_in_memory();
1554
1555 let n1 = db.create_node(&["Article"]);
1556 db.set_node_property(n1, "body", Value::String("rust graph database".into()));
1557 let n2 = db.create_node(&["Article"]);
1558 db.set_node_property(n2, "body", Value::String("python web framework".into()));
1559
1560 db.create_text_index("Article", "body").unwrap();
1561
1562 let snapshot = db.export_snapshot().unwrap();
1563 let db2 = GrafeoDB::import_snapshot(&snapshot).unwrap();
1564
1565 let results = db2
1567 .text_search("Article", "body", "graph database", 10)
1568 .unwrap();
1569 assert_eq!(results.len(), 1);
1570 assert_eq!(results[0].0, n1);
1571 }
1572
1573 #[test]
1574 fn test_snapshot_roundtrip_property_index_via_restore() {
1575 let db = GrafeoDB::new_in_memory();
1576 let session = db.session();
1577
1578 session
1579 .execute("INSERT (:Person {name: 'Alix', email: 'alix@example.com'})")
1580 .unwrap();
1581 db.create_property_index("email");
1582
1583 let snapshot = db.export_snapshot().unwrap();
1584
1585 session
1587 .execute("INSERT (:Person {name: 'Gus', email: 'gus@example.com'})")
1588 .unwrap();
1589 db.drop_property_index("email");
1590 assert!(!db.has_property_index("email"));
1591
1592 db.restore_snapshot(&snapshot).unwrap();
1594 assert!(db.has_property_index("email"));
1595 }
1596}