Skip to main content

grafeo_engine/database/
persistence.rs

1//! Persistence, snapshots, and data export for GrafeoDB.
2
3use std::path::Path;
4
5use grafeo_common::types::{EdgeId, NodeId, Value};
6use grafeo_common::utils::error::{Error, Result};
7use hashbrown::HashSet;
8
9use crate::config::Config;
10
11#[cfg(feature = "wal")]
12use grafeo_adapters::storage::wal::WalRecord;
13
14/// Binary snapshot format for database export/import.
15#[derive(serde::Serialize, serde::Deserialize)]
16struct Snapshot {
17    version: u8,
18    nodes: Vec<SnapshotNode>,
19    edges: Vec<SnapshotEdge>,
20}
21
22#[derive(serde::Serialize, serde::Deserialize)]
23struct SnapshotNode {
24    id: NodeId,
25    labels: Vec<String>,
26    properties: Vec<(String, Value)>,
27}
28
29#[derive(serde::Serialize, serde::Deserialize)]
30struct SnapshotEdge {
31    id: EdgeId,
32    src: NodeId,
33    dst: NodeId,
34    edge_type: String,
35    properties: Vec<(String, Value)>,
36}
37
38impl super::GrafeoDB {
39    // =========================================================================
40    // ADMIN API: Persistence Control
41    // =========================================================================
42
43    /// Saves the database to a file path.
44    ///
45    /// - If in-memory: creates a new persistent database at path
46    /// - If file-backed: creates a copy at the new path
47    ///
48    /// The original database remains unchanged.
49    ///
50    /// # Errors
51    ///
52    /// Returns an error if the save operation fails.
53    ///
54    /// Requires the `wal` feature for persistence support.
55    #[cfg(feature = "wal")]
56    pub fn save(&self, path: impl AsRef<Path>) -> Result<()> {
57        let path = path.as_ref();
58
59        // Create target database with WAL enabled
60        let target_config = Config::persistent(path);
61        let target = Self::with_config(target_config)?;
62
63        // Copy all nodes using WAL-enabled methods
64        for node in self.store.all_nodes() {
65            let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
66            target.store.create_node_with_id(node.id, &label_refs);
67
68            // Log to WAL
69            target.log_wal(&WalRecord::CreateNode {
70                id: node.id,
71                labels: node.labels.iter().map(|s| s.to_string()).collect(),
72            })?;
73
74            // Copy properties
75            for (key, value) in node.properties {
76                target
77                    .store
78                    .set_node_property(node.id, key.as_str(), value.clone());
79                target.log_wal(&WalRecord::SetNodeProperty {
80                    id: node.id,
81                    key: key.to_string(),
82                    value,
83                })?;
84            }
85        }
86
87        // Copy all edges using WAL-enabled methods
88        for edge in self.store.all_edges() {
89            target
90                .store
91                .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
92
93            // Log to WAL
94            target.log_wal(&WalRecord::CreateEdge {
95                id: edge.id,
96                src: edge.src,
97                dst: edge.dst,
98                edge_type: edge.edge_type.to_string(),
99            })?;
100
101            // Copy properties
102            for (key, value) in edge.properties {
103                target
104                    .store
105                    .set_edge_property(edge.id, key.as_str(), value.clone());
106                target.log_wal(&WalRecord::SetEdgeProperty {
107                    id: edge.id,
108                    key: key.to_string(),
109                    value,
110                })?;
111            }
112        }
113
114        // Checkpoint and close the target database
115        target.close()?;
116
117        Ok(())
118    }
119
120    /// Creates an in-memory copy of this database.
121    ///
122    /// Returns a new database that is completely independent.
123    /// Useful for:
124    /// - Testing modifications without affecting the original
125    /// - Faster operations when persistence isn't needed
126    ///
127    /// # Errors
128    ///
129    /// Returns an error if the copy operation fails.
130    pub fn to_memory(&self) -> Result<Self> {
131        let config = Config::in_memory();
132        let target = Self::with_config(config)?;
133
134        // Copy all nodes
135        for node in self.store.all_nodes() {
136            let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
137            target.store.create_node_with_id(node.id, &label_refs);
138
139            // Copy properties
140            for (key, value) in node.properties {
141                target.store.set_node_property(node.id, key.as_str(), value);
142            }
143        }
144
145        // Copy all edges
146        for edge in self.store.all_edges() {
147            target
148                .store
149                .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
150
151            // Copy properties
152            for (key, value) in edge.properties {
153                target.store.set_edge_property(edge.id, key.as_str(), value);
154            }
155        }
156
157        Ok(target)
158    }
159
160    /// Opens a database file and loads it entirely into memory.
161    ///
162    /// The returned database has no connection to the original file.
163    /// Changes will NOT be written back to the file.
164    ///
165    /// # Errors
166    ///
167    /// Returns an error if the file can't be opened or loaded.
168    #[cfg(feature = "wal")]
169    pub fn open_in_memory(path: impl AsRef<Path>) -> Result<Self> {
170        // Open the source database (triggers WAL recovery)
171        let source = Self::open(path)?;
172
173        // Create in-memory copy
174        let target = source.to_memory()?;
175
176        // Close the source (releases file handles)
177        source.close()?;
178
179        Ok(target)
180    }
181
182    // =========================================================================
183    // ADMIN API: Snapshot Export/Import
184    // =========================================================================
185
186    /// Exports the entire database to a binary snapshot.
187    ///
188    /// The returned bytes can be stored (e.g. in IndexedDB) and later
189    /// restored with [`import_snapshot()`](Self::import_snapshot).
190    ///
191    /// # Errors
192    ///
193    /// Returns an error if serialization fails.
194    pub fn export_snapshot(&self) -> Result<Vec<u8>> {
195        let nodes: Vec<SnapshotNode> = self
196            .store
197            .all_nodes()
198            .map(|n| SnapshotNode {
199                id: n.id,
200                labels: n.labels.iter().map(|l| l.to_string()).collect(),
201                properties: n
202                    .properties
203                    .into_iter()
204                    .map(|(k, v)| (k.to_string(), v))
205                    .collect(),
206            })
207            .collect();
208
209        let edges: Vec<SnapshotEdge> = self
210            .store
211            .all_edges()
212            .map(|e| SnapshotEdge {
213                id: e.id,
214                src: e.src,
215                dst: e.dst,
216                edge_type: e.edge_type.to_string(),
217                properties: e
218                    .properties
219                    .into_iter()
220                    .map(|(k, v)| (k.to_string(), v))
221                    .collect(),
222            })
223            .collect();
224
225        let snapshot = Snapshot {
226            version: 1,
227            nodes,
228            edges,
229        };
230
231        let config = bincode::config::standard();
232        bincode::serde::encode_to_vec(&snapshot, config)
233            .map_err(|e| Error::Internal(format!("snapshot export failed: {e}")))
234    }
235
236    /// Creates a new in-memory database from a binary snapshot.
237    ///
238    /// The `data` must have been produced by [`export_snapshot()`](Self::export_snapshot).
239    ///
240    /// All edge references are validated before any data is inserted: every
241    /// edge's source and destination must reference a node present in the
242    /// snapshot, and duplicate node/edge IDs are rejected. If validation
243    /// fails, no database is created.
244    ///
245    /// # Errors
246    ///
247    /// Returns an error if the snapshot is invalid, contains dangling edge
248    /// references, has duplicate IDs, or deserialization fails.
249    pub fn import_snapshot(data: &[u8]) -> Result<Self> {
250        let config = bincode::config::standard();
251        let (snapshot, _): (Snapshot, _) = bincode::serde::decode_from_slice(data, config)
252            .map_err(|e| Error::Internal(format!("snapshot import failed: {e}")))?;
253
254        if snapshot.version != 1 {
255            return Err(Error::Internal(format!(
256                "unsupported snapshot version: {}",
257                snapshot.version
258            )));
259        }
260
261        // Pre-validate: collect all node IDs and check for duplicates
262        let mut node_ids = HashSet::with_capacity(snapshot.nodes.len());
263        for node in &snapshot.nodes {
264            if !node_ids.insert(node.id) {
265                return Err(Error::Internal(format!(
266                    "snapshot contains duplicate node ID {}",
267                    node.id
268                )));
269            }
270        }
271
272        // Validate edge references and check for duplicate edge IDs
273        let mut edge_ids = HashSet::with_capacity(snapshot.edges.len());
274        for edge in &snapshot.edges {
275            if !edge_ids.insert(edge.id) {
276                return Err(Error::Internal(format!(
277                    "snapshot contains duplicate edge ID {}",
278                    edge.id
279                )));
280            }
281            if !node_ids.contains(&edge.src) {
282                return Err(Error::Internal(format!(
283                    "snapshot edge {} references non-existent source node {}",
284                    edge.id, edge.src
285                )));
286            }
287            if !node_ids.contains(&edge.dst) {
288                return Err(Error::Internal(format!(
289                    "snapshot edge {} references non-existent destination node {}",
290                    edge.id, edge.dst
291                )));
292            }
293        }
294
295        // Validation passed — build the database
296        let db = Self::new_in_memory();
297
298        for node in snapshot.nodes {
299            let label_refs: Vec<&str> = node.labels.iter().map(|s| s.as_str()).collect();
300            db.store.create_node_with_id(node.id, &label_refs);
301            for (key, value) in node.properties {
302                db.store.set_node_property(node.id, &key, value);
303            }
304        }
305
306        for edge in snapshot.edges {
307            db.store
308                .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
309            for (key, value) in edge.properties {
310                db.store.set_edge_property(edge.id, &key, value);
311            }
312        }
313
314        Ok(db)
315    }
316
317    // =========================================================================
318    // ADMIN API: Iteration
319    // =========================================================================
320
321    /// Returns an iterator over all nodes in the database.
322    ///
323    /// Useful for dump/export operations.
324    pub fn iter_nodes(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Node> + '_ {
325        self.store.all_nodes()
326    }
327
328    /// Returns an iterator over all edges in the database.
329    ///
330    /// Useful for dump/export operations.
331    pub fn iter_edges(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Edge> + '_ {
332        self.store.all_edges()
333    }
334}