Skip to main content

grafeo_engine/database/
persistence.rs

1//! Persistence, snapshots, and data export for GrafeoDB.
2
3#[cfg(any(feature = "wal", feature = "grafeo-file"))]
4use std::path::Path;
5
6#[cfg(any(feature = "vector-index", feature = "text-index"))]
7use grafeo_common::grafeo_warn;
8use grafeo_common::types::{EdgeId, EpochId, NodeId, Value};
9use grafeo_common::utils::error::{Error, Result};
10use hashbrown::HashSet;
11
12use crate::config::Config;
13
14#[cfg(feature = "wal")]
15use grafeo_storage::wal::WalRecord;
16
17use crate::catalog::{
18    EdgeTypeDefinition, GraphTypeDefinition, NodeTypeDefinition, ProcedureDefinition,
19};
20
21/// Current snapshot version.
22const SNAPSHOT_VERSION: u8 = 4;
23
24/// Binary snapshot format (v4: graph data, named graphs, RDF, schema, index metadata,
25/// and property version history for temporal support).
26#[derive(serde::Serialize, serde::Deserialize)]
27struct Snapshot {
28    version: u8,
29    nodes: Vec<SnapshotNode>,
30    edges: Vec<SnapshotEdge>,
31    named_graphs: Vec<NamedGraphSnapshot>,
32    rdf_triples: Vec<SnapshotTriple>,
33    rdf_named_graphs: Vec<RdfNamedGraphSnapshot>,
34    schema: SnapshotSchema,
35    indexes: SnapshotIndexes,
36    /// Current store epoch at snapshot time (0 when temporal is disabled).
37    epoch: u64,
38}
39
40/// Schema metadata within a snapshot.
41#[derive(serde::Serialize, serde::Deserialize, Default)]
42struct SnapshotSchema {
43    node_types: Vec<NodeTypeDefinition>,
44    edge_types: Vec<EdgeTypeDefinition>,
45    graph_types: Vec<GraphTypeDefinition>,
46    procedures: Vec<ProcedureDefinition>,
47    schemas: Vec<String>,
48    graph_type_bindings: Vec<(String, String)>,
49}
50
51/// Index metadata within a snapshot (definitions only, not index data).
52#[derive(serde::Serialize, serde::Deserialize, Default)]
53struct SnapshotIndexes {
54    property_indexes: Vec<String>,
55    vector_indexes: Vec<SnapshotVectorIndex>,
56    text_indexes: Vec<SnapshotTextIndex>,
57}
58
59/// Vector index definition for snapshot persistence.
60#[derive(serde::Serialize, serde::Deserialize)]
61struct SnapshotVectorIndex {
62    label: String,
63    property: String,
64    dimensions: usize,
65    metric: grafeo_core::index::vector::DistanceMetric,
66    m: usize,
67    ef_construction: usize,
68}
69
70/// Text index definition for snapshot persistence.
71#[derive(serde::Serialize, serde::Deserialize)]
72struct SnapshotTextIndex {
73    label: String,
74    property: String,
75}
76
77/// A named graph partition within a v2 snapshot.
78#[derive(serde::Serialize, serde::Deserialize)]
79struct NamedGraphSnapshot {
80    name: String,
81    nodes: Vec<SnapshotNode>,
82    edges: Vec<SnapshotEdge>,
83}
84
85/// An RDF triple in snapshot format (N-Triples encoded terms).
86#[derive(serde::Serialize, serde::Deserialize)]
87struct SnapshotTriple {
88    subject: String,
89    predicate: String,
90    object: String,
91}
92
93/// An RDF named graph in snapshot format.
94#[derive(serde::Serialize, serde::Deserialize)]
95struct RdfNamedGraphSnapshot {
96    name: String,
97    triples: Vec<SnapshotTriple>,
98}
99
100#[derive(serde::Serialize, serde::Deserialize)]
101struct SnapshotNode {
102    id: NodeId,
103    labels: Vec<String>,
104    /// Each property has a list of `(epoch, value)` entries (ascending epoch order).
105    properties: Vec<(String, Vec<(EpochId, Value)>)>,
106}
107
108#[derive(serde::Serialize, serde::Deserialize)]
109struct SnapshotEdge {
110    id: EdgeId,
111    src: NodeId,
112    dst: NodeId,
113    edge_type: String,
114    /// Each property has a list of `(epoch, value)` entries (ascending epoch order).
115    properties: Vec<(String, Vec<(EpochId, Value)>)>,
116}
117
118/// Collects all nodes from a store into snapshot format.
119///
120/// With `temporal`: stores full property version history.
121/// Without: wraps each current value as a single-entry version list at epoch 0.
122fn collect_snapshot_nodes(store: &grafeo_core::graph::lpg::LpgStore) -> Vec<SnapshotNode> {
123    let mut nodes: Vec<SnapshotNode> = store
124        .all_nodes()
125        .map(|n| {
126            #[cfg(feature = "temporal")]
127            let mut properties: Vec<(String, Vec<(EpochId, Value)>)> = store
128                .node_property_history(n.id)
129                .into_iter()
130                .map(|(k, entries)| (k.to_string(), entries))
131                .collect();
132
133            #[cfg(not(feature = "temporal"))]
134            let mut properties: Vec<(String, Vec<(EpochId, Value)>)> = n
135                .properties
136                .into_iter()
137                .map(|(k, v)| (k.to_string(), vec![(EpochId::new(0), v)]))
138                .collect();
139
140            properties.sort_by(|(a, _), (b, _)| a.cmp(b));
141
142            let mut labels: Vec<String> = n.labels.iter().map(|l| l.to_string()).collect();
143            labels.sort();
144
145            SnapshotNode {
146                id: n.id,
147                labels,
148                properties,
149            }
150        })
151        .collect();
152    nodes.sort_by_key(|n| n.id);
153    nodes
154}
155
156/// Collects all edges from a store into snapshot format.
157///
158/// With `temporal`: stores full property version history.
159/// Without: wraps each current value as a single-entry version list at epoch 0.
160fn collect_snapshot_edges(store: &grafeo_core::graph::lpg::LpgStore) -> Vec<SnapshotEdge> {
161    let mut edges: Vec<SnapshotEdge> = store
162        .all_edges()
163        .map(|e| {
164            #[cfg(feature = "temporal")]
165            let mut properties: Vec<(String, Vec<(EpochId, Value)>)> = store
166                .edge_property_history(e.id)
167                .into_iter()
168                .map(|(k, entries)| (k.to_string(), entries))
169                .collect();
170
171            #[cfg(not(feature = "temporal"))]
172            let mut properties: Vec<(String, Vec<(EpochId, Value)>)> = e
173                .properties
174                .into_iter()
175                .map(|(k, v)| (k.to_string(), vec![(EpochId::new(0), v)]))
176                .collect();
177
178            properties.sort_by(|(a, _), (b, _)| a.cmp(b));
179
180            SnapshotEdge {
181                id: e.id,
182                src: e.src,
183                dst: e.dst,
184                edge_type: e.edge_type.to_string(),
185                properties,
186            }
187        })
188        .collect();
189    edges.sort_by_key(|e| e.id);
190    edges
191}
192
193/// Populates a store from snapshot node/edge data.
194///
195/// With `temporal`: replays all `(epoch, value)` entries into version logs.
196/// Without: reads the latest value from each property's version list.
197fn populate_store_from_snapshot(
198    store: &grafeo_core::graph::lpg::LpgStore,
199    nodes: Vec<SnapshotNode>,
200    edges: Vec<SnapshotEdge>,
201) -> Result<()> {
202    for node in nodes {
203        let label_refs: Vec<&str> = node.labels.iter().map(|s| s.as_str()).collect();
204        store.create_node_with_id(node.id, &label_refs)?;
205        for (key, entries) in node.properties {
206            #[cfg(feature = "temporal")]
207            for (epoch, value) in entries {
208                store.set_node_property_at_epoch(node.id, &key, value, epoch);
209            }
210            #[cfg(not(feature = "temporal"))]
211            if let Some((_, value)) = entries.into_iter().last() {
212                store.set_node_property(node.id, &key, value);
213            }
214        }
215    }
216    for edge in edges {
217        store.create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type)?;
218        for (key, entries) in edge.properties {
219            #[cfg(feature = "temporal")]
220            for (epoch, value) in entries {
221                store.set_edge_property_at_epoch(edge.id, &key, value, epoch);
222            }
223            #[cfg(not(feature = "temporal"))]
224            if let Some((_, value)) = entries.into_iter().last() {
225                store.set_edge_property(edge.id, &key, value);
226            }
227        }
228    }
229    Ok(())
230}
231
232/// Validates snapshot nodes/edges for duplicates and dangling references.
233fn validate_snapshot_data(nodes: &[SnapshotNode], edges: &[SnapshotEdge]) -> Result<()> {
234    let mut node_ids = HashSet::with_capacity(nodes.len());
235    for node in nodes {
236        if !node_ids.insert(node.id) {
237            return Err(Error::Internal(format!(
238                "snapshot contains duplicate node ID {}",
239                node.id
240            )));
241        }
242    }
243    let mut edge_ids = HashSet::with_capacity(edges.len());
244    for edge in edges {
245        if !edge_ids.insert(edge.id) {
246            return Err(Error::Internal(format!(
247                "snapshot contains duplicate edge ID {}",
248                edge.id
249            )));
250        }
251        if !node_ids.contains(&edge.src) {
252            return Err(Error::Internal(format!(
253                "snapshot edge {} references non-existent source node {}",
254                edge.id, edge.src
255            )));
256        }
257        if !node_ids.contains(&edge.dst) {
258            return Err(Error::Internal(format!(
259                "snapshot edge {} references non-existent destination node {}",
260                edge.id, edge.dst
261            )));
262        }
263    }
264    Ok(())
265}
266
267/// Collects all triples from an RDF store into snapshot format.
268#[cfg(feature = "triple-store")]
269fn collect_rdf_triples(store: &grafeo_core::graph::rdf::RdfStore) -> Vec<SnapshotTriple> {
270    store
271        .triples()
272        .into_iter()
273        .map(|t| SnapshotTriple {
274            subject: t.subject().to_string(),
275            predicate: t.predicate().to_string(),
276            object: t.object().to_string(),
277        })
278        .collect()
279}
280
281/// Populates an RDF store from snapshot triples.
282#[cfg(feature = "triple-store")]
283fn populate_rdf_store(store: &grafeo_core::graph::rdf::RdfStore, triples: &[SnapshotTriple]) {
284    use grafeo_core::graph::rdf::{Term, Triple};
285    for triple in triples {
286        if let (Some(s), Some(p), Some(o)) = (
287            Term::from_ntriples(&triple.subject),
288            Term::from_ntriples(&triple.predicate),
289            Term::from_ntriples(&triple.object),
290        ) {
291            store.insert(Triple::new(s, p, o));
292        }
293    }
294}
295
296// =========================================================================
297// Snapshot deserialization helpers (used by single-file format)
298// =========================================================================
299
300/// Decodes snapshot bytes and populates a store and catalog.
301#[cfg(feature = "grafeo-file")]
302pub(super) fn load_snapshot_into_store(
303    store: &std::sync::Arc<grafeo_core::graph::lpg::LpgStore>,
304    catalog: &std::sync::Arc<crate::catalog::Catalog>,
305    #[cfg(feature = "triple-store")] rdf_store: &std::sync::Arc<grafeo_core::graph::rdf::RdfStore>,
306    data: &[u8],
307) -> grafeo_common::utils::error::Result<()> {
308    use grafeo_common::utils::error::Error;
309
310    let config = bincode::config::standard();
311    let (snapshot, _) =
312        bincode::serde::decode_from_slice::<Snapshot, _>(data, config).map_err(|e| {
313            Error::Serialization(format!("failed to decode snapshot from .grafeo file: {e}"))
314        })?;
315
316    populate_store_from_snapshot_ref(store, &snapshot.nodes, &snapshot.edges)?;
317
318    // Restore epoch from snapshot (store-level only; TransactionManager
319    // sync is handled in with_config() after all recovery completes).
320    #[cfg(feature = "temporal")]
321    store.sync_epoch(EpochId::new(snapshot.epoch));
322
323    for graph in &snapshot.named_graphs {
324        store
325            .create_graph(&graph.name)
326            .map_err(|e| Error::Internal(e.to_string()))?;
327        if let Some(graph_store) = store.graph(&graph.name) {
328            populate_store_from_snapshot_ref(&graph_store, &graph.nodes, &graph.edges)?;
329            #[cfg(feature = "temporal")]
330            graph_store.sync_epoch(EpochId::new(snapshot.epoch));
331        }
332    }
333    restore_schema_from_snapshot(store, catalog, &snapshot.schema);
334
335    // Restore RDF triples
336    #[cfg(feature = "triple-store")]
337    {
338        populate_rdf_store(rdf_store, &snapshot.rdf_triples);
339        for rdf_graph in &snapshot.rdf_named_graphs {
340            rdf_store.create_graph(&rdf_graph.name);
341            if let Some(graph_store) = rdf_store.graph(&rdf_graph.name) {
342                populate_rdf_store(&graph_store, &rdf_graph.triples);
343            }
344        }
345    }
346
347    Ok(())
348}
349
350/// Populates a store from snapshot refs (borrowed, for single-file loading).
351#[cfg(feature = "grafeo-file")]
352fn populate_store_from_snapshot_ref(
353    store: &grafeo_core::graph::lpg::LpgStore,
354    nodes: &[SnapshotNode],
355    edges: &[SnapshotEdge],
356) -> grafeo_common::utils::error::Result<()> {
357    for node in nodes {
358        let label_refs: Vec<&str> = node.labels.iter().map(|s| s.as_str()).collect();
359        store.create_node_with_id(node.id, &label_refs)?;
360        for (key, entries) in &node.properties {
361            #[cfg(feature = "temporal")]
362            for (epoch, value) in entries {
363                store.set_node_property_at_epoch(node.id, key, value.clone(), *epoch);
364            }
365            #[cfg(not(feature = "temporal"))]
366            if let Some((_, value)) = entries.last() {
367                store.set_node_property(node.id, key, value.clone());
368            }
369        }
370    }
371    for edge in edges {
372        store.create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type)?;
373        for (key, entries) in &edge.properties {
374            #[cfg(feature = "temporal")]
375            for (epoch, value) in entries {
376                store.set_edge_property_at_epoch(edge.id, key, value.clone(), *epoch);
377            }
378            #[cfg(not(feature = "temporal"))]
379            if let Some((_, value)) = entries.last() {
380                store.set_edge_property(edge.id, key, value.clone());
381            }
382        }
383    }
384    Ok(())
385}
386
387/// Restores schema definitions from a snapshot into the catalog.
388///
389/// Also ensures each schema has its `__default__` graph partition, which
390/// may be missing in snapshots created before the schema hierarchy feature.
391fn restore_schema_from_snapshot(
392    store: &std::sync::Arc<grafeo_core::graph::lpg::LpgStore>,
393    catalog: &std::sync::Arc<crate::catalog::Catalog>,
394    schema: &SnapshotSchema,
395) {
396    for def in &schema.node_types {
397        catalog.register_or_replace_node_type(def.clone());
398    }
399    for def in &schema.edge_types {
400        catalog.register_or_replace_edge_type_def(def.clone());
401    }
402    for def in &schema.graph_types {
403        let _ = catalog.register_graph_type(def.clone());
404    }
405    for def in &schema.procedures {
406        catalog.replace_procedure(def.clone()).ok();
407    }
408    for name in &schema.schemas {
409        let _ = catalog.register_schema_namespace(name.clone());
410        // Ensure the schema's default graph partition exists
411        let default_key = format!("{name}/__default__");
412        let _ = store.create_graph(&default_key);
413    }
414    for (graph_name, type_name) in &schema.graph_type_bindings {
415        let _ = catalog.bind_graph_type(graph_name, type_name.clone());
416    }
417}
418
419/// Collects schema definitions from the catalog into snapshot format.
420fn collect_schema(catalog: &std::sync::Arc<crate::catalog::Catalog>) -> SnapshotSchema {
421    SnapshotSchema {
422        node_types: catalog.all_node_type_defs(),
423        edge_types: catalog.all_edge_type_defs(),
424        graph_types: catalog.all_graph_type_defs(),
425        procedures: catalog.all_procedure_defs(),
426        schemas: catalog.schema_names(),
427        graph_type_bindings: catalog.all_graph_type_bindings(),
428    }
429}
430
431/// Restores indexes from snapshot metadata by rebuilding them from existing data.
432///
433/// Must be called after all nodes/edges have been populated, since index
434/// creation scans existing data.
435fn restore_indexes_from_snapshot(db: &super::GrafeoDB, indexes: &SnapshotIndexes) {
436    for name in &indexes.property_indexes {
437        db.lpg_store().create_property_index(name);
438    }
439
440    #[cfg(feature = "vector-index")]
441    for vi in &indexes.vector_indexes {
442        if let Err(err) = db.create_vector_index(
443            &vi.label,
444            &vi.property,
445            Some(vi.dimensions),
446            Some(vi.metric.name()),
447            Some(vi.m),
448            Some(vi.ef_construction),
449        ) {
450            grafeo_warn!(
451                "Failed to restore vector index :{label}({property}): {err}",
452                label = vi.label,
453                property = vi.property,
454            );
455        }
456    }
457
458    #[cfg(feature = "text-index")]
459    for ti in &indexes.text_indexes {
460        if let Err(err) = db.create_text_index(&ti.label, &ti.property) {
461            grafeo_warn!(
462                "Failed to restore text index :{label}({property}): {err}",
463                label = ti.label,
464                property = ti.property,
465            );
466        }
467    }
468}
469
470/// Collects index metadata from a store into snapshot format.
471fn collect_index_metadata(store: &grafeo_core::graph::lpg::LpgStore) -> SnapshotIndexes {
472    let property_indexes = store.property_index_keys();
473
474    #[cfg(feature = "vector-index")]
475    let vector_indexes: Vec<SnapshotVectorIndex> = store
476        .vector_index_entries()
477        .into_iter()
478        .filter_map(|(key, index)| {
479            let (label, property) = key.split_once(':')?;
480            let config = index.config();
481            Some(SnapshotVectorIndex {
482                label: label.to_string(),
483                property: property.to_string(),
484                dimensions: config.dimensions,
485                metric: config.metric,
486                m: config.m,
487                ef_construction: config.ef_construction,
488            })
489        })
490        .collect();
491    #[cfg(not(feature = "vector-index"))]
492    let vector_indexes = Vec::new();
493
494    #[cfg(feature = "text-index")]
495    let text_indexes: Vec<SnapshotTextIndex> = store
496        .text_index_entries()
497        .into_iter()
498        .filter_map(|(key, _)| {
499            let (label, property) = key.split_once(':')?;
500            Some(SnapshotTextIndex {
501                label: label.to_string(),
502                property: property.to_string(),
503            })
504        })
505        .collect();
506    #[cfg(not(feature = "text-index"))]
507    let text_indexes = Vec::new();
508
509    SnapshotIndexes {
510        property_indexes,
511        vector_indexes,
512        text_indexes,
513    }
514}
515
516impl super::GrafeoDB {
517    // =========================================================================
518    // ADMIN API: Persistence Control
519    // =========================================================================
520
521    /// Saves the database to a file path.
522    ///
523    /// - If the path ends in `.grafeo`: creates a single-file database
524    /// - Otherwise: creates a WAL directory-backed database at the path
525    /// - If in-memory: creates a new persistent database at path
526    /// - If file-backed: creates a copy at the new path
527    ///
528    /// The original database remains unchanged.
529    ///
530    /// # Errors
531    ///
532    /// Returns an error if the save operation fails.
533    ///
534    /// Requires the `wal` feature for persistence support.
535    #[cfg(feature = "wal")]
536    pub fn save(&self, path: impl AsRef<Path>) -> Result<()> {
537        let path = path.as_ref();
538
539        // Single-file format: export snapshot directly to a .grafeo file
540        #[cfg(feature = "grafeo-file")]
541        if path.extension().is_some_and(|ext| ext == "grafeo") {
542            return self.save_as_grafeo_file(path);
543        }
544
545        // Create target database with WAL enabled
546        let target_config = Config::persistent(path);
547        let target = Self::with_config(target_config)?;
548
549        // Copy all nodes using WAL-enabled methods
550        for node in self.lpg_store().all_nodes() {
551            let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
552            target
553                .lpg_store()
554                .create_node_with_id(node.id, &label_refs)?;
555
556            // Log to WAL
557            target.log_wal(&WalRecord::CreateNode {
558                id: node.id,
559                labels: node.labels.iter().map(|s| s.to_string()).collect(),
560            })?;
561
562            // Copy properties
563            for (key, value) in node.properties {
564                target
565                    .lpg_store()
566                    .set_node_property(node.id, key.as_str(), value.clone());
567                target.log_wal(&WalRecord::SetNodeProperty {
568                    id: node.id,
569                    key: key.to_string(),
570                    value,
571                })?;
572            }
573        }
574
575        // Copy all edges using WAL-enabled methods
576        for edge in self.lpg_store().all_edges() {
577            target
578                .lpg_store()
579                .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type)?;
580
581            // Log to WAL
582            target.log_wal(&WalRecord::CreateEdge {
583                id: edge.id,
584                src: edge.src,
585                dst: edge.dst,
586                edge_type: edge.edge_type.to_string(),
587            })?;
588
589            // Copy properties
590            for (key, value) in edge.properties {
591                target
592                    .lpg_store()
593                    .set_edge_property(edge.id, key.as_str(), value.clone());
594                target.log_wal(&WalRecord::SetEdgeProperty {
595                    id: edge.id,
596                    key: key.to_string(),
597                    value,
598                })?;
599            }
600        }
601
602        // Copy named graphs
603        for graph_name in self.lpg_store().graph_names() {
604            if let Some(src_graph) = self.lpg_store().graph(&graph_name) {
605                target.log_wal(&WalRecord::CreateNamedGraph {
606                    name: graph_name.clone(),
607                })?;
608                target
609                    .lpg_store()
610                    .create_graph(&graph_name)
611                    .map_err(|e| Error::Internal(e.to_string()))?;
612
613                if let Some(dst_graph) = target.lpg_store().graph(&graph_name) {
614                    // Switch WAL context to this named graph
615                    target.log_wal(&WalRecord::SwitchGraph {
616                        name: Some(graph_name.clone()),
617                    })?;
618
619                    for node in src_graph.all_nodes() {
620                        let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
621                        dst_graph.create_node_with_id(node.id, &label_refs)?;
622                        target.log_wal(&WalRecord::CreateNode {
623                            id: node.id,
624                            labels: node.labels.iter().map(|s| s.to_string()).collect(),
625                        })?;
626                        for (key, value) in node.properties {
627                            dst_graph.set_node_property(node.id, key.as_str(), value.clone());
628                            target.log_wal(&WalRecord::SetNodeProperty {
629                                id: node.id,
630                                key: key.to_string(),
631                                value,
632                            })?;
633                        }
634                    }
635                    for edge in src_graph.all_edges() {
636                        dst_graph.create_edge_with_id(
637                            edge.id,
638                            edge.src,
639                            edge.dst,
640                            &edge.edge_type,
641                        )?;
642                        target.log_wal(&WalRecord::CreateEdge {
643                            id: edge.id,
644                            src: edge.src,
645                            dst: edge.dst,
646                            edge_type: edge.edge_type.to_string(),
647                        })?;
648                        for (key, value) in edge.properties {
649                            dst_graph.set_edge_property(edge.id, key.as_str(), value.clone());
650                            target.log_wal(&WalRecord::SetEdgeProperty {
651                                id: edge.id,
652                                key: key.to_string(),
653                                value,
654                            })?;
655                        }
656                    }
657                }
658            }
659        }
660
661        // Switch WAL context back to default graph
662        if !self.lpg_store().graph_names().is_empty() {
663            target.log_wal(&WalRecord::SwitchGraph { name: None })?;
664        }
665
666        // Copy RDF data with WAL logging
667        #[cfg(feature = "triple-store")]
668        {
669            for triple in self.rdf_store.triples() {
670                let record = WalRecord::InsertRdfTriple {
671                    subject: triple.subject().to_string(),
672                    predicate: triple.predicate().to_string(),
673                    object: triple.object().to_string(),
674                    graph: None,
675                };
676                target.rdf_store.insert((*triple).clone());
677                target.log_wal(&record)?;
678            }
679            for name in self.rdf_store.graph_names() {
680                target.log_wal(&WalRecord::CreateRdfGraph { name: name.clone() })?;
681                if let Some(src_graph) = self.rdf_store.graph(&name) {
682                    let dst_graph = target.rdf_store.graph_or_create(&name);
683                    for triple in src_graph.triples() {
684                        let record = WalRecord::InsertRdfTriple {
685                            subject: triple.subject().to_string(),
686                            predicate: triple.predicate().to_string(),
687                            object: triple.object().to_string(),
688                            graph: Some(name.clone()),
689                        };
690                        dst_graph.insert((*triple).clone());
691                        target.log_wal(&record)?;
692                    }
693                }
694            }
695        }
696
697        // Checkpoint and close the target database
698        target.close()?;
699
700        Ok(())
701    }
702
703    /// Creates an in-memory copy of this database.
704    ///
705    /// Returns a new database that is completely independent, including
706    /// all named graph data.
707    /// Useful for:
708    /// Saves the database to a single `.grafeo` file.
709    #[cfg(feature = "grafeo-file")]
710    fn save_as_grafeo_file(&self, path: &Path) -> Result<()> {
711        use grafeo_storage::file::GrafeoFileManager;
712
713        let snapshot_data = self.export_snapshot()?;
714        let epoch = self.lpg_store().current_epoch();
715        let transaction_id = self
716            .transaction_manager
717            .last_assigned_transaction_id()
718            .map_or(0, |t| t.0);
719        let node_count = self.lpg_store().node_count() as u64;
720        let edge_count = self.lpg_store().edge_count() as u64;
721
722        let fm = GrafeoFileManager::create(path)?;
723        fm.write_snapshot(
724            &snapshot_data,
725            epoch.0,
726            transaction_id,
727            node_count,
728            edge_count,
729        )?;
730        Ok(())
731    }
732
733    /// - Testing modifications without affecting the original
734    /// - Faster operations when persistence isn't needed
735    ///
736    /// # Errors
737    ///
738    /// Returns an error if the copy operation fails.
739    pub fn to_memory(&self) -> Result<Self> {
740        let config = Config::in_memory();
741        let target = Self::with_config(config)?;
742
743        // Copy default graph nodes
744        for node in self.lpg_store().all_nodes() {
745            let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
746            target
747                .lpg_store()
748                .create_node_with_id(node.id, &label_refs)?;
749            for (key, value) in node.properties {
750                target
751                    .lpg_store()
752                    .set_node_property(node.id, key.as_str(), value);
753            }
754        }
755
756        // Copy default graph edges
757        for edge in self.lpg_store().all_edges() {
758            target
759                .lpg_store()
760                .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type)?;
761            for (key, value) in edge.properties {
762                target
763                    .lpg_store()
764                    .set_edge_property(edge.id, key.as_str(), value);
765            }
766        }
767
768        // Copy named graphs
769        for graph_name in self.lpg_store().graph_names() {
770            if let Some(src_graph) = self.lpg_store().graph(&graph_name) {
771                target
772                    .lpg_store()
773                    .create_graph(&graph_name)
774                    .map_err(|e| Error::Internal(e.to_string()))?;
775                if let Some(dst_graph) = target.lpg_store().graph(&graph_name) {
776                    for node in src_graph.all_nodes() {
777                        let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
778                        dst_graph.create_node_with_id(node.id, &label_refs)?;
779                        for (key, value) in node.properties {
780                            dst_graph.set_node_property(node.id, key.as_str(), value);
781                        }
782                    }
783                    for edge in src_graph.all_edges() {
784                        dst_graph.create_edge_with_id(
785                            edge.id,
786                            edge.src,
787                            edge.dst,
788                            &edge.edge_type,
789                        )?;
790                        for (key, value) in edge.properties {
791                            dst_graph.set_edge_property(edge.id, key.as_str(), value);
792                        }
793                    }
794                }
795            }
796        }
797
798        // Copy RDF data
799        #[cfg(feature = "triple-store")]
800        {
801            for triple in self.rdf_store.triples() {
802                target.rdf_store.insert((*triple).clone());
803            }
804            for name in self.rdf_store.graph_names() {
805                if let Some(src_graph) = self.rdf_store.graph(&name) {
806                    let dst_graph = target.rdf_store.graph_or_create(&name);
807                    for triple in src_graph.triples() {
808                        dst_graph.insert((*triple).clone());
809                    }
810                }
811            }
812        }
813
814        Ok(target)
815    }
816
817    /// Opens a database file and loads it entirely into memory.
818    ///
819    /// The returned database has no connection to the original file.
820    /// Changes will NOT be written back to the file.
821    ///
822    /// # Errors
823    ///
824    /// Returns an error if the file can't be opened or loaded.
825    #[cfg(feature = "wal")]
826    pub fn open_in_memory(path: impl AsRef<Path>) -> Result<Self> {
827        // Open the source database (triggers WAL recovery)
828        let source = Self::open(path)?;
829
830        // Create in-memory copy
831        let target = source.to_memory()?;
832
833        // Close the source (releases file handles)
834        source.close()?;
835
836        Ok(target)
837    }
838
839    // =========================================================================
840    // ADMIN API: Snapshot Export/Import
841    // =========================================================================
842
843    /// Exports the entire database to a binary snapshot.
844    ///
845    /// The returned bytes can be stored (e.g. in IndexedDB) and later
846    /// restored with [`import_snapshot()`](Self::import_snapshot).
847    /// Includes all named graph data.
848    ///
849    /// Properties are stored as version-history lists. When `temporal` is
850    /// enabled, the full history is captured. Otherwise, each property is
851    /// wrapped as a single-entry list at epoch 0.
852    ///
853    /// # Errors
854    ///
855    /// Returns an error if serialization fails.
856    pub fn export_snapshot(&self) -> Result<Vec<u8>> {
857        let nodes = collect_snapshot_nodes(self.lpg_store());
858        let edges = collect_snapshot_edges(self.lpg_store());
859
860        // Collect named graphs
861        let named_graphs: Vec<NamedGraphSnapshot> = self
862            .lpg_store()
863            .graph_names()
864            .into_iter()
865            .filter_map(|name| {
866                self.lpg_store()
867                    .graph(&name)
868                    .map(|graph_store| NamedGraphSnapshot {
869                        name,
870                        nodes: collect_snapshot_nodes(&graph_store),
871                        edges: collect_snapshot_edges(&graph_store),
872                    })
873            })
874            .collect();
875
876        // Collect RDF triples
877        #[cfg(feature = "triple-store")]
878        let rdf_triples = collect_rdf_triples(&self.rdf_store);
879        #[cfg(not(feature = "triple-store"))]
880        let rdf_triples = Vec::new();
881
882        #[cfg(feature = "triple-store")]
883        let rdf_named_graphs: Vec<RdfNamedGraphSnapshot> = self
884            .rdf_store
885            .graph_names()
886            .into_iter()
887            .filter_map(|name| {
888                self.rdf_store
889                    .graph(&name)
890                    .map(|graph| RdfNamedGraphSnapshot {
891                        name,
892                        triples: collect_rdf_triples(&graph),
893                    })
894            })
895            .collect();
896        #[cfg(not(feature = "triple-store"))]
897        let rdf_named_graphs = Vec::new();
898
899        let schema = collect_schema(&self.catalog);
900        let indexes = collect_index_metadata(self.lpg_store());
901
902        let snapshot = Snapshot {
903            version: SNAPSHOT_VERSION,
904            nodes,
905            edges,
906            named_graphs,
907            rdf_triples,
908            rdf_named_graphs,
909            schema,
910            indexes,
911            #[cfg(feature = "temporal")]
912            epoch: self.transaction_manager.current_epoch().as_u64(),
913            #[cfg(not(feature = "temporal"))]
914            epoch: 0,
915        };
916
917        let config = bincode::config::standard();
918        bincode::serde::encode_to_vec(&snapshot, config)
919            .map_err(|e| Error::Internal(format!("snapshot export failed: {e}")))
920    }
921
922    /// Creates a new in-memory database from a binary snapshot.
923    ///
924    /// The `data` must have been produced by [`export_snapshot()`](Self::export_snapshot).
925    ///
926    /// All edge references are validated before any data is inserted: every
927    /// edge's source and destination must reference a node present in the
928    /// snapshot, and duplicate node/edge IDs are rejected. If validation
929    /// fails, no database is created.
930    ///
931    /// # Errors
932    ///
933    /// Returns an error if the snapshot is invalid, contains dangling edge
934    /// references, has duplicate IDs, or deserialization fails.
935    pub fn import_snapshot(data: &[u8]) -> Result<Self> {
936        if data.is_empty() {
937            return Err(Error::Internal("empty snapshot data".to_string()));
938        }
939
940        let version = data[0];
941        if version != 4 {
942            return Err(Error::Internal(format!(
943                "unsupported snapshot version: {version} (expected 4)"
944            )));
945        }
946
947        let config = bincode::config::standard();
948        let (snapshot, _): (Snapshot, _) = bincode::serde::decode_from_slice(data, config)
949            .map_err(|e| Error::Internal(format!("snapshot import failed: {e}")))?;
950
951        // Validate default graph data
952        validate_snapshot_data(&snapshot.nodes, &snapshot.edges)?;
953
954        // Validate each named graph
955        for ng in &snapshot.named_graphs {
956            validate_snapshot_data(&ng.nodes, &ng.edges)?;
957        }
958
959        let db = Self::new_in_memory();
960        populate_store_from_snapshot(db.lpg_store(), snapshot.nodes, snapshot.edges)?;
961
962        // Restore epoch from snapshot
963        #[cfg(feature = "temporal")]
964        {
965            let epoch = EpochId::new(snapshot.epoch);
966            db.lpg_store().sync_epoch(epoch);
967            db.transaction_manager.sync_epoch(epoch);
968        }
969
970        // Capture epoch before moving snapshot fields
971        #[cfg(feature = "temporal")]
972        let snapshot_epoch = EpochId::new(snapshot.epoch);
973
974        // Restore named graphs
975        for ng in snapshot.named_graphs {
976            db.lpg_store()
977                .create_graph(&ng.name)
978                .map_err(|e| Error::Internal(e.to_string()))?;
979            if let Some(graph_store) = db.lpg_store().graph(&ng.name) {
980                populate_store_from_snapshot(&graph_store, ng.nodes, ng.edges)?;
981                // Named graph stores need the same epoch so temporal property
982                // lookups via current_epoch() return the correct values.
983                #[cfg(feature = "temporal")]
984                graph_store.sync_epoch(snapshot_epoch);
985            }
986        }
987
988        // Restore RDF triples
989        #[cfg(feature = "triple-store")]
990        {
991            populate_rdf_store(&db.rdf_store, &snapshot.rdf_triples);
992            for rng in &snapshot.rdf_named_graphs {
993                let graph = db.rdf_store.graph_or_create(&rng.name);
994                populate_rdf_store(&graph, &rng.triples);
995            }
996        }
997
998        // Restore schema
999        restore_schema_from_snapshot(db.lpg_store(), &db.catalog, &snapshot.schema);
1000
1001        // Restore indexes (must come after data population)
1002        restore_indexes_from_snapshot(&db, &snapshot.indexes);
1003
1004        Ok(db)
1005    }
1006
1007    /// Replaces the current database contents with data from a binary snapshot.
1008    ///
1009    /// The `data` must have been produced by
1010    /// [`export_snapshot()`](Self::export_snapshot).
1011    ///
1012    /// All validation (duplicate IDs, dangling edge references) is performed
1013    /// before any data is modified. If validation fails, the current database
1014    /// is left unchanged. If validation passes, the store is cleared and
1015    /// rebuilt from the snapshot atomically (from the perspective of
1016    /// subsequent queries).
1017    ///
1018    /// # Errors
1019    ///
1020    /// Returns an error if the snapshot is invalid, contains dangling edge
1021    /// references, has duplicate IDs, or deserialization fails.
1022    pub fn restore_snapshot(&self, data: &[u8]) -> Result<()> {
1023        if data.is_empty() {
1024            return Err(Error::Internal("empty snapshot data".to_string()));
1025        }
1026
1027        let version = data[0];
1028        if version != 4 {
1029            return Err(Error::Internal(format!(
1030                "unsupported snapshot version: {version} (expected 4)"
1031            )));
1032        }
1033
1034        let config = bincode::config::standard();
1035        let (snapshot, _): (Snapshot, _) = bincode::serde::decode_from_slice(data, config)
1036            .map_err(|e| Error::Internal(format!("snapshot restore failed: {e}")))?;
1037
1038        // Validate all data before making any changes
1039        validate_snapshot_data(&snapshot.nodes, &snapshot.edges)?;
1040        for ng in &snapshot.named_graphs {
1041            validate_snapshot_data(&ng.nodes, &ng.edges)?;
1042        }
1043
1044        // Drop all existing named graphs, then clear default store
1045        for name in self.lpg_store().graph_names() {
1046            self.lpg_store().drop_graph(&name);
1047        }
1048        self.lpg_store().clear();
1049
1050        populate_store_from_snapshot(self.lpg_store(), snapshot.nodes, snapshot.edges)?;
1051
1052        // Restore epoch from temporal snapshot
1053        #[cfg(feature = "temporal")]
1054        let snapshot_epoch = {
1055            let epoch = EpochId::new(snapshot.epoch);
1056            self.lpg_store().sync_epoch(epoch);
1057            self.transaction_manager.sync_epoch(epoch);
1058            epoch
1059        };
1060
1061        // Restore named graphs
1062        for ng in snapshot.named_graphs {
1063            self.lpg_store()
1064                .create_graph(&ng.name)
1065                .map_err(|e| Error::Internal(e.to_string()))?;
1066            if let Some(graph_store) = self.lpg_store().graph(&ng.name) {
1067                populate_store_from_snapshot(&graph_store, ng.nodes, ng.edges)?;
1068                #[cfg(feature = "temporal")]
1069                graph_store.sync_epoch(snapshot_epoch);
1070            }
1071        }
1072
1073        // Restore RDF data
1074        #[cfg(feature = "triple-store")]
1075        {
1076            // Clear existing RDF data
1077            self.rdf_store.clear();
1078            for name in self.rdf_store.graph_names() {
1079                self.rdf_store.drop_graph(&name);
1080            }
1081            populate_rdf_store(&self.rdf_store, &snapshot.rdf_triples);
1082            for rng in &snapshot.rdf_named_graphs {
1083                let graph = self.rdf_store.graph_or_create(&rng.name);
1084                populate_rdf_store(&graph, &rng.triples);
1085            }
1086        }
1087
1088        // Restore schema
1089        restore_schema_from_snapshot(self.lpg_store(), &self.catalog, &snapshot.schema);
1090
1091        // Restore indexes (must come after data population)
1092        restore_indexes_from_snapshot(self, &snapshot.indexes);
1093
1094        Ok(())
1095    }
1096
1097    // =========================================================================
1098    // ADMIN API: Iteration
1099    // =========================================================================
1100
1101    /// Returns an iterator over all nodes in the database.
1102    ///
1103    /// Useful for dump/export operations.
1104    pub fn iter_nodes(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Node> + '_ {
1105        self.lpg_store().all_nodes()
1106    }
1107
1108    /// Returns an iterator over all edges in the database.
1109    ///
1110    /// Useful for dump/export operations.
1111    pub fn iter_edges(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Edge> + '_ {
1112        self.lpg_store().all_edges()
1113    }
1114}
1115
1116#[cfg(test)]
1117mod tests {
1118    use grafeo_common::types::{EdgeId, NodeId, Value};
1119
1120    use super::super::GrafeoDB;
1121    use super::{
1122        SNAPSHOT_VERSION, Snapshot, SnapshotEdge, SnapshotIndexes, SnapshotNode, SnapshotSchema,
1123    };
1124
1125    #[test]
1126    fn test_restore_snapshot_basic() {
1127        let db = GrafeoDB::new_in_memory();
1128        let session = db.session();
1129
1130        // Populate
1131        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1132        session.execute("INSERT (:Person {name: 'Gus'})").unwrap();
1133
1134        let snapshot = db.export_snapshot().unwrap();
1135
1136        // Modify
1137        session
1138            .execute("INSERT (:Person {name: 'Vincent'})")
1139            .unwrap();
1140        assert_eq!(db.lpg_store().node_count(), 3);
1141
1142        // Restore original
1143        db.restore_snapshot(&snapshot).unwrap();
1144
1145        assert_eq!(db.lpg_store().node_count(), 2);
1146        let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
1147        assert_eq!(result.rows.len(), 2);
1148    }
1149
1150    #[test]
1151    fn test_restore_snapshot_validation_failure() {
1152        let db = GrafeoDB::new_in_memory();
1153        let session = db.session();
1154
1155        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1156
1157        // Corrupt snapshot: just garbage bytes
1158        let result = db.restore_snapshot(b"garbage");
1159        assert!(result.is_err());
1160
1161        // DB should be unchanged
1162        assert_eq!(db.lpg_store().node_count(), 1);
1163    }
1164
1165    #[test]
1166    fn test_restore_snapshot_empty_db() {
1167        let db = GrafeoDB::new_in_memory();
1168
1169        // Export empty snapshot, then populate, then restore to empty
1170        let empty_snapshot = db.export_snapshot().unwrap();
1171
1172        let session = db.session();
1173        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1174        assert_eq!(db.lpg_store().node_count(), 1);
1175
1176        db.restore_snapshot(&empty_snapshot).unwrap();
1177        assert_eq!(db.lpg_store().node_count(), 0);
1178    }
1179
1180    #[test]
1181    fn test_restore_snapshot_with_edges() {
1182        let db = GrafeoDB::new_in_memory();
1183        let session = db.session();
1184
1185        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1186        session.execute("INSERT (:Person {name: 'Gus'})").unwrap();
1187        session
1188            .execute(
1189                "MATCH (a:Person {name: 'Alix'}), (b:Person {name: 'Gus'}) INSERT (a)-[:KNOWS]->(b)",
1190            )
1191            .unwrap();
1192
1193        let snapshot = db.export_snapshot().unwrap();
1194        assert_eq!(db.lpg_store().edge_count(), 1);
1195
1196        // Modify: add more data
1197        session
1198            .execute("INSERT (:Person {name: 'Vincent'})")
1199            .unwrap();
1200
1201        // Restore
1202        db.restore_snapshot(&snapshot).unwrap();
1203        assert_eq!(db.lpg_store().node_count(), 2);
1204        assert_eq!(db.lpg_store().edge_count(), 1);
1205    }
1206
1207    #[test]
1208    fn test_restore_snapshot_preserves_sessions() {
1209        let db = GrafeoDB::new_in_memory();
1210        let session = db.session();
1211
1212        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1213        let snapshot = db.export_snapshot().unwrap();
1214
1215        // Modify
1216        session.execute("INSERT (:Person {name: 'Gus'})").unwrap();
1217
1218        // Restore
1219        db.restore_snapshot(&snapshot).unwrap();
1220
1221        // Session should still work and see restored data
1222        let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
1223        assert_eq!(result.rows.len(), 1);
1224    }
1225
1226    #[test]
1227    fn test_export_import_roundtrip() {
1228        let db = GrafeoDB::new_in_memory();
1229        let session = db.session();
1230
1231        session
1232            .execute("INSERT (:Person {name: 'Alix', age: 30})")
1233            .unwrap();
1234
1235        let snapshot = db.export_snapshot().unwrap();
1236        let db2 = GrafeoDB::import_snapshot(&snapshot).unwrap();
1237        let session2 = db2.session();
1238
1239        let result = session2.execute("MATCH (n:Person) RETURN n.name").unwrap();
1240        assert_eq!(result.rows.len(), 1);
1241    }
1242
1243    // --- to_memory() ---
1244
1245    #[test]
1246    fn test_to_memory_empty() {
1247        let db = GrafeoDB::new_in_memory();
1248        let copy = db.to_memory().unwrap();
1249        assert_eq!(copy.lpg_store().node_count(), 0);
1250        assert_eq!(copy.lpg_store().edge_count(), 0);
1251    }
1252
1253    #[test]
1254    fn test_to_memory_copies_nodes_and_properties() {
1255        let db = GrafeoDB::new_in_memory();
1256        let session = db.session();
1257        session
1258            .execute("INSERT (:Person {name: 'Alix', age: 30})")
1259            .unwrap();
1260        session
1261            .execute("INSERT (:Person {name: 'Gus', age: 25})")
1262            .unwrap();
1263
1264        let copy = db.to_memory().unwrap();
1265        assert_eq!(copy.lpg_store().node_count(), 2);
1266
1267        let s2 = copy.session();
1268        let result = s2
1269            .execute("MATCH (p:Person) RETURN p.name ORDER BY p.name")
1270            .unwrap();
1271        assert_eq!(result.rows.len(), 2);
1272        assert_eq!(result.rows[0][0], Value::String("Alix".into()));
1273        assert_eq!(result.rows[1][0], Value::String("Gus".into()));
1274    }
1275
1276    #[test]
1277    fn test_to_memory_copies_edges_and_properties() {
1278        let db = GrafeoDB::new_in_memory();
1279        let a = db.create_node(&["Person"]);
1280        db.set_node_property(a, "name", "Alix".into());
1281        let b = db.create_node(&["Person"]);
1282        db.set_node_property(b, "name", "Gus".into());
1283        let edge = db.create_edge(a, b, "KNOWS");
1284        db.set_edge_property(edge, "since", Value::Int64(2020));
1285
1286        let copy = db.to_memory().unwrap();
1287        assert_eq!(copy.lpg_store().node_count(), 2);
1288        assert_eq!(copy.lpg_store().edge_count(), 1);
1289
1290        let s2 = copy.session();
1291        let result = s2.execute("MATCH ()-[e:KNOWS]->() RETURN e.since").unwrap();
1292        assert_eq!(result.rows[0][0], Value::Int64(2020));
1293    }
1294
1295    #[test]
1296    fn test_to_memory_is_independent() {
1297        let db = GrafeoDB::new_in_memory();
1298        let session = db.session();
1299        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1300
1301        let copy = db.to_memory().unwrap();
1302
1303        // Mutating original should not affect copy
1304        session.execute("INSERT (:Person {name: 'Gus'})").unwrap();
1305        assert_eq!(db.lpg_store().node_count(), 2);
1306        assert_eq!(copy.lpg_store().node_count(), 1);
1307    }
1308
1309    // --- iter_nodes() / iter_edges() ---
1310
1311    #[test]
1312    fn test_iter_nodes_empty() {
1313        let db = GrafeoDB::new_in_memory();
1314        assert_eq!(db.iter_nodes().count(), 0);
1315    }
1316
1317    #[test]
1318    fn test_iter_nodes_returns_all() {
1319        let db = GrafeoDB::new_in_memory();
1320        let id1 = db.create_node(&["Person"]);
1321        db.set_node_property(id1, "name", "Alix".into());
1322        let id2 = db.create_node(&["Animal"]);
1323        db.set_node_property(id2, "name", "Fido".into());
1324
1325        let nodes: Vec<_> = db.iter_nodes().collect();
1326        assert_eq!(nodes.len(), 2);
1327
1328        let names: Vec<_> = nodes
1329            .iter()
1330            .filter_map(|n| n.properties.iter().find(|(k, _)| k.as_str() == "name"))
1331            .map(|(_, v)| v.clone())
1332            .collect();
1333        assert!(names.contains(&Value::String("Alix".into())));
1334        assert!(names.contains(&Value::String("Fido".into())));
1335    }
1336
1337    #[test]
1338    fn test_iter_edges_empty() {
1339        let db = GrafeoDB::new_in_memory();
1340        assert_eq!(db.iter_edges().count(), 0);
1341    }
1342
1343    #[test]
1344    fn test_iter_edges_returns_all() {
1345        let db = GrafeoDB::new_in_memory();
1346        let a = db.create_node(&["A"]);
1347        let b = db.create_node(&["B"]);
1348        let c = db.create_node(&["C"]);
1349        db.create_edge(a, b, "R1");
1350        db.create_edge(b, c, "R2");
1351
1352        let edges: Vec<_> = db.iter_edges().collect();
1353        assert_eq!(edges.len(), 2);
1354
1355        let types: Vec<_> = edges.iter().map(|e| e.edge_type.as_ref()).collect();
1356        assert!(types.contains(&"R1"));
1357        assert!(types.contains(&"R2"));
1358    }
1359
1360    // --- restore_snapshot() validation ---
1361
1362    fn make_snapshot(version: u8, nodes: Vec<SnapshotNode>, edges: Vec<SnapshotEdge>) -> Vec<u8> {
1363        let snap = Snapshot {
1364            version,
1365            nodes,
1366            edges,
1367            named_graphs: vec![],
1368            rdf_triples: vec![],
1369            rdf_named_graphs: vec![],
1370            schema: SnapshotSchema::default(),
1371            indexes: SnapshotIndexes::default(),
1372            epoch: 0,
1373        };
1374        bincode::serde::encode_to_vec(&snap, bincode::config::standard()).unwrap()
1375    }
1376
1377    #[test]
1378    fn test_restore_rejects_unsupported_version() {
1379        let db = GrafeoDB::new_in_memory();
1380        let session = db.session();
1381        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1382
1383        let bytes = make_snapshot(99, vec![], vec![]);
1384
1385        let result = db.restore_snapshot(&bytes);
1386        assert!(result.is_err());
1387        let err = result.unwrap_err().to_string();
1388        assert!(err.contains("unsupported snapshot version"), "got: {err}");
1389
1390        // DB unchanged
1391        assert_eq!(db.lpg_store().node_count(), 1);
1392    }
1393
1394    #[test]
1395    fn test_restore_rejects_duplicate_node_ids() {
1396        let db = GrafeoDB::new_in_memory();
1397        let session = db.session();
1398        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1399
1400        let bytes = make_snapshot(
1401            SNAPSHOT_VERSION,
1402            vec![
1403                SnapshotNode {
1404                    id: NodeId::new(0),
1405                    labels: vec!["A".into()],
1406                    properties: vec![],
1407                },
1408                SnapshotNode {
1409                    id: NodeId::new(0),
1410                    labels: vec!["B".into()],
1411                    properties: vec![],
1412                },
1413            ],
1414            vec![],
1415        );
1416
1417        let result = db.restore_snapshot(&bytes);
1418        assert!(result.is_err());
1419        let err = result.unwrap_err().to_string();
1420        assert!(err.contains("duplicate node ID"), "got: {err}");
1421        assert_eq!(db.lpg_store().node_count(), 1);
1422    }
1423
1424    #[test]
1425    fn test_restore_rejects_duplicate_edge_ids() {
1426        let db = GrafeoDB::new_in_memory();
1427
1428        let bytes = make_snapshot(
1429            SNAPSHOT_VERSION,
1430            vec![
1431                SnapshotNode {
1432                    id: NodeId::new(0),
1433                    labels: vec![],
1434                    properties: vec![],
1435                },
1436                SnapshotNode {
1437                    id: NodeId::new(1),
1438                    labels: vec![],
1439                    properties: vec![],
1440                },
1441            ],
1442            vec![
1443                SnapshotEdge {
1444                    id: EdgeId::new(0),
1445                    src: NodeId::new(0),
1446                    dst: NodeId::new(1),
1447                    edge_type: "REL".into(),
1448                    properties: vec![],
1449                },
1450                SnapshotEdge {
1451                    id: EdgeId::new(0),
1452                    src: NodeId::new(0),
1453                    dst: NodeId::new(1),
1454                    edge_type: "REL".into(),
1455                    properties: vec![],
1456                },
1457            ],
1458        );
1459
1460        let result = db.restore_snapshot(&bytes);
1461        assert!(result.is_err());
1462        let err = result.unwrap_err().to_string();
1463        assert!(err.contains("duplicate edge ID"), "got: {err}");
1464    }
1465
1466    #[test]
1467    fn test_restore_rejects_dangling_source() {
1468        let db = GrafeoDB::new_in_memory();
1469
1470        let bytes = make_snapshot(
1471            SNAPSHOT_VERSION,
1472            vec![SnapshotNode {
1473                id: NodeId::new(0),
1474                labels: vec![],
1475                properties: vec![],
1476            }],
1477            vec![SnapshotEdge {
1478                id: EdgeId::new(0),
1479                src: NodeId::new(999),
1480                dst: NodeId::new(0),
1481                edge_type: "REL".into(),
1482                properties: vec![],
1483            }],
1484        );
1485
1486        let result = db.restore_snapshot(&bytes);
1487        assert!(result.is_err());
1488        let err = result.unwrap_err().to_string();
1489        assert!(err.contains("non-existent source node"), "got: {err}");
1490    }
1491
1492    #[test]
1493    fn test_restore_rejects_dangling_destination() {
1494        let db = GrafeoDB::new_in_memory();
1495
1496        let bytes = make_snapshot(
1497            SNAPSHOT_VERSION,
1498            vec![SnapshotNode {
1499                id: NodeId::new(0),
1500                labels: vec![],
1501                properties: vec![],
1502            }],
1503            vec![SnapshotEdge {
1504                id: EdgeId::new(0),
1505                src: NodeId::new(0),
1506                dst: NodeId::new(999),
1507                edge_type: "REL".into(),
1508                properties: vec![],
1509            }],
1510        );
1511
1512        let result = db.restore_snapshot(&bytes);
1513        assert!(result.is_err());
1514        let err = result.unwrap_err().to_string();
1515        assert!(err.contains("non-existent destination node"), "got: {err}");
1516    }
1517
1518    // --- index metadata roundtrip ---
1519
1520    #[test]
1521    fn test_snapshot_roundtrip_property_index() {
1522        let db = GrafeoDB::new_in_memory();
1523        let session = db.session();
1524
1525        session
1526            .execute("INSERT (:Person {name: 'Alix', email: 'alix@example.com'})")
1527            .unwrap();
1528        db.create_property_index("email");
1529        assert!(db.has_property_index("email"));
1530
1531        let snapshot = db.export_snapshot().unwrap();
1532        let db2 = GrafeoDB::import_snapshot(&snapshot).unwrap();
1533
1534        assert!(db2.has_property_index("email"));
1535
1536        // Verify the index actually works for O(1) lookups
1537        let found = db2.find_nodes_by_property("email", &Value::String("alix@example.com".into()));
1538        assert_eq!(found.len(), 1);
1539    }
1540
1541    #[cfg(feature = "vector-index")]
1542    #[test]
1543    fn test_snapshot_roundtrip_vector_index() {
1544        use std::sync::Arc;
1545
1546        let db = GrafeoDB::new_in_memory();
1547
1548        let n1 = db.create_node(&["Doc"]);
1549        db.set_node_property(
1550            n1,
1551            "embedding",
1552            Value::Vector(Arc::from([1.0_f32, 0.0, 0.0])),
1553        );
1554        let n2 = db.create_node(&["Doc"]);
1555        db.set_node_property(
1556            n2,
1557            "embedding",
1558            Value::Vector(Arc::from([0.0_f32, 1.0, 0.0])),
1559        );
1560
1561        db.create_vector_index("Doc", "embedding", None, Some("cosine"), Some(4), Some(32))
1562            .unwrap();
1563
1564        let snapshot = db.export_snapshot().unwrap();
1565        let db2 = GrafeoDB::import_snapshot(&snapshot).unwrap();
1566
1567        // Vector search should work on the restored database
1568        let results = db2
1569            .vector_search("Doc", "embedding", &[1.0, 0.0, 0.0], 2, None, None)
1570            .unwrap();
1571        assert_eq!(results.len(), 2);
1572        // Closest to [1,0,0] should be n1
1573        assert_eq!(results[0].0, n1);
1574    }
1575
1576    #[cfg(feature = "text-index")]
1577    #[test]
1578    fn test_snapshot_roundtrip_text_index() {
1579        let db = GrafeoDB::new_in_memory();
1580
1581        let n1 = db.create_node(&["Article"]);
1582        db.set_node_property(n1, "body", Value::String("rust graph database".into()));
1583        let n2 = db.create_node(&["Article"]);
1584        db.set_node_property(n2, "body", Value::String("python web framework".into()));
1585
1586        db.create_text_index("Article", "body").unwrap();
1587
1588        let snapshot = db.export_snapshot().unwrap();
1589        let db2 = GrafeoDB::import_snapshot(&snapshot).unwrap();
1590
1591        // Text search should work on the restored database
1592        let results = db2
1593            .text_search("Article", "body", "graph database", 10)
1594            .unwrap();
1595        assert_eq!(results.len(), 1);
1596        assert_eq!(results[0].0, n1);
1597    }
1598
1599    #[test]
1600    fn test_snapshot_roundtrip_property_index_via_restore() {
1601        let db = GrafeoDB::new_in_memory();
1602        let session = db.session();
1603
1604        session
1605            .execute("INSERT (:Person {name: 'Alix', email: 'alix@example.com'})")
1606            .unwrap();
1607        db.create_property_index("email");
1608
1609        let snapshot = db.export_snapshot().unwrap();
1610
1611        // Mutate the database
1612        session
1613            .execute("INSERT (:Person {name: 'Gus', email: 'gus@example.com'})")
1614            .unwrap();
1615        db.drop_property_index("email");
1616        assert!(!db.has_property_index("email"));
1617
1618        // Restore should bring back the index
1619        db.restore_snapshot(&snapshot).unwrap();
1620        assert!(db.has_property_index("email"));
1621    }
1622}