grafeo_core/graph/lpg/store/
versioning.rs1use super::LpgStore;
2use crate::graph::lpg::{EdgeRecord, NodeRecord};
3use grafeo_common::types::{EdgeId, EpochId, NodeId, TxId};
4use grafeo_common::utils::hash::{FxHashMap, FxHashSet};
5use std::sync::atomic::Ordering;
6
7#[cfg(not(feature = "tiered-storage"))]
8use grafeo_common::mvcc::VersionChain;
9
10#[cfg(feature = "tiered-storage")]
11use grafeo_common::mvcc::{ColdVersionRef, HotVersionRef, VersionIndex};
12
13impl LpgStore {
14 #[cfg(not(feature = "tiered-storage"))]
19 pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
20 {
22 let mut nodes = self.nodes.write();
23 for chain in nodes.values_mut() {
24 chain.remove_versions_by(tx_id);
25 }
26 nodes.retain(|_, chain| !chain.is_empty());
28 }
29
30 {
32 let mut edges = self.edges.write();
33 for chain in edges.values_mut() {
34 chain.remove_versions_by(tx_id);
35 }
36 edges.retain(|_, chain| !chain.is_empty());
38 }
39 }
40
41 #[cfg(feature = "tiered-storage")]
44 pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
45 {
47 let mut versions = self.node_versions.write();
48 for index in versions.values_mut() {
49 index.remove_versions_by(tx_id);
50 }
51 versions.retain(|_, index| !index.is_empty());
53 }
54
55 {
57 let mut versions = self.edge_versions.write();
58 for index in versions.values_mut() {
59 index.remove_versions_by(tx_id);
60 }
61 versions.retain(|_, index| !index.is_empty());
63 }
64 }
65
66 #[cfg(not(feature = "tiered-storage"))]
71 pub fn gc_versions(&self, min_epoch: EpochId) {
72 {
73 let mut nodes = self.nodes.write();
74 for chain in nodes.values_mut() {
75 chain.gc(min_epoch);
76 }
77 nodes.retain(|_, chain| !chain.is_empty());
78 }
79 {
80 let mut edges = self.edges.write();
81 for chain in edges.values_mut() {
82 chain.gc(min_epoch);
83 }
84 edges.retain(|_, chain| !chain.is_empty());
85 }
86 }
87
88 #[cfg(feature = "tiered-storage")]
90 pub fn gc_versions(&self, min_epoch: EpochId) {
91 {
92 let mut versions = self.node_versions.write();
93 for index in versions.values_mut() {
94 index.gc(min_epoch);
95 }
96 versions.retain(|_, index| !index.is_empty());
97 }
98 {
99 let mut versions = self.edge_versions.write();
100 for index in versions.values_mut() {
101 index.gc(min_epoch);
102 }
103 versions.retain(|_, index| !index.is_empty());
104 }
105 }
106
107 #[cfg(feature = "tiered-storage")]
126 #[allow(unsafe_code)]
127 pub fn freeze_epoch(&self, epoch: EpochId) -> usize {
128 let mut node_records: Vec<(u64, NodeRecord)> = Vec::new();
130 let mut node_hot_refs: Vec<(NodeId, HotVersionRef)> = Vec::new();
131
132 {
133 let versions = self.node_versions.read();
134 for (node_id, index) in versions.iter() {
135 for hot_ref in index.hot_refs_for_epoch(epoch) {
136 let arena = self.arena_allocator.arena(hot_ref.epoch);
137 let record: &NodeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
139 node_records.push((node_id.as_u64(), *record));
140 node_hot_refs.push((*node_id, *hot_ref));
141 }
142 }
143 }
144
145 let mut edge_records: Vec<(u64, EdgeRecord)> = Vec::new();
147 let mut edge_hot_refs: Vec<(EdgeId, HotVersionRef)> = Vec::new();
148
149 {
150 let versions = self.edge_versions.read();
151 for (edge_id, index) in versions.iter() {
152 for hot_ref in index.hot_refs_for_epoch(epoch) {
153 let arena = self.arena_allocator.arena(hot_ref.epoch);
154 let record: &EdgeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
156 edge_records.push((edge_id.as_u64(), *record));
157 edge_hot_refs.push((*edge_id, *hot_ref));
158 }
159 }
160 }
161
162 let total_frozen = node_records.len() + edge_records.len();
163
164 if total_frozen == 0 {
165 return 0;
166 }
167
168 let (node_entries, edge_entries) =
170 self.epoch_store
171 .freeze_epoch(epoch, node_records, edge_records);
172
173 let node_entry_map: FxHashMap<u64, _> = node_entries
175 .iter()
176 .map(|e| (e.entity_id, (e.offset, e.length)))
177 .collect();
178 let edge_entry_map: FxHashMap<u64, _> = edge_entries
179 .iter()
180 .map(|e| (e.entity_id, (e.offset, e.length)))
181 .collect();
182
183 {
185 let mut versions = self.node_versions.write();
186 for (node_id, hot_ref) in &node_hot_refs {
187 if let Some(index) = versions.get_mut(node_id)
188 && let Some(&(offset, length)) = node_entry_map.get(&node_id.as_u64())
189 {
190 let cold_ref = ColdVersionRef {
191 epoch,
192 block_offset: offset,
193 length,
194 created_by: hot_ref.created_by,
195 deleted_epoch: hot_ref.deleted_epoch,
196 };
197 index.freeze_epoch(epoch, std::iter::once(cold_ref));
198 }
199 }
200 }
201
202 {
203 let mut versions = self.edge_versions.write();
204 for (edge_id, hot_ref) in &edge_hot_refs {
205 if let Some(index) = versions.get_mut(edge_id)
206 && let Some(&(offset, length)) = edge_entry_map.get(&edge_id.as_u64())
207 {
208 let cold_ref = ColdVersionRef {
209 epoch,
210 block_offset: offset,
211 length,
212 created_by: hot_ref.created_by,
213 deleted_epoch: hot_ref.deleted_epoch,
214 };
215 index.freeze_epoch(epoch, std::iter::once(cold_ref));
216 }
217 }
218 }
219
220 total_frozen
221 }
222
223 #[cfg(feature = "tiered-storage")]
225 #[must_use]
226 pub fn epoch_store(&self) -> &crate::storage::EpochStore {
227 &self.epoch_store
228 }
229
230 #[cfg(not(feature = "tiered-storage"))]
237 pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
238 let epoch = self.current_epoch();
239 let mut record = NodeRecord::new(id, epoch);
240 record.set_label_count(labels.len() as u16);
241
242 let mut node_label_set = FxHashSet::default();
244 for label in labels {
245 let label_id = self.get_or_create_label_id(*label);
246 node_label_set.insert(label_id);
247
248 let mut index = self.label_index.write();
250 while index.len() <= label_id as usize {
251 index.push(FxHashMap::default());
252 }
253 index[label_id as usize].insert(id, ());
254 }
255
256 self.node_labels.write().insert(id, node_label_set);
258
259 let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
261 self.nodes.write().insert(id, chain);
262
263 let id_val = id.as_u64();
265 let _ = self
266 .next_node_id
267 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
268 if id_val >= current {
269 Some(id_val + 1)
270 } else {
271 None
272 }
273 });
274 }
275
276 #[cfg(feature = "tiered-storage")]
279 pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
280 let epoch = self.current_epoch();
281 let mut record = NodeRecord::new(id, epoch);
282 record.set_label_count(labels.len() as u16);
283
284 let mut node_label_set = FxHashSet::default();
286 for label in labels {
287 let label_id = self.get_or_create_label_id(*label);
288 node_label_set.insert(label_id);
289
290 let mut index = self.label_index.write();
292 while index.len() <= label_id as usize {
293 index.push(FxHashMap::default());
294 }
295 index[label_id as usize].insert(id, ());
296 }
297
298 self.node_labels.write().insert(id, node_label_set);
300
301 let arena = self.arena_allocator.arena_or_create(epoch);
303 let (offset, _stored) = arena.alloc_value_with_offset(record);
304
305 let hot_ref = HotVersionRef::new(epoch, offset, TxId::SYSTEM);
307 let mut versions = self.node_versions.write();
308 versions.insert(id, VersionIndex::with_initial(hot_ref));
309
310 let id_val = id.as_u64();
312 let _ = self
313 .next_node_id
314 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
315 if id_val >= current {
316 Some(id_val + 1)
317 } else {
318 None
319 }
320 });
321 }
322
323 #[cfg(not(feature = "tiered-storage"))]
327 pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
328 let epoch = self.current_epoch();
329 let type_id = self.get_or_create_edge_type_id(edge_type);
330
331 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
332 let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
333 self.edges.write().insert(id, chain);
334
335 self.forward_adj.add_edge(src, dst, id);
337 if let Some(ref backward) = self.backward_adj {
338 backward.add_edge(dst, src, id);
339 }
340
341 let id_val = id.as_u64();
343 let _ = self
344 .next_edge_id
345 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
346 if id_val >= current {
347 Some(id_val + 1)
348 } else {
349 None
350 }
351 });
352 }
353
354 #[cfg(feature = "tiered-storage")]
357 pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
358 let epoch = self.current_epoch();
359 let type_id = self.get_or_create_edge_type_id(edge_type);
360
361 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
362
363 let arena = self.arena_allocator.arena_or_create(epoch);
365 let (offset, _stored) = arena.alloc_value_with_offset(record);
366
367 let hot_ref = HotVersionRef::new(epoch, offset, TxId::SYSTEM);
369 let mut versions = self.edge_versions.write();
370 versions.insert(id, VersionIndex::with_initial(hot_ref));
371
372 self.forward_adj.add_edge(src, dst, id);
374 if let Some(ref backward) = self.backward_adj {
375 backward.add_edge(dst, src, id);
376 }
377
378 let id_val = id.as_u64();
380 let _ = self
381 .next_edge_id
382 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
383 if id_val >= current {
384 Some(id_val + 1)
385 } else {
386 None
387 }
388 });
389 }
390
391 pub fn set_epoch(&self, epoch: EpochId) {
393 self.current_epoch.store(epoch.as_u64(), Ordering::SeqCst);
394 }
395}