Skip to main content

grafeo_core/graph/lpg/store/
versioning.rs

1use super::LpgStore;
2use crate::graph::lpg::{EdgeRecord, NodeRecord};
3use grafeo_common::types::{EdgeId, EpochId, NodeId, TxId};
4use grafeo_common::utils::hash::{FxHashMap, FxHashSet};
5use std::sync::atomic::Ordering;
6
7#[cfg(not(feature = "tiered-storage"))]
8use grafeo_common::mvcc::VersionChain;
9
10#[cfg(feature = "tiered-storage")]
11use grafeo_common::mvcc::{ColdVersionRef, HotVersionRef, VersionIndex};
12
13impl LpgStore {
14    /// Discards all uncommitted versions created by a transaction.
15    ///
16    /// This is called during transaction rollback to clean up uncommitted changes.
17    /// The method removes version chain entries created by the specified transaction.
18    #[cfg(not(feature = "tiered-storage"))]
19    pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
20        // Remove uncommitted node versions
21        {
22            let mut nodes = self.nodes.write();
23            for chain in nodes.values_mut() {
24                chain.remove_versions_by(tx_id);
25            }
26            // Remove completely empty chains (no versions left)
27            nodes.retain(|_, chain| !chain.is_empty());
28        }
29
30        // Remove uncommitted edge versions
31        {
32            let mut edges = self.edges.write();
33            for chain in edges.values_mut() {
34                chain.remove_versions_by(tx_id);
35            }
36            // Remove completely empty chains (no versions left)
37            edges.retain(|_, chain| !chain.is_empty());
38        }
39    }
40
41    /// Discards all uncommitted versions created by a transaction.
42    /// (Tiered storage version)
43    #[cfg(feature = "tiered-storage")]
44    pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
45        // Remove uncommitted node versions
46        {
47            let mut versions = self.node_versions.write();
48            for index in versions.values_mut() {
49                index.remove_versions_by(tx_id);
50            }
51            // Remove completely empty indexes (no versions left)
52            versions.retain(|_, index| !index.is_empty());
53        }
54
55        // Remove uncommitted edge versions
56        {
57            let mut versions = self.edge_versions.write();
58            for index in versions.values_mut() {
59                index.remove_versions_by(tx_id);
60            }
61            // Remove completely empty indexes (no versions left)
62            versions.retain(|_, index| !index.is_empty());
63        }
64    }
65
66    /// Garbage collects old versions that are no longer visible to any transaction.
67    ///
68    /// Versions older than `min_epoch` are pruned from version chains, keeping
69    /// at most one old version per entity as a baseline. Empty chains are removed.
70    #[cfg(not(feature = "tiered-storage"))]
71    pub fn gc_versions(&self, min_epoch: EpochId) {
72        {
73            let mut nodes = self.nodes.write();
74            for chain in nodes.values_mut() {
75                chain.gc(min_epoch);
76            }
77            nodes.retain(|_, chain| !chain.is_empty());
78        }
79        {
80            let mut edges = self.edges.write();
81            for chain in edges.values_mut() {
82                chain.gc(min_epoch);
83            }
84            edges.retain(|_, chain| !chain.is_empty());
85        }
86    }
87
88    /// Garbage collects old versions (tiered storage variant).
89    #[cfg(feature = "tiered-storage")]
90    pub fn gc_versions(&self, min_epoch: EpochId) {
91        {
92            let mut versions = self.node_versions.write();
93            for index in versions.values_mut() {
94                index.gc(min_epoch);
95            }
96            versions.retain(|_, index| !index.is_empty());
97        }
98        {
99            let mut versions = self.edge_versions.write();
100            for index in versions.values_mut() {
101                index.gc(min_epoch);
102            }
103            versions.retain(|_, index| !index.is_empty());
104        }
105    }
106
107    /// Freezes an epoch from hot (arena) storage to cold (compressed) storage.
108    ///
109    /// This is called by the transaction manager when an epoch becomes eligible
110    /// for freezing (no active transactions can see it). The freeze process:
111    ///
112    /// 1. Collects all hot version refs for the epoch
113    /// 2. Reads the corresponding records from arena
114    /// 3. Compresses them into a `CompressedEpochBlock`
115    /// 4. Updates `VersionIndex` entries to point to cold storage
116    /// 5. The arena can be deallocated after all epochs in it are frozen
117    ///
118    /// # Arguments
119    ///
120    /// * `epoch` - The epoch to freeze
121    ///
122    /// # Returns
123    ///
124    /// The number of records frozen (nodes + edges).
125    #[cfg(feature = "tiered-storage")]
126    #[allow(unsafe_code)]
127    pub fn freeze_epoch(&self, epoch: EpochId) -> usize {
128        // Collect node records to freeze
129        let mut node_records: Vec<(u64, NodeRecord)> = Vec::new();
130        let mut node_hot_refs: Vec<(NodeId, HotVersionRef)> = Vec::new();
131
132        {
133            let versions = self.node_versions.read();
134            for (node_id, index) in versions.iter() {
135                for hot_ref in index.hot_refs_for_epoch(epoch) {
136                    let arena = self.arena_allocator.arena(hot_ref.epoch);
137                    // SAFETY: The offset was returned by alloc_value_with_offset for a NodeRecord
138                    let record: &NodeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
139                    node_records.push((node_id.as_u64(), *record));
140                    node_hot_refs.push((*node_id, *hot_ref));
141                }
142            }
143        }
144
145        // Collect edge records to freeze
146        let mut edge_records: Vec<(u64, EdgeRecord)> = Vec::new();
147        let mut edge_hot_refs: Vec<(EdgeId, HotVersionRef)> = Vec::new();
148
149        {
150            let versions = self.edge_versions.read();
151            for (edge_id, index) in versions.iter() {
152                for hot_ref in index.hot_refs_for_epoch(epoch) {
153                    let arena = self.arena_allocator.arena(hot_ref.epoch);
154                    // SAFETY: The offset was returned by alloc_value_with_offset for an EdgeRecord
155                    let record: &EdgeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
156                    edge_records.push((edge_id.as_u64(), *record));
157                    edge_hot_refs.push((*edge_id, *hot_ref));
158                }
159            }
160        }
161
162        let total_frozen = node_records.len() + edge_records.len();
163
164        if total_frozen == 0 {
165            return 0;
166        }
167
168        // Freeze to compressed storage
169        let (node_entries, edge_entries) =
170            self.epoch_store
171                .freeze_epoch(epoch, node_records, edge_records);
172
173        // Build lookup maps for index entries
174        let node_entry_map: FxHashMap<u64, _> = node_entries
175            .iter()
176            .map(|e| (e.entity_id, (e.offset, e.length)))
177            .collect();
178        let edge_entry_map: FxHashMap<u64, _> = edge_entries
179            .iter()
180            .map(|e| (e.entity_id, (e.offset, e.length)))
181            .collect();
182
183        // Update version indexes to use cold refs
184        {
185            let mut versions = self.node_versions.write();
186            for (node_id, hot_ref) in &node_hot_refs {
187                if let Some(index) = versions.get_mut(node_id)
188                    && let Some(&(offset, length)) = node_entry_map.get(&node_id.as_u64())
189                {
190                    let cold_ref = ColdVersionRef {
191                        epoch,
192                        block_offset: offset,
193                        length,
194                        created_by: hot_ref.created_by,
195                        deleted_epoch: hot_ref.deleted_epoch,
196                    };
197                    index.freeze_epoch(epoch, std::iter::once(cold_ref));
198                }
199            }
200        }
201
202        {
203            let mut versions = self.edge_versions.write();
204            for (edge_id, hot_ref) in &edge_hot_refs {
205                if let Some(index) = versions.get_mut(edge_id)
206                    && let Some(&(offset, length)) = edge_entry_map.get(&edge_id.as_u64())
207                {
208                    let cold_ref = ColdVersionRef {
209                        epoch,
210                        block_offset: offset,
211                        length,
212                        created_by: hot_ref.created_by,
213                        deleted_epoch: hot_ref.deleted_epoch,
214                    };
215                    index.freeze_epoch(epoch, std::iter::once(cold_ref));
216                }
217            }
218        }
219
220        total_frozen
221    }
222
223    /// Returns the epoch store for cold storage statistics.
224    #[cfg(feature = "tiered-storage")]
225    #[must_use]
226    pub fn epoch_store(&self) -> &crate::storage::EpochStore {
227        &self.epoch_store
228    }
229
230    // === Recovery Support ===
231
232    /// Creates a node with a specific ID during recovery.
233    ///
234    /// This is used for WAL recovery to restore nodes with their original IDs.
235    /// The caller must ensure IDs don't conflict with existing nodes.
236    #[cfg(not(feature = "tiered-storage"))]
237    pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
238        let epoch = self.current_epoch();
239        let mut record = NodeRecord::new(id, epoch);
240        record.set_label_count(labels.len() as u16);
241
242        // Store labels in node_labels map and label_index
243        let mut node_label_set = FxHashSet::default();
244        for label in labels {
245            let label_id = self.get_or_create_label_id(*label);
246            node_label_set.insert(label_id);
247
248            // Update label index
249            let mut index = self.label_index.write();
250            while index.len() <= label_id as usize {
251                index.push(FxHashMap::default());
252            }
253            index[label_id as usize].insert(id, ());
254        }
255
256        // Store node's labels
257        self.node_labels.write().insert(id, node_label_set);
258
259        // Create version chain with initial version (using SYSTEM tx for recovery)
260        let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
261        self.nodes.write().insert(id, chain);
262
263        // Update next_node_id if necessary to avoid future collisions
264        let id_val = id.as_u64();
265        let _ = self
266            .next_node_id
267            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
268                if id_val >= current {
269                    Some(id_val + 1)
270                } else {
271                    None
272                }
273            });
274    }
275
276    /// Creates a node with a specific ID during recovery.
277    /// (Tiered storage version)
278    #[cfg(feature = "tiered-storage")]
279    pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
280        let epoch = self.current_epoch();
281        let mut record = NodeRecord::new(id, epoch);
282        record.set_label_count(labels.len() as u16);
283
284        // Store labels in node_labels map and label_index
285        let mut node_label_set = FxHashSet::default();
286        for label in labels {
287            let label_id = self.get_or_create_label_id(*label);
288            node_label_set.insert(label_id);
289
290            // Update label index
291            let mut index = self.label_index.write();
292            while index.len() <= label_id as usize {
293                index.push(FxHashMap::default());
294            }
295            index[label_id as usize].insert(id, ());
296        }
297
298        // Store node's labels
299        self.node_labels.write().insert(id, node_label_set);
300
301        // Allocate record in arena and get offset (create epoch if needed)
302        let arena = self.arena_allocator.arena_or_create(epoch);
303        let (offset, _stored) = arena.alloc_value_with_offset(record);
304
305        // Create HotVersionRef (using SYSTEM tx for recovery)
306        let hot_ref = HotVersionRef::new(epoch, offset, TxId::SYSTEM);
307        let mut versions = self.node_versions.write();
308        versions.insert(id, VersionIndex::with_initial(hot_ref));
309
310        // Update next_node_id if necessary to avoid future collisions
311        let id_val = id.as_u64();
312        let _ = self
313            .next_node_id
314            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
315                if id_val >= current {
316                    Some(id_val + 1)
317                } else {
318                    None
319                }
320            });
321    }
322
323    /// Creates an edge with a specific ID during recovery.
324    ///
325    /// This is used for WAL recovery to restore edges with their original IDs.
326    #[cfg(not(feature = "tiered-storage"))]
327    pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
328        let epoch = self.current_epoch();
329        let type_id = self.get_or_create_edge_type_id(edge_type);
330
331        let record = EdgeRecord::new(id, src, dst, type_id, epoch);
332        let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
333        self.edges.write().insert(id, chain);
334
335        // Update adjacency
336        self.forward_adj.add_edge(src, dst, id);
337        if let Some(ref backward) = self.backward_adj {
338            backward.add_edge(dst, src, id);
339        }
340
341        // Update next_edge_id if necessary
342        let id_val = id.as_u64();
343        let _ = self
344            .next_edge_id
345            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
346                if id_val >= current {
347                    Some(id_val + 1)
348                } else {
349                    None
350                }
351            });
352    }
353
354    /// Creates an edge with a specific ID during recovery.
355    /// (Tiered storage version)
356    #[cfg(feature = "tiered-storage")]
357    pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
358        let epoch = self.current_epoch();
359        let type_id = self.get_or_create_edge_type_id(edge_type);
360
361        let record = EdgeRecord::new(id, src, dst, type_id, epoch);
362
363        // Allocate record in arena and get offset (create epoch if needed)
364        let arena = self.arena_allocator.arena_or_create(epoch);
365        let (offset, _stored) = arena.alloc_value_with_offset(record);
366
367        // Create HotVersionRef (using SYSTEM tx for recovery)
368        let hot_ref = HotVersionRef::new(epoch, offset, TxId::SYSTEM);
369        let mut versions = self.edge_versions.write();
370        versions.insert(id, VersionIndex::with_initial(hot_ref));
371
372        // Update adjacency
373        self.forward_adj.add_edge(src, dst, id);
374        if let Some(ref backward) = self.backward_adj {
375            backward.add_edge(dst, src, id);
376        }
377
378        // Update next_edge_id if necessary
379        let id_val = id.as_u64();
380        let _ = self
381            .next_edge_id
382            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
383                if id_val >= current {
384                    Some(id_val + 1)
385                } else {
386                    None
387                }
388            });
389    }
390
391    /// Sets the current epoch during recovery.
392    pub fn set_epoch(&self, epoch: EpochId) {
393        self.current_epoch.store(epoch.as_u64(), Ordering::SeqCst);
394    }
395}