grafeo_engine/database/
persistence.rs1use std::path::Path;
4
5use grafeo_common::types::{EdgeId, NodeId, Value};
6use grafeo_common::utils::error::{Error, Result};
7
8use crate::config::Config;
9
10#[cfg(feature = "wal")]
11use grafeo_adapters::storage::wal::WalRecord;
12
13#[derive(serde::Serialize, serde::Deserialize)]
15struct Snapshot {
16 version: u8,
17 nodes: Vec<SnapshotNode>,
18 edges: Vec<SnapshotEdge>,
19}
20
21#[derive(serde::Serialize, serde::Deserialize)]
22struct SnapshotNode {
23 id: NodeId,
24 labels: Vec<String>,
25 properties: Vec<(String, Value)>,
26}
27
28#[derive(serde::Serialize, serde::Deserialize)]
29struct SnapshotEdge {
30 id: EdgeId,
31 src: NodeId,
32 dst: NodeId,
33 edge_type: String,
34 properties: Vec<(String, Value)>,
35}
36
37impl super::GrafeoDB {
38 #[cfg(feature = "wal")]
55 pub fn save(&self, path: impl AsRef<Path>) -> Result<()> {
56 let path = path.as_ref();
57
58 let target_config = Config::persistent(path);
60 let target = Self::with_config(target_config)?;
61
62 for node in self.store.all_nodes() {
64 let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
65 target.store.create_node_with_id(node.id, &label_refs);
66
67 target.log_wal(&WalRecord::CreateNode {
69 id: node.id,
70 labels: node.labels.iter().map(|s| s.to_string()).collect(),
71 })?;
72
73 for (key, value) in node.properties {
75 target
76 .store
77 .set_node_property(node.id, key.as_str(), value.clone());
78 target.log_wal(&WalRecord::SetNodeProperty {
79 id: node.id,
80 key: key.to_string(),
81 value,
82 })?;
83 }
84 }
85
86 for edge in self.store.all_edges() {
88 target
89 .store
90 .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
91
92 target.log_wal(&WalRecord::CreateEdge {
94 id: edge.id,
95 src: edge.src,
96 dst: edge.dst,
97 edge_type: edge.edge_type.to_string(),
98 })?;
99
100 for (key, value) in edge.properties {
102 target
103 .store
104 .set_edge_property(edge.id, key.as_str(), value.clone());
105 target.log_wal(&WalRecord::SetEdgeProperty {
106 id: edge.id,
107 key: key.to_string(),
108 value,
109 })?;
110 }
111 }
112
113 target.close()?;
115
116 Ok(())
117 }
118
119 pub fn to_memory(&self) -> Result<Self> {
130 let config = Config::in_memory();
131 let target = Self::with_config(config)?;
132
133 for node in self.store.all_nodes() {
135 let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
136 target.store.create_node_with_id(node.id, &label_refs);
137
138 for (key, value) in node.properties {
140 target.store.set_node_property(node.id, key.as_str(), value);
141 }
142 }
143
144 for edge in self.store.all_edges() {
146 target
147 .store
148 .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
149
150 for (key, value) in edge.properties {
152 target.store.set_edge_property(edge.id, key.as_str(), value);
153 }
154 }
155
156 Ok(target)
157 }
158
159 #[cfg(feature = "wal")]
168 pub fn open_in_memory(path: impl AsRef<Path>) -> Result<Self> {
169 let source = Self::open(path)?;
171
172 let target = source.to_memory()?;
174
175 source.close()?;
177
178 Ok(target)
179 }
180
181 pub fn export_snapshot(&self) -> Result<Vec<u8>> {
194 let nodes: Vec<SnapshotNode> = self
195 .store
196 .all_nodes()
197 .map(|n| SnapshotNode {
198 id: n.id,
199 labels: n.labels.iter().map(|l| l.to_string()).collect(),
200 properties: n
201 .properties
202 .into_iter()
203 .map(|(k, v)| (k.to_string(), v))
204 .collect(),
205 })
206 .collect();
207
208 let edges: Vec<SnapshotEdge> = self
209 .store
210 .all_edges()
211 .map(|e| SnapshotEdge {
212 id: e.id,
213 src: e.src,
214 dst: e.dst,
215 edge_type: e.edge_type.to_string(),
216 properties: e
217 .properties
218 .into_iter()
219 .map(|(k, v)| (k.to_string(), v))
220 .collect(),
221 })
222 .collect();
223
224 let snapshot = Snapshot {
225 version: 1,
226 nodes,
227 edges,
228 };
229
230 let config = bincode::config::standard();
231 bincode::serde::encode_to_vec(&snapshot, config)
232 .map_err(|e| Error::Internal(format!("snapshot export failed: {e}")))
233 }
234
235 pub fn import_snapshot(data: &[u8]) -> Result<Self> {
243 let config = bincode::config::standard();
244 let (snapshot, _): (Snapshot, _) = bincode::serde::decode_from_slice(data, config)
245 .map_err(|e| Error::Internal(format!("snapshot import failed: {e}")))?;
246
247 if snapshot.version != 1 {
248 return Err(Error::Internal(format!(
249 "unsupported snapshot version: {}",
250 snapshot.version
251 )));
252 }
253
254 let db = Self::new_in_memory();
255
256 for node in snapshot.nodes {
257 let label_refs: Vec<&str> = node.labels.iter().map(|s| s.as_str()).collect();
258 db.store.create_node_with_id(node.id, &label_refs);
259 for (key, value) in node.properties {
260 db.store.set_node_property(node.id, &key, value);
261 }
262 }
263
264 for edge in snapshot.edges {
265 db.store
266 .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
267 for (key, value) in edge.properties {
268 db.store.set_edge_property(edge.id, &key, value);
269 }
270 }
271
272 Ok(db)
273 }
274
275 pub fn iter_nodes(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Node> + '_ {
283 self.store.all_nodes()
284 }
285
286 pub fn iter_edges(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Edge> + '_ {
290 self.store.all_edges()
291 }
292}