Skip to main content

grafeo_engine/database/
persistence.rs

1//! Persistence, snapshots, and data export for GrafeoDB.
2
3#[cfg(feature = "wal")]
4use std::path::Path;
5
6use grafeo_common::types::{EdgeId, NodeId, Value};
7use grafeo_common::utils::error::{Error, Result};
8use hashbrown::HashSet;
9
10use crate::config::Config;
11
12#[cfg(feature = "wal")]
13use grafeo_adapters::storage::wal::WalRecord;
14
15/// Binary snapshot format v1 (no named graphs).
16#[derive(serde::Serialize, serde::Deserialize)]
17struct Snapshot {
18    version: u8,
19    nodes: Vec<SnapshotNode>,
20    edges: Vec<SnapshotEdge>,
21}
22
23/// Binary snapshot format v2 (with named graphs and optional RDF data).
24#[derive(serde::Serialize, serde::Deserialize)]
25struct SnapshotV2 {
26    version: u8,
27    nodes: Vec<SnapshotNode>,
28    edges: Vec<SnapshotEdge>,
29    named_graphs: Vec<NamedGraphSnapshot>,
30    /// RDF triples in the default graph.
31    #[serde(default)]
32    rdf_triples: Vec<SnapshotTriple>,
33    /// RDF named graph data.
34    #[serde(default)]
35    rdf_named_graphs: Vec<RdfNamedGraphSnapshot>,
36}
37
38/// A named graph partition within a v2 snapshot.
39#[derive(serde::Serialize, serde::Deserialize)]
40struct NamedGraphSnapshot {
41    name: String,
42    nodes: Vec<SnapshotNode>,
43    edges: Vec<SnapshotEdge>,
44}
45
46/// An RDF triple in snapshot format (N-Triples encoded terms).
47#[derive(serde::Serialize, serde::Deserialize)]
48struct SnapshotTriple {
49    subject: String,
50    predicate: String,
51    object: String,
52}
53
54/// An RDF named graph in snapshot format.
55#[derive(serde::Serialize, serde::Deserialize)]
56struct RdfNamedGraphSnapshot {
57    name: String,
58    triples: Vec<SnapshotTriple>,
59}
60
61#[derive(serde::Serialize, serde::Deserialize)]
62struct SnapshotNode {
63    id: NodeId,
64    labels: Vec<String>,
65    properties: Vec<(String, Value)>,
66}
67
68#[derive(serde::Serialize, serde::Deserialize)]
69struct SnapshotEdge {
70    id: EdgeId,
71    src: NodeId,
72    dst: NodeId,
73    edge_type: String,
74    properties: Vec<(String, Value)>,
75}
76
77/// Collects all nodes from a store into snapshot format.
78fn collect_snapshot_nodes(store: &grafeo_core::graph::lpg::LpgStore) -> Vec<SnapshotNode> {
79    store
80        .all_nodes()
81        .map(|n| SnapshotNode {
82            id: n.id,
83            labels: n.labels.iter().map(|l| l.to_string()).collect(),
84            properties: n
85                .properties
86                .into_iter()
87                .map(|(k, v)| (k.to_string(), v))
88                .collect(),
89        })
90        .collect()
91}
92
93/// Collects all edges from a store into snapshot format.
94fn collect_snapshot_edges(store: &grafeo_core::graph::lpg::LpgStore) -> Vec<SnapshotEdge> {
95    store
96        .all_edges()
97        .map(|e| SnapshotEdge {
98            id: e.id,
99            src: e.src,
100            dst: e.dst,
101            edge_type: e.edge_type.to_string(),
102            properties: e
103                .properties
104                .into_iter()
105                .map(|(k, v)| (k.to_string(), v))
106                .collect(),
107        })
108        .collect()
109}
110
111/// Populates a store from snapshot node/edge data.
112fn populate_store_from_snapshot(
113    store: &grafeo_core::graph::lpg::LpgStore,
114    nodes: Vec<SnapshotNode>,
115    edges: Vec<SnapshotEdge>,
116) -> Result<()> {
117    for node in nodes {
118        let label_refs: Vec<&str> = node.labels.iter().map(|s| s.as_str()).collect();
119        store.create_node_with_id(node.id, &label_refs)?;
120        for (key, value) in node.properties {
121            store.set_node_property(node.id, &key, value);
122        }
123    }
124    for edge in edges {
125        store.create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type)?;
126        for (key, value) in edge.properties {
127            store.set_edge_property(edge.id, &key, value);
128        }
129    }
130    Ok(())
131}
132
133/// Validates snapshot nodes/edges for duplicates and dangling references.
134fn validate_snapshot_data(nodes: &[SnapshotNode], edges: &[SnapshotEdge]) -> Result<()> {
135    let mut node_ids = HashSet::with_capacity(nodes.len());
136    for node in nodes {
137        if !node_ids.insert(node.id) {
138            return Err(Error::Internal(format!(
139                "snapshot contains duplicate node ID {}",
140                node.id
141            )));
142        }
143    }
144    let mut edge_ids = HashSet::with_capacity(edges.len());
145    for edge in edges {
146        if !edge_ids.insert(edge.id) {
147            return Err(Error::Internal(format!(
148                "snapshot contains duplicate edge ID {}",
149                edge.id
150            )));
151        }
152        if !node_ids.contains(&edge.src) {
153            return Err(Error::Internal(format!(
154                "snapshot edge {} references non-existent source node {}",
155                edge.id, edge.src
156            )));
157        }
158        if !node_ids.contains(&edge.dst) {
159            return Err(Error::Internal(format!(
160                "snapshot edge {} references non-existent destination node {}",
161                edge.id, edge.dst
162            )));
163        }
164    }
165    Ok(())
166}
167
168/// Collects all triples from an RDF store into snapshot format.
169#[cfg(feature = "rdf")]
170fn collect_rdf_triples(store: &grafeo_core::graph::rdf::RdfStore) -> Vec<SnapshotTriple> {
171    store
172        .triples()
173        .into_iter()
174        .map(|t| SnapshotTriple {
175            subject: t.subject().to_string(),
176            predicate: t.predicate().to_string(),
177            object: t.object().to_string(),
178        })
179        .collect()
180}
181
182/// Populates an RDF store from snapshot triples.
183#[cfg(feature = "rdf")]
184fn populate_rdf_store(store: &grafeo_core::graph::rdf::RdfStore, triples: &[SnapshotTriple]) {
185    use grafeo_core::graph::rdf::{Term, Triple};
186    for triple in triples {
187        if let (Some(s), Some(p), Some(o)) = (
188            Term::from_ntriples(&triple.subject),
189            Term::from_ntriples(&triple.predicate),
190            Term::from_ntriples(&triple.object),
191        ) {
192            store.insert(Triple::new(s, p, o));
193        }
194    }
195}
196
197impl super::GrafeoDB {
198    // =========================================================================
199    // ADMIN API: Persistence Control
200    // =========================================================================
201
202    /// Saves the database to a file path.
203    ///
204    /// - If in-memory: creates a new persistent database at path
205    /// - If file-backed: creates a copy at the new path
206    ///
207    /// The original database remains unchanged.
208    ///
209    /// # Errors
210    ///
211    /// Returns an error if the save operation fails.
212    ///
213    /// Requires the `wal` feature for persistence support.
214    #[cfg(feature = "wal")]
215    pub fn save(&self, path: impl AsRef<Path>) -> Result<()> {
216        let path = path.as_ref();
217
218        // Create target database with WAL enabled
219        let target_config = Config::persistent(path);
220        let target = Self::with_config(target_config)?;
221
222        // Copy all nodes using WAL-enabled methods
223        for node in self.store.all_nodes() {
224            let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
225            target.store.create_node_with_id(node.id, &label_refs)?;
226
227            // Log to WAL
228            target.log_wal(&WalRecord::CreateNode {
229                id: node.id,
230                labels: node.labels.iter().map(|s| s.to_string()).collect(),
231            })?;
232
233            // Copy properties
234            for (key, value) in node.properties {
235                target
236                    .store
237                    .set_node_property(node.id, key.as_str(), value.clone());
238                target.log_wal(&WalRecord::SetNodeProperty {
239                    id: node.id,
240                    key: key.to_string(),
241                    value,
242                })?;
243            }
244        }
245
246        // Copy all edges using WAL-enabled methods
247        for edge in self.store.all_edges() {
248            target
249                .store
250                .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type)?;
251
252            // Log to WAL
253            target.log_wal(&WalRecord::CreateEdge {
254                id: edge.id,
255                src: edge.src,
256                dst: edge.dst,
257                edge_type: edge.edge_type.to_string(),
258            })?;
259
260            // Copy properties
261            for (key, value) in edge.properties {
262                target
263                    .store
264                    .set_edge_property(edge.id, key.as_str(), value.clone());
265                target.log_wal(&WalRecord::SetEdgeProperty {
266                    id: edge.id,
267                    key: key.to_string(),
268                    value,
269                })?;
270            }
271        }
272
273        // Copy named graphs
274        for graph_name in self.store.graph_names() {
275            if let Some(src_graph) = self.store.graph(&graph_name) {
276                target.log_wal(&WalRecord::CreateNamedGraph {
277                    name: graph_name.clone(),
278                })?;
279                target
280                    .store
281                    .create_graph(&graph_name)
282                    .map_err(|e| Error::Internal(e.to_string()))?;
283
284                if let Some(dst_graph) = target.store.graph(&graph_name) {
285                    // Switch WAL context to this named graph
286                    target.log_wal(&WalRecord::SwitchGraph {
287                        name: Some(graph_name.clone()),
288                    })?;
289
290                    for node in src_graph.all_nodes() {
291                        let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
292                        dst_graph.create_node_with_id(node.id, &label_refs)?;
293                        target.log_wal(&WalRecord::CreateNode {
294                            id: node.id,
295                            labels: node.labels.iter().map(|s| s.to_string()).collect(),
296                        })?;
297                        for (key, value) in node.properties {
298                            dst_graph.set_node_property(node.id, key.as_str(), value.clone());
299                            target.log_wal(&WalRecord::SetNodeProperty {
300                                id: node.id,
301                                key: key.to_string(),
302                                value,
303                            })?;
304                        }
305                    }
306                    for edge in src_graph.all_edges() {
307                        dst_graph.create_edge_with_id(
308                            edge.id,
309                            edge.src,
310                            edge.dst,
311                            &edge.edge_type,
312                        )?;
313                        target.log_wal(&WalRecord::CreateEdge {
314                            id: edge.id,
315                            src: edge.src,
316                            dst: edge.dst,
317                            edge_type: edge.edge_type.to_string(),
318                        })?;
319                        for (key, value) in edge.properties {
320                            dst_graph.set_edge_property(edge.id, key.as_str(), value.clone());
321                            target.log_wal(&WalRecord::SetEdgeProperty {
322                                id: edge.id,
323                                key: key.to_string(),
324                                value,
325                            })?;
326                        }
327                    }
328                }
329            }
330        }
331
332        // Switch WAL context back to default graph
333        if !self.store.graph_names().is_empty() {
334            target.log_wal(&WalRecord::SwitchGraph { name: None })?;
335        }
336
337        // Copy RDF data with WAL logging
338        #[cfg(feature = "rdf")]
339        {
340            for triple in self.rdf_store.triples() {
341                let record = WalRecord::InsertRdfTriple {
342                    subject: triple.subject().to_string(),
343                    predicate: triple.predicate().to_string(),
344                    object: triple.object().to_string(),
345                    graph: None,
346                };
347                target.rdf_store.insert((*triple).clone());
348                target.log_wal(&record)?;
349            }
350            for name in self.rdf_store.graph_names() {
351                target.log_wal(&WalRecord::CreateRdfGraph { name: name.clone() })?;
352                if let Some(src_graph) = self.rdf_store.graph(&name) {
353                    let dst_graph = target.rdf_store.graph_or_create(&name);
354                    for triple in src_graph.triples() {
355                        let record = WalRecord::InsertRdfTriple {
356                            subject: triple.subject().to_string(),
357                            predicate: triple.predicate().to_string(),
358                            object: triple.object().to_string(),
359                            graph: Some(name.clone()),
360                        };
361                        dst_graph.insert((*triple).clone());
362                        target.log_wal(&record)?;
363                    }
364                }
365            }
366        }
367
368        // Checkpoint and close the target database
369        target.close()?;
370
371        Ok(())
372    }
373
374    /// Creates an in-memory copy of this database.
375    ///
376    /// Returns a new database that is completely independent, including
377    /// all named graph data.
378    /// Useful for:
379    /// - Testing modifications without affecting the original
380    /// - Faster operations when persistence isn't needed
381    ///
382    /// # Errors
383    ///
384    /// Returns an error if the copy operation fails.
385    pub fn to_memory(&self) -> Result<Self> {
386        let config = Config::in_memory();
387        let target = Self::with_config(config)?;
388
389        // Copy default graph nodes
390        for node in self.store.all_nodes() {
391            let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
392            target.store.create_node_with_id(node.id, &label_refs)?;
393            for (key, value) in node.properties {
394                target.store.set_node_property(node.id, key.as_str(), value);
395            }
396        }
397
398        // Copy default graph edges
399        for edge in self.store.all_edges() {
400            target
401                .store
402                .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type)?;
403            for (key, value) in edge.properties {
404                target.store.set_edge_property(edge.id, key.as_str(), value);
405            }
406        }
407
408        // Copy named graphs
409        for graph_name in self.store.graph_names() {
410            if let Some(src_graph) = self.store.graph(&graph_name) {
411                target
412                    .store
413                    .create_graph(&graph_name)
414                    .map_err(|e| Error::Internal(e.to_string()))?;
415                if let Some(dst_graph) = target.store.graph(&graph_name) {
416                    for node in src_graph.all_nodes() {
417                        let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
418                        dst_graph.create_node_with_id(node.id, &label_refs)?;
419                        for (key, value) in node.properties {
420                            dst_graph.set_node_property(node.id, key.as_str(), value);
421                        }
422                    }
423                    for edge in src_graph.all_edges() {
424                        dst_graph.create_edge_with_id(
425                            edge.id,
426                            edge.src,
427                            edge.dst,
428                            &edge.edge_type,
429                        )?;
430                        for (key, value) in edge.properties {
431                            dst_graph.set_edge_property(edge.id, key.as_str(), value);
432                        }
433                    }
434                }
435            }
436        }
437
438        // Copy RDF data
439        #[cfg(feature = "rdf")]
440        {
441            for triple in self.rdf_store.triples() {
442                target.rdf_store.insert((*triple).clone());
443            }
444            for name in self.rdf_store.graph_names() {
445                if let Some(src_graph) = self.rdf_store.graph(&name) {
446                    let dst_graph = target.rdf_store.graph_or_create(&name);
447                    for triple in src_graph.triples() {
448                        dst_graph.insert((*triple).clone());
449                    }
450                }
451            }
452        }
453
454        Ok(target)
455    }
456
457    /// Opens a database file and loads it entirely into memory.
458    ///
459    /// The returned database has no connection to the original file.
460    /// Changes will NOT be written back to the file.
461    ///
462    /// # Errors
463    ///
464    /// Returns an error if the file can't be opened or loaded.
465    #[cfg(feature = "wal")]
466    pub fn open_in_memory(path: impl AsRef<Path>) -> Result<Self> {
467        // Open the source database (triggers WAL recovery)
468        let source = Self::open(path)?;
469
470        // Create in-memory copy
471        let target = source.to_memory()?;
472
473        // Close the source (releases file handles)
474        source.close()?;
475
476        Ok(target)
477    }
478
479    // =========================================================================
480    // ADMIN API: Snapshot Export/Import
481    // =========================================================================
482
483    /// Exports the entire database to a binary snapshot (v2 format).
484    ///
485    /// The returned bytes can be stored (e.g. in IndexedDB) and later
486    /// restored with [`import_snapshot()`](Self::import_snapshot).
487    /// Includes all named graph data.
488    ///
489    /// # Errors
490    ///
491    /// Returns an error if serialization fails.
492    pub fn export_snapshot(&self) -> Result<Vec<u8>> {
493        let nodes = collect_snapshot_nodes(&self.store);
494        let edges = collect_snapshot_edges(&self.store);
495
496        // Collect named graphs
497        let named_graphs: Vec<NamedGraphSnapshot> = self
498            .store
499            .graph_names()
500            .into_iter()
501            .filter_map(|name| {
502                self.store
503                    .graph(&name)
504                    .map(|graph_store| NamedGraphSnapshot {
505                        name,
506                        nodes: collect_snapshot_nodes(&graph_store),
507                        edges: collect_snapshot_edges(&graph_store),
508                    })
509            })
510            .collect();
511
512        // Collect RDF triples
513        #[cfg(feature = "rdf")]
514        let rdf_triples = collect_rdf_triples(&self.rdf_store);
515        #[cfg(not(feature = "rdf"))]
516        let rdf_triples = Vec::new();
517
518        #[cfg(feature = "rdf")]
519        let rdf_named_graphs: Vec<RdfNamedGraphSnapshot> = self
520            .rdf_store
521            .graph_names()
522            .into_iter()
523            .filter_map(|name| {
524                self.rdf_store
525                    .graph(&name)
526                    .map(|graph| RdfNamedGraphSnapshot {
527                        name,
528                        triples: collect_rdf_triples(&graph),
529                    })
530            })
531            .collect();
532        #[cfg(not(feature = "rdf"))]
533        let rdf_named_graphs = Vec::new();
534
535        let snapshot = SnapshotV2 {
536            version: 2,
537            nodes,
538            edges,
539            named_graphs,
540            rdf_triples,
541            rdf_named_graphs,
542        };
543
544        let config = bincode::config::standard();
545        bincode::serde::encode_to_vec(&snapshot, config)
546            .map_err(|e| Error::Internal(format!("snapshot export failed: {e}")))
547    }
548
549    /// Creates a new in-memory database from a binary snapshot.
550    ///
551    /// Accepts both v1 (no named graphs) and v2 (with named graphs) formats.
552    /// The `data` must have been produced by [`export_snapshot()`](Self::export_snapshot).
553    ///
554    /// All edge references are validated before any data is inserted: every
555    /// edge's source and destination must reference a node present in the
556    /// snapshot, and duplicate node/edge IDs are rejected. If validation
557    /// fails, no database is created.
558    ///
559    /// # Errors
560    ///
561    /// Returns an error if the snapshot is invalid, contains dangling edge
562    /// references, has duplicate IDs, or deserialization fails.
563    pub fn import_snapshot(data: &[u8]) -> Result<Self> {
564        if data.is_empty() {
565            return Err(Error::Internal("empty snapshot data".to_string()));
566        }
567
568        let config = bincode::config::standard();
569
570        // Peek at version byte (bincode standard encodes u8 as raw byte)
571        match data[0] {
572            1 => Self::import_snapshot_v1(data, config),
573            2 => Self::import_snapshot_v2(data, config),
574            v => Err(Error::Internal(format!(
575                "unsupported snapshot version: {v}"
576            ))),
577        }
578    }
579
580    fn import_snapshot_v1(data: &[u8], config: bincode::config::Configuration) -> Result<Self> {
581        let (snapshot, _): (Snapshot, _) = bincode::serde::decode_from_slice(data, config)
582            .map_err(|e| Error::Internal(format!("snapshot import failed: {e}")))?;
583
584        validate_snapshot_data(&snapshot.nodes, &snapshot.edges)?;
585
586        let db = Self::new_in_memory();
587        populate_store_from_snapshot(&db.store, snapshot.nodes, snapshot.edges)?;
588        Ok(db)
589    }
590
591    fn import_snapshot_v2(data: &[u8], config: bincode::config::Configuration) -> Result<Self> {
592        let (snapshot, _): (SnapshotV2, _) = bincode::serde::decode_from_slice(data, config)
593            .map_err(|e| Error::Internal(format!("snapshot import failed: {e}")))?;
594
595        // Validate default graph data
596        validate_snapshot_data(&snapshot.nodes, &snapshot.edges)?;
597
598        // Validate each named graph
599        for ng in &snapshot.named_graphs {
600            validate_snapshot_data(&ng.nodes, &ng.edges)?;
601        }
602
603        let db = Self::new_in_memory();
604        populate_store_from_snapshot(&db.store, snapshot.nodes, snapshot.edges)?;
605
606        // Restore named graphs
607        for ng in snapshot.named_graphs {
608            db.store
609                .create_graph(&ng.name)
610                .map_err(|e| Error::Internal(e.to_string()))?;
611            if let Some(graph_store) = db.store.graph(&ng.name) {
612                populate_store_from_snapshot(&graph_store, ng.nodes, ng.edges)?;
613            }
614        }
615
616        // Restore RDF triples
617        #[cfg(feature = "rdf")]
618        {
619            populate_rdf_store(&db.rdf_store, &snapshot.rdf_triples);
620            for rng in &snapshot.rdf_named_graphs {
621                let graph = db.rdf_store.graph_or_create(&rng.name);
622                populate_rdf_store(&graph, &rng.triples);
623            }
624        }
625
626        Ok(db)
627    }
628
629    /// Replaces the current database contents with data from a binary snapshot.
630    ///
631    /// Accepts both v1 and v2 snapshot formats. The `data` must have been
632    /// produced by [`export_snapshot()`](Self::export_snapshot).
633    ///
634    /// All validation (duplicate IDs, dangling edge references) is performed
635    /// before any data is modified. If validation fails, the current database
636    /// is left unchanged. If validation passes, the store is cleared and
637    /// rebuilt from the snapshot atomically (from the perspective of
638    /// subsequent queries).
639    ///
640    /// # Errors
641    ///
642    /// Returns an error if the snapshot is invalid, contains dangling edge
643    /// references, has duplicate IDs, or deserialization fails.
644    pub fn restore_snapshot(&self, data: &[u8]) -> Result<()> {
645        if data.is_empty() {
646            return Err(Error::Internal("empty snapshot data".to_string()));
647        }
648
649        let config = bincode::config::standard();
650
651        match data[0] {
652            1 => self.restore_snapshot_v1(data, config),
653            2 => self.restore_snapshot_v2(data, config),
654            v => Err(Error::Internal(format!(
655                "unsupported snapshot version: {v}"
656            ))),
657        }
658    }
659
660    fn restore_snapshot_v1(
661        &self,
662        data: &[u8],
663        config: bincode::config::Configuration,
664    ) -> Result<()> {
665        let (snapshot, _): (Snapshot, _) = bincode::serde::decode_from_slice(data, config)
666            .map_err(|e| Error::Internal(format!("snapshot restore failed: {e}")))?;
667
668        validate_snapshot_data(&snapshot.nodes, &snapshot.edges)?;
669
670        // Drop all named graphs before clearing the default store
671        for name in self.store.graph_names() {
672            self.store.drop_graph(&name);
673        }
674        self.store.clear();
675        populate_store_from_snapshot(&self.store, snapshot.nodes, snapshot.edges)
676    }
677
678    fn restore_snapshot_v2(
679        &self,
680        data: &[u8],
681        config: bincode::config::Configuration,
682    ) -> Result<()> {
683        let (snapshot, _): (SnapshotV2, _) = bincode::serde::decode_from_slice(data, config)
684            .map_err(|e| Error::Internal(format!("snapshot restore failed: {e}")))?;
685
686        // Validate all data before making any changes
687        validate_snapshot_data(&snapshot.nodes, &snapshot.edges)?;
688        for ng in &snapshot.named_graphs {
689            validate_snapshot_data(&ng.nodes, &ng.edges)?;
690        }
691
692        // Drop all existing named graphs, then clear default store
693        for name in self.store.graph_names() {
694            self.store.drop_graph(&name);
695        }
696        self.store.clear();
697
698        populate_store_from_snapshot(&self.store, snapshot.nodes, snapshot.edges)?;
699
700        // Restore named graphs
701        for ng in snapshot.named_graphs {
702            self.store
703                .create_graph(&ng.name)
704                .map_err(|e| Error::Internal(e.to_string()))?;
705            if let Some(graph_store) = self.store.graph(&ng.name) {
706                populate_store_from_snapshot(&graph_store, ng.nodes, ng.edges)?;
707            }
708        }
709
710        // Restore RDF data
711        #[cfg(feature = "rdf")]
712        {
713            // Clear existing RDF data
714            self.rdf_store.clear();
715            for name in self.rdf_store.graph_names() {
716                self.rdf_store.drop_graph(&name);
717            }
718            populate_rdf_store(&self.rdf_store, &snapshot.rdf_triples);
719            for rng in &snapshot.rdf_named_graphs {
720                let graph = self.rdf_store.graph_or_create(&rng.name);
721                populate_rdf_store(&graph, &rng.triples);
722            }
723        }
724
725        Ok(())
726    }
727
728    // =========================================================================
729    // ADMIN API: Iteration
730    // =========================================================================
731
732    /// Returns an iterator over all nodes in the database.
733    ///
734    /// Useful for dump/export operations.
735    pub fn iter_nodes(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Node> + '_ {
736        self.store.all_nodes()
737    }
738
739    /// Returns an iterator over all edges in the database.
740    ///
741    /// Useful for dump/export operations.
742    pub fn iter_edges(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Edge> + '_ {
743        self.store.all_edges()
744    }
745}
746
747#[cfg(test)]
748mod tests {
749    use grafeo_common::types::{EdgeId, NodeId, Value};
750
751    use super::super::GrafeoDB;
752    use super::{Snapshot, SnapshotEdge, SnapshotNode};
753
754    #[test]
755    fn test_restore_snapshot_basic() {
756        let db = GrafeoDB::new_in_memory();
757        let session = db.session();
758
759        // Populate
760        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
761        session.execute("INSERT (:Person {name: 'Gus'})").unwrap();
762
763        let snapshot = db.export_snapshot().unwrap();
764
765        // Modify
766        session
767            .execute("INSERT (:Person {name: 'Vincent'})")
768            .unwrap();
769        assert_eq!(db.store.node_count(), 3);
770
771        // Restore original
772        db.restore_snapshot(&snapshot).unwrap();
773
774        assert_eq!(db.store.node_count(), 2);
775        let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
776        assert_eq!(result.rows.len(), 2);
777    }
778
779    #[test]
780    fn test_restore_snapshot_validation_failure() {
781        let db = GrafeoDB::new_in_memory();
782        let session = db.session();
783
784        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
785
786        // Corrupt snapshot: just garbage bytes
787        let result = db.restore_snapshot(b"garbage");
788        assert!(result.is_err());
789
790        // DB should be unchanged
791        assert_eq!(db.store.node_count(), 1);
792    }
793
794    #[test]
795    fn test_restore_snapshot_empty_db() {
796        let db = GrafeoDB::new_in_memory();
797
798        // Export empty snapshot, then populate, then restore to empty
799        let empty_snapshot = db.export_snapshot().unwrap();
800
801        let session = db.session();
802        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
803        assert_eq!(db.store.node_count(), 1);
804
805        db.restore_snapshot(&empty_snapshot).unwrap();
806        assert_eq!(db.store.node_count(), 0);
807    }
808
809    #[test]
810    fn test_restore_snapshot_with_edges() {
811        let db = GrafeoDB::new_in_memory();
812        let session = db.session();
813
814        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
815        session.execute("INSERT (:Person {name: 'Gus'})").unwrap();
816        session
817            .execute(
818                "MATCH (a:Person {name: 'Alix'}), (b:Person {name: 'Gus'}) INSERT (a)-[:KNOWS]->(b)",
819            )
820            .unwrap();
821
822        let snapshot = db.export_snapshot().unwrap();
823        assert_eq!(db.store.edge_count(), 1);
824
825        // Modify: add more data
826        session
827            .execute("INSERT (:Person {name: 'Vincent'})")
828            .unwrap();
829
830        // Restore
831        db.restore_snapshot(&snapshot).unwrap();
832        assert_eq!(db.store.node_count(), 2);
833        assert_eq!(db.store.edge_count(), 1);
834    }
835
836    #[test]
837    fn test_restore_snapshot_preserves_sessions() {
838        let db = GrafeoDB::new_in_memory();
839        let session = db.session();
840
841        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
842        let snapshot = db.export_snapshot().unwrap();
843
844        // Modify
845        session.execute("INSERT (:Person {name: 'Gus'})").unwrap();
846
847        // Restore
848        db.restore_snapshot(&snapshot).unwrap();
849
850        // Session should still work and see restored data
851        let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
852        assert_eq!(result.rows.len(), 1);
853    }
854
855    #[test]
856    fn test_export_import_roundtrip() {
857        let db = GrafeoDB::new_in_memory();
858        let session = db.session();
859
860        session
861            .execute("INSERT (:Person {name: 'Alix', age: 30})")
862            .unwrap();
863
864        let snapshot = db.export_snapshot().unwrap();
865        let db2 = GrafeoDB::import_snapshot(&snapshot).unwrap();
866        let session2 = db2.session();
867
868        let result = session2.execute("MATCH (n:Person) RETURN n.name").unwrap();
869        assert_eq!(result.rows.len(), 1);
870    }
871
872    // --- to_memory() ---
873
874    #[test]
875    fn test_to_memory_empty() {
876        let db = GrafeoDB::new_in_memory();
877        let copy = db.to_memory().unwrap();
878        assert_eq!(copy.store.node_count(), 0);
879        assert_eq!(copy.store.edge_count(), 0);
880    }
881
882    #[test]
883    fn test_to_memory_copies_nodes_and_properties() {
884        let db = GrafeoDB::new_in_memory();
885        let session = db.session();
886        session
887            .execute("INSERT (:Person {name: 'Alix', age: 30})")
888            .unwrap();
889        session
890            .execute("INSERT (:Person {name: 'Gus', age: 25})")
891            .unwrap();
892
893        let copy = db.to_memory().unwrap();
894        assert_eq!(copy.store.node_count(), 2);
895
896        let s2 = copy.session();
897        let result = s2
898            .execute("MATCH (p:Person) RETURN p.name ORDER BY p.name")
899            .unwrap();
900        assert_eq!(result.rows.len(), 2);
901        assert_eq!(result.rows[0][0], Value::String("Alix".into()));
902        assert_eq!(result.rows[1][0], Value::String("Gus".into()));
903    }
904
905    #[test]
906    fn test_to_memory_copies_edges_and_properties() {
907        let db = GrafeoDB::new_in_memory();
908        let a = db.create_node(&["Person"]);
909        db.set_node_property(a, "name", "Alix".into());
910        let b = db.create_node(&["Person"]);
911        db.set_node_property(b, "name", "Gus".into());
912        let edge = db.create_edge(a, b, "KNOWS");
913        db.set_edge_property(edge, "since", Value::Int64(2020));
914
915        let copy = db.to_memory().unwrap();
916        assert_eq!(copy.store.node_count(), 2);
917        assert_eq!(copy.store.edge_count(), 1);
918
919        let s2 = copy.session();
920        let result = s2.execute("MATCH ()-[e:KNOWS]->() RETURN e.since").unwrap();
921        assert_eq!(result.rows[0][0], Value::Int64(2020));
922    }
923
924    #[test]
925    fn test_to_memory_is_independent() {
926        let db = GrafeoDB::new_in_memory();
927        let session = db.session();
928        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
929
930        let copy = db.to_memory().unwrap();
931
932        // Mutating original should not affect copy
933        session.execute("INSERT (:Person {name: 'Gus'})").unwrap();
934        assert_eq!(db.store.node_count(), 2);
935        assert_eq!(copy.store.node_count(), 1);
936    }
937
938    // --- iter_nodes() / iter_edges() ---
939
940    #[test]
941    fn test_iter_nodes_empty() {
942        let db = GrafeoDB::new_in_memory();
943        assert_eq!(db.iter_nodes().count(), 0);
944    }
945
946    #[test]
947    fn test_iter_nodes_returns_all() {
948        let db = GrafeoDB::new_in_memory();
949        let id1 = db.create_node(&["Person"]);
950        db.set_node_property(id1, "name", "Alix".into());
951        let id2 = db.create_node(&["Animal"]);
952        db.set_node_property(id2, "name", "Fido".into());
953
954        let nodes: Vec<_> = db.iter_nodes().collect();
955        assert_eq!(nodes.len(), 2);
956
957        let names: Vec<_> = nodes
958            .iter()
959            .filter_map(|n| n.properties.iter().find(|(k, _)| k.as_str() == "name"))
960            .map(|(_, v)| v.clone())
961            .collect();
962        assert!(names.contains(&Value::String("Alix".into())));
963        assert!(names.contains(&Value::String("Fido".into())));
964    }
965
966    #[test]
967    fn test_iter_edges_empty() {
968        let db = GrafeoDB::new_in_memory();
969        assert_eq!(db.iter_edges().count(), 0);
970    }
971
972    #[test]
973    fn test_iter_edges_returns_all() {
974        let db = GrafeoDB::new_in_memory();
975        let a = db.create_node(&["A"]);
976        let b = db.create_node(&["B"]);
977        let c = db.create_node(&["C"]);
978        db.create_edge(a, b, "R1");
979        db.create_edge(b, c, "R2");
980
981        let edges: Vec<_> = db.iter_edges().collect();
982        assert_eq!(edges.len(), 2);
983
984        let types: Vec<_> = edges.iter().map(|e| e.edge_type.as_ref()).collect();
985        assert!(types.contains(&"R1"));
986        assert!(types.contains(&"R2"));
987    }
988
989    // --- restore_snapshot() validation ---
990
991    fn encode_snapshot(snap: &Snapshot) -> Vec<u8> {
992        bincode::serde::encode_to_vec(snap, bincode::config::standard()).unwrap()
993    }
994
995    #[test]
996    fn test_restore_rejects_unsupported_version() {
997        let db = GrafeoDB::new_in_memory();
998        let session = db.session();
999        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1000
1001        let snap = Snapshot {
1002            version: 99,
1003            nodes: vec![],
1004            edges: vec![],
1005        };
1006        let bytes = encode_snapshot(&snap);
1007
1008        let result = db.restore_snapshot(&bytes);
1009        assert!(result.is_err());
1010        let err = result.unwrap_err().to_string();
1011        assert!(err.contains("unsupported snapshot version"), "got: {err}");
1012
1013        // DB unchanged
1014        assert_eq!(db.store.node_count(), 1);
1015    }
1016
1017    #[test]
1018    fn test_restore_rejects_duplicate_node_ids() {
1019        let db = GrafeoDB::new_in_memory();
1020        let session = db.session();
1021        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1022
1023        let snap = Snapshot {
1024            version: 1,
1025            nodes: vec![
1026                SnapshotNode {
1027                    id: NodeId::new(0),
1028                    labels: vec!["A".into()],
1029                    properties: vec![],
1030                },
1031                SnapshotNode {
1032                    id: NodeId::new(0),
1033                    labels: vec!["B".into()],
1034                    properties: vec![],
1035                },
1036            ],
1037            edges: vec![],
1038        };
1039        let bytes = encode_snapshot(&snap);
1040
1041        let result = db.restore_snapshot(&bytes);
1042        assert!(result.is_err());
1043        let err = result.unwrap_err().to_string();
1044        assert!(err.contains("duplicate node ID"), "got: {err}");
1045        assert_eq!(db.store.node_count(), 1);
1046    }
1047
1048    #[test]
1049    fn test_restore_rejects_duplicate_edge_ids() {
1050        let db = GrafeoDB::new_in_memory();
1051
1052        let snap = Snapshot {
1053            version: 1,
1054            nodes: vec![
1055                SnapshotNode {
1056                    id: NodeId::new(0),
1057                    labels: vec![],
1058                    properties: vec![],
1059                },
1060                SnapshotNode {
1061                    id: NodeId::new(1),
1062                    labels: vec![],
1063                    properties: vec![],
1064                },
1065            ],
1066            edges: vec![
1067                SnapshotEdge {
1068                    id: EdgeId::new(0),
1069                    src: NodeId::new(0),
1070                    dst: NodeId::new(1),
1071                    edge_type: "REL".into(),
1072                    properties: vec![],
1073                },
1074                SnapshotEdge {
1075                    id: EdgeId::new(0),
1076                    src: NodeId::new(0),
1077                    dst: NodeId::new(1),
1078                    edge_type: "REL".into(),
1079                    properties: vec![],
1080                },
1081            ],
1082        };
1083        let bytes = encode_snapshot(&snap);
1084
1085        let result = db.restore_snapshot(&bytes);
1086        assert!(result.is_err());
1087        let err = result.unwrap_err().to_string();
1088        assert!(err.contains("duplicate edge ID"), "got: {err}");
1089    }
1090
1091    #[test]
1092    fn test_restore_rejects_dangling_source() {
1093        let db = GrafeoDB::new_in_memory();
1094
1095        let snap = Snapshot {
1096            version: 1,
1097            nodes: vec![SnapshotNode {
1098                id: NodeId::new(0),
1099                labels: vec![],
1100                properties: vec![],
1101            }],
1102            edges: vec![SnapshotEdge {
1103                id: EdgeId::new(0),
1104                src: NodeId::new(999),
1105                dst: NodeId::new(0),
1106                edge_type: "REL".into(),
1107                properties: vec![],
1108            }],
1109        };
1110        let bytes = encode_snapshot(&snap);
1111
1112        let result = db.restore_snapshot(&bytes);
1113        assert!(result.is_err());
1114        let err = result.unwrap_err().to_string();
1115        assert!(err.contains("non-existent source node"), "got: {err}");
1116    }
1117
1118    #[test]
1119    fn test_restore_rejects_dangling_destination() {
1120        let db = GrafeoDB::new_in_memory();
1121
1122        let snap = Snapshot {
1123            version: 1,
1124            nodes: vec![SnapshotNode {
1125                id: NodeId::new(0),
1126                labels: vec![],
1127                properties: vec![],
1128            }],
1129            edges: vec![SnapshotEdge {
1130                id: EdgeId::new(0),
1131                src: NodeId::new(0),
1132                dst: NodeId::new(999),
1133                edge_type: "REL".into(),
1134                properties: vec![],
1135            }],
1136        };
1137        let bytes = encode_snapshot(&snap);
1138
1139        let result = db.restore_snapshot(&bytes);
1140        assert!(result.is_err());
1141        let err = result.unwrap_err().to_string();
1142        assert!(err.contains("non-existent destination node"), "got: {err}");
1143    }
1144}