Skip to main content

grafeo_core/graph/lpg/store/
versioning.rs

1use super::LpgStore;
2use crate::graph::lpg::{EdgeRecord, NodeRecord};
3use grafeo_common::types::{EdgeId, EpochId, NodeId, TxId};
4use grafeo_common::utils::hash::{FxHashMap, FxHashSet};
5use std::sync::atomic::Ordering;
6
7#[cfg(not(feature = "tiered-storage"))]
8use grafeo_common::mvcc::VersionChain;
9
10#[cfg(feature = "tiered-storage")]
11use grafeo_common::mvcc::{ColdVersionRef, HotVersionRef, VersionIndex};
12
13impl LpgStore {
14    /// Discards all uncommitted versions created by a transaction.
15    ///
16    /// This is called during transaction rollback to clean up uncommitted changes.
17    /// The method removes version chain entries created by the specified transaction.
18    #[cfg(not(feature = "tiered-storage"))]
19    pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
20        // Remove uncommitted node versions
21        {
22            let mut nodes = self.nodes.write();
23            for chain in nodes.values_mut() {
24                chain.remove_versions_by(tx_id);
25            }
26            // Remove completely empty chains (no versions left)
27            nodes.retain(|_, chain| !chain.is_empty());
28        }
29
30        // Remove uncommitted edge versions
31        {
32            let mut edges = self.edges.write();
33            for chain in edges.values_mut() {
34                chain.remove_versions_by(tx_id);
35            }
36            // Remove completely empty chains (no versions left)
37            edges.retain(|_, chain| !chain.is_empty());
38        }
39
40        // Counters may be out of sync after rollback — force full recompute
41        self.needs_stats_recompute.store(true, Ordering::Relaxed);
42    }
43
44    /// Discards all uncommitted versions created by a transaction.
45    /// (Tiered storage version)
46    #[cfg(feature = "tiered-storage")]
47    pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
48        // Remove uncommitted node versions
49        {
50            let mut versions = self.node_versions.write();
51            for index in versions.values_mut() {
52                index.remove_versions_by(tx_id);
53            }
54            // Remove completely empty indexes (no versions left)
55            versions.retain(|_, index| !index.is_empty());
56        }
57
58        // Remove uncommitted edge versions
59        {
60            let mut versions = self.edge_versions.write();
61            for index in versions.values_mut() {
62                index.remove_versions_by(tx_id);
63            }
64            // Remove completely empty indexes (no versions left)
65            versions.retain(|_, index| !index.is_empty());
66        }
67
68        // Counters may be out of sync after rollback — force full recompute
69        self.needs_stats_recompute.store(true, Ordering::Relaxed);
70    }
71
72    /// Garbage collects old versions that are no longer visible to any transaction.
73    ///
74    /// Versions older than `min_epoch` are pruned from version chains, keeping
75    /// at most one old version per entity as a baseline. Empty chains are removed.
76    #[cfg(not(feature = "tiered-storage"))]
77    pub fn gc_versions(&self, min_epoch: EpochId) {
78        {
79            let mut nodes = self.nodes.write();
80            for chain in nodes.values_mut() {
81                chain.gc(min_epoch);
82            }
83            nodes.retain(|_, chain| !chain.is_empty());
84        }
85        {
86            let mut edges = self.edges.write();
87            for chain in edges.values_mut() {
88                chain.gc(min_epoch);
89            }
90            edges.retain(|_, chain| !chain.is_empty());
91        }
92    }
93
94    /// Garbage collects old versions (tiered storage variant).
95    #[cfg(feature = "tiered-storage")]
96    pub fn gc_versions(&self, min_epoch: EpochId) {
97        {
98            let mut versions = self.node_versions.write();
99            for index in versions.values_mut() {
100                index.gc(min_epoch);
101            }
102            versions.retain(|_, index| !index.is_empty());
103        }
104        {
105            let mut versions = self.edge_versions.write();
106            for index in versions.values_mut() {
107                index.gc(min_epoch);
108            }
109            versions.retain(|_, index| !index.is_empty());
110        }
111    }
112
113    /// Freezes an epoch from hot (arena) storage to cold (compressed) storage.
114    ///
115    /// This is called by the transaction manager when an epoch becomes eligible
116    /// for freezing (no active transactions can see it). The freeze process:
117    ///
118    /// 1. Collects all hot version refs for the epoch
119    /// 2. Reads the corresponding records from arena
120    /// 3. Compresses them into a `CompressedEpochBlock`
121    /// 4. Updates `VersionIndex` entries to point to cold storage
122    /// 5. The arena can be deallocated after all epochs in it are frozen
123    ///
124    /// # Arguments
125    ///
126    /// * `epoch` - The epoch to freeze
127    ///
128    /// # Returns
129    ///
130    /// The number of records frozen (nodes + edges).
131    #[cfg(feature = "tiered-storage")]
132    #[allow(unsafe_code)]
133    pub fn freeze_epoch(&self, epoch: EpochId) -> usize {
134        // Collect node records to freeze
135        let mut node_records: Vec<(u64, NodeRecord)> = Vec::new();
136        let mut node_hot_refs: Vec<(NodeId, HotVersionRef)> = Vec::new();
137
138        {
139            let versions = self.node_versions.read();
140            for (node_id, index) in versions.iter() {
141                for hot_ref in index.hot_refs_for_epoch(epoch) {
142                    let arena = self.arena_allocator.arena(hot_ref.epoch);
143                    // SAFETY: The offset was returned by alloc_value_with_offset for a NodeRecord
144                    let record: &NodeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
145                    node_records.push((node_id.as_u64(), *record));
146                    node_hot_refs.push((*node_id, *hot_ref));
147                }
148            }
149        }
150
151        // Collect edge records to freeze
152        let mut edge_records: Vec<(u64, EdgeRecord)> = Vec::new();
153        let mut edge_hot_refs: Vec<(EdgeId, HotVersionRef)> = Vec::new();
154
155        {
156            let versions = self.edge_versions.read();
157            for (edge_id, index) in versions.iter() {
158                for hot_ref in index.hot_refs_for_epoch(epoch) {
159                    let arena = self.arena_allocator.arena(hot_ref.epoch);
160                    // SAFETY: The offset was returned by alloc_value_with_offset for an EdgeRecord
161                    let record: &EdgeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
162                    edge_records.push((edge_id.as_u64(), *record));
163                    edge_hot_refs.push((*edge_id, *hot_ref));
164                }
165            }
166        }
167
168        let total_frozen = node_records.len() + edge_records.len();
169
170        if total_frozen == 0 {
171            return 0;
172        }
173
174        // Freeze to compressed storage
175        let (node_entries, edge_entries) =
176            self.epoch_store
177                .freeze_epoch(epoch, node_records, edge_records);
178
179        // Build lookup maps for index entries
180        let node_entry_map: FxHashMap<u64, _> = node_entries
181            .iter()
182            .map(|e| (e.entity_id, (e.offset, e.length)))
183            .collect();
184        let edge_entry_map: FxHashMap<u64, _> = edge_entries
185            .iter()
186            .map(|e| (e.entity_id, (e.offset, e.length)))
187            .collect();
188
189        // Update version indexes to use cold refs
190        {
191            let mut versions = self.node_versions.write();
192            for (node_id, hot_ref) in &node_hot_refs {
193                if let Some(index) = versions.get_mut(node_id)
194                    && let Some(&(offset, length)) = node_entry_map.get(&node_id.as_u64())
195                {
196                    let cold_ref = ColdVersionRef {
197                        epoch,
198                        block_offset: offset,
199                        length,
200                        created_by: hot_ref.created_by,
201                        deleted_epoch: hot_ref.deleted_epoch,
202                    };
203                    index.freeze_epoch(epoch, std::iter::once(cold_ref));
204                }
205            }
206        }
207
208        {
209            let mut versions = self.edge_versions.write();
210            for (edge_id, hot_ref) in &edge_hot_refs {
211                if let Some(index) = versions.get_mut(edge_id)
212                    && let Some(&(offset, length)) = edge_entry_map.get(&edge_id.as_u64())
213                {
214                    let cold_ref = ColdVersionRef {
215                        epoch,
216                        block_offset: offset,
217                        length,
218                        created_by: hot_ref.created_by,
219                        deleted_epoch: hot_ref.deleted_epoch,
220                    };
221                    index.freeze_epoch(epoch, std::iter::once(cold_ref));
222                }
223            }
224        }
225
226        total_frozen
227    }
228
229    /// Returns the epoch store for cold storage statistics.
230    #[cfg(feature = "tiered-storage")]
231    #[must_use]
232    pub fn epoch_store(&self) -> &crate::storage::EpochStore {
233        &self.epoch_store
234    }
235
236    // === Recovery Support ===
237
238    /// Creates a node with a specific ID during recovery.
239    ///
240    /// This is used for WAL recovery to restore nodes with their original IDs.
241    /// The caller must ensure IDs don't conflict with existing nodes.
242    #[cfg(not(feature = "tiered-storage"))]
243    pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
244        let epoch = self.current_epoch();
245        let mut record = NodeRecord::new(id, epoch);
246        record.set_label_count(labels.len() as u16);
247
248        // Store labels in node_labels map and label_index
249        let mut node_label_set = FxHashSet::default();
250        for label in labels {
251            let label_id = self.get_or_create_label_id(*label);
252            node_label_set.insert(label_id);
253
254            // Update label index
255            let mut index = self.label_index.write();
256            while index.len() <= label_id as usize {
257                index.push(FxHashMap::default());
258            }
259            index[label_id as usize].insert(id, ());
260        }
261
262        // Store node's labels
263        self.node_labels.write().insert(id, node_label_set);
264
265        // Create version chain with initial version (using SYSTEM tx for recovery)
266        let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
267        self.nodes.write().insert(id, chain);
268        self.live_node_count.fetch_add(1, Ordering::Relaxed);
269
270        // Update next_node_id if necessary to avoid future collisions
271        let id_val = id.as_u64();
272        let _ = self
273            .next_node_id
274            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
275                if id_val >= current {
276                    Some(id_val + 1)
277                } else {
278                    None
279                }
280            });
281    }
282
283    /// Creates a node with a specific ID during recovery.
284    /// (Tiered storage version)
285    #[cfg(feature = "tiered-storage")]
286    pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
287        let epoch = self.current_epoch();
288        let mut record = NodeRecord::new(id, epoch);
289        record.set_label_count(labels.len() as u16);
290
291        // Store labels in node_labels map and label_index
292        let mut node_label_set = FxHashSet::default();
293        for label in labels {
294            let label_id = self.get_or_create_label_id(*label);
295            node_label_set.insert(label_id);
296
297            // Update label index
298            let mut index = self.label_index.write();
299            while index.len() <= label_id as usize {
300                index.push(FxHashMap::default());
301            }
302            index[label_id as usize].insert(id, ());
303        }
304
305        // Store node's labels
306        self.node_labels.write().insert(id, node_label_set);
307
308        // Allocate record in arena and get offset (create epoch if needed)
309        let arena = self.arena_allocator.arena_or_create(epoch);
310        let (offset, _stored) = arena.alloc_value_with_offset(record);
311
312        // Create HotVersionRef (using SYSTEM tx for recovery)
313        let hot_ref = HotVersionRef::new(epoch, offset, TxId::SYSTEM);
314        let mut versions = self.node_versions.write();
315        versions.insert(id, VersionIndex::with_initial(hot_ref));
316        self.live_node_count.fetch_add(1, Ordering::Relaxed);
317
318        // Update next_node_id if necessary to avoid future collisions
319        let id_val = id.as_u64();
320        let _ = self
321            .next_node_id
322            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
323                if id_val >= current {
324                    Some(id_val + 1)
325                } else {
326                    None
327                }
328            });
329    }
330
331    /// Creates an edge with a specific ID during recovery.
332    ///
333    /// This is used for WAL recovery to restore edges with their original IDs.
334    #[cfg(not(feature = "tiered-storage"))]
335    pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
336        let epoch = self.current_epoch();
337        let type_id = self.get_or_create_edge_type_id(edge_type);
338
339        let record = EdgeRecord::new(id, src, dst, type_id, epoch);
340        let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
341        self.edges.write().insert(id, chain);
342
343        // Update adjacency
344        self.forward_adj.add_edge(src, dst, id);
345        if let Some(ref backward) = self.backward_adj {
346            backward.add_edge(dst, src, id);
347        }
348
349        self.live_edge_count.fetch_add(1, Ordering::Relaxed);
350        self.increment_edge_type_count(type_id);
351
352        // Update next_edge_id if necessary
353        let id_val = id.as_u64();
354        let _ = self
355            .next_edge_id
356            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
357                if id_val >= current {
358                    Some(id_val + 1)
359                } else {
360                    None
361                }
362            });
363    }
364
365    /// Creates an edge with a specific ID during recovery.
366    /// (Tiered storage version)
367    #[cfg(feature = "tiered-storage")]
368    pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
369        let epoch = self.current_epoch();
370        let type_id = self.get_or_create_edge_type_id(edge_type);
371
372        let record = EdgeRecord::new(id, src, dst, type_id, epoch);
373
374        // Allocate record in arena and get offset (create epoch if needed)
375        let arena = self.arena_allocator.arena_or_create(epoch);
376        let (offset, _stored) = arena.alloc_value_with_offset(record);
377
378        // Create HotVersionRef (using SYSTEM tx for recovery)
379        let hot_ref = HotVersionRef::new(epoch, offset, TxId::SYSTEM);
380        let mut versions = self.edge_versions.write();
381        versions.insert(id, VersionIndex::with_initial(hot_ref));
382
383        // Update adjacency
384        self.forward_adj.add_edge(src, dst, id);
385        if let Some(ref backward) = self.backward_adj {
386            backward.add_edge(dst, src, id);
387        }
388
389        self.live_edge_count.fetch_add(1, Ordering::Relaxed);
390        self.increment_edge_type_count(type_id);
391
392        // Update next_edge_id if necessary
393        let id_val = id.as_u64();
394        let _ = self
395            .next_edge_id
396            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
397                if id_val >= current {
398                    Some(id_val + 1)
399                } else {
400                    None
401                }
402            });
403    }
404
405    /// Sets the current epoch during recovery.
406    pub fn set_epoch(&self, epoch: EpochId) {
407        self.current_epoch.store(epoch.as_u64(), Ordering::SeqCst);
408    }
409}