Skip to main content

grafeo_engine/database/
persistence.rs

1//! Persistence, snapshots, and data export for GrafeoDB.
2
3#[cfg(any(feature = "wal", feature = "grafeo-file"))]
4use std::path::Path;
5
6#[cfg(any(feature = "vector-index", feature = "text-index"))]
7use grafeo_common::grafeo_warn;
8use grafeo_common::types::{EdgeId, EpochId, NodeId, Value};
9use grafeo_common::utils::error::{Error, Result};
10use hashbrown::HashSet;
11
12use crate::config::Config;
13
14#[cfg(feature = "wal")]
15use grafeo_storage::wal::WalRecord;
16
17use crate::catalog::{
18    EdgeTypeDefinition, GraphTypeDefinition, NodeTypeDefinition, ProcedureDefinition,
19};
20
21/// Current snapshot version.
22const SNAPSHOT_VERSION: u8 = 4;
23
24/// Binary snapshot format (v4: graph data, named graphs, RDF, schema, index metadata,
25/// and property version history for temporal support).
26#[derive(serde::Serialize, serde::Deserialize)]
27struct Snapshot {
28    version: u8,
29    nodes: Vec<SnapshotNode>,
30    edges: Vec<SnapshotEdge>,
31    named_graphs: Vec<NamedGraphSnapshot>,
32    rdf_triples: Vec<SnapshotTriple>,
33    rdf_named_graphs: Vec<RdfNamedGraphSnapshot>,
34    schema: SnapshotSchema,
35    indexes: SnapshotIndexes,
36    /// Current store epoch at snapshot time (0 when temporal is disabled).
37    epoch: u64,
38}
39
40/// Schema metadata within a snapshot.
41#[derive(serde::Serialize, serde::Deserialize, Default)]
42struct SnapshotSchema {
43    node_types: Vec<NodeTypeDefinition>,
44    edge_types: Vec<EdgeTypeDefinition>,
45    graph_types: Vec<GraphTypeDefinition>,
46    procedures: Vec<ProcedureDefinition>,
47    schemas: Vec<String>,
48    graph_type_bindings: Vec<(String, String)>,
49}
50
51/// Index metadata within a snapshot (definitions only, not index data).
52#[derive(serde::Serialize, serde::Deserialize, Default)]
53struct SnapshotIndexes {
54    property_indexes: Vec<String>,
55    vector_indexes: Vec<SnapshotVectorIndex>,
56    text_indexes: Vec<SnapshotTextIndex>,
57}
58
59/// Vector index definition for snapshot persistence.
60#[derive(serde::Serialize, serde::Deserialize)]
61struct SnapshotVectorIndex {
62    label: String,
63    property: String,
64    dimensions: usize,
65    metric: grafeo_core::index::vector::DistanceMetric,
66    m: usize,
67    ef_construction: usize,
68}
69
70/// Text index definition for snapshot persistence.
71#[derive(serde::Serialize, serde::Deserialize)]
72struct SnapshotTextIndex {
73    label: String,
74    property: String,
75}
76
77/// A named graph partition within a v2 snapshot.
78#[derive(serde::Serialize, serde::Deserialize)]
79struct NamedGraphSnapshot {
80    name: String,
81    nodes: Vec<SnapshotNode>,
82    edges: Vec<SnapshotEdge>,
83}
84
85/// An RDF triple in snapshot format (N-Triples encoded terms).
86#[derive(serde::Serialize, serde::Deserialize)]
87struct SnapshotTriple {
88    subject: String,
89    predicate: String,
90    object: String,
91}
92
93/// An RDF named graph in snapshot format.
94#[derive(serde::Serialize, serde::Deserialize)]
95struct RdfNamedGraphSnapshot {
96    name: String,
97    triples: Vec<SnapshotTriple>,
98}
99
100#[derive(serde::Serialize, serde::Deserialize)]
101struct SnapshotNode {
102    id: NodeId,
103    labels: Vec<String>,
104    /// Each property has a list of `(epoch, value)` entries (ascending epoch order).
105    properties: Vec<(String, Vec<(EpochId, Value)>)>,
106}
107
108#[derive(serde::Serialize, serde::Deserialize)]
109struct SnapshotEdge {
110    id: EdgeId,
111    src: NodeId,
112    dst: NodeId,
113    edge_type: String,
114    /// Each property has a list of `(epoch, value)` entries (ascending epoch order).
115    properties: Vec<(String, Vec<(EpochId, Value)>)>,
116}
117
118/// Collects all nodes from a store into snapshot format.
119///
120/// With `temporal`: stores full property version history.
121/// Without: wraps each current value as a single-entry version list at epoch 0.
122fn collect_snapshot_nodes(store: &grafeo_core::graph::lpg::LpgStore) -> Vec<SnapshotNode> {
123    let mut nodes: Vec<SnapshotNode> = store
124        .all_nodes()
125        .map(|n| {
126            #[cfg(feature = "temporal")]
127            let mut properties: Vec<(String, Vec<(EpochId, Value)>)> = store
128                .node_property_history(n.id)
129                .into_iter()
130                .map(|(k, entries)| (k.to_string(), entries))
131                .collect();
132
133            #[cfg(not(feature = "temporal"))]
134            let mut properties: Vec<(String, Vec<(EpochId, Value)>)> = n
135                .properties
136                .into_iter()
137                .map(|(k, v)| (k.to_string(), vec![(EpochId::new(0), v)]))
138                .collect();
139
140            properties.sort_by(|(a, _), (b, _)| a.cmp(b));
141
142            let mut labels: Vec<String> = n.labels.iter().map(|l| l.to_string()).collect();
143            labels.sort();
144
145            SnapshotNode {
146                id: n.id,
147                labels,
148                properties,
149            }
150        })
151        .collect();
152    nodes.sort_by_key(|n| n.id);
153    nodes
154}
155
156/// Collects all edges from a store into snapshot format.
157///
158/// With `temporal`: stores full property version history.
159/// Without: wraps each current value as a single-entry version list at epoch 0.
160fn collect_snapshot_edges(store: &grafeo_core::graph::lpg::LpgStore) -> Vec<SnapshotEdge> {
161    let mut edges: Vec<SnapshotEdge> = store
162        .all_edges()
163        .map(|e| {
164            #[cfg(feature = "temporal")]
165            let mut properties: Vec<(String, Vec<(EpochId, Value)>)> = store
166                .edge_property_history(e.id)
167                .into_iter()
168                .map(|(k, entries)| (k.to_string(), entries))
169                .collect();
170
171            #[cfg(not(feature = "temporal"))]
172            let mut properties: Vec<(String, Vec<(EpochId, Value)>)> = e
173                .properties
174                .into_iter()
175                .map(|(k, v)| (k.to_string(), vec![(EpochId::new(0), v)]))
176                .collect();
177
178            properties.sort_by(|(a, _), (b, _)| a.cmp(b));
179
180            SnapshotEdge {
181                id: e.id,
182                src: e.src,
183                dst: e.dst,
184                edge_type: e.edge_type.to_string(),
185                properties,
186            }
187        })
188        .collect();
189    edges.sort_by_key(|e| e.id);
190    edges
191}
192
193/// Populates a store from snapshot node/edge data.
194///
195/// With `temporal`: replays all `(epoch, value)` entries into version logs.
196/// Without: reads the latest value from each property's version list.
197fn populate_store_from_snapshot(
198    store: &grafeo_core::graph::lpg::LpgStore,
199    nodes: Vec<SnapshotNode>,
200    edges: Vec<SnapshotEdge>,
201) -> Result<()> {
202    for node in nodes {
203        let label_refs: Vec<&str> = node.labels.iter().map(|s| s.as_str()).collect();
204        store.create_node_with_id(node.id, &label_refs)?;
205        for (key, entries) in node.properties {
206            #[cfg(feature = "temporal")]
207            for (epoch, value) in entries {
208                store.set_node_property_at_epoch(node.id, &key, value, epoch);
209            }
210            #[cfg(not(feature = "temporal"))]
211            if let Some((_, value)) = entries.into_iter().last() {
212                store.set_node_property(node.id, &key, value);
213            }
214        }
215    }
216    for edge in edges {
217        store.create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type)?;
218        for (key, entries) in edge.properties {
219            #[cfg(feature = "temporal")]
220            for (epoch, value) in entries {
221                store.set_edge_property_at_epoch(edge.id, &key, value, epoch);
222            }
223            #[cfg(not(feature = "temporal"))]
224            if let Some((_, value)) = entries.into_iter().last() {
225                store.set_edge_property(edge.id, &key, value);
226            }
227        }
228    }
229    Ok(())
230}
231
232/// Validates snapshot nodes/edges for duplicates and dangling references.
233fn validate_snapshot_data(nodes: &[SnapshotNode], edges: &[SnapshotEdge]) -> Result<()> {
234    let mut node_ids = HashSet::with_capacity(nodes.len());
235    for node in nodes {
236        if !node_ids.insert(node.id) {
237            return Err(Error::Internal(format!(
238                "snapshot contains duplicate node ID {}",
239                node.id
240            )));
241        }
242    }
243    let mut edge_ids = HashSet::with_capacity(edges.len());
244    for edge in edges {
245        if !edge_ids.insert(edge.id) {
246            return Err(Error::Internal(format!(
247                "snapshot contains duplicate edge ID {}",
248                edge.id
249            )));
250        }
251        if !node_ids.contains(&edge.src) {
252            return Err(Error::Internal(format!(
253                "snapshot edge {} references non-existent source node {}",
254                edge.id, edge.src
255            )));
256        }
257        if !node_ids.contains(&edge.dst) {
258            return Err(Error::Internal(format!(
259                "snapshot edge {} references non-existent destination node {}",
260                edge.id, edge.dst
261            )));
262        }
263    }
264    Ok(())
265}
266
267/// Collects all triples from an RDF store into snapshot format.
268#[cfg(feature = "triple-store")]
269fn collect_rdf_triples(store: &grafeo_core::graph::rdf::RdfStore) -> Vec<SnapshotTriple> {
270    store
271        .triples()
272        .into_iter()
273        .map(|t| SnapshotTriple {
274            subject: t.subject().to_string(),
275            predicate: t.predicate().to_string(),
276            object: t.object().to_string(),
277        })
278        .collect()
279}
280
281/// Populates an RDF store from snapshot triples.
282#[cfg(feature = "triple-store")]
283fn populate_rdf_store(store: &grafeo_core::graph::rdf::RdfStore, triples: &[SnapshotTriple]) {
284    use grafeo_core::graph::rdf::{Term, Triple};
285    for triple in triples {
286        if let (Some(s), Some(p), Some(o)) = (
287            Term::from_ntriples(&triple.subject),
288            Term::from_ntriples(&triple.predicate),
289            Term::from_ntriples(&triple.object),
290        ) {
291            store.insert(Triple::new(s, p, o));
292        }
293    }
294}
295
296// =========================================================================
297// Snapshot deserialization helpers (used by single-file format)
298// =========================================================================
299
300/// Decodes snapshot bytes and populates a store and catalog.
301#[cfg(feature = "grafeo-file")]
302pub(super) fn load_snapshot_into_store(
303    store: &std::sync::Arc<grafeo_core::graph::lpg::LpgStore>,
304    catalog: &std::sync::Arc<crate::catalog::Catalog>,
305    #[cfg(feature = "triple-store")] rdf_store: &std::sync::Arc<grafeo_core::graph::rdf::RdfStore>,
306    data: &[u8],
307) -> grafeo_common::utils::error::Result<()> {
308    use grafeo_common::utils::error::Error;
309
310    let config = bincode::config::standard();
311    let (snapshot, _) =
312        bincode::serde::decode_from_slice::<Snapshot, _>(data, config).map_err(|e| {
313            Error::Serialization(format!("failed to decode snapshot from .grafeo file: {e}"))
314        })?;
315
316    populate_store_from_snapshot_ref(store, &snapshot.nodes, &snapshot.edges)?;
317
318    // Restore epoch from snapshot (store-level only; TransactionManager
319    // sync is handled in with_config() after all recovery completes).
320    #[cfg(feature = "temporal")]
321    store.sync_epoch(EpochId::new(snapshot.epoch));
322
323    for graph in &snapshot.named_graphs {
324        store
325            .create_graph(&graph.name)
326            .map_err(|e| Error::Internal(e.to_string()))?;
327        if let Some(graph_store) = store.graph(&graph.name) {
328            populate_store_from_snapshot_ref(&graph_store, &graph.nodes, &graph.edges)?;
329            #[cfg(feature = "temporal")]
330            graph_store.sync_epoch(EpochId::new(snapshot.epoch));
331        }
332    }
333    restore_schema_from_snapshot(store, catalog, &snapshot.schema);
334
335    // Restore RDF triples
336    #[cfg(feature = "triple-store")]
337    {
338        populate_rdf_store(rdf_store, &snapshot.rdf_triples);
339        for rdf_graph in &snapshot.rdf_named_graphs {
340            rdf_store.create_graph(&rdf_graph.name);
341            if let Some(graph_store) = rdf_store.graph(&rdf_graph.name) {
342                populate_rdf_store(&graph_store, &rdf_graph.triples);
343            }
344        }
345    }
346
347    Ok(())
348}
349
350/// Populates a store from snapshot refs (borrowed, for single-file loading).
351#[cfg(feature = "grafeo-file")]
352fn populate_store_from_snapshot_ref(
353    store: &grafeo_core::graph::lpg::LpgStore,
354    nodes: &[SnapshotNode],
355    edges: &[SnapshotEdge],
356) -> grafeo_common::utils::error::Result<()> {
357    for node in nodes {
358        let label_refs: Vec<&str> = node.labels.iter().map(|s| s.as_str()).collect();
359        store.create_node_with_id(node.id, &label_refs)?;
360        for (key, entries) in &node.properties {
361            #[cfg(feature = "temporal")]
362            for (epoch, value) in entries {
363                store.set_node_property_at_epoch(node.id, key, value.clone(), *epoch);
364            }
365            #[cfg(not(feature = "temporal"))]
366            if let Some((_, value)) = entries.last() {
367                store.set_node_property(node.id, key, value.clone());
368            }
369        }
370    }
371    for edge in edges {
372        store.create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type)?;
373        for (key, entries) in &edge.properties {
374            #[cfg(feature = "temporal")]
375            for (epoch, value) in entries {
376                store.set_edge_property_at_epoch(edge.id, key, value.clone(), *epoch);
377            }
378            #[cfg(not(feature = "temporal"))]
379            if let Some((_, value)) = entries.last() {
380                store.set_edge_property(edge.id, key, value.clone());
381            }
382        }
383    }
384    Ok(())
385}
386
387/// Restores schema definitions from a snapshot into the catalog.
388///
389/// Also ensures each schema has its `__default__` graph partition, which
390/// may be missing in snapshots created before the schema hierarchy feature.
391fn restore_schema_from_snapshot(
392    store: &std::sync::Arc<grafeo_core::graph::lpg::LpgStore>,
393    catalog: &std::sync::Arc<crate::catalog::Catalog>,
394    schema: &SnapshotSchema,
395) {
396    for def in &schema.node_types {
397        catalog.register_or_replace_node_type(def.clone());
398    }
399    for def in &schema.edge_types {
400        catalog.register_or_replace_edge_type_def(def.clone());
401    }
402    for def in &schema.graph_types {
403        let _ = catalog.register_graph_type(def.clone());
404    }
405    for def in &schema.procedures {
406        catalog.replace_procedure(def.clone()).ok();
407    }
408    for name in &schema.schemas {
409        let _ = catalog.register_schema_namespace(name.clone());
410        // Ensure the schema's default graph partition exists
411        let default_key = format!("{name}/__default__");
412        let _ = store.create_graph(&default_key);
413    }
414    for (graph_name, type_name) in &schema.graph_type_bindings {
415        let _ = catalog.bind_graph_type(graph_name, type_name.clone());
416    }
417}
418
419/// Collects schema definitions from the catalog into snapshot format.
420fn collect_schema(catalog: &std::sync::Arc<crate::catalog::Catalog>) -> SnapshotSchema {
421    SnapshotSchema {
422        node_types: catalog.all_node_type_defs(),
423        edge_types: catalog.all_edge_type_defs(),
424        graph_types: catalog.all_graph_type_defs(),
425        procedures: catalog.all_procedure_defs(),
426        schemas: catalog.schema_names(),
427        graph_type_bindings: catalog.all_graph_type_bindings(),
428    }
429}
430
431/// Restores indexes from snapshot metadata by rebuilding them from existing data.
432///
433/// Must be called after all nodes/edges have been populated, since index
434/// creation scans existing data.
435fn restore_indexes_from_snapshot(db: &super::GrafeoDB, indexes: &SnapshotIndexes) {
436    for name in &indexes.property_indexes {
437        db.lpg_store().create_property_index(name);
438    }
439
440    #[cfg(feature = "vector-index")]
441    for vi in &indexes.vector_indexes {
442        if let Err(err) = db.create_vector_index(
443            &vi.label,
444            &vi.property,
445            Some(vi.dimensions),
446            Some(vi.metric.name()),
447            Some(vi.m),
448            Some(vi.ef_construction),
449            None,
450        ) {
451            grafeo_warn!(
452                "Failed to restore vector index :{label}({property}): {err}",
453                label = vi.label,
454                property = vi.property,
455            );
456        }
457    }
458
459    #[cfg(feature = "text-index")]
460    for ti in &indexes.text_indexes {
461        if let Err(err) = db.create_text_index(&ti.label, &ti.property) {
462            grafeo_warn!(
463                "Failed to restore text index :{label}({property}): {err}",
464                label = ti.label,
465                property = ti.property,
466            );
467        }
468    }
469}
470
471/// Collects index metadata from a store into snapshot format.
472fn collect_index_metadata(store: &grafeo_core::graph::lpg::LpgStore) -> SnapshotIndexes {
473    let property_indexes = store.property_index_keys();
474
475    #[cfg(feature = "vector-index")]
476    let vector_indexes: Vec<SnapshotVectorIndex> = store
477        .vector_index_entries()
478        .into_iter()
479        .filter_map(|(key, index)| {
480            let (label, property) = key.split_once(':')?;
481            let config = index.config();
482            Some(SnapshotVectorIndex {
483                label: label.to_string(),
484                property: property.to_string(),
485                dimensions: config.dimensions,
486                metric: config.metric,
487                m: config.m,
488                ef_construction: config.ef_construction,
489            })
490        })
491        .collect();
492    #[cfg(not(feature = "vector-index"))]
493    let vector_indexes = Vec::new();
494
495    #[cfg(feature = "text-index")]
496    let text_indexes: Vec<SnapshotTextIndex> = store
497        .text_index_entries()
498        .into_iter()
499        .filter_map(|(key, _)| {
500            let (label, property) = key.split_once(':')?;
501            Some(SnapshotTextIndex {
502                label: label.to_string(),
503                property: property.to_string(),
504            })
505        })
506        .collect();
507    #[cfg(not(feature = "text-index"))]
508    let text_indexes = Vec::new();
509
510    SnapshotIndexes {
511        property_indexes,
512        vector_indexes,
513        text_indexes,
514    }
515}
516
517impl super::GrafeoDB {
518    // =========================================================================
519    // ADMIN API: Persistence Control
520    // =========================================================================
521
522    /// Saves the database to a file path.
523    ///
524    /// - If the path ends in `.grafeo`: creates a single-file database
525    /// - Otherwise: creates a WAL directory-backed database at the path
526    /// - If in-memory: creates a new persistent database at path
527    /// - If file-backed: creates a copy at the new path
528    ///
529    /// The original database remains unchanged.
530    ///
531    /// # Errors
532    ///
533    /// Returns an error if the save operation fails.
534    ///
535    /// Requires the `wal` feature for persistence support.
536    #[cfg(feature = "wal")]
537    pub fn save(&self, path: impl AsRef<Path>) -> Result<()> {
538        let path = path.as_ref();
539
540        // Single-file format: export snapshot directly to a .grafeo file
541        #[cfg(feature = "grafeo-file")]
542        if path.extension().is_some_and(|ext| ext == "grafeo") {
543            return self.save_as_grafeo_file(path);
544        }
545
546        // Create target database with WAL enabled
547        let target_config = Config::persistent(path);
548        let target = Self::with_config(target_config)?;
549
550        // Copy all nodes using WAL-enabled methods
551        for node in self.lpg_store().all_nodes() {
552            let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
553            target
554                .lpg_store()
555                .create_node_with_id(node.id, &label_refs)?;
556
557            // Log to WAL
558            target.log_wal(&WalRecord::CreateNode {
559                id: node.id,
560                labels: node.labels.iter().map(|s| s.to_string()).collect(),
561            })?;
562
563            // Copy properties
564            for (key, value) in node.properties {
565                target
566                    .lpg_store()
567                    .set_node_property(node.id, key.as_str(), value.clone());
568                target.log_wal(&WalRecord::SetNodeProperty {
569                    id: node.id,
570                    key: key.to_string(),
571                    value,
572                })?;
573            }
574        }
575
576        // Copy all edges using WAL-enabled methods
577        for edge in self.lpg_store().all_edges() {
578            target
579                .lpg_store()
580                .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type)?;
581
582            // Log to WAL
583            target.log_wal(&WalRecord::CreateEdge {
584                id: edge.id,
585                src: edge.src,
586                dst: edge.dst,
587                edge_type: edge.edge_type.to_string(),
588            })?;
589
590            // Copy properties
591            for (key, value) in edge.properties {
592                target
593                    .lpg_store()
594                    .set_edge_property(edge.id, key.as_str(), value.clone());
595                target.log_wal(&WalRecord::SetEdgeProperty {
596                    id: edge.id,
597                    key: key.to_string(),
598                    value,
599                })?;
600            }
601        }
602
603        // Copy named graphs
604        for graph_name in self.lpg_store().graph_names() {
605            if let Some(src_graph) = self.lpg_store().graph(&graph_name) {
606                target.log_wal(&WalRecord::CreateNamedGraph {
607                    name: graph_name.clone(),
608                })?;
609                target
610                    .lpg_store()
611                    .create_graph(&graph_name)
612                    .map_err(|e| Error::Internal(e.to_string()))?;
613
614                if let Some(dst_graph) = target.lpg_store().graph(&graph_name) {
615                    // Switch WAL context to this named graph
616                    target.log_wal(&WalRecord::SwitchGraph {
617                        name: Some(graph_name.clone()),
618                    })?;
619
620                    for node in src_graph.all_nodes() {
621                        let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
622                        dst_graph.create_node_with_id(node.id, &label_refs)?;
623                        target.log_wal(&WalRecord::CreateNode {
624                            id: node.id,
625                            labels: node.labels.iter().map(|s| s.to_string()).collect(),
626                        })?;
627                        for (key, value) in node.properties {
628                            dst_graph.set_node_property(node.id, key.as_str(), value.clone());
629                            target.log_wal(&WalRecord::SetNodeProperty {
630                                id: node.id,
631                                key: key.to_string(),
632                                value,
633                            })?;
634                        }
635                    }
636                    for edge in src_graph.all_edges() {
637                        dst_graph.create_edge_with_id(
638                            edge.id,
639                            edge.src,
640                            edge.dst,
641                            &edge.edge_type,
642                        )?;
643                        target.log_wal(&WalRecord::CreateEdge {
644                            id: edge.id,
645                            src: edge.src,
646                            dst: edge.dst,
647                            edge_type: edge.edge_type.to_string(),
648                        })?;
649                        for (key, value) in edge.properties {
650                            dst_graph.set_edge_property(edge.id, key.as_str(), value.clone());
651                            target.log_wal(&WalRecord::SetEdgeProperty {
652                                id: edge.id,
653                                key: key.to_string(),
654                                value,
655                            })?;
656                        }
657                    }
658                }
659            }
660        }
661
662        // Switch WAL context back to default graph
663        if !self.lpg_store().graph_names().is_empty() {
664            target.log_wal(&WalRecord::SwitchGraph { name: None })?;
665        }
666
667        // Copy RDF data with WAL logging
668        #[cfg(feature = "triple-store")]
669        {
670            for triple in self.rdf_store.triples() {
671                let record = WalRecord::InsertRdfTriple {
672                    subject: triple.subject().to_string(),
673                    predicate: triple.predicate().to_string(),
674                    object: triple.object().to_string(),
675                    graph: None,
676                };
677                target.rdf_store.insert((*triple).clone());
678                target.log_wal(&record)?;
679            }
680            for name in self.rdf_store.graph_names() {
681                target.log_wal(&WalRecord::CreateRdfGraph { name: name.clone() })?;
682                if let Some(src_graph) = self.rdf_store.graph(&name) {
683                    let dst_graph = target.rdf_store.graph_or_create(&name);
684                    for triple in src_graph.triples() {
685                        let record = WalRecord::InsertRdfTriple {
686                            subject: triple.subject().to_string(),
687                            predicate: triple.predicate().to_string(),
688                            object: triple.object().to_string(),
689                            graph: Some(name.clone()),
690                        };
691                        dst_graph.insert((*triple).clone());
692                        target.log_wal(&record)?;
693                    }
694                }
695            }
696        }
697
698        // Checkpoint and close the target database
699        target.close()?;
700
701        Ok(())
702    }
703
704    /// Creates an in-memory copy of this database.
705    ///
706    /// Returns a new database that is completely independent, including
707    /// all named graph data.
708    /// Useful for:
709    /// Saves the database to a single `.grafeo` file.
710    #[cfg(feature = "grafeo-file")]
711    fn save_as_grafeo_file(&self, path: &Path) -> Result<()> {
712        use grafeo_storage::file::GrafeoFileManager;
713
714        let snapshot_data = self.export_snapshot()?;
715        let epoch = self.lpg_store().current_epoch();
716        let transaction_id = self
717            .transaction_manager
718            .last_assigned_transaction_id()
719            .map_or(0, |t| t.0);
720        let node_count = self.lpg_store().node_count() as u64;
721        let edge_count = self.lpg_store().edge_count() as u64;
722
723        let fm = GrafeoFileManager::create(path)?;
724        fm.write_snapshot(
725            &snapshot_data,
726            epoch.0,
727            transaction_id,
728            node_count,
729            edge_count,
730        )?;
731        Ok(())
732    }
733
734    /// - Testing modifications without affecting the original
735    /// - Faster operations when persistence isn't needed
736    ///
737    /// # Errors
738    ///
739    /// Returns an error if the copy operation fails.
740    pub fn to_memory(&self) -> Result<Self> {
741        let config = Config::in_memory();
742        let target = Self::with_config(config)?;
743
744        // Copy default graph nodes
745        for node in self.lpg_store().all_nodes() {
746            let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
747            target
748                .lpg_store()
749                .create_node_with_id(node.id, &label_refs)?;
750            for (key, value) in node.properties {
751                target
752                    .lpg_store()
753                    .set_node_property(node.id, key.as_str(), value);
754            }
755        }
756
757        // Copy default graph edges
758        for edge in self.lpg_store().all_edges() {
759            target
760                .lpg_store()
761                .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type)?;
762            for (key, value) in edge.properties {
763                target
764                    .lpg_store()
765                    .set_edge_property(edge.id, key.as_str(), value);
766            }
767        }
768
769        // Copy named graphs
770        for graph_name in self.lpg_store().graph_names() {
771            if let Some(src_graph) = self.lpg_store().graph(&graph_name) {
772                target
773                    .lpg_store()
774                    .create_graph(&graph_name)
775                    .map_err(|e| Error::Internal(e.to_string()))?;
776                if let Some(dst_graph) = target.lpg_store().graph(&graph_name) {
777                    for node in src_graph.all_nodes() {
778                        let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
779                        dst_graph.create_node_with_id(node.id, &label_refs)?;
780                        for (key, value) in node.properties {
781                            dst_graph.set_node_property(node.id, key.as_str(), value);
782                        }
783                    }
784                    for edge in src_graph.all_edges() {
785                        dst_graph.create_edge_with_id(
786                            edge.id,
787                            edge.src,
788                            edge.dst,
789                            &edge.edge_type,
790                        )?;
791                        for (key, value) in edge.properties {
792                            dst_graph.set_edge_property(edge.id, key.as_str(), value);
793                        }
794                    }
795                }
796            }
797        }
798
799        // Copy RDF data
800        #[cfg(feature = "triple-store")]
801        {
802            for triple in self.rdf_store.triples() {
803                target.rdf_store.insert((*triple).clone());
804            }
805            for name in self.rdf_store.graph_names() {
806                if let Some(src_graph) = self.rdf_store.graph(&name) {
807                    let dst_graph = target.rdf_store.graph_or_create(&name);
808                    for triple in src_graph.triples() {
809                        dst_graph.insert((*triple).clone());
810                    }
811                }
812            }
813        }
814
815        Ok(target)
816    }
817
818    /// Opens a database file and loads it entirely into memory.
819    ///
820    /// The returned database has no connection to the original file.
821    /// Changes will NOT be written back to the file.
822    ///
823    /// # Errors
824    ///
825    /// Returns an error if the file can't be opened or loaded.
826    #[cfg(feature = "wal")]
827    pub fn open_in_memory(path: impl AsRef<Path>) -> Result<Self> {
828        // Open the source database (triggers WAL recovery)
829        let source = Self::open(path)?;
830
831        // Create in-memory copy
832        let target = source.to_memory()?;
833
834        // Close the source (releases file handles)
835        source.close()?;
836
837        Ok(target)
838    }
839
840    // =========================================================================
841    // ADMIN API: Snapshot Export/Import
842    // =========================================================================
843
844    /// Exports the entire database to a binary snapshot.
845    ///
846    /// The returned bytes can be stored (e.g. in IndexedDB) and later
847    /// restored with [`import_snapshot()`](Self::import_snapshot).
848    /// Includes all named graph data.
849    ///
850    /// Properties are stored as version-history lists. When `temporal` is
851    /// enabled, the full history is captured. Otherwise, each property is
852    /// wrapped as a single-entry list at epoch 0.
853    ///
854    /// # Errors
855    ///
856    /// Returns an error if serialization fails.
857    pub fn export_snapshot(&self) -> Result<Vec<u8>> {
858        let nodes = collect_snapshot_nodes(self.lpg_store());
859        let edges = collect_snapshot_edges(self.lpg_store());
860
861        // Collect named graphs
862        let named_graphs: Vec<NamedGraphSnapshot> = self
863            .lpg_store()
864            .graph_names()
865            .into_iter()
866            .filter_map(|name| {
867                self.lpg_store()
868                    .graph(&name)
869                    .map(|graph_store| NamedGraphSnapshot {
870                        name,
871                        nodes: collect_snapshot_nodes(&graph_store),
872                        edges: collect_snapshot_edges(&graph_store),
873                    })
874            })
875            .collect();
876
877        // Collect RDF triples
878        #[cfg(feature = "triple-store")]
879        let rdf_triples = collect_rdf_triples(&self.rdf_store);
880        #[cfg(not(feature = "triple-store"))]
881        let rdf_triples = Vec::new();
882
883        #[cfg(feature = "triple-store")]
884        let rdf_named_graphs: Vec<RdfNamedGraphSnapshot> = self
885            .rdf_store
886            .graph_names()
887            .into_iter()
888            .filter_map(|name| {
889                self.rdf_store
890                    .graph(&name)
891                    .map(|graph| RdfNamedGraphSnapshot {
892                        name,
893                        triples: collect_rdf_triples(&graph),
894                    })
895            })
896            .collect();
897        #[cfg(not(feature = "triple-store"))]
898        let rdf_named_graphs = Vec::new();
899
900        let schema = collect_schema(&self.catalog);
901        let indexes = collect_index_metadata(self.lpg_store());
902
903        let snapshot = Snapshot {
904            version: SNAPSHOT_VERSION,
905            nodes,
906            edges,
907            named_graphs,
908            rdf_triples,
909            rdf_named_graphs,
910            schema,
911            indexes,
912            #[cfg(feature = "temporal")]
913            epoch: self.transaction_manager.current_epoch().as_u64(),
914            #[cfg(not(feature = "temporal"))]
915            epoch: 0,
916        };
917
918        let config = bincode::config::standard();
919        bincode::serde::encode_to_vec(&snapshot, config)
920            .map_err(|e| Error::Internal(format!("snapshot export failed: {e}")))
921    }
922
923    /// Creates a new in-memory database from a binary snapshot.
924    ///
925    /// The `data` must have been produced by [`export_snapshot()`](Self::export_snapshot).
926    ///
927    /// All edge references are validated before any data is inserted: every
928    /// edge's source and destination must reference a node present in the
929    /// snapshot, and duplicate node/edge IDs are rejected. If validation
930    /// fails, no database is created.
931    ///
932    /// # Errors
933    ///
934    /// Returns an error if the snapshot is invalid, contains dangling edge
935    /// references, has duplicate IDs, or deserialization fails.
936    pub fn import_snapshot(data: &[u8]) -> Result<Self> {
937        if data.is_empty() {
938            return Err(Error::Internal("empty snapshot data".to_string()));
939        }
940
941        let version = data[0];
942        if version != 4 {
943            return Err(Error::Internal(format!(
944                "unsupported snapshot version: {version} (expected 4)"
945            )));
946        }
947
948        let config = bincode::config::standard();
949        let (snapshot, _): (Snapshot, _) = bincode::serde::decode_from_slice(data, config)
950            .map_err(|e| Error::Internal(format!("snapshot import failed: {e}")))?;
951
952        // Validate default graph data
953        validate_snapshot_data(&snapshot.nodes, &snapshot.edges)?;
954
955        // Validate each named graph
956        for ng in &snapshot.named_graphs {
957            validate_snapshot_data(&ng.nodes, &ng.edges)?;
958        }
959
960        let db = Self::new_in_memory();
961        populate_store_from_snapshot(db.lpg_store(), snapshot.nodes, snapshot.edges)?;
962
963        // Restore epoch from snapshot
964        #[cfg(feature = "temporal")]
965        {
966            let epoch = EpochId::new(snapshot.epoch);
967            db.lpg_store().sync_epoch(epoch);
968            db.transaction_manager.sync_epoch(epoch);
969        }
970
971        // Capture epoch before moving snapshot fields
972        #[cfg(feature = "temporal")]
973        let snapshot_epoch = EpochId::new(snapshot.epoch);
974
975        // Restore named graphs
976        for ng in snapshot.named_graphs {
977            db.lpg_store()
978                .create_graph(&ng.name)
979                .map_err(|e| Error::Internal(e.to_string()))?;
980            if let Some(graph_store) = db.lpg_store().graph(&ng.name) {
981                populate_store_from_snapshot(&graph_store, ng.nodes, ng.edges)?;
982                // Named graph stores need the same epoch so temporal property
983                // lookups via current_epoch() return the correct values.
984                #[cfg(feature = "temporal")]
985                graph_store.sync_epoch(snapshot_epoch);
986            }
987        }
988
989        // Restore RDF triples
990        #[cfg(feature = "triple-store")]
991        {
992            populate_rdf_store(&db.rdf_store, &snapshot.rdf_triples);
993            for rng in &snapshot.rdf_named_graphs {
994                let graph = db.rdf_store.graph_or_create(&rng.name);
995                populate_rdf_store(&graph, &rng.triples);
996            }
997        }
998
999        // Restore schema
1000        restore_schema_from_snapshot(db.lpg_store(), &db.catalog, &snapshot.schema);
1001
1002        // Restore indexes (must come after data population)
1003        restore_indexes_from_snapshot(&db, &snapshot.indexes);
1004
1005        Ok(db)
1006    }
1007
1008    /// Replaces the current database contents with data from a binary snapshot.
1009    ///
1010    /// The `data` must have been produced by
1011    /// [`export_snapshot()`](Self::export_snapshot).
1012    ///
1013    /// All validation (duplicate IDs, dangling edge references) is performed
1014    /// before any data is modified. If validation fails, the current database
1015    /// is left unchanged. If validation passes, the store is cleared and
1016    /// rebuilt from the snapshot atomically (from the perspective of
1017    /// subsequent queries).
1018    ///
1019    /// # Errors
1020    ///
1021    /// Returns an error if the snapshot is invalid, contains dangling edge
1022    /// references, has duplicate IDs, or deserialization fails.
1023    pub fn restore_snapshot(&self, data: &[u8]) -> Result<()> {
1024        if data.is_empty() {
1025            return Err(Error::Internal("empty snapshot data".to_string()));
1026        }
1027
1028        let version = data[0];
1029        if version != 4 {
1030            return Err(Error::Internal(format!(
1031                "unsupported snapshot version: {version} (expected 4)"
1032            )));
1033        }
1034
1035        let config = bincode::config::standard();
1036        let (snapshot, _): (Snapshot, _) = bincode::serde::decode_from_slice(data, config)
1037            .map_err(|e| Error::Internal(format!("snapshot restore failed: {e}")))?;
1038
1039        // Validate all data before making any changes
1040        validate_snapshot_data(&snapshot.nodes, &snapshot.edges)?;
1041        for ng in &snapshot.named_graphs {
1042            validate_snapshot_data(&ng.nodes, &ng.edges)?;
1043        }
1044
1045        // Drop all existing named graphs, then clear default store
1046        for name in self.lpg_store().graph_names() {
1047            self.lpg_store().drop_graph(&name);
1048        }
1049        self.lpg_store().clear();
1050
1051        populate_store_from_snapshot(self.lpg_store(), snapshot.nodes, snapshot.edges)?;
1052
1053        // Restore epoch from temporal snapshot
1054        #[cfg(feature = "temporal")]
1055        let snapshot_epoch = {
1056            let epoch = EpochId::new(snapshot.epoch);
1057            self.lpg_store().sync_epoch(epoch);
1058            self.transaction_manager.sync_epoch(epoch);
1059            epoch
1060        };
1061
1062        // Restore named graphs
1063        for ng in snapshot.named_graphs {
1064            self.lpg_store()
1065                .create_graph(&ng.name)
1066                .map_err(|e| Error::Internal(e.to_string()))?;
1067            if let Some(graph_store) = self.lpg_store().graph(&ng.name) {
1068                populate_store_from_snapshot(&graph_store, ng.nodes, ng.edges)?;
1069                #[cfg(feature = "temporal")]
1070                graph_store.sync_epoch(snapshot_epoch);
1071            }
1072        }
1073
1074        // Restore RDF data
1075        #[cfg(feature = "triple-store")]
1076        {
1077            // Clear existing RDF data
1078            self.rdf_store.clear();
1079            for name in self.rdf_store.graph_names() {
1080                self.rdf_store.drop_graph(&name);
1081            }
1082            populate_rdf_store(&self.rdf_store, &snapshot.rdf_triples);
1083            for rng in &snapshot.rdf_named_graphs {
1084                let graph = self.rdf_store.graph_or_create(&rng.name);
1085                populate_rdf_store(&graph, &rng.triples);
1086            }
1087        }
1088
1089        // Restore schema
1090        restore_schema_from_snapshot(self.lpg_store(), &self.catalog, &snapshot.schema);
1091
1092        // Restore indexes (must come after data population)
1093        restore_indexes_from_snapshot(self, &snapshot.indexes);
1094
1095        Ok(())
1096    }
1097
1098    // =========================================================================
1099    // ADMIN API: Iteration
1100    // =========================================================================
1101
1102    /// Returns an iterator over all nodes in the database.
1103    ///
1104    /// Useful for dump/export operations.
1105    pub fn iter_nodes(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Node> + '_ {
1106        self.lpg_store().all_nodes()
1107    }
1108
1109    /// Returns an iterator over all edges in the database.
1110    ///
1111    /// Useful for dump/export operations.
1112    pub fn iter_edges(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Edge> + '_ {
1113        self.lpg_store().all_edges()
1114    }
1115}
1116
1117#[cfg(test)]
1118mod tests {
1119    use grafeo_common::types::{EdgeId, NodeId, Value};
1120
1121    use super::super::GrafeoDB;
1122    use super::{
1123        SNAPSHOT_VERSION, Snapshot, SnapshotEdge, SnapshotIndexes, SnapshotNode, SnapshotSchema,
1124    };
1125
1126    #[test]
1127    fn test_restore_snapshot_basic() {
1128        let db = GrafeoDB::new_in_memory();
1129        let session = db.session();
1130
1131        // Populate
1132        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1133        session.execute("INSERT (:Person {name: 'Gus'})").unwrap();
1134
1135        let snapshot = db.export_snapshot().unwrap();
1136
1137        // Modify
1138        session
1139            .execute("INSERT (:Person {name: 'Vincent'})")
1140            .unwrap();
1141        assert_eq!(db.lpg_store().node_count(), 3);
1142
1143        // Restore original
1144        db.restore_snapshot(&snapshot).unwrap();
1145
1146        assert_eq!(db.lpg_store().node_count(), 2);
1147        let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
1148        assert_eq!(result.rows.len(), 2);
1149    }
1150
1151    #[test]
1152    fn test_restore_snapshot_validation_failure() {
1153        let db = GrafeoDB::new_in_memory();
1154        let session = db.session();
1155
1156        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1157
1158        // Corrupt snapshot: just garbage bytes
1159        let result = db.restore_snapshot(b"garbage");
1160        assert!(result.is_err());
1161
1162        // DB should be unchanged
1163        assert_eq!(db.lpg_store().node_count(), 1);
1164    }
1165
1166    #[test]
1167    fn test_restore_snapshot_empty_db() {
1168        let db = GrafeoDB::new_in_memory();
1169
1170        // Export empty snapshot, then populate, then restore to empty
1171        let empty_snapshot = db.export_snapshot().unwrap();
1172
1173        let session = db.session();
1174        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1175        assert_eq!(db.lpg_store().node_count(), 1);
1176
1177        db.restore_snapshot(&empty_snapshot).unwrap();
1178        assert_eq!(db.lpg_store().node_count(), 0);
1179    }
1180
1181    #[test]
1182    fn test_restore_snapshot_with_edges() {
1183        let db = GrafeoDB::new_in_memory();
1184        let session = db.session();
1185
1186        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1187        session.execute("INSERT (:Person {name: 'Gus'})").unwrap();
1188        session
1189            .execute(
1190                "MATCH (a:Person {name: 'Alix'}), (b:Person {name: 'Gus'}) INSERT (a)-[:KNOWS]->(b)",
1191            )
1192            .unwrap();
1193
1194        let snapshot = db.export_snapshot().unwrap();
1195        assert_eq!(db.lpg_store().edge_count(), 1);
1196
1197        // Modify: add more data
1198        session
1199            .execute("INSERT (:Person {name: 'Vincent'})")
1200            .unwrap();
1201
1202        // Restore
1203        db.restore_snapshot(&snapshot).unwrap();
1204        assert_eq!(db.lpg_store().node_count(), 2);
1205        assert_eq!(db.lpg_store().edge_count(), 1);
1206    }
1207
1208    #[test]
1209    fn test_restore_snapshot_preserves_sessions() {
1210        let db = GrafeoDB::new_in_memory();
1211        let session = db.session();
1212
1213        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1214        let snapshot = db.export_snapshot().unwrap();
1215
1216        // Modify
1217        session.execute("INSERT (:Person {name: 'Gus'})").unwrap();
1218
1219        // Restore
1220        db.restore_snapshot(&snapshot).unwrap();
1221
1222        // Session should still work and see restored data
1223        let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
1224        assert_eq!(result.rows.len(), 1);
1225    }
1226
1227    #[test]
1228    fn test_export_import_roundtrip() {
1229        let db = GrafeoDB::new_in_memory();
1230        let session = db.session();
1231
1232        session
1233            .execute("INSERT (:Person {name: 'Alix', age: 30})")
1234            .unwrap();
1235
1236        let snapshot = db.export_snapshot().unwrap();
1237        let db2 = GrafeoDB::import_snapshot(&snapshot).unwrap();
1238        let session2 = db2.session();
1239
1240        let result = session2.execute("MATCH (n:Person) RETURN n.name").unwrap();
1241        assert_eq!(result.rows.len(), 1);
1242    }
1243
1244    // --- to_memory() ---
1245
1246    #[test]
1247    fn test_to_memory_empty() {
1248        let db = GrafeoDB::new_in_memory();
1249        let copy = db.to_memory().unwrap();
1250        assert_eq!(copy.lpg_store().node_count(), 0);
1251        assert_eq!(copy.lpg_store().edge_count(), 0);
1252    }
1253
1254    #[test]
1255    fn test_to_memory_copies_nodes_and_properties() {
1256        let db = GrafeoDB::new_in_memory();
1257        let session = db.session();
1258        session
1259            .execute("INSERT (:Person {name: 'Alix', age: 30})")
1260            .unwrap();
1261        session
1262            .execute("INSERT (:Person {name: 'Gus', age: 25})")
1263            .unwrap();
1264
1265        let copy = db.to_memory().unwrap();
1266        assert_eq!(copy.lpg_store().node_count(), 2);
1267
1268        let s2 = copy.session();
1269        let result = s2
1270            .execute("MATCH (p:Person) RETURN p.name ORDER BY p.name")
1271            .unwrap();
1272        assert_eq!(result.rows.len(), 2);
1273        assert_eq!(result.rows[0][0], Value::String("Alix".into()));
1274        assert_eq!(result.rows[1][0], Value::String("Gus".into()));
1275    }
1276
1277    #[test]
1278    fn test_to_memory_copies_edges_and_properties() {
1279        let db = GrafeoDB::new_in_memory();
1280        let a = db.create_node(&["Person"]);
1281        db.set_node_property(a, "name", "Alix".into());
1282        let b = db.create_node(&["Person"]);
1283        db.set_node_property(b, "name", "Gus".into());
1284        let edge = db.create_edge(a, b, "KNOWS");
1285        db.set_edge_property(edge, "since", Value::Int64(2020));
1286
1287        let copy = db.to_memory().unwrap();
1288        assert_eq!(copy.lpg_store().node_count(), 2);
1289        assert_eq!(copy.lpg_store().edge_count(), 1);
1290
1291        let s2 = copy.session();
1292        let result = s2.execute("MATCH ()-[e:KNOWS]->() RETURN e.since").unwrap();
1293        assert_eq!(result.rows[0][0], Value::Int64(2020));
1294    }
1295
1296    #[test]
1297    fn test_to_memory_is_independent() {
1298        let db = GrafeoDB::new_in_memory();
1299        let session = db.session();
1300        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1301
1302        let copy = db.to_memory().unwrap();
1303
1304        // Mutating original should not affect copy
1305        session.execute("INSERT (:Person {name: 'Gus'})").unwrap();
1306        assert_eq!(db.lpg_store().node_count(), 2);
1307        assert_eq!(copy.lpg_store().node_count(), 1);
1308    }
1309
1310    // --- iter_nodes() / iter_edges() ---
1311
1312    #[test]
1313    fn test_iter_nodes_empty() {
1314        let db = GrafeoDB::new_in_memory();
1315        assert_eq!(db.iter_nodes().count(), 0);
1316    }
1317
1318    #[test]
1319    fn test_iter_nodes_returns_all() {
1320        let db = GrafeoDB::new_in_memory();
1321        let id1 = db.create_node(&["Person"]);
1322        db.set_node_property(id1, "name", "Alix".into());
1323        let id2 = db.create_node(&["Animal"]);
1324        db.set_node_property(id2, "name", "Fido".into());
1325
1326        let nodes: Vec<_> = db.iter_nodes().collect();
1327        assert_eq!(nodes.len(), 2);
1328
1329        let names: Vec<_> = nodes
1330            .iter()
1331            .filter_map(|n| n.properties.iter().find(|(k, _)| k.as_str() == "name"))
1332            .map(|(_, v)| v.clone())
1333            .collect();
1334        assert!(names.contains(&Value::String("Alix".into())));
1335        assert!(names.contains(&Value::String("Fido".into())));
1336    }
1337
1338    #[test]
1339    fn test_iter_edges_empty() {
1340        let db = GrafeoDB::new_in_memory();
1341        assert_eq!(db.iter_edges().count(), 0);
1342    }
1343
1344    #[test]
1345    fn test_iter_edges_returns_all() {
1346        let db = GrafeoDB::new_in_memory();
1347        let a = db.create_node(&["A"]);
1348        let b = db.create_node(&["B"]);
1349        let c = db.create_node(&["C"]);
1350        db.create_edge(a, b, "R1");
1351        db.create_edge(b, c, "R2");
1352
1353        let edges: Vec<_> = db.iter_edges().collect();
1354        assert_eq!(edges.len(), 2);
1355
1356        let types: Vec<_> = edges.iter().map(|e| e.edge_type.as_ref()).collect();
1357        assert!(types.contains(&"R1"));
1358        assert!(types.contains(&"R2"));
1359    }
1360
1361    // --- restore_snapshot() validation ---
1362
1363    fn make_snapshot(version: u8, nodes: Vec<SnapshotNode>, edges: Vec<SnapshotEdge>) -> Vec<u8> {
1364        let snap = Snapshot {
1365            version,
1366            nodes,
1367            edges,
1368            named_graphs: vec![],
1369            rdf_triples: vec![],
1370            rdf_named_graphs: vec![],
1371            schema: SnapshotSchema::default(),
1372            indexes: SnapshotIndexes::default(),
1373            epoch: 0,
1374        };
1375        bincode::serde::encode_to_vec(&snap, bincode::config::standard()).unwrap()
1376    }
1377
1378    #[test]
1379    fn test_restore_rejects_unsupported_version() {
1380        let db = GrafeoDB::new_in_memory();
1381        let session = db.session();
1382        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1383
1384        let bytes = make_snapshot(99, vec![], vec![]);
1385
1386        let result = db.restore_snapshot(&bytes);
1387        assert!(result.is_err());
1388        let err = result.unwrap_err().to_string();
1389        assert!(err.contains("unsupported snapshot version"), "got: {err}");
1390
1391        // DB unchanged
1392        assert_eq!(db.lpg_store().node_count(), 1);
1393    }
1394
1395    #[test]
1396    fn test_restore_rejects_duplicate_node_ids() {
1397        let db = GrafeoDB::new_in_memory();
1398        let session = db.session();
1399        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1400
1401        let bytes = make_snapshot(
1402            SNAPSHOT_VERSION,
1403            vec![
1404                SnapshotNode {
1405                    id: NodeId::new(0),
1406                    labels: vec!["A".into()],
1407                    properties: vec![],
1408                },
1409                SnapshotNode {
1410                    id: NodeId::new(0),
1411                    labels: vec!["B".into()],
1412                    properties: vec![],
1413                },
1414            ],
1415            vec![],
1416        );
1417
1418        let result = db.restore_snapshot(&bytes);
1419        assert!(result.is_err());
1420        let err = result.unwrap_err().to_string();
1421        assert!(err.contains("duplicate node ID"), "got: {err}");
1422        assert_eq!(db.lpg_store().node_count(), 1);
1423    }
1424
1425    #[test]
1426    fn test_restore_rejects_duplicate_edge_ids() {
1427        let db = GrafeoDB::new_in_memory();
1428
1429        let bytes = make_snapshot(
1430            SNAPSHOT_VERSION,
1431            vec![
1432                SnapshotNode {
1433                    id: NodeId::new(0),
1434                    labels: vec![],
1435                    properties: vec![],
1436                },
1437                SnapshotNode {
1438                    id: NodeId::new(1),
1439                    labels: vec![],
1440                    properties: vec![],
1441                },
1442            ],
1443            vec![
1444                SnapshotEdge {
1445                    id: EdgeId::new(0),
1446                    src: NodeId::new(0),
1447                    dst: NodeId::new(1),
1448                    edge_type: "REL".into(),
1449                    properties: vec![],
1450                },
1451                SnapshotEdge {
1452                    id: EdgeId::new(0),
1453                    src: NodeId::new(0),
1454                    dst: NodeId::new(1),
1455                    edge_type: "REL".into(),
1456                    properties: vec![],
1457                },
1458            ],
1459        );
1460
1461        let result = db.restore_snapshot(&bytes);
1462        assert!(result.is_err());
1463        let err = result.unwrap_err().to_string();
1464        assert!(err.contains("duplicate edge ID"), "got: {err}");
1465    }
1466
1467    #[test]
1468    fn test_restore_rejects_dangling_source() {
1469        let db = GrafeoDB::new_in_memory();
1470
1471        let bytes = make_snapshot(
1472            SNAPSHOT_VERSION,
1473            vec![SnapshotNode {
1474                id: NodeId::new(0),
1475                labels: vec![],
1476                properties: vec![],
1477            }],
1478            vec![SnapshotEdge {
1479                id: EdgeId::new(0),
1480                src: NodeId::new(999),
1481                dst: NodeId::new(0),
1482                edge_type: "REL".into(),
1483                properties: vec![],
1484            }],
1485        );
1486
1487        let result = db.restore_snapshot(&bytes);
1488        assert!(result.is_err());
1489        let err = result.unwrap_err().to_string();
1490        assert!(err.contains("non-existent source node"), "got: {err}");
1491    }
1492
1493    #[test]
1494    fn test_restore_rejects_dangling_destination() {
1495        let db = GrafeoDB::new_in_memory();
1496
1497        let bytes = make_snapshot(
1498            SNAPSHOT_VERSION,
1499            vec![SnapshotNode {
1500                id: NodeId::new(0),
1501                labels: vec![],
1502                properties: vec![],
1503            }],
1504            vec![SnapshotEdge {
1505                id: EdgeId::new(0),
1506                src: NodeId::new(0),
1507                dst: NodeId::new(999),
1508                edge_type: "REL".into(),
1509                properties: vec![],
1510            }],
1511        );
1512
1513        let result = db.restore_snapshot(&bytes);
1514        assert!(result.is_err());
1515        let err = result.unwrap_err().to_string();
1516        assert!(err.contains("non-existent destination node"), "got: {err}");
1517    }
1518
1519    // --- index metadata roundtrip ---
1520
1521    #[test]
1522    fn test_snapshot_roundtrip_property_index() {
1523        let db = GrafeoDB::new_in_memory();
1524        let session = db.session();
1525
1526        session
1527            .execute("INSERT (:Person {name: 'Alix', email: 'alix@example.com'})")
1528            .unwrap();
1529        db.create_property_index("email");
1530        assert!(db.has_property_index("email"));
1531
1532        let snapshot = db.export_snapshot().unwrap();
1533        let db2 = GrafeoDB::import_snapshot(&snapshot).unwrap();
1534
1535        assert!(db2.has_property_index("email"));
1536
1537        // Verify the index actually works for O(1) lookups
1538        let found = db2.find_nodes_by_property("email", &Value::String("alix@example.com".into()));
1539        assert_eq!(found.len(), 1);
1540    }
1541
1542    #[cfg(feature = "vector-index")]
1543    #[test]
1544    fn test_snapshot_roundtrip_vector_index() {
1545        use std::sync::Arc;
1546
1547        let db = GrafeoDB::new_in_memory();
1548
1549        let n1 = db.create_node(&["Doc"]);
1550        db.set_node_property(
1551            n1,
1552            "embedding",
1553            Value::Vector(Arc::from([1.0_f32, 0.0, 0.0])),
1554        );
1555        let n2 = db.create_node(&["Doc"]);
1556        db.set_node_property(
1557            n2,
1558            "embedding",
1559            Value::Vector(Arc::from([0.0_f32, 1.0, 0.0])),
1560        );
1561
1562        db.create_vector_index(
1563            "Doc",
1564            "embedding",
1565            None,
1566            Some("cosine"),
1567            Some(4),
1568            Some(32),
1569            None,
1570        )
1571        .unwrap();
1572
1573        let snapshot = db.export_snapshot().unwrap();
1574        let db2 = GrafeoDB::import_snapshot(&snapshot).unwrap();
1575
1576        // Vector search should work on the restored database
1577        let results = db2
1578            .vector_search("Doc", "embedding", &[1.0, 0.0, 0.0], 2, None, None)
1579            .unwrap();
1580        assert_eq!(results.len(), 2);
1581        // Closest to [1,0,0] should be n1
1582        assert_eq!(results[0].0, n1);
1583    }
1584
1585    #[cfg(feature = "text-index")]
1586    #[test]
1587    fn test_snapshot_roundtrip_text_index() {
1588        let db = GrafeoDB::new_in_memory();
1589
1590        let n1 = db.create_node(&["Article"]);
1591        db.set_node_property(n1, "body", Value::String("rust graph database".into()));
1592        let n2 = db.create_node(&["Article"]);
1593        db.set_node_property(n2, "body", Value::String("python web framework".into()));
1594
1595        db.create_text_index("Article", "body").unwrap();
1596
1597        let snapshot = db.export_snapshot().unwrap();
1598        let db2 = GrafeoDB::import_snapshot(&snapshot).unwrap();
1599
1600        // Text search should work on the restored database
1601        let results = db2
1602            .text_search("Article", "body", "graph database", 10)
1603            .unwrap();
1604        assert_eq!(results.len(), 1);
1605        assert_eq!(results[0].0, n1);
1606    }
1607
1608    #[test]
1609    fn test_snapshot_roundtrip_property_index_via_restore() {
1610        let db = GrafeoDB::new_in_memory();
1611        let session = db.session();
1612
1613        session
1614            .execute("INSERT (:Person {name: 'Alix', email: 'alix@example.com'})")
1615            .unwrap();
1616        db.create_property_index("email");
1617
1618        let snapshot = db.export_snapshot().unwrap();
1619
1620        // Mutate the database
1621        session
1622            .execute("INSERT (:Person {name: 'Gus', email: 'gus@example.com'})")
1623            .unwrap();
1624        db.drop_property_index("email");
1625        assert!(!db.has_property_index("email"));
1626
1627        // Restore should bring back the index
1628        db.restore_snapshot(&snapshot).unwrap();
1629        assert!(db.has_property_index("email"));
1630    }
1631}