Skip to main content

grafeo_engine/database/
persistence.rs

1//! Persistence, snapshots, and data export for GrafeoDB.
2
3use std::path::Path;
4
5use grafeo_common::types::{EdgeId, NodeId, Value};
6use grafeo_common::utils::error::{Error, Result};
7
8use crate::config::Config;
9
10#[cfg(feature = "wal")]
11use grafeo_adapters::storage::wal::WalRecord;
12
13/// Binary snapshot format for database export/import.
14#[derive(serde::Serialize, serde::Deserialize)]
15struct Snapshot {
16    version: u8,
17    nodes: Vec<SnapshotNode>,
18    edges: Vec<SnapshotEdge>,
19}
20
21#[derive(serde::Serialize, serde::Deserialize)]
22struct SnapshotNode {
23    id: NodeId,
24    labels: Vec<String>,
25    properties: Vec<(String, Value)>,
26}
27
28#[derive(serde::Serialize, serde::Deserialize)]
29struct SnapshotEdge {
30    id: EdgeId,
31    src: NodeId,
32    dst: NodeId,
33    edge_type: String,
34    properties: Vec<(String, Value)>,
35}
36
37impl super::GrafeoDB {
38    // =========================================================================
39    // ADMIN API: Persistence Control
40    // =========================================================================
41
42    /// Saves the database to a file path.
43    ///
44    /// - If in-memory: creates a new persistent database at path
45    /// - If file-backed: creates a copy at the new path
46    ///
47    /// The original database remains unchanged.
48    ///
49    /// # Errors
50    ///
51    /// Returns an error if the save operation fails.
52    ///
53    /// Requires the `wal` feature for persistence support.
54    #[cfg(feature = "wal")]
55    pub fn save(&self, path: impl AsRef<Path>) -> Result<()> {
56        let path = path.as_ref();
57
58        // Create target database with WAL enabled
59        let target_config = Config::persistent(path);
60        let target = Self::with_config(target_config)?;
61
62        // Copy all nodes using WAL-enabled methods
63        for node in self.store.all_nodes() {
64            let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
65            target.store.create_node_with_id(node.id, &label_refs);
66
67            // Log to WAL
68            target.log_wal(&WalRecord::CreateNode {
69                id: node.id,
70                labels: node.labels.iter().map(|s| s.to_string()).collect(),
71            })?;
72
73            // Copy properties
74            for (key, value) in node.properties {
75                target
76                    .store
77                    .set_node_property(node.id, key.as_str(), value.clone());
78                target.log_wal(&WalRecord::SetNodeProperty {
79                    id: node.id,
80                    key: key.to_string(),
81                    value,
82                })?;
83            }
84        }
85
86        // Copy all edges using WAL-enabled methods
87        for edge in self.store.all_edges() {
88            target
89                .store
90                .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
91
92            // Log to WAL
93            target.log_wal(&WalRecord::CreateEdge {
94                id: edge.id,
95                src: edge.src,
96                dst: edge.dst,
97                edge_type: edge.edge_type.to_string(),
98            })?;
99
100            // Copy properties
101            for (key, value) in edge.properties {
102                target
103                    .store
104                    .set_edge_property(edge.id, key.as_str(), value.clone());
105                target.log_wal(&WalRecord::SetEdgeProperty {
106                    id: edge.id,
107                    key: key.to_string(),
108                    value,
109                })?;
110            }
111        }
112
113        // Checkpoint and close the target database
114        target.close()?;
115
116        Ok(())
117    }
118
119    /// Creates an in-memory copy of this database.
120    ///
121    /// Returns a new database that is completely independent.
122    /// Useful for:
123    /// - Testing modifications without affecting the original
124    /// - Faster operations when persistence isn't needed
125    ///
126    /// # Errors
127    ///
128    /// Returns an error if the copy operation fails.
129    pub fn to_memory(&self) -> Result<Self> {
130        let config = Config::in_memory();
131        let target = Self::with_config(config)?;
132
133        // Copy all nodes
134        for node in self.store.all_nodes() {
135            let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
136            target.store.create_node_with_id(node.id, &label_refs);
137
138            // Copy properties
139            for (key, value) in node.properties {
140                target.store.set_node_property(node.id, key.as_str(), value);
141            }
142        }
143
144        // Copy all edges
145        for edge in self.store.all_edges() {
146            target
147                .store
148                .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
149
150            // Copy properties
151            for (key, value) in edge.properties {
152                target.store.set_edge_property(edge.id, key.as_str(), value);
153            }
154        }
155
156        Ok(target)
157    }
158
159    /// Opens a database file and loads it entirely into memory.
160    ///
161    /// The returned database has no connection to the original file.
162    /// Changes will NOT be written back to the file.
163    ///
164    /// # Errors
165    ///
166    /// Returns an error if the file can't be opened or loaded.
167    #[cfg(feature = "wal")]
168    pub fn open_in_memory(path: impl AsRef<Path>) -> Result<Self> {
169        // Open the source database (triggers WAL recovery)
170        let source = Self::open(path)?;
171
172        // Create in-memory copy
173        let target = source.to_memory()?;
174
175        // Close the source (releases file handles)
176        source.close()?;
177
178        Ok(target)
179    }
180
181    // =========================================================================
182    // ADMIN API: Snapshot Export/Import
183    // =========================================================================
184
185    /// Exports the entire database to a binary snapshot.
186    ///
187    /// The returned bytes can be stored (e.g. in IndexedDB) and later
188    /// restored with [`import_snapshot()`](Self::import_snapshot).
189    ///
190    /// # Errors
191    ///
192    /// Returns an error if serialization fails.
193    pub fn export_snapshot(&self) -> Result<Vec<u8>> {
194        let nodes: Vec<SnapshotNode> = self
195            .store
196            .all_nodes()
197            .map(|n| SnapshotNode {
198                id: n.id,
199                labels: n.labels.iter().map(|l| l.to_string()).collect(),
200                properties: n
201                    .properties
202                    .into_iter()
203                    .map(|(k, v)| (k.to_string(), v))
204                    .collect(),
205            })
206            .collect();
207
208        let edges: Vec<SnapshotEdge> = self
209            .store
210            .all_edges()
211            .map(|e| SnapshotEdge {
212                id: e.id,
213                src: e.src,
214                dst: e.dst,
215                edge_type: e.edge_type.to_string(),
216                properties: e
217                    .properties
218                    .into_iter()
219                    .map(|(k, v)| (k.to_string(), v))
220                    .collect(),
221            })
222            .collect();
223
224        let snapshot = Snapshot {
225            version: 1,
226            nodes,
227            edges,
228        };
229
230        let config = bincode::config::standard();
231        bincode::serde::encode_to_vec(&snapshot, config)
232            .map_err(|e| Error::Internal(format!("snapshot export failed: {e}")))
233    }
234
235    /// Creates a new in-memory database from a binary snapshot.
236    ///
237    /// The `data` must have been produced by [`export_snapshot()`](Self::export_snapshot).
238    ///
239    /// # Errors
240    ///
241    /// Returns an error if the snapshot is invalid or deserialization fails.
242    pub fn import_snapshot(data: &[u8]) -> Result<Self> {
243        let config = bincode::config::standard();
244        let (snapshot, _): (Snapshot, _) = bincode::serde::decode_from_slice(data, config)
245            .map_err(|e| Error::Internal(format!("snapshot import failed: {e}")))?;
246
247        if snapshot.version != 1 {
248            return Err(Error::Internal(format!(
249                "unsupported snapshot version: {}",
250                snapshot.version
251            )));
252        }
253
254        let db = Self::new_in_memory();
255
256        for node in snapshot.nodes {
257            let label_refs: Vec<&str> = node.labels.iter().map(|s| s.as_str()).collect();
258            db.store.create_node_with_id(node.id, &label_refs);
259            for (key, value) in node.properties {
260                db.store.set_node_property(node.id, &key, value);
261            }
262        }
263
264        for edge in snapshot.edges {
265            db.store
266                .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
267            for (key, value) in edge.properties {
268                db.store.set_edge_property(edge.id, &key, value);
269            }
270        }
271
272        Ok(db)
273    }
274
275    // =========================================================================
276    // ADMIN API: Iteration
277    // =========================================================================
278
279    /// Returns an iterator over all nodes in the database.
280    ///
281    /// Useful for dump/export operations.
282    pub fn iter_nodes(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Node> + '_ {
283        self.store.all_nodes()
284    }
285
286    /// Returns an iterator over all edges in the database.
287    ///
288    /// Useful for dump/export operations.
289    pub fn iter_edges(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Edge> + '_ {
290        self.store.all_edges()
291    }
292}