Skip to main content

grafeo_engine/database/
persistence.rs

1//! Persistence, snapshots, and data export for GrafeoDB.
2
3#[cfg(feature = "wal")]
4use std::path::Path;
5
6use grafeo_common::types::{EdgeId, NodeId, Value};
7use grafeo_common::utils::error::{Error, Result};
8use hashbrown::HashSet;
9
10use crate::config::Config;
11
12#[cfg(feature = "wal")]
13use grafeo_adapters::storage::wal::WalRecord;
14
15use crate::catalog::{
16    EdgeTypeDefinition, GraphTypeDefinition, NodeTypeDefinition, ProcedureDefinition,
17};
18
19/// Current snapshot version.
20const SNAPSHOT_VERSION: u8 = 3;
21
22/// Binary snapshot format (v3: graph data, named graphs, RDF, and schema).
23#[derive(serde::Serialize, serde::Deserialize)]
24struct Snapshot {
25    version: u8,
26    nodes: Vec<SnapshotNode>,
27    edges: Vec<SnapshotEdge>,
28    named_graphs: Vec<NamedGraphSnapshot>,
29    rdf_triples: Vec<SnapshotTriple>,
30    rdf_named_graphs: Vec<RdfNamedGraphSnapshot>,
31    schema: SnapshotSchema,
32}
33
34/// Schema metadata within a snapshot.
35#[derive(serde::Serialize, serde::Deserialize, Default)]
36struct SnapshotSchema {
37    node_types: Vec<NodeTypeDefinition>,
38    edge_types: Vec<EdgeTypeDefinition>,
39    graph_types: Vec<GraphTypeDefinition>,
40    procedures: Vec<ProcedureDefinition>,
41    schemas: Vec<String>,
42    graph_type_bindings: Vec<(String, String)>,
43}
44
45/// A named graph partition within a v2 snapshot.
46#[derive(serde::Serialize, serde::Deserialize)]
47struct NamedGraphSnapshot {
48    name: String,
49    nodes: Vec<SnapshotNode>,
50    edges: Vec<SnapshotEdge>,
51}
52
53/// An RDF triple in snapshot format (N-Triples encoded terms).
54#[derive(serde::Serialize, serde::Deserialize)]
55struct SnapshotTriple {
56    subject: String,
57    predicate: String,
58    object: String,
59}
60
61/// An RDF named graph in snapshot format.
62#[derive(serde::Serialize, serde::Deserialize)]
63struct RdfNamedGraphSnapshot {
64    name: String,
65    triples: Vec<SnapshotTriple>,
66}
67
68#[derive(serde::Serialize, serde::Deserialize)]
69struct SnapshotNode {
70    id: NodeId,
71    labels: Vec<String>,
72    properties: Vec<(String, Value)>,
73}
74
75#[derive(serde::Serialize, serde::Deserialize)]
76struct SnapshotEdge {
77    id: EdgeId,
78    src: NodeId,
79    dst: NodeId,
80    edge_type: String,
81    properties: Vec<(String, Value)>,
82}
83
84/// Collects all nodes from a store into snapshot format.
85fn collect_snapshot_nodes(store: &grafeo_core::graph::lpg::LpgStore) -> Vec<SnapshotNode> {
86    store
87        .all_nodes()
88        .map(|n| SnapshotNode {
89            id: n.id,
90            labels: n.labels.iter().map(|l| l.to_string()).collect(),
91            properties: n
92                .properties
93                .into_iter()
94                .map(|(k, v)| (k.to_string(), v))
95                .collect(),
96        })
97        .collect()
98}
99
100/// Collects all edges from a store into snapshot format.
101fn collect_snapshot_edges(store: &grafeo_core::graph::lpg::LpgStore) -> Vec<SnapshotEdge> {
102    store
103        .all_edges()
104        .map(|e| SnapshotEdge {
105            id: e.id,
106            src: e.src,
107            dst: e.dst,
108            edge_type: e.edge_type.to_string(),
109            properties: e
110                .properties
111                .into_iter()
112                .map(|(k, v)| (k.to_string(), v))
113                .collect(),
114        })
115        .collect()
116}
117
118/// Populates a store from snapshot node/edge data.
119fn populate_store_from_snapshot(
120    store: &grafeo_core::graph::lpg::LpgStore,
121    nodes: Vec<SnapshotNode>,
122    edges: Vec<SnapshotEdge>,
123) -> Result<()> {
124    for node in nodes {
125        let label_refs: Vec<&str> = node.labels.iter().map(|s| s.as_str()).collect();
126        store.create_node_with_id(node.id, &label_refs)?;
127        for (key, value) in node.properties {
128            store.set_node_property(node.id, &key, value);
129        }
130    }
131    for edge in edges {
132        store.create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type)?;
133        for (key, value) in edge.properties {
134            store.set_edge_property(edge.id, &key, value);
135        }
136    }
137    Ok(())
138}
139
140/// Validates snapshot nodes/edges for duplicates and dangling references.
141fn validate_snapshot_data(nodes: &[SnapshotNode], edges: &[SnapshotEdge]) -> Result<()> {
142    let mut node_ids = HashSet::with_capacity(nodes.len());
143    for node in nodes {
144        if !node_ids.insert(node.id) {
145            return Err(Error::Internal(format!(
146                "snapshot contains duplicate node ID {}",
147                node.id
148            )));
149        }
150    }
151    let mut edge_ids = HashSet::with_capacity(edges.len());
152    for edge in edges {
153        if !edge_ids.insert(edge.id) {
154            return Err(Error::Internal(format!(
155                "snapshot contains duplicate edge ID {}",
156                edge.id
157            )));
158        }
159        if !node_ids.contains(&edge.src) {
160            return Err(Error::Internal(format!(
161                "snapshot edge {} references non-existent source node {}",
162                edge.id, edge.src
163            )));
164        }
165        if !node_ids.contains(&edge.dst) {
166            return Err(Error::Internal(format!(
167                "snapshot edge {} references non-existent destination node {}",
168                edge.id, edge.dst
169            )));
170        }
171    }
172    Ok(())
173}
174
175/// Collects all triples from an RDF store into snapshot format.
176#[cfg(feature = "rdf")]
177fn collect_rdf_triples(store: &grafeo_core::graph::rdf::RdfStore) -> Vec<SnapshotTriple> {
178    store
179        .triples()
180        .into_iter()
181        .map(|t| SnapshotTriple {
182            subject: t.subject().to_string(),
183            predicate: t.predicate().to_string(),
184            object: t.object().to_string(),
185        })
186        .collect()
187}
188
189/// Populates an RDF store from snapshot triples.
190#[cfg(feature = "rdf")]
191fn populate_rdf_store(store: &grafeo_core::graph::rdf::RdfStore, triples: &[SnapshotTriple]) {
192    use grafeo_core::graph::rdf::{Term, Triple};
193    for triple in triples {
194        if let (Some(s), Some(p), Some(o)) = (
195            Term::from_ntriples(&triple.subject),
196            Term::from_ntriples(&triple.predicate),
197            Term::from_ntriples(&triple.object),
198        ) {
199            store.insert(Triple::new(s, p, o));
200        }
201    }
202}
203
204// =========================================================================
205// Snapshot deserialization helpers (used by single-file format)
206// =========================================================================
207
208/// Decodes snapshot bytes and populates a store and catalog.
209#[cfg(feature = "grafeo-file")]
210pub(super) fn load_snapshot_into_store(
211    store: &std::sync::Arc<grafeo_core::graph::lpg::LpgStore>,
212    catalog: &std::sync::Arc<crate::catalog::Catalog>,
213    #[cfg(feature = "rdf")] rdf_store: &std::sync::Arc<grafeo_core::graph::rdf::RdfStore>,
214    data: &[u8],
215) -> grafeo_common::utils::error::Result<()> {
216    use grafeo_common::utils::error::Error;
217
218    let config = bincode::config::standard();
219    let (snapshot, _) =
220        bincode::serde::decode_from_slice::<Snapshot, _>(data, config).map_err(|e| {
221            Error::Serialization(format!("failed to decode snapshot from .grafeo file: {e}"))
222        })?;
223
224    populate_store_from_snapshot_ref(store, &snapshot.nodes, &snapshot.edges)?;
225    for graph in &snapshot.named_graphs {
226        store
227            .create_graph(&graph.name)
228            .map_err(|e| Error::Internal(e.to_string()))?;
229        if let Some(graph_store) = store.graph(&graph.name) {
230            populate_store_from_snapshot_ref(&graph_store, &graph.nodes, &graph.edges)?;
231        }
232    }
233    restore_schema_from_snapshot(catalog, &snapshot.schema);
234
235    // Restore RDF triples
236    #[cfg(feature = "rdf")]
237    {
238        populate_rdf_store(rdf_store, &snapshot.rdf_triples);
239        for rdf_graph in &snapshot.rdf_named_graphs {
240            rdf_store.create_graph(&rdf_graph.name);
241            if let Some(graph_store) = rdf_store.graph(&rdf_graph.name) {
242                populate_rdf_store(&graph_store, &rdf_graph.triples);
243            }
244        }
245    }
246
247    Ok(())
248}
249
250/// Populates a store from snapshot refs (borrowed, for single-file loading).
251#[cfg(feature = "grafeo-file")]
252fn populate_store_from_snapshot_ref(
253    store: &grafeo_core::graph::lpg::LpgStore,
254    nodes: &[SnapshotNode],
255    edges: &[SnapshotEdge],
256) -> grafeo_common::utils::error::Result<()> {
257    for node in nodes {
258        let label_refs: Vec<&str> = node.labels.iter().map(|s| s.as_str()).collect();
259        store.create_node_with_id(node.id, &label_refs)?;
260        for (key, value) in &node.properties {
261            store.set_node_property(node.id, key, value.clone());
262        }
263    }
264    for edge in edges {
265        store.create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type)?;
266        for (key, value) in &edge.properties {
267            store.set_edge_property(edge.id, key, value.clone());
268        }
269    }
270    Ok(())
271}
272
273/// Restores schema definitions from a snapshot into the catalog.
274fn restore_schema_from_snapshot(
275    catalog: &std::sync::Arc<crate::catalog::Catalog>,
276    schema: &SnapshotSchema,
277) {
278    for def in &schema.node_types {
279        catalog.register_or_replace_node_type(def.clone());
280    }
281    for def in &schema.edge_types {
282        catalog.register_or_replace_edge_type_def(def.clone());
283    }
284    for def in &schema.graph_types {
285        let _ = catalog.register_graph_type(def.clone());
286    }
287    for def in &schema.procedures {
288        catalog.replace_procedure(def.clone()).ok();
289    }
290    for name in &schema.schemas {
291        let _ = catalog.register_schema_namespace(name.clone());
292    }
293    for (graph_name, type_name) in &schema.graph_type_bindings {
294        let _ = catalog.bind_graph_type(graph_name, type_name.clone());
295    }
296}
297
298/// Collects schema definitions from the catalog into snapshot format.
299fn collect_schema(catalog: &std::sync::Arc<crate::catalog::Catalog>) -> SnapshotSchema {
300    SnapshotSchema {
301        node_types: catalog.all_node_type_defs(),
302        edge_types: catalog.all_edge_type_defs(),
303        graph_types: catalog.all_graph_type_defs(),
304        procedures: catalog.all_procedure_defs(),
305        schemas: catalog.schema_names(),
306        graph_type_bindings: catalog.all_graph_type_bindings(),
307    }
308}
309
310impl super::GrafeoDB {
311    // =========================================================================
312    // ADMIN API: Persistence Control
313    // =========================================================================
314
315    /// Saves the database to a file path.
316    ///
317    /// - If the path ends in `.grafeo`: creates a single-file database
318    /// - Otherwise: creates a WAL directory-backed database at the path
319    /// - If in-memory: creates a new persistent database at path
320    /// - If file-backed: creates a copy at the new path
321    ///
322    /// The original database remains unchanged.
323    ///
324    /// # Errors
325    ///
326    /// Returns an error if the save operation fails.
327    ///
328    /// Requires the `wal` feature for persistence support.
329    #[cfg(feature = "wal")]
330    pub fn save(&self, path: impl AsRef<Path>) -> Result<()> {
331        let path = path.as_ref();
332
333        // Single-file format: export snapshot directly to a .grafeo file
334        #[cfg(feature = "grafeo-file")]
335        if path.extension().is_some_and(|ext| ext == "grafeo") {
336            return self.save_as_grafeo_file(path);
337        }
338
339        // Create target database with WAL enabled
340        let target_config = Config::persistent(path);
341        let target = Self::with_config(target_config)?;
342
343        // Copy all nodes using WAL-enabled methods
344        for node in self.store.all_nodes() {
345            let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
346            target.store.create_node_with_id(node.id, &label_refs)?;
347
348            // Log to WAL
349            target.log_wal(&WalRecord::CreateNode {
350                id: node.id,
351                labels: node.labels.iter().map(|s| s.to_string()).collect(),
352            })?;
353
354            // Copy properties
355            for (key, value) in node.properties {
356                target
357                    .store
358                    .set_node_property(node.id, key.as_str(), value.clone());
359                target.log_wal(&WalRecord::SetNodeProperty {
360                    id: node.id,
361                    key: key.to_string(),
362                    value,
363                })?;
364            }
365        }
366
367        // Copy all edges using WAL-enabled methods
368        for edge in self.store.all_edges() {
369            target
370                .store
371                .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type)?;
372
373            // Log to WAL
374            target.log_wal(&WalRecord::CreateEdge {
375                id: edge.id,
376                src: edge.src,
377                dst: edge.dst,
378                edge_type: edge.edge_type.to_string(),
379            })?;
380
381            // Copy properties
382            for (key, value) in edge.properties {
383                target
384                    .store
385                    .set_edge_property(edge.id, key.as_str(), value.clone());
386                target.log_wal(&WalRecord::SetEdgeProperty {
387                    id: edge.id,
388                    key: key.to_string(),
389                    value,
390                })?;
391            }
392        }
393
394        // Copy named graphs
395        for graph_name in self.store.graph_names() {
396            if let Some(src_graph) = self.store.graph(&graph_name) {
397                target.log_wal(&WalRecord::CreateNamedGraph {
398                    name: graph_name.clone(),
399                })?;
400                target
401                    .store
402                    .create_graph(&graph_name)
403                    .map_err(|e| Error::Internal(e.to_string()))?;
404
405                if let Some(dst_graph) = target.store.graph(&graph_name) {
406                    // Switch WAL context to this named graph
407                    target.log_wal(&WalRecord::SwitchGraph {
408                        name: Some(graph_name.clone()),
409                    })?;
410
411                    for node in src_graph.all_nodes() {
412                        let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
413                        dst_graph.create_node_with_id(node.id, &label_refs)?;
414                        target.log_wal(&WalRecord::CreateNode {
415                            id: node.id,
416                            labels: node.labels.iter().map(|s| s.to_string()).collect(),
417                        })?;
418                        for (key, value) in node.properties {
419                            dst_graph.set_node_property(node.id, key.as_str(), value.clone());
420                            target.log_wal(&WalRecord::SetNodeProperty {
421                                id: node.id,
422                                key: key.to_string(),
423                                value,
424                            })?;
425                        }
426                    }
427                    for edge in src_graph.all_edges() {
428                        dst_graph.create_edge_with_id(
429                            edge.id,
430                            edge.src,
431                            edge.dst,
432                            &edge.edge_type,
433                        )?;
434                        target.log_wal(&WalRecord::CreateEdge {
435                            id: edge.id,
436                            src: edge.src,
437                            dst: edge.dst,
438                            edge_type: edge.edge_type.to_string(),
439                        })?;
440                        for (key, value) in edge.properties {
441                            dst_graph.set_edge_property(edge.id, key.as_str(), value.clone());
442                            target.log_wal(&WalRecord::SetEdgeProperty {
443                                id: edge.id,
444                                key: key.to_string(),
445                                value,
446                            })?;
447                        }
448                    }
449                }
450            }
451        }
452
453        // Switch WAL context back to default graph
454        if !self.store.graph_names().is_empty() {
455            target.log_wal(&WalRecord::SwitchGraph { name: None })?;
456        }
457
458        // Copy RDF data with WAL logging
459        #[cfg(feature = "rdf")]
460        {
461            for triple in self.rdf_store.triples() {
462                let record = WalRecord::InsertRdfTriple {
463                    subject: triple.subject().to_string(),
464                    predicate: triple.predicate().to_string(),
465                    object: triple.object().to_string(),
466                    graph: None,
467                };
468                target.rdf_store.insert((*triple).clone());
469                target.log_wal(&record)?;
470            }
471            for name in self.rdf_store.graph_names() {
472                target.log_wal(&WalRecord::CreateRdfGraph { name: name.clone() })?;
473                if let Some(src_graph) = self.rdf_store.graph(&name) {
474                    let dst_graph = target.rdf_store.graph_or_create(&name);
475                    for triple in src_graph.triples() {
476                        let record = WalRecord::InsertRdfTriple {
477                            subject: triple.subject().to_string(),
478                            predicate: triple.predicate().to_string(),
479                            object: triple.object().to_string(),
480                            graph: Some(name.clone()),
481                        };
482                        dst_graph.insert((*triple).clone());
483                        target.log_wal(&record)?;
484                    }
485                }
486            }
487        }
488
489        // Checkpoint and close the target database
490        target.close()?;
491
492        Ok(())
493    }
494
495    /// Creates an in-memory copy of this database.
496    ///
497    /// Returns a new database that is completely independent, including
498    /// all named graph data.
499    /// Useful for:
500    /// Saves the database to a single `.grafeo` file.
501    #[cfg(feature = "grafeo-file")]
502    fn save_as_grafeo_file(&self, path: &Path) -> Result<()> {
503        use grafeo_adapters::storage::file::GrafeoFileManager;
504
505        let snapshot_data = self.export_snapshot()?;
506        let epoch = self.store.current_epoch();
507        let transaction_id = self
508            .transaction_manager
509            .last_assigned_transaction_id()
510            .map_or(0, |t| t.0);
511        let node_count = self.store.node_count() as u64;
512        let edge_count = self.store.edge_count() as u64;
513
514        let fm = GrafeoFileManager::create(path)?;
515        fm.write_snapshot(
516            &snapshot_data,
517            epoch.0,
518            transaction_id,
519            node_count,
520            edge_count,
521        )?;
522        Ok(())
523    }
524
525    /// - Testing modifications without affecting the original
526    /// - Faster operations when persistence isn't needed
527    ///
528    /// # Errors
529    ///
530    /// Returns an error if the copy operation fails.
531    pub fn to_memory(&self) -> Result<Self> {
532        let config = Config::in_memory();
533        let target = Self::with_config(config)?;
534
535        // Copy default graph nodes
536        for node in self.store.all_nodes() {
537            let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
538            target.store.create_node_with_id(node.id, &label_refs)?;
539            for (key, value) in node.properties {
540                target.store.set_node_property(node.id, key.as_str(), value);
541            }
542        }
543
544        // Copy default graph edges
545        for edge in self.store.all_edges() {
546            target
547                .store
548                .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type)?;
549            for (key, value) in edge.properties {
550                target.store.set_edge_property(edge.id, key.as_str(), value);
551            }
552        }
553
554        // Copy named graphs
555        for graph_name in self.store.graph_names() {
556            if let Some(src_graph) = self.store.graph(&graph_name) {
557                target
558                    .store
559                    .create_graph(&graph_name)
560                    .map_err(|e| Error::Internal(e.to_string()))?;
561                if let Some(dst_graph) = target.store.graph(&graph_name) {
562                    for node in src_graph.all_nodes() {
563                        let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
564                        dst_graph.create_node_with_id(node.id, &label_refs)?;
565                        for (key, value) in node.properties {
566                            dst_graph.set_node_property(node.id, key.as_str(), value);
567                        }
568                    }
569                    for edge in src_graph.all_edges() {
570                        dst_graph.create_edge_with_id(
571                            edge.id,
572                            edge.src,
573                            edge.dst,
574                            &edge.edge_type,
575                        )?;
576                        for (key, value) in edge.properties {
577                            dst_graph.set_edge_property(edge.id, key.as_str(), value);
578                        }
579                    }
580                }
581            }
582        }
583
584        // Copy RDF data
585        #[cfg(feature = "rdf")]
586        {
587            for triple in self.rdf_store.triples() {
588                target.rdf_store.insert((*triple).clone());
589            }
590            for name in self.rdf_store.graph_names() {
591                if let Some(src_graph) = self.rdf_store.graph(&name) {
592                    let dst_graph = target.rdf_store.graph_or_create(&name);
593                    for triple in src_graph.triples() {
594                        dst_graph.insert((*triple).clone());
595                    }
596                }
597            }
598        }
599
600        Ok(target)
601    }
602
603    /// Opens a database file and loads it entirely into memory.
604    ///
605    /// The returned database has no connection to the original file.
606    /// Changes will NOT be written back to the file.
607    ///
608    /// # Errors
609    ///
610    /// Returns an error if the file can't be opened or loaded.
611    #[cfg(feature = "wal")]
612    pub fn open_in_memory(path: impl AsRef<Path>) -> Result<Self> {
613        // Open the source database (triggers WAL recovery)
614        let source = Self::open(path)?;
615
616        // Create in-memory copy
617        let target = source.to_memory()?;
618
619        // Close the source (releases file handles)
620        source.close()?;
621
622        Ok(target)
623    }
624
625    // =========================================================================
626    // ADMIN API: Snapshot Export/Import
627    // =========================================================================
628
629    /// Exports the entire database to a binary snapshot (v2 format).
630    ///
631    /// The returned bytes can be stored (e.g. in IndexedDB) and later
632    /// restored with [`import_snapshot()`](Self::import_snapshot).
633    /// Includes all named graph data.
634    ///
635    /// # Errors
636    ///
637    /// Returns an error if serialization fails.
638    pub fn export_snapshot(&self) -> Result<Vec<u8>> {
639        let nodes = collect_snapshot_nodes(&self.store);
640        let edges = collect_snapshot_edges(&self.store);
641
642        // Collect named graphs
643        let named_graphs: Vec<NamedGraphSnapshot> = self
644            .store
645            .graph_names()
646            .into_iter()
647            .filter_map(|name| {
648                self.store
649                    .graph(&name)
650                    .map(|graph_store| NamedGraphSnapshot {
651                        name,
652                        nodes: collect_snapshot_nodes(&graph_store),
653                        edges: collect_snapshot_edges(&graph_store),
654                    })
655            })
656            .collect();
657
658        // Collect RDF triples
659        #[cfg(feature = "rdf")]
660        let rdf_triples = collect_rdf_triples(&self.rdf_store);
661        #[cfg(not(feature = "rdf"))]
662        let rdf_triples = Vec::new();
663
664        #[cfg(feature = "rdf")]
665        let rdf_named_graphs: Vec<RdfNamedGraphSnapshot> = self
666            .rdf_store
667            .graph_names()
668            .into_iter()
669            .filter_map(|name| {
670                self.rdf_store
671                    .graph(&name)
672                    .map(|graph| RdfNamedGraphSnapshot {
673                        name,
674                        triples: collect_rdf_triples(&graph),
675                    })
676            })
677            .collect();
678        #[cfg(not(feature = "rdf"))]
679        let rdf_named_graphs = Vec::new();
680
681        let schema = collect_schema(&self.catalog);
682
683        let snapshot = Snapshot {
684            version: SNAPSHOT_VERSION,
685            nodes,
686            edges,
687            named_graphs,
688            rdf_triples,
689            rdf_named_graphs,
690            schema,
691        };
692
693        let config = bincode::config::standard();
694        bincode::serde::encode_to_vec(&snapshot, config)
695            .map_err(|e| Error::Internal(format!("snapshot export failed: {e}")))
696    }
697
698    /// Creates a new in-memory database from a binary snapshot.
699    ///
700    /// Accepts both v1 (no named graphs) and v2 (with named graphs) formats.
701    /// The `data` must have been produced by [`export_snapshot()`](Self::export_snapshot).
702    ///
703    /// All edge references are validated before any data is inserted: every
704    /// edge's source and destination must reference a node present in the
705    /// snapshot, and duplicate node/edge IDs are rejected. If validation
706    /// fails, no database is created.
707    ///
708    /// # Errors
709    ///
710    /// Returns an error if the snapshot is invalid, contains dangling edge
711    /// references, has duplicate IDs, or deserialization fails.
712    pub fn import_snapshot(data: &[u8]) -> Result<Self> {
713        if data.is_empty() {
714            return Err(Error::Internal("empty snapshot data".to_string()));
715        }
716
717        // Peek at version byte (bincode standard encodes u8 as raw byte)
718        if data[0] != SNAPSHOT_VERSION {
719            return Err(Error::Internal(format!(
720                "unsupported snapshot version: {} (expected {SNAPSHOT_VERSION})",
721                data[0]
722            )));
723        }
724
725        let config = bincode::config::standard();
726        let (snapshot, _): (Snapshot, _) = bincode::serde::decode_from_slice(data, config)
727            .map_err(|e| Error::Internal(format!("snapshot import failed: {e}")))?;
728
729        // Validate default graph data
730        validate_snapshot_data(&snapshot.nodes, &snapshot.edges)?;
731
732        // Validate each named graph
733        for ng in &snapshot.named_graphs {
734            validate_snapshot_data(&ng.nodes, &ng.edges)?;
735        }
736
737        let db = Self::new_in_memory();
738        populate_store_from_snapshot(&db.store, snapshot.nodes, snapshot.edges)?;
739
740        // Restore named graphs
741        for ng in snapshot.named_graphs {
742            db.store
743                .create_graph(&ng.name)
744                .map_err(|e| Error::Internal(e.to_string()))?;
745            if let Some(graph_store) = db.store.graph(&ng.name) {
746                populate_store_from_snapshot(&graph_store, ng.nodes, ng.edges)?;
747            }
748        }
749
750        // Restore RDF triples
751        #[cfg(feature = "rdf")]
752        {
753            populate_rdf_store(&db.rdf_store, &snapshot.rdf_triples);
754            for rng in &snapshot.rdf_named_graphs {
755                let graph = db.rdf_store.graph_or_create(&rng.name);
756                populate_rdf_store(&graph, &rng.triples);
757            }
758        }
759
760        // Restore schema
761        restore_schema_from_snapshot(&db.catalog, &snapshot.schema);
762
763        Ok(db)
764    }
765
766    /// Replaces the current database contents with data from a binary snapshot.
767    ///
768    /// Accepts both v1 and v2 snapshot formats. The `data` must have been
769    /// produced by [`export_snapshot()`](Self::export_snapshot).
770    ///
771    /// All validation (duplicate IDs, dangling edge references) is performed
772    /// before any data is modified. If validation fails, the current database
773    /// is left unchanged. If validation passes, the store is cleared and
774    /// rebuilt from the snapshot atomically (from the perspective of
775    /// subsequent queries).
776    ///
777    /// # Errors
778    ///
779    /// Returns an error if the snapshot is invalid, contains dangling edge
780    /// references, has duplicate IDs, or deserialization fails.
781    pub fn restore_snapshot(&self, data: &[u8]) -> Result<()> {
782        if data.is_empty() {
783            return Err(Error::Internal("empty snapshot data".to_string()));
784        }
785
786        if data[0] != SNAPSHOT_VERSION {
787            return Err(Error::Internal(format!(
788                "unsupported snapshot version: {} (expected {SNAPSHOT_VERSION})",
789                data[0]
790            )));
791        }
792
793        let config = bincode::config::standard();
794        let (snapshot, _): (Snapshot, _) = bincode::serde::decode_from_slice(data, config)
795            .map_err(|e| Error::Internal(format!("snapshot restore failed: {e}")))?;
796
797        // Validate all data before making any changes
798        validate_snapshot_data(&snapshot.nodes, &snapshot.edges)?;
799        for ng in &snapshot.named_graphs {
800            validate_snapshot_data(&ng.nodes, &ng.edges)?;
801        }
802
803        // Drop all existing named graphs, then clear default store
804        for name in self.store.graph_names() {
805            self.store.drop_graph(&name);
806        }
807        self.store.clear();
808
809        populate_store_from_snapshot(&self.store, snapshot.nodes, snapshot.edges)?;
810
811        // Restore named graphs
812        for ng in snapshot.named_graphs {
813            self.store
814                .create_graph(&ng.name)
815                .map_err(|e| Error::Internal(e.to_string()))?;
816            if let Some(graph_store) = self.store.graph(&ng.name) {
817                populate_store_from_snapshot(&graph_store, ng.nodes, ng.edges)?;
818            }
819        }
820
821        // Restore RDF data
822        #[cfg(feature = "rdf")]
823        {
824            // Clear existing RDF data
825            self.rdf_store.clear();
826            for name in self.rdf_store.graph_names() {
827                self.rdf_store.drop_graph(&name);
828            }
829            populate_rdf_store(&self.rdf_store, &snapshot.rdf_triples);
830            for rng in &snapshot.rdf_named_graphs {
831                let graph = self.rdf_store.graph_or_create(&rng.name);
832                populate_rdf_store(&graph, &rng.triples);
833            }
834        }
835
836        // Restore schema
837        restore_schema_from_snapshot(&self.catalog, &snapshot.schema);
838
839        Ok(())
840    }
841
842    // =========================================================================
843    // ADMIN API: Iteration
844    // =========================================================================
845
846    /// Returns an iterator over all nodes in the database.
847    ///
848    /// Useful for dump/export operations.
849    pub fn iter_nodes(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Node> + '_ {
850        self.store.all_nodes()
851    }
852
853    /// Returns an iterator over all edges in the database.
854    ///
855    /// Useful for dump/export operations.
856    pub fn iter_edges(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Edge> + '_ {
857        self.store.all_edges()
858    }
859}
860
861#[cfg(test)]
862mod tests {
863    use grafeo_common::types::{EdgeId, NodeId, Value};
864
865    use super::super::GrafeoDB;
866    use super::{SNAPSHOT_VERSION, Snapshot, SnapshotEdge, SnapshotNode, SnapshotSchema};
867
868    #[test]
869    fn test_restore_snapshot_basic() {
870        let db = GrafeoDB::new_in_memory();
871        let session = db.session();
872
873        // Populate
874        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
875        session.execute("INSERT (:Person {name: 'Gus'})").unwrap();
876
877        let snapshot = db.export_snapshot().unwrap();
878
879        // Modify
880        session
881            .execute("INSERT (:Person {name: 'Vincent'})")
882            .unwrap();
883        assert_eq!(db.store.node_count(), 3);
884
885        // Restore original
886        db.restore_snapshot(&snapshot).unwrap();
887
888        assert_eq!(db.store.node_count(), 2);
889        let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
890        assert_eq!(result.rows.len(), 2);
891    }
892
893    #[test]
894    fn test_restore_snapshot_validation_failure() {
895        let db = GrafeoDB::new_in_memory();
896        let session = db.session();
897
898        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
899
900        // Corrupt snapshot: just garbage bytes
901        let result = db.restore_snapshot(b"garbage");
902        assert!(result.is_err());
903
904        // DB should be unchanged
905        assert_eq!(db.store.node_count(), 1);
906    }
907
908    #[test]
909    fn test_restore_snapshot_empty_db() {
910        let db = GrafeoDB::new_in_memory();
911
912        // Export empty snapshot, then populate, then restore to empty
913        let empty_snapshot = db.export_snapshot().unwrap();
914
915        let session = db.session();
916        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
917        assert_eq!(db.store.node_count(), 1);
918
919        db.restore_snapshot(&empty_snapshot).unwrap();
920        assert_eq!(db.store.node_count(), 0);
921    }
922
923    #[test]
924    fn test_restore_snapshot_with_edges() {
925        let db = GrafeoDB::new_in_memory();
926        let session = db.session();
927
928        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
929        session.execute("INSERT (:Person {name: 'Gus'})").unwrap();
930        session
931            .execute(
932                "MATCH (a:Person {name: 'Alix'}), (b:Person {name: 'Gus'}) INSERT (a)-[:KNOWS]->(b)",
933            )
934            .unwrap();
935
936        let snapshot = db.export_snapshot().unwrap();
937        assert_eq!(db.store.edge_count(), 1);
938
939        // Modify: add more data
940        session
941            .execute("INSERT (:Person {name: 'Vincent'})")
942            .unwrap();
943
944        // Restore
945        db.restore_snapshot(&snapshot).unwrap();
946        assert_eq!(db.store.node_count(), 2);
947        assert_eq!(db.store.edge_count(), 1);
948    }
949
950    #[test]
951    fn test_restore_snapshot_preserves_sessions() {
952        let db = GrafeoDB::new_in_memory();
953        let session = db.session();
954
955        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
956        let snapshot = db.export_snapshot().unwrap();
957
958        // Modify
959        session.execute("INSERT (:Person {name: 'Gus'})").unwrap();
960
961        // Restore
962        db.restore_snapshot(&snapshot).unwrap();
963
964        // Session should still work and see restored data
965        let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
966        assert_eq!(result.rows.len(), 1);
967    }
968
969    #[test]
970    fn test_export_import_roundtrip() {
971        let db = GrafeoDB::new_in_memory();
972        let session = db.session();
973
974        session
975            .execute("INSERT (:Person {name: 'Alix', age: 30})")
976            .unwrap();
977
978        let snapshot = db.export_snapshot().unwrap();
979        let db2 = GrafeoDB::import_snapshot(&snapshot).unwrap();
980        let session2 = db2.session();
981
982        let result = session2.execute("MATCH (n:Person) RETURN n.name").unwrap();
983        assert_eq!(result.rows.len(), 1);
984    }
985
986    // --- to_memory() ---
987
988    #[test]
989    fn test_to_memory_empty() {
990        let db = GrafeoDB::new_in_memory();
991        let copy = db.to_memory().unwrap();
992        assert_eq!(copy.store.node_count(), 0);
993        assert_eq!(copy.store.edge_count(), 0);
994    }
995
996    #[test]
997    fn test_to_memory_copies_nodes_and_properties() {
998        let db = GrafeoDB::new_in_memory();
999        let session = db.session();
1000        session
1001            .execute("INSERT (:Person {name: 'Alix', age: 30})")
1002            .unwrap();
1003        session
1004            .execute("INSERT (:Person {name: 'Gus', age: 25})")
1005            .unwrap();
1006
1007        let copy = db.to_memory().unwrap();
1008        assert_eq!(copy.store.node_count(), 2);
1009
1010        let s2 = copy.session();
1011        let result = s2
1012            .execute("MATCH (p:Person) RETURN p.name ORDER BY p.name")
1013            .unwrap();
1014        assert_eq!(result.rows.len(), 2);
1015        assert_eq!(result.rows[0][0], Value::String("Alix".into()));
1016        assert_eq!(result.rows[1][0], Value::String("Gus".into()));
1017    }
1018
1019    #[test]
1020    fn test_to_memory_copies_edges_and_properties() {
1021        let db = GrafeoDB::new_in_memory();
1022        let a = db.create_node(&["Person"]);
1023        db.set_node_property(a, "name", "Alix".into());
1024        let b = db.create_node(&["Person"]);
1025        db.set_node_property(b, "name", "Gus".into());
1026        let edge = db.create_edge(a, b, "KNOWS");
1027        db.set_edge_property(edge, "since", Value::Int64(2020));
1028
1029        let copy = db.to_memory().unwrap();
1030        assert_eq!(copy.store.node_count(), 2);
1031        assert_eq!(copy.store.edge_count(), 1);
1032
1033        let s2 = copy.session();
1034        let result = s2.execute("MATCH ()-[e:KNOWS]->() RETURN e.since").unwrap();
1035        assert_eq!(result.rows[0][0], Value::Int64(2020));
1036    }
1037
1038    #[test]
1039    fn test_to_memory_is_independent() {
1040        let db = GrafeoDB::new_in_memory();
1041        let session = db.session();
1042        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1043
1044        let copy = db.to_memory().unwrap();
1045
1046        // Mutating original should not affect copy
1047        session.execute("INSERT (:Person {name: 'Gus'})").unwrap();
1048        assert_eq!(db.store.node_count(), 2);
1049        assert_eq!(copy.store.node_count(), 1);
1050    }
1051
1052    // --- iter_nodes() / iter_edges() ---
1053
1054    #[test]
1055    fn test_iter_nodes_empty() {
1056        let db = GrafeoDB::new_in_memory();
1057        assert_eq!(db.iter_nodes().count(), 0);
1058    }
1059
1060    #[test]
1061    fn test_iter_nodes_returns_all() {
1062        let db = GrafeoDB::new_in_memory();
1063        let id1 = db.create_node(&["Person"]);
1064        db.set_node_property(id1, "name", "Alix".into());
1065        let id2 = db.create_node(&["Animal"]);
1066        db.set_node_property(id2, "name", "Fido".into());
1067
1068        let nodes: Vec<_> = db.iter_nodes().collect();
1069        assert_eq!(nodes.len(), 2);
1070
1071        let names: Vec<_> = nodes
1072            .iter()
1073            .filter_map(|n| n.properties.iter().find(|(k, _)| k.as_str() == "name"))
1074            .map(|(_, v)| v.clone())
1075            .collect();
1076        assert!(names.contains(&Value::String("Alix".into())));
1077        assert!(names.contains(&Value::String("Fido".into())));
1078    }
1079
1080    #[test]
1081    fn test_iter_edges_empty() {
1082        let db = GrafeoDB::new_in_memory();
1083        assert_eq!(db.iter_edges().count(), 0);
1084    }
1085
1086    #[test]
1087    fn test_iter_edges_returns_all() {
1088        let db = GrafeoDB::new_in_memory();
1089        let a = db.create_node(&["A"]);
1090        let b = db.create_node(&["B"]);
1091        let c = db.create_node(&["C"]);
1092        db.create_edge(a, b, "R1");
1093        db.create_edge(b, c, "R2");
1094
1095        let edges: Vec<_> = db.iter_edges().collect();
1096        assert_eq!(edges.len(), 2);
1097
1098        let types: Vec<_> = edges.iter().map(|e| e.edge_type.as_ref()).collect();
1099        assert!(types.contains(&"R1"));
1100        assert!(types.contains(&"R2"));
1101    }
1102
1103    // --- restore_snapshot() validation ---
1104
1105    fn make_snapshot(version: u8, nodes: Vec<SnapshotNode>, edges: Vec<SnapshotEdge>) -> Vec<u8> {
1106        let snap = Snapshot {
1107            version,
1108            nodes,
1109            edges,
1110            named_graphs: vec![],
1111            rdf_triples: vec![],
1112            rdf_named_graphs: vec![],
1113            schema: SnapshotSchema::default(),
1114        };
1115        bincode::serde::encode_to_vec(&snap, bincode::config::standard()).unwrap()
1116    }
1117
1118    #[test]
1119    fn test_restore_rejects_unsupported_version() {
1120        let db = GrafeoDB::new_in_memory();
1121        let session = db.session();
1122        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1123
1124        let bytes = make_snapshot(99, vec![], vec![]);
1125
1126        let result = db.restore_snapshot(&bytes);
1127        assert!(result.is_err());
1128        let err = result.unwrap_err().to_string();
1129        assert!(err.contains("unsupported snapshot version"), "got: {err}");
1130
1131        // DB unchanged
1132        assert_eq!(db.store.node_count(), 1);
1133    }
1134
1135    #[test]
1136    fn test_restore_rejects_duplicate_node_ids() {
1137        let db = GrafeoDB::new_in_memory();
1138        let session = db.session();
1139        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1140
1141        let bytes = make_snapshot(
1142            SNAPSHOT_VERSION,
1143            vec![
1144                SnapshotNode {
1145                    id: NodeId::new(0),
1146                    labels: vec!["A".into()],
1147                    properties: vec![],
1148                },
1149                SnapshotNode {
1150                    id: NodeId::new(0),
1151                    labels: vec!["B".into()],
1152                    properties: vec![],
1153                },
1154            ],
1155            vec![],
1156        );
1157
1158        let result = db.restore_snapshot(&bytes);
1159        assert!(result.is_err());
1160        let err = result.unwrap_err().to_string();
1161        assert!(err.contains("duplicate node ID"), "got: {err}");
1162        assert_eq!(db.store.node_count(), 1);
1163    }
1164
1165    #[test]
1166    fn test_restore_rejects_duplicate_edge_ids() {
1167        let db = GrafeoDB::new_in_memory();
1168
1169        let bytes = make_snapshot(
1170            SNAPSHOT_VERSION,
1171            vec![
1172                SnapshotNode {
1173                    id: NodeId::new(0),
1174                    labels: vec![],
1175                    properties: vec![],
1176                },
1177                SnapshotNode {
1178                    id: NodeId::new(1),
1179                    labels: vec![],
1180                    properties: vec![],
1181                },
1182            ],
1183            vec![
1184                SnapshotEdge {
1185                    id: EdgeId::new(0),
1186                    src: NodeId::new(0),
1187                    dst: NodeId::new(1),
1188                    edge_type: "REL".into(),
1189                    properties: vec![],
1190                },
1191                SnapshotEdge {
1192                    id: EdgeId::new(0),
1193                    src: NodeId::new(0),
1194                    dst: NodeId::new(1),
1195                    edge_type: "REL".into(),
1196                    properties: vec![],
1197                },
1198            ],
1199        );
1200
1201        let result = db.restore_snapshot(&bytes);
1202        assert!(result.is_err());
1203        let err = result.unwrap_err().to_string();
1204        assert!(err.contains("duplicate edge ID"), "got: {err}");
1205    }
1206
1207    #[test]
1208    fn test_restore_rejects_dangling_source() {
1209        let db = GrafeoDB::new_in_memory();
1210
1211        let bytes = make_snapshot(
1212            SNAPSHOT_VERSION,
1213            vec![SnapshotNode {
1214                id: NodeId::new(0),
1215                labels: vec![],
1216                properties: vec![],
1217            }],
1218            vec![SnapshotEdge {
1219                id: EdgeId::new(0),
1220                src: NodeId::new(999),
1221                dst: NodeId::new(0),
1222                edge_type: "REL".into(),
1223                properties: vec![],
1224            }],
1225        );
1226
1227        let result = db.restore_snapshot(&bytes);
1228        assert!(result.is_err());
1229        let err = result.unwrap_err().to_string();
1230        assert!(err.contains("non-existent source node"), "got: {err}");
1231    }
1232
1233    #[test]
1234    fn test_restore_rejects_dangling_destination() {
1235        let db = GrafeoDB::new_in_memory();
1236
1237        let bytes = make_snapshot(
1238            SNAPSHOT_VERSION,
1239            vec![SnapshotNode {
1240                id: NodeId::new(0),
1241                labels: vec![],
1242                properties: vec![],
1243            }],
1244            vec![SnapshotEdge {
1245                id: EdgeId::new(0),
1246                src: NodeId::new(0),
1247                dst: NodeId::new(999),
1248                edge_type: "REL".into(),
1249                properties: vec![],
1250            }],
1251        );
1252
1253        let result = db.restore_snapshot(&bytes);
1254        assert!(result.is_err());
1255        let err = result.unwrap_err().to_string();
1256        assert!(err.contains("non-existent destination node"), "got: {err}");
1257    }
1258}