grafeo_core/graph/lpg/store/
versioning.rs1use super::LpgStore;
2use crate::graph::lpg::{EdgeRecord, NodeRecord};
3use grafeo_common::types::{EdgeId, EpochId, NodeId, TxId};
4use grafeo_common::utils::hash::{FxHashMap, FxHashSet};
5use std::sync::atomic::Ordering;
6
7#[cfg(not(feature = "tiered-storage"))]
8use grafeo_common::mvcc::VersionChain;
9
10#[cfg(feature = "tiered-storage")]
11use grafeo_common::mvcc::{ColdVersionRef, HotVersionRef, VersionIndex};
12
13impl LpgStore {
14 #[cfg(not(feature = "tiered-storage"))]
19 pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
20 {
22 let mut nodes = self.nodes.write();
23 for chain in nodes.values_mut() {
24 chain.remove_versions_by(tx_id);
25 }
26 nodes.retain(|_, chain| !chain.is_empty());
28 }
29
30 {
32 let mut edges = self.edges.write();
33 for chain in edges.values_mut() {
34 chain.remove_versions_by(tx_id);
35 }
36 edges.retain(|_, chain| !chain.is_empty());
38 }
39
40 self.needs_stats_recompute.store(true, Ordering::Relaxed);
42 }
43
44 #[cfg(feature = "tiered-storage")]
47 pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
48 {
50 let mut versions = self.node_versions.write();
51 for index in versions.values_mut() {
52 index.remove_versions_by(tx_id);
53 }
54 versions.retain(|_, index| !index.is_empty());
56 }
57
58 {
60 let mut versions = self.edge_versions.write();
61 for index in versions.values_mut() {
62 index.remove_versions_by(tx_id);
63 }
64 versions.retain(|_, index| !index.is_empty());
66 }
67
68 self.needs_stats_recompute.store(true, Ordering::Relaxed);
70 }
71
72 #[cfg(not(feature = "tiered-storage"))]
77 pub fn gc_versions(&self, min_epoch: EpochId) {
78 {
79 let mut nodes = self.nodes.write();
80 for chain in nodes.values_mut() {
81 chain.gc(min_epoch);
82 }
83 nodes.retain(|_, chain| !chain.is_empty());
84 }
85 {
86 let mut edges = self.edges.write();
87 for chain in edges.values_mut() {
88 chain.gc(min_epoch);
89 }
90 edges.retain(|_, chain| !chain.is_empty());
91 }
92 }
93
94 #[cfg(feature = "tiered-storage")]
96 pub fn gc_versions(&self, min_epoch: EpochId) {
97 {
98 let mut versions = self.node_versions.write();
99 for index in versions.values_mut() {
100 index.gc(min_epoch);
101 }
102 versions.retain(|_, index| !index.is_empty());
103 }
104 {
105 let mut versions = self.edge_versions.write();
106 for index in versions.values_mut() {
107 index.gc(min_epoch);
108 }
109 versions.retain(|_, index| !index.is_empty());
110 }
111 }
112
113 #[cfg(feature = "tiered-storage")]
132 #[allow(unsafe_code)]
133 pub fn freeze_epoch(&self, epoch: EpochId) -> usize {
134 let mut node_records: Vec<(u64, NodeRecord)> = Vec::new();
136 let mut node_hot_refs: Vec<(NodeId, HotVersionRef)> = Vec::new();
137
138 {
139 let versions = self.node_versions.read();
140 for (node_id, index) in versions.iter() {
141 for hot_ref in index.hot_refs_for_epoch(epoch) {
142 let arena = self.arena_allocator.arena(hot_ref.epoch);
143 let record: &NodeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
145 node_records.push((node_id.as_u64(), *record));
146 node_hot_refs.push((*node_id, *hot_ref));
147 }
148 }
149 }
150
151 let mut edge_records: Vec<(u64, EdgeRecord)> = Vec::new();
153 let mut edge_hot_refs: Vec<(EdgeId, HotVersionRef)> = Vec::new();
154
155 {
156 let versions = self.edge_versions.read();
157 for (edge_id, index) in versions.iter() {
158 for hot_ref in index.hot_refs_for_epoch(epoch) {
159 let arena = self.arena_allocator.arena(hot_ref.epoch);
160 let record: &EdgeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
162 edge_records.push((edge_id.as_u64(), *record));
163 edge_hot_refs.push((*edge_id, *hot_ref));
164 }
165 }
166 }
167
168 let total_frozen = node_records.len() + edge_records.len();
169
170 if total_frozen == 0 {
171 return 0;
172 }
173
174 let (node_entries, edge_entries) =
176 self.epoch_store
177 .freeze_epoch(epoch, node_records, edge_records);
178
179 let node_entry_map: FxHashMap<u64, _> = node_entries
181 .iter()
182 .map(|e| (e.entity_id, (e.offset, e.length)))
183 .collect();
184 let edge_entry_map: FxHashMap<u64, _> = edge_entries
185 .iter()
186 .map(|e| (e.entity_id, (e.offset, e.length)))
187 .collect();
188
189 {
191 let mut versions = self.node_versions.write();
192 for (node_id, hot_ref) in &node_hot_refs {
193 if let Some(index) = versions.get_mut(node_id)
194 && let Some(&(offset, length)) = node_entry_map.get(&node_id.as_u64())
195 {
196 let cold_ref = ColdVersionRef {
197 epoch,
198 block_offset: offset,
199 length,
200 created_by: hot_ref.created_by,
201 deleted_epoch: hot_ref.deleted_epoch,
202 };
203 index.freeze_epoch(epoch, std::iter::once(cold_ref));
204 }
205 }
206 }
207
208 {
209 let mut versions = self.edge_versions.write();
210 for (edge_id, hot_ref) in &edge_hot_refs {
211 if let Some(index) = versions.get_mut(edge_id)
212 && let Some(&(offset, length)) = edge_entry_map.get(&edge_id.as_u64())
213 {
214 let cold_ref = ColdVersionRef {
215 epoch,
216 block_offset: offset,
217 length,
218 created_by: hot_ref.created_by,
219 deleted_epoch: hot_ref.deleted_epoch,
220 };
221 index.freeze_epoch(epoch, std::iter::once(cold_ref));
222 }
223 }
224 }
225
226 total_frozen
227 }
228
229 #[cfg(feature = "tiered-storage")]
231 #[must_use]
232 pub fn epoch_store(&self) -> &crate::storage::EpochStore {
233 &self.epoch_store
234 }
235
236 #[cfg(not(feature = "tiered-storage"))]
243 pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
244 let epoch = self.current_epoch();
245 let mut record = NodeRecord::new(id, epoch);
246 record.set_label_count(labels.len() as u16);
247
248 let mut node_label_set = FxHashSet::default();
250 for label in labels {
251 let label_id = self.get_or_create_label_id(*label);
252 node_label_set.insert(label_id);
253
254 let mut index = self.label_index.write();
256 while index.len() <= label_id as usize {
257 index.push(FxHashMap::default());
258 }
259 index[label_id as usize].insert(id, ());
260 }
261
262 self.node_labels.write().insert(id, node_label_set);
264
265 let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
267 self.nodes.write().insert(id, chain);
268 self.live_node_count.fetch_add(1, Ordering::Relaxed);
269
270 let id_val = id.as_u64();
272 let _ = self
273 .next_node_id
274 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
275 if id_val >= current {
276 Some(id_val + 1)
277 } else {
278 None
279 }
280 });
281 }
282
283 #[cfg(feature = "tiered-storage")]
286 pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
287 let epoch = self.current_epoch();
288 let mut record = NodeRecord::new(id, epoch);
289 record.set_label_count(labels.len() as u16);
290
291 let mut node_label_set = FxHashSet::default();
293 for label in labels {
294 let label_id = self.get_or_create_label_id(*label);
295 node_label_set.insert(label_id);
296
297 let mut index = self.label_index.write();
299 while index.len() <= label_id as usize {
300 index.push(FxHashMap::default());
301 }
302 index[label_id as usize].insert(id, ());
303 }
304
305 self.node_labels.write().insert(id, node_label_set);
307
308 let arena = self.arena_allocator.arena_or_create(epoch);
310 let (offset, _stored) = arena.alloc_value_with_offset(record);
311
312 let hot_ref = HotVersionRef::new(epoch, offset, TxId::SYSTEM);
314 let mut versions = self.node_versions.write();
315 versions.insert(id, VersionIndex::with_initial(hot_ref));
316 self.live_node_count.fetch_add(1, Ordering::Relaxed);
317
318 let id_val = id.as_u64();
320 let _ = self
321 .next_node_id
322 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
323 if id_val >= current {
324 Some(id_val + 1)
325 } else {
326 None
327 }
328 });
329 }
330
331 #[cfg(not(feature = "tiered-storage"))]
335 pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
336 let epoch = self.current_epoch();
337 let type_id = self.get_or_create_edge_type_id(edge_type);
338
339 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
340 let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
341 self.edges.write().insert(id, chain);
342
343 self.forward_adj.add_edge(src, dst, id);
345 if let Some(ref backward) = self.backward_adj {
346 backward.add_edge(dst, src, id);
347 }
348
349 self.live_edge_count.fetch_add(1, Ordering::Relaxed);
350 self.increment_edge_type_count(type_id);
351
352 let id_val = id.as_u64();
354 let _ = self
355 .next_edge_id
356 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
357 if id_val >= current {
358 Some(id_val + 1)
359 } else {
360 None
361 }
362 });
363 }
364
365 #[cfg(feature = "tiered-storage")]
368 pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
369 let epoch = self.current_epoch();
370 let type_id = self.get_or_create_edge_type_id(edge_type);
371
372 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
373
374 let arena = self.arena_allocator.arena_or_create(epoch);
376 let (offset, _stored) = arena.alloc_value_with_offset(record);
377
378 let hot_ref = HotVersionRef::new(epoch, offset, TxId::SYSTEM);
380 let mut versions = self.edge_versions.write();
381 versions.insert(id, VersionIndex::with_initial(hot_ref));
382
383 self.forward_adj.add_edge(src, dst, id);
385 if let Some(ref backward) = self.backward_adj {
386 backward.add_edge(dst, src, id);
387 }
388
389 self.live_edge_count.fetch_add(1, Ordering::Relaxed);
390 self.increment_edge_type_count(type_id);
391
392 let id_val = id.as_u64();
394 let _ = self
395 .next_edge_id
396 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
397 if id_val >= current {
398 Some(id_val + 1)
399 } else {
400 None
401 }
402 });
403 }
404
405 pub fn set_epoch(&self, epoch: EpochId) {
407 self.current_epoch.store(epoch.as_u64(), Ordering::SeqCst);
408 }
409}