1use std::sync::Arc;
9use std::sync::atomic::{AtomicBool, Ordering};
10
11use grafeo_common::storage::section::{Section, SectionType};
12use grafeo_common::types::{EpochId, Value};
13use grafeo_common::utils::error::Result;
14
15use super::block::{self, BlockEdge, BlockNamedGraph, BlockNode};
16use crate::graph::lpg::LpgStore;
17
18const LPG_SECTION_VERSION: u8 = 2;
20
21fn collect_block_nodes(store: &LpgStore) -> Vec<BlockNode> {
24 let mut nodes: Vec<BlockNode> = store
25 .all_nodes()
26 .map(|n| {
27 #[cfg(feature = "temporal")]
28 let mut properties: Vec<(String, Vec<(EpochId, Value)>)> = store
29 .node_property_history(n.id)
30 .into_iter()
31 .map(|(k, entries)| (k.to_string(), entries))
32 .collect();
33
34 #[cfg(not(feature = "temporal"))]
35 let mut properties: Vec<(String, Vec<(EpochId, Value)>)> = n
36 .properties
37 .into_iter()
38 .map(|(k, v)| (k.to_string(), vec![(EpochId::new(0), v)]))
39 .collect();
40
41 properties.sort_by(|(a, _), (b, _)| a.cmp(b));
42
43 let mut labels: Vec<String> = n.labels.iter().map(|l| l.to_string()).collect();
44 labels.sort();
45
46 BlockNode {
47 id: n.id,
48 labels,
49 properties,
50 }
51 })
52 .collect();
53 nodes.sort_by_key(|n| n.id);
54 nodes
55}
56
57fn collect_block_edges(store: &LpgStore) -> Vec<BlockEdge> {
58 let mut edges: Vec<BlockEdge> = store
59 .all_edges()
60 .map(|e| {
61 #[cfg(feature = "temporal")]
62 let mut properties: Vec<(String, Vec<(EpochId, Value)>)> = store
63 .edge_property_history(e.id)
64 .into_iter()
65 .map(|(k, entries)| (k.to_string(), entries))
66 .collect();
67
68 #[cfg(not(feature = "temporal"))]
69 let mut properties: Vec<(String, Vec<(EpochId, Value)>)> = e
70 .properties
71 .into_iter()
72 .map(|(k, v)| (k.to_string(), vec![(EpochId::new(0), v)]))
73 .collect();
74
75 properties.sort_by(|(a, _), (b, _)| a.cmp(b));
76
77 BlockEdge {
78 id: e.id,
79 src: e.src,
80 dst: e.dst,
81 edge_type: e.edge_type.to_string(),
82 properties,
83 }
84 })
85 .collect();
86 edges.sort_by_key(|e| e.id);
87 edges
88}
89
90fn populate_store(store: &LpgStore, nodes: &[BlockNode], edges: &[BlockEdge]) -> Result<()> {
91 for node in nodes {
92 let label_refs: Vec<&str> = node.labels.iter().map(|s| s.as_str()).collect();
93 store.create_node_with_id(node.id, &label_refs)?;
94 for (key, entries) in &node.properties {
95 #[cfg(feature = "temporal")]
96 for (epoch, value) in entries {
97 store.set_node_property_at_epoch(node.id, key, value.clone(), *epoch);
98 }
99 #[cfg(not(feature = "temporal"))]
100 if let Some((_, value)) = entries.last() {
101 store.set_node_property(node.id, key, value.clone());
102 }
103 }
104 }
105 for edge in edges {
106 store.create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type)?;
107 for (key, entries) in &edge.properties {
108 #[cfg(feature = "temporal")]
109 for (epoch, value) in entries {
110 store.set_edge_property_at_epoch(edge.id, key, value.clone(), *epoch);
111 }
112 #[cfg(not(feature = "temporal"))]
113 if let Some((_, value)) = entries.last() {
114 store.set_edge_property(edge.id, key, value.clone());
115 }
116 }
117 }
118 Ok(())
119}
120
121pub struct LpgStoreSection {
129 store: Arc<LpgStore>,
130 dirty: AtomicBool,
131}
132
133impl LpgStoreSection {
134 pub fn new(store: Arc<LpgStore>) -> Self {
136 Self {
137 store,
138 dirty: AtomicBool::new(false),
139 }
140 }
141
142 pub fn mark_dirty(&self) {
144 self.dirty.store(true, Ordering::Release);
145 }
146
147 #[must_use]
149 pub fn store(&self) -> &Arc<LpgStore> {
150 &self.store
151 }
152}
153
154impl Section for LpgStoreSection {
155 fn section_type(&self) -> SectionType {
156 SectionType::LpgStore
157 }
158
159 fn version(&self) -> u8 {
160 LPG_SECTION_VERSION
161 }
162
163 fn serialize(&self) -> Result<Vec<u8>> {
164 let nodes = collect_block_nodes(&self.store);
165 let edges = collect_block_edges(&self.store);
166
167 let named_graphs: Vec<BlockNamedGraph> = self
168 .store
169 .graph_names()
170 .into_iter()
171 .filter_map(|name| {
172 self.store.graph(&name).map(|graph_store| BlockNamedGraph {
173 name,
174 nodes: collect_block_nodes(&graph_store),
175 edges: collect_block_edges(&graph_store),
176 })
177 })
178 .collect();
179
180 #[cfg(feature = "temporal")]
181 let epoch = self.store.current_epoch().as_u64();
182 #[cfg(not(feature = "temporal"))]
183 let epoch = 0u64;
184
185 block::write_blocks(&nodes, &edges, &named_graphs, epoch)
186 }
187
188 fn deserialize(&mut self, data: &[u8]) -> Result<()> {
189 let store = &self.store;
190
191 block::read_blocks(data, &mut |nodes, edges, named_graphs, epoch| {
192 populate_store(store, &nodes, &edges)?;
193
194 #[cfg(feature = "temporal")]
195 store.sync_epoch(EpochId::new(epoch));
196 #[cfg(not(feature = "temporal"))]
197 let _ = epoch;
198
199 for graph in &named_graphs {
200 store
201 .create_graph(&graph.name)
202 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
203 if let Some(graph_store) = store.graph(&graph.name) {
204 populate_store(&graph_store, &graph.nodes, &graph.edges)?;
205 #[cfg(feature = "temporal")]
206 graph_store.sync_epoch(EpochId::new(epoch));
207 }
208 }
209
210 Ok(())
211 })
212 }
213
214 fn is_dirty(&self) -> bool {
215 self.dirty.load(Ordering::Acquire)
216 }
217
218 fn mark_clean(&self) {
219 self.dirty.store(false, Ordering::Release);
220 }
221
222 fn memory_usage(&self) -> usize {
223 let (store, indexes, mvcc, string_pool) = self.store.memory_breakdown();
224 store.total_bytes + indexes.total_bytes + mvcc.total_bytes + string_pool.total_bytes
225 }
226}
227
228#[cfg(test)]
229mod tests {
230 use super::*;
231 use grafeo_common::types::{NodeId, PropertyKey, Value};
232
233 #[test]
234 fn lpg_section_round_trip() {
235 let store = Arc::new(LpgStore::new().unwrap());
236 store.create_node(&["Person"]);
237 store.create_node(&["Person"]);
238 let n1 = NodeId::new(1);
239 let n2 = NodeId::new(2);
240 store.set_node_property(n1, "name", Value::String("Alix".into()));
241 store.set_node_property(n2, "name", Value::String("Gus".into()));
242 store.create_edge(n1, n2, "KNOWS");
243
244 let section = LpgStoreSection::new(Arc::clone(&store));
245 let bytes = section.serialize().expect("serialize should succeed");
246 assert!(!bytes.is_empty());
247 assert!(block::is_block_format(&bytes));
248
249 let store2 = Arc::new(LpgStore::new().unwrap());
251 let mut section2 = LpgStoreSection::new(store2);
252 section2
253 .deserialize(&bytes)
254 .expect("deserialize should succeed");
255
256 assert_eq!(section2.store().node_count(), 2);
257 assert_eq!(section2.store().edge_count(), 1);
258 }
259
260 #[test]
261 fn lpg_section_dirty_tracking() {
262 let store = Arc::new(LpgStore::new().unwrap());
263 let section = LpgStoreSection::new(store);
264
265 assert!(!section.is_dirty());
266 section.mark_dirty();
267 assert!(section.is_dirty());
268 section.mark_clean();
269 assert!(!section.is_dirty());
270 }
271
272 #[test]
273 fn lpg_section_type() {
274 let store = Arc::new(LpgStore::new().unwrap());
275 let section = LpgStoreSection::new(store);
276 assert_eq!(section.section_type(), SectionType::LpgStore);
277 assert_eq!(section.version(), LPG_SECTION_VERSION);
278 }
279
280 #[test]
281 fn lpg_section_empty_round_trip() {
282 let store = Arc::new(LpgStore::new().unwrap());
283 let section = LpgStoreSection::new(Arc::clone(&store));
284 let bytes = section.serialize().unwrap();
285
286 let store2 = Arc::new(LpgStore::new().unwrap());
287 let mut section2 = LpgStoreSection::new(store2);
288 section2.deserialize(&bytes).unwrap();
289 assert_eq!(section2.store().node_count(), 0);
290 assert_eq!(section2.store().edge_count(), 0);
291 }
292
293 #[test]
294 fn lpg_section_properties_preserved() {
295 let store = Arc::new(LpgStore::new().unwrap());
296 let n = store.create_node(&["Person"]);
297 store.set_node_property(n, "name", Value::String("Alix".into()));
298 store.set_node_property(n, "age", Value::Int64(30));
299 store.set_node_property(n, "active", Value::Bool(true));
300
301 let section = LpgStoreSection::new(Arc::clone(&store));
302 let bytes = section.serialize().unwrap();
303
304 let store2 = Arc::new(LpgStore::new().unwrap());
305 let mut section2 = LpgStoreSection::new(Arc::clone(&store2));
306 section2.deserialize(&bytes).unwrap();
307
308 let node = store2.get_node(n).unwrap();
309 let name_key: PropertyKey = "name".into();
310 let age_key: PropertyKey = "age".into();
311 let active_key: PropertyKey = "active".into();
312 assert_eq!(
313 node.properties.get(&name_key),
314 Some(&Value::String("Alix".into()))
315 );
316 assert_eq!(node.properties.get(&age_key), Some(&Value::Int64(30)));
317 assert_eq!(node.properties.get(&active_key), Some(&Value::Bool(true)));
318 }
319
320 #[test]
321 fn lpg_section_named_graphs() {
322 let store = Arc::new(LpgStore::new().unwrap());
323 store.create_node(&["Root"]);
324 store.create_graph("social").unwrap();
325
326 if let Some(g) = store.graph("social") {
327 g.create_node(&["Friend"]);
328 }
329
330 let section = LpgStoreSection::new(Arc::clone(&store));
331 let bytes = section.serialize().unwrap();
332
333 let store2 = Arc::new(LpgStore::new().unwrap());
334 let mut section2 = LpgStoreSection::new(Arc::clone(&store2));
335 section2.deserialize(&bytes).unwrap();
336
337 assert_eq!(store2.node_count(), 1);
338 assert!(store2.graph("social").is_some());
339 assert_eq!(store2.graph("social").unwrap().node_count(), 1);
340 }
341
342 #[test]
343 fn lpg_section_crc_integrity() {
344 let store = Arc::new(LpgStore::new().unwrap());
345 store.create_node(&["Test"]);
346
347 let section = LpgStoreSection::new(Arc::clone(&store));
348 let mut bytes = section.serialize().unwrap();
349
350 let last = bytes.len() - 1;
352 bytes[last] ^= 0xFF;
353
354 let store2 = Arc::new(LpgStore::new().unwrap());
355 let mut section2 = LpgStoreSection::new(store2);
356 assert!(section2.deserialize(&bytes).is_err());
357 }
358}