Skip to main content

grafeo_core/graph/lpg/
section.rs

1//! LPG section serializer for the `.grafeo` container format.
2//!
3//! Implements the [`Section`] trait for LPG graph data (nodes, edges,
4//! properties, named graphs). Uses the block-based binary format (v2)
5//! defined in the `block` submodule for efficient serialization, CRC integrity
6//! checking, and future mmap support.
7
8use std::sync::Arc;
9use std::sync::atomic::{AtomicBool, Ordering};
10
11use grafeo_common::storage::section::{Section, SectionType};
12use grafeo_common::types::{EpochId, Value};
13use grafeo_common::utils::error::Result;
14
15use super::block::{self, BlockEdge, BlockNamedGraph, BlockNode};
16use crate::graph::lpg::LpgStore;
17
18/// Current LPG section format version (v2 = block-based).
19const LPG_SECTION_VERSION: u8 = 2;
20
21// ── Collection helpers ──────────────────────────────────────────────
22
23fn collect_block_nodes(store: &LpgStore) -> Vec<BlockNode> {
24    let mut nodes: Vec<BlockNode> = store
25        .all_nodes()
26        .map(|n| {
27            #[cfg(feature = "temporal")]
28            let mut properties: Vec<(String, Vec<(EpochId, Value)>)> = store
29                .node_property_history(n.id)
30                .into_iter()
31                .map(|(k, entries)| (k.to_string(), entries))
32                .collect();
33
34            #[cfg(not(feature = "temporal"))]
35            let mut properties: Vec<(String, Vec<(EpochId, Value)>)> = n
36                .properties
37                .into_iter()
38                .map(|(k, v)| (k.to_string(), vec![(EpochId::new(0), v)]))
39                .collect();
40
41            properties.sort_by(|(a, _), (b, _)| a.cmp(b));
42
43            let mut labels: Vec<String> = n.labels.iter().map(|l| l.to_string()).collect();
44            labels.sort();
45
46            BlockNode {
47                id: n.id,
48                labels,
49                properties,
50            }
51        })
52        .collect();
53    nodes.sort_by_key(|n| n.id);
54    nodes
55}
56
57fn collect_block_edges(store: &LpgStore) -> Vec<BlockEdge> {
58    let mut edges: Vec<BlockEdge> = store
59        .all_edges()
60        .map(|e| {
61            #[cfg(feature = "temporal")]
62            let mut properties: Vec<(String, Vec<(EpochId, Value)>)> = store
63                .edge_property_history(e.id)
64                .into_iter()
65                .map(|(k, entries)| (k.to_string(), entries))
66                .collect();
67
68            #[cfg(not(feature = "temporal"))]
69            let mut properties: Vec<(String, Vec<(EpochId, Value)>)> = e
70                .properties
71                .into_iter()
72                .map(|(k, v)| (k.to_string(), vec![(EpochId::new(0), v)]))
73                .collect();
74
75            properties.sort_by(|(a, _), (b, _)| a.cmp(b));
76
77            BlockEdge {
78                id: e.id,
79                src: e.src,
80                dst: e.dst,
81                edge_type: e.edge_type.to_string(),
82                properties,
83            }
84        })
85        .collect();
86    edges.sort_by_key(|e| e.id);
87    edges
88}
89
90fn populate_store(store: &LpgStore, nodes: &[BlockNode], edges: &[BlockEdge]) -> Result<()> {
91    for node in nodes {
92        let label_refs: Vec<&str> = node.labels.iter().map(|s| s.as_str()).collect();
93        store.create_node_with_id(node.id, &label_refs)?;
94        for (key, entries) in &node.properties {
95            #[cfg(feature = "temporal")]
96            for (epoch, value) in entries {
97                store.set_node_property_at_epoch(node.id, key, value.clone(), *epoch);
98            }
99            #[cfg(not(feature = "temporal"))]
100            if let Some((_, value)) = entries.last() {
101                store.set_node_property(node.id, key, value.clone());
102            }
103        }
104    }
105    for edge in edges {
106        store.create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type)?;
107        for (key, entries) in &edge.properties {
108            #[cfg(feature = "temporal")]
109            for (epoch, value) in entries {
110                store.set_edge_property_at_epoch(edge.id, key, value.clone(), *epoch);
111            }
112            #[cfg(not(feature = "temporal"))]
113            if let Some((_, value)) = entries.last() {
114                store.set_edge_property(edge.id, key, value.clone());
115            }
116        }
117    }
118    Ok(())
119}
120
121// ── Section implementation ──────────────────────────────────────────
122
123/// LPG store section for the `.grafeo` container.
124///
125/// Wraps an `Arc<LpgStore>` and implements the [`Section`] trait for
126/// serialization/deserialization of LPG graph data using the block-based
127/// format (v2).
128pub struct LpgStoreSection {
129    store: Arc<LpgStore>,
130    dirty: AtomicBool,
131}
132
133impl LpgStoreSection {
134    /// Create a new LPG section wrapping the given store.
135    pub fn new(store: Arc<LpgStore>) -> Self {
136        Self {
137            store,
138            dirty: AtomicBool::new(false),
139        }
140    }
141
142    /// Mark this section as dirty (has unsaved changes).
143    pub fn mark_dirty(&self) {
144        self.dirty.store(true, Ordering::Release);
145    }
146
147    /// Access the underlying store.
148    #[must_use]
149    pub fn store(&self) -> &Arc<LpgStore> {
150        &self.store
151    }
152}
153
154impl Section for LpgStoreSection {
155    fn section_type(&self) -> SectionType {
156        SectionType::LpgStore
157    }
158
159    fn version(&self) -> u8 {
160        LPG_SECTION_VERSION
161    }
162
163    fn serialize(&self) -> Result<Vec<u8>> {
164        let nodes = collect_block_nodes(&self.store);
165        let edges = collect_block_edges(&self.store);
166
167        let named_graphs: Vec<BlockNamedGraph> = self
168            .store
169            .graph_names()
170            .into_iter()
171            .filter_map(|name| {
172                self.store.graph(&name).map(|graph_store| BlockNamedGraph {
173                    name,
174                    nodes: collect_block_nodes(&graph_store),
175                    edges: collect_block_edges(&graph_store),
176                })
177            })
178            .collect();
179
180        #[cfg(feature = "temporal")]
181        let epoch = self.store.current_epoch().as_u64();
182        #[cfg(not(feature = "temporal"))]
183        let epoch = 0u64;
184
185        block::write_blocks(&nodes, &edges, &named_graphs, epoch)
186    }
187
188    fn deserialize(&mut self, data: &[u8]) -> Result<()> {
189        let store = &self.store;
190
191        block::read_blocks(data, &mut |nodes, edges, named_graphs, epoch| {
192            populate_store(store, &nodes, &edges)?;
193
194            #[cfg(feature = "temporal")]
195            store.sync_epoch(EpochId::new(epoch));
196            #[cfg(not(feature = "temporal"))]
197            let _ = epoch;
198
199            for graph in &named_graphs {
200                store
201                    .create_graph(&graph.name)
202                    .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
203                if let Some(graph_store) = store.graph(&graph.name) {
204                    populate_store(&graph_store, &graph.nodes, &graph.edges)?;
205                    #[cfg(feature = "temporal")]
206                    graph_store.sync_epoch(EpochId::new(epoch));
207                }
208            }
209
210            Ok(())
211        })
212    }
213
214    fn is_dirty(&self) -> bool {
215        self.dirty.load(Ordering::Acquire)
216    }
217
218    fn mark_clean(&self) {
219        self.dirty.store(false, Ordering::Release);
220    }
221
222    fn memory_usage(&self) -> usize {
223        let (store, indexes, mvcc, string_pool) = self.store.memory_breakdown();
224        store.total_bytes + indexes.total_bytes + mvcc.total_bytes + string_pool.total_bytes
225    }
226}
227
228#[cfg(test)]
229mod tests {
230    use super::*;
231    use grafeo_common::types::{NodeId, PropertyKey, Value};
232
233    #[test]
234    fn lpg_section_round_trip() {
235        let store = Arc::new(LpgStore::new().unwrap());
236        store.create_node(&["Person"]);
237        store.create_node(&["Person"]);
238        let n1 = NodeId::new(1);
239        let n2 = NodeId::new(2);
240        store.set_node_property(n1, "name", Value::String("Alix".into()));
241        store.set_node_property(n2, "name", Value::String("Gus".into()));
242        store.create_edge(n1, n2, "KNOWS");
243
244        let section = LpgStoreSection::new(Arc::clone(&store));
245        let bytes = section.serialize().expect("serialize should succeed");
246        assert!(!bytes.is_empty());
247        assert!(block::is_block_format(&bytes));
248
249        // Deserialize into a fresh store
250        let store2 = Arc::new(LpgStore::new().unwrap());
251        let mut section2 = LpgStoreSection::new(store2);
252        section2
253            .deserialize(&bytes)
254            .expect("deserialize should succeed");
255
256        assert_eq!(section2.store().node_count(), 2);
257        assert_eq!(section2.store().edge_count(), 1);
258    }
259
260    #[test]
261    fn lpg_section_dirty_tracking() {
262        let store = Arc::new(LpgStore::new().unwrap());
263        let section = LpgStoreSection::new(store);
264
265        assert!(!section.is_dirty());
266        section.mark_dirty();
267        assert!(section.is_dirty());
268        section.mark_clean();
269        assert!(!section.is_dirty());
270    }
271
272    #[test]
273    fn lpg_section_type() {
274        let store = Arc::new(LpgStore::new().unwrap());
275        let section = LpgStoreSection::new(store);
276        assert_eq!(section.section_type(), SectionType::LpgStore);
277        assert_eq!(section.version(), LPG_SECTION_VERSION);
278    }
279
280    #[test]
281    fn lpg_section_empty_round_trip() {
282        let store = Arc::new(LpgStore::new().unwrap());
283        let section = LpgStoreSection::new(Arc::clone(&store));
284        let bytes = section.serialize().unwrap();
285
286        let store2 = Arc::new(LpgStore::new().unwrap());
287        let mut section2 = LpgStoreSection::new(store2);
288        section2.deserialize(&bytes).unwrap();
289        assert_eq!(section2.store().node_count(), 0);
290        assert_eq!(section2.store().edge_count(), 0);
291    }
292
293    #[test]
294    fn lpg_section_properties_preserved() {
295        let store = Arc::new(LpgStore::new().unwrap());
296        let n = store.create_node(&["Person"]);
297        store.set_node_property(n, "name", Value::String("Alix".into()));
298        store.set_node_property(n, "age", Value::Int64(30));
299        store.set_node_property(n, "active", Value::Bool(true));
300
301        let section = LpgStoreSection::new(Arc::clone(&store));
302        let bytes = section.serialize().unwrap();
303
304        let store2 = Arc::new(LpgStore::new().unwrap());
305        let mut section2 = LpgStoreSection::new(Arc::clone(&store2));
306        section2.deserialize(&bytes).unwrap();
307
308        let node = store2.get_node(n).unwrap();
309        let name_key: PropertyKey = "name".into();
310        let age_key: PropertyKey = "age".into();
311        let active_key: PropertyKey = "active".into();
312        assert_eq!(
313            node.properties.get(&name_key),
314            Some(&Value::String("Alix".into()))
315        );
316        assert_eq!(node.properties.get(&age_key), Some(&Value::Int64(30)));
317        assert_eq!(node.properties.get(&active_key), Some(&Value::Bool(true)));
318    }
319
320    #[test]
321    fn lpg_section_named_graphs() {
322        let store = Arc::new(LpgStore::new().unwrap());
323        store.create_node(&["Root"]);
324        store.create_graph("social").unwrap();
325
326        if let Some(g) = store.graph("social") {
327            g.create_node(&["Friend"]);
328        }
329
330        let section = LpgStoreSection::new(Arc::clone(&store));
331        let bytes = section.serialize().unwrap();
332
333        let store2 = Arc::new(LpgStore::new().unwrap());
334        let mut section2 = LpgStoreSection::new(Arc::clone(&store2));
335        section2.deserialize(&bytes).unwrap();
336
337        assert_eq!(store2.node_count(), 1);
338        assert!(store2.graph("social").is_some());
339        assert_eq!(store2.graph("social").unwrap().node_count(), 1);
340    }
341
342    #[test]
343    fn lpg_section_crc_integrity() {
344        let store = Arc::new(LpgStore::new().unwrap());
345        store.create_node(&["Test"]);
346
347        let section = LpgStoreSection::new(Arc::clone(&store));
348        let mut bytes = section.serialize().unwrap();
349
350        // Corrupt a byte
351        let last = bytes.len() - 1;
352        bytes[last] ^= 0xFF;
353
354        let store2 = Arc::new(LpgStore::new().unwrap());
355        let mut section2 = LpgStoreSection::new(store2);
356        assert!(section2.deserialize(&bytes).is_err());
357    }
358}