grafeo_core/graph/lpg/store/
schema.rs1use super::{LpgStore, PropertyUndoEntry};
4#[cfg(feature = "temporal")]
5use grafeo_common::types::EpochId;
6use grafeo_common::types::{NodeId, TransactionId};
7use grafeo_common::utils::hash::FxHashMap;
8
9impl LpgStore {
10 #[cfg(not(feature = "tiered-storage"))]
15 pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
16 let epoch = self.current_epoch();
17
18 let nodes = self.nodes.read();
20 if let Some(chain) = nodes.get(&node_id) {
21 if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
22 return false;
23 }
24 } else {
25 return false;
26 }
27 drop(nodes);
28
29 let label_id = self.get_or_create_label_id(label);
31
32 let mut node_labels = self.node_labels.write();
34
35 #[cfg(not(feature = "temporal"))]
36 {
37 let label_set = node_labels.entry(node_id).or_default();
38 if label_set.contains(&label_id) {
39 return false;
40 }
41 label_set.insert(label_id);
42 }
43
44 #[cfg(feature = "temporal")]
45 {
46 let current = node_labels
47 .get(&node_id)
48 .and_then(|log| log.latest())
49 .cloned()
50 .unwrap_or_default();
51 if current.contains(&label_id) {
52 return false;
53 }
54 let mut new_set = current;
55 new_set.insert(label_id);
56 node_labels
57 .entry(node_id)
58 .or_default()
59 .append(self.current_epoch(), new_set);
60 }
61
62 drop(node_labels);
63
64 let mut index = self.label_index.write();
66 if (label_id as usize) >= index.len() {
67 index.resize(label_id as usize + 1, FxHashMap::default());
68 }
69 index[label_id as usize].insert(node_id, ());
70
71 #[cfg(not(feature = "temporal"))]
73 if let Some(chain) = self.nodes.write().get_mut(&node_id)
74 && let Some(record) = chain.latest_mut()
75 {
76 let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
77 record.set_label_count(count as u16);
78 }
79
80 true
81 }
82
83 #[cfg(feature = "tiered-storage")]
86 pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
87 let epoch = self.current_epoch();
88
89 let versions = self.node_versions.read();
91 if let Some(index) = versions.get(&node_id) {
92 if let Some(vref) = index.visible_at(epoch) {
93 if let Some(record) = self.read_node_record(&vref) {
94 if record.is_deleted() {
95 return false;
96 }
97 } else {
98 return false;
99 }
100 } else {
101 return false;
102 }
103 } else {
104 return false;
105 }
106 drop(versions);
107
108 let label_id = self.get_or_create_label_id(label);
110
111 let mut node_labels = self.node_labels.write();
113
114 #[cfg(not(feature = "temporal"))]
115 {
116 let label_set = node_labels.entry(node_id).or_default();
117 if label_set.contains(&label_id) {
118 return false;
119 }
120 label_set.insert(label_id);
121 }
122
123 #[cfg(feature = "temporal")]
124 {
125 let current = node_labels
126 .get(&node_id)
127 .and_then(|log| log.latest())
128 .cloned()
129 .unwrap_or_default();
130 if current.contains(&label_id) {
131 return false;
132 }
133 let mut new_set = current;
134 new_set.insert(label_id);
135 node_labels
136 .entry(node_id)
137 .or_default()
138 .append(self.current_epoch(), new_set);
139 }
140
141 drop(node_labels);
142
143 let mut index = self.label_index.write();
145 if (label_id as usize) >= index.len() {
146 index.resize(label_id as usize + 1, FxHashMap::default());
147 }
148 index[label_id as usize].insert(node_id, ());
149
150 true
151 }
152
153 #[cfg(not(feature = "tiered-storage"))]
158 pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
159 let epoch = self.current_epoch();
160
161 let nodes = self.nodes.read();
163 if let Some(chain) = nodes.get(&node_id) {
164 if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
165 return false;
166 }
167 } else {
168 return false;
169 }
170 drop(nodes);
171
172 let label_id = {
174 let reg = self.label_registry.read();
175 match reg.get_id(label) {
176 Some(id) => id,
177 None => return false, }
179 };
180
181 let mut node_labels = self.node_labels.write();
183
184 #[cfg(not(feature = "temporal"))]
185 {
186 if let Some(label_set) = node_labels.get_mut(&node_id) {
187 if !label_set.remove(&label_id) {
188 return false;
189 }
190 } else {
191 return false;
192 }
193 }
194
195 #[cfg(feature = "temporal")]
196 {
197 let current = node_labels
198 .get(&node_id)
199 .and_then(|log| log.latest())
200 .cloned()
201 .unwrap_or_default();
202 if !current.contains(&label_id) {
203 return false;
204 }
205 let mut new_set = current;
206 new_set.remove(&label_id);
207 node_labels
208 .entry(node_id)
209 .or_default()
210 .append(self.current_epoch(), new_set);
211 }
212
213 drop(node_labels);
214
215 let mut index = self.label_index.write();
217 if (label_id as usize) < index.len() {
218 index[label_id as usize].remove(&node_id);
219 }
220
221 #[cfg(not(feature = "temporal"))]
223 if let Some(chain) = self.nodes.write().get_mut(&node_id)
224 && let Some(record) = chain.latest_mut()
225 {
226 let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
227 record.set_label_count(count as u16);
228 }
229
230 true
231 }
232
233 #[cfg(feature = "tiered-storage")]
236 pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
237 let epoch = self.current_epoch();
238
239 let versions = self.node_versions.read();
241 if let Some(index) = versions.get(&node_id) {
242 if let Some(vref) = index.visible_at(epoch) {
243 if let Some(record) = self.read_node_record(&vref) {
244 if record.is_deleted() {
245 return false;
246 }
247 } else {
248 return false;
249 }
250 } else {
251 return false;
252 }
253 } else {
254 return false;
255 }
256 drop(versions);
257
258 let label_id = {
260 let reg = self.label_registry.read();
261 match reg.get_id(label) {
262 Some(id) => id,
263 None => return false,
264 }
265 };
266
267 let mut node_labels = self.node_labels.write();
269
270 #[cfg(not(feature = "temporal"))]
271 {
272 if let Some(label_set) = node_labels.get_mut(&node_id) {
273 if !label_set.remove(&label_id) {
274 return false;
275 }
276 } else {
277 return false;
278 }
279 }
280
281 #[cfg(feature = "temporal")]
282 {
283 let current = node_labels
284 .get(&node_id)
285 .and_then(|log| log.latest())
286 .cloned()
287 .unwrap_or_default();
288 if !current.contains(&label_id) {
289 return false;
290 }
291 let mut new_set = current;
292 new_set.remove(&label_id);
293 node_labels
294 .entry(node_id)
295 .or_default()
296 .append(self.current_epoch(), new_set);
297 }
298
299 drop(node_labels);
300
301 let mut index = self.label_index.write();
303 if (label_id as usize) < index.len() {
304 index[label_id as usize].remove(&node_id);
305 }
306
307 true
308 }
309
310 pub fn nodes_by_label(&self, label: &str) -> Vec<NodeId> {
316 let reg = self.label_registry.read();
317 if let Some(label_id) = reg.get_id(label) {
318 let index = self.label_index.read();
319 if let Some(set) = index.get(label_id as usize) {
320 let mut ids: Vec<NodeId> = set.keys().copied().collect();
321 ids.sort_unstable();
322 return ids;
323 }
324 }
325 Vec::new()
326 }
327
328 #[must_use]
330 pub fn label_count(&self) -> usize {
331 self.label_registry.read().len()
332 }
333
334 #[must_use]
338 pub fn property_key_count(&self) -> usize {
339 let node_keys = self.node_properties.column_count();
340 let edge_keys = self.edge_properties.column_count();
341 node_keys + edge_keys
345 }
346
347 #[must_use]
349 pub fn edge_type_count(&self) -> usize {
350 self.id_to_edge_type.read().len()
351 }
352
353 pub fn all_labels(&self) -> Vec<String> {
355 self.label_registry
356 .read()
357 .names()
358 .iter()
359 .map(|s| s.to_string())
360 .collect()
361 }
362
363 pub fn all_edge_types(&self) -> Vec<String> {
365 self.id_to_edge_type
366 .read()
367 .iter()
368 .map(|s| s.to_string())
369 .collect()
370 }
371
372 pub fn all_property_keys(&self) -> Vec<String> {
374 let mut keys = std::collections::HashSet::new();
375 for key in self.node_properties.keys() {
376 keys.insert(key.to_string());
377 }
378 for key in self.edge_properties.keys() {
379 keys.insert(key.to_string());
380 }
381 keys.into_iter().collect()
382 }
383
384 #[must_use]
386 pub fn peek_next_node_id(&self) -> u64 {
387 self.next_node_id.load(std::sync::atomic::Ordering::Relaxed)
388 }
389
390 #[must_use]
392 pub fn peek_next_edge_id(&self) -> u64 {
393 self.next_edge_id.load(std::sync::atomic::Ordering::Relaxed)
394 }
395
396 #[cfg(not(feature = "temporal"))]
399 pub fn add_label_versioned(
400 &self,
401 node_id: NodeId,
402 label: &str,
403 transaction_id: TransactionId,
404 ) -> bool {
405 let added = self.add_label(node_id, label);
406 if added {
407 self.property_undo_log
408 .write()
409 .entry(transaction_id)
410 .or_default()
411 .push(PropertyUndoEntry::LabelAdded {
412 node_id,
413 label: label.to_string(),
414 });
415 }
416 added
417 }
418
419 #[cfg(feature = "temporal")]
423 pub fn add_label_versioned(
424 &self,
425 node_id: NodeId,
426 label: &str,
427 transaction_id: TransactionId,
428 ) -> bool {
429 let label_id = self.get_or_create_label_id(label);
430
431 let mut node_labels = self.node_labels.write();
432 let current = node_labels
433 .get(&node_id)
434 .and_then(|log| log.latest())
435 .cloned()
436 .unwrap_or_default();
437 if current.contains(&label_id) {
438 return false;
439 }
440 let mut new_set = current;
441 new_set.insert(label_id);
442 node_labels
443 .entry(node_id)
444 .or_default()
445 .append(EpochId::PENDING, new_set);
446 drop(node_labels);
447
448 let mut index = self.label_index.write();
450 if (label_id as usize) >= index.len() {
451 index.resize(label_id as usize + 1, FxHashMap::default());
452 }
453 index[label_id as usize].insert(node_id, ());
454
455 self.property_undo_log
457 .write()
458 .entry(transaction_id)
459 .or_default()
460 .push(PropertyUndoEntry::LabelAdded {
461 node_id,
462 label: label.to_string(),
463 });
464
465 true
466 }
467
468 #[cfg(not(feature = "temporal"))]
471 pub fn remove_label_versioned(
472 &self,
473 node_id: NodeId,
474 label: &str,
475 transaction_id: TransactionId,
476 ) -> bool {
477 let removed = self.remove_label(node_id, label);
478 if removed {
479 self.property_undo_log
480 .write()
481 .entry(transaction_id)
482 .or_default()
483 .push(PropertyUndoEntry::LabelRemoved {
484 node_id,
485 label: label.to_string(),
486 });
487 }
488 removed
489 }
490
491 #[cfg(feature = "temporal")]
493 pub fn remove_label_versioned(
494 &self,
495 node_id: NodeId,
496 label: &str,
497 transaction_id: TransactionId,
498 ) -> bool {
499 let label_id = {
500 let reg = self.label_registry.read();
501 match reg.get_id(label) {
502 Some(id) => id,
503 None => return false,
504 }
505 };
506
507 let mut node_labels = self.node_labels.write();
508 let current = node_labels
509 .get(&node_id)
510 .and_then(|log| log.latest())
511 .cloned()
512 .unwrap_or_default();
513 if !current.contains(&label_id) {
514 return false;
515 }
516 let mut new_set = current;
517 new_set.remove(&label_id);
518 node_labels
519 .entry(node_id)
520 .or_default()
521 .append(EpochId::PENDING, new_set);
522 drop(node_labels);
523
524 let mut index = self.label_index.write();
526 if (label_id as usize) < index.len() {
527 index[label_id as usize].remove(&node_id);
528 }
529
530 self.property_undo_log
532 .write()
533 .entry(transaction_id)
534 .or_default()
535 .push(PropertyUndoEntry::LabelRemoved {
536 node_id,
537 label: label.to_string(),
538 });
539
540 true
541 }
542}