grafeo_core/graph/lpg/store/
schema.rs1use super::{LpgStore, PropertyUndoEntry};
4#[cfg(feature = "temporal")]
5use grafeo_common::types::EpochId;
6use grafeo_common::types::{NodeId, TransactionId};
7use grafeo_common::utils::hash::FxHashMap;
8
9impl LpgStore {
10 #[cfg(not(feature = "tiered-storage"))]
15 pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
16 let epoch = self.current_epoch();
17
18 let nodes = self.nodes.read();
20 if let Some(chain) = nodes.get(&node_id) {
21 if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
22 return false;
23 }
24 } else {
25 return false;
26 }
27 drop(nodes);
28
29 let label_id = self.get_or_create_label_id(label);
31
32 let mut node_labels = self.node_labels.write();
34
35 #[cfg(not(feature = "temporal"))]
36 {
37 let label_set = node_labels.entry(node_id).or_default();
38 if label_set.contains(&label_id) {
39 return false;
40 }
41 label_set.insert(label_id);
42 }
43
44 #[cfg(feature = "temporal")]
45 {
46 let current = node_labels
47 .get(&node_id)
48 .and_then(|log| log.latest())
49 .cloned()
50 .unwrap_or_default();
51 if current.contains(&label_id) {
52 return false;
53 }
54 let mut new_set = current;
55 new_set.insert(label_id);
56 node_labels
57 .entry(node_id)
58 .or_default()
59 .append(self.current_epoch(), new_set);
60 }
61
62 drop(node_labels);
63
64 let mut index = self.label_index.write();
66 if (label_id as usize) >= index.len() {
67 index.resize(label_id as usize + 1, FxHashMap::default());
68 }
69 index[label_id as usize].insert(node_id, ());
70
71 #[cfg(not(feature = "temporal"))]
73 if let Some(chain) = self.nodes.write().get_mut(&node_id)
74 && let Some(record) = chain.latest_mut()
75 {
76 let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
77 record.set_label_count(u16::try_from(count).unwrap_or(u16::MAX));
78 }
79
80 true
81 }
82
83 #[cfg(feature = "tiered-storage")]
86 pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
87 let epoch = self.current_epoch();
88
89 let versions = self.node_versions.read();
91 if let Some(index) = versions.get(&node_id) {
92 if let Some(vref) = index.visible_at(epoch) {
93 if let Some(record) = self.read_node_record(&vref) {
94 if record.is_deleted() {
95 return false;
96 }
97 } else {
98 return false;
99 }
100 } else {
101 return false;
102 }
103 } else {
104 return false;
105 }
106 drop(versions);
107
108 let label_id = self.get_or_create_label_id(label);
110
111 let mut node_labels = self.node_labels.write();
113
114 #[cfg(not(feature = "temporal"))]
115 {
116 let label_set = node_labels.entry(node_id).or_default();
117 if label_set.contains(&label_id) {
118 return false;
119 }
120 label_set.insert(label_id);
121 }
122
123 #[cfg(feature = "temporal")]
124 {
125 let current = node_labels
126 .get(&node_id)
127 .and_then(|log| log.latest())
128 .cloned()
129 .unwrap_or_default();
130 if current.contains(&label_id) {
131 return false;
132 }
133 let mut new_set = current;
134 new_set.insert(label_id);
135 node_labels
136 .entry(node_id)
137 .or_default()
138 .append(self.current_epoch(), new_set);
139 }
140
141 drop(node_labels);
142
143 let mut index = self.label_index.write();
145 if (label_id as usize) >= index.len() {
146 index.resize(label_id as usize + 1, FxHashMap::default());
147 }
148 index[label_id as usize].insert(node_id, ());
149
150 true
151 }
152
153 #[cfg(not(feature = "tiered-storage"))]
158 pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
159 let epoch = self.current_epoch();
160
161 let nodes = self.nodes.read();
163 if let Some(chain) = nodes.get(&node_id) {
164 if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
165 return false;
166 }
167 } else {
168 return false;
169 }
170 drop(nodes);
171
172 let label_id = {
174 let reg = self.label_registry.read();
175 match reg.get_id(label) {
176 Some(id) => id,
177 None => return false, }
179 };
180
181 let mut node_labels = self.node_labels.write();
183
184 #[cfg(not(feature = "temporal"))]
185 {
186 if let Some(label_set) = node_labels.get_mut(&node_id) {
187 if !label_set.remove(&label_id) {
188 return false;
189 }
190 } else {
191 return false;
192 }
193 }
194
195 #[cfg(feature = "temporal")]
196 {
197 let current = node_labels
198 .get(&node_id)
199 .and_then(|log| log.latest())
200 .cloned()
201 .unwrap_or_default();
202 if !current.contains(&label_id) {
203 return false;
204 }
205 let mut new_set = current;
206 new_set.remove(&label_id);
207 node_labels
208 .entry(node_id)
209 .or_default()
210 .append(self.current_epoch(), new_set);
211 }
212
213 drop(node_labels);
214
215 let mut index = self.label_index.write();
217 if (label_id as usize) < index.len() {
218 index[label_id as usize].remove(&node_id);
219 }
220
221 #[cfg(not(feature = "temporal"))]
223 if let Some(chain) = self.nodes.write().get_mut(&node_id)
224 && let Some(record) = chain.latest_mut()
225 {
226 let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
227 record.set_label_count(u16::try_from(count).unwrap_or(u16::MAX));
228 }
229
230 true
231 }
232
233 #[cfg(feature = "tiered-storage")]
236 pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
237 let epoch = self.current_epoch();
238
239 let versions = self.node_versions.read();
241 if let Some(index) = versions.get(&node_id) {
242 if let Some(vref) = index.visible_at(epoch) {
243 if let Some(record) = self.read_node_record(&vref) {
244 if record.is_deleted() {
245 return false;
246 }
247 } else {
248 return false;
249 }
250 } else {
251 return false;
252 }
253 } else {
254 return false;
255 }
256 drop(versions);
257
258 let label_id = {
260 let reg = self.label_registry.read();
261 match reg.get_id(label) {
262 Some(id) => id,
263 None => return false,
264 }
265 };
266
267 let mut node_labels = self.node_labels.write();
269
270 #[cfg(not(feature = "temporal"))]
271 {
272 if let Some(label_set) = node_labels.get_mut(&node_id) {
273 if !label_set.remove(&label_id) {
274 return false;
275 }
276 } else {
277 return false;
278 }
279 }
280
281 #[cfg(feature = "temporal")]
282 {
283 let current = node_labels
284 .get(&node_id)
285 .and_then(|log| log.latest())
286 .cloned()
287 .unwrap_or_default();
288 if !current.contains(&label_id) {
289 return false;
290 }
291 let mut new_set = current;
292 new_set.remove(&label_id);
293 node_labels
294 .entry(node_id)
295 .or_default()
296 .append(self.current_epoch(), new_set);
297 }
298
299 drop(node_labels);
300
301 let mut index = self.label_index.write();
303 if (label_id as usize) < index.len() {
304 index[label_id as usize].remove(&node_id);
305 }
306
307 true
308 }
309
310 pub fn nodes_by_label(&self, label: &str) -> Vec<NodeId> {
316 let reg = self.label_registry.read();
317 if let Some(label_id) = reg.get_id(label) {
318 let index = self.label_index.read();
319 if let Some(set) = index.get(label_id as usize) {
320 let mut ids: Vec<NodeId> = set.keys().copied().collect();
321 ids.sort_unstable();
322 return ids;
323 }
324 }
325 Vec::new()
326 }
327
328 #[must_use]
331 pub fn nodes_by_label_count(&self, label: &str) -> usize {
332 let reg = self.label_registry.read();
333 let Some(label_id) = reg.get_id(label) else {
334 return 0;
335 };
336 self.label_index
337 .read()
338 .get(label_id as usize)
339 .map_or(0, |set| set.len())
340 }
341
342 #[must_use]
344 pub fn label_count(&self) -> usize {
345 self.label_registry.read().len()
346 }
347
348 #[must_use]
352 pub fn property_key_count(&self) -> usize {
353 let node_keys = self.node_properties.column_count();
354 let edge_keys = self.edge_properties.column_count();
355 node_keys + edge_keys
359 }
360
361 #[must_use]
363 pub fn edge_type_count(&self) -> usize {
364 self.id_to_edge_type.read().len()
365 }
366
367 pub fn all_labels(&self) -> Vec<String> {
369 self.label_registry
370 .read()
371 .names()
372 .iter()
373 .map(|s| s.to_string())
374 .collect()
375 }
376
377 pub fn all_edge_types(&self) -> Vec<String> {
379 self.id_to_edge_type
380 .read()
381 .iter()
382 .map(|s| s.to_string())
383 .collect()
384 }
385
386 pub fn all_property_keys(&self) -> Vec<String> {
388 let mut keys = std::collections::HashSet::new();
389 for key in self.node_properties.keys() {
390 keys.insert(key.to_string());
391 }
392 for key in self.edge_properties.keys() {
393 keys.insert(key.to_string());
394 }
395 keys.into_iter().collect()
396 }
397
398 #[must_use]
400 pub fn peek_next_node_id(&self) -> u64 {
401 self.next_node_id.load(std::sync::atomic::Ordering::Relaxed)
402 }
403
404 #[must_use]
406 pub fn peek_next_edge_id(&self) -> u64 {
407 self.next_edge_id.load(std::sync::atomic::Ordering::Relaxed)
408 }
409
410 #[cfg(not(feature = "temporal"))]
413 pub fn add_label_versioned(
414 &self,
415 node_id: NodeId,
416 label: &str,
417 transaction_id: TransactionId,
418 ) -> bool {
419 let added = self.add_label(node_id, label);
420 if added {
421 self.property_undo_log
422 .write()
423 .entry(transaction_id)
424 .or_default()
425 .push(PropertyUndoEntry::LabelAdded {
426 node_id,
427 label: label.to_string(),
428 });
429 }
430 added
431 }
432
433 #[cfg(feature = "temporal")]
437 pub fn add_label_versioned(
438 &self,
439 node_id: NodeId,
440 label: &str,
441 transaction_id: TransactionId,
442 ) -> bool {
443 let label_id = self.get_or_create_label_id(label);
444
445 let mut node_labels = self.node_labels.write();
446 let current = node_labels
447 .get(&node_id)
448 .and_then(|log| log.latest())
449 .cloned()
450 .unwrap_or_default();
451 if current.contains(&label_id) {
452 return false;
453 }
454 let mut new_set = current;
455 new_set.insert(label_id);
456 node_labels
457 .entry(node_id)
458 .or_default()
459 .append(EpochId::PENDING, new_set);
460 drop(node_labels);
461
462 let mut index = self.label_index.write();
464 if (label_id as usize) >= index.len() {
465 index.resize(label_id as usize + 1, FxHashMap::default());
466 }
467 index[label_id as usize].insert(node_id, ());
468
469 self.property_undo_log
471 .write()
472 .entry(transaction_id)
473 .or_default()
474 .push(PropertyUndoEntry::LabelAdded {
475 node_id,
476 label: label.to_string(),
477 });
478
479 true
480 }
481
482 #[cfg(not(feature = "temporal"))]
485 pub fn remove_label_versioned(
486 &self,
487 node_id: NodeId,
488 label: &str,
489 transaction_id: TransactionId,
490 ) -> bool {
491 let removed = self.remove_label(node_id, label);
492 if removed {
493 self.property_undo_log
494 .write()
495 .entry(transaction_id)
496 .or_default()
497 .push(PropertyUndoEntry::LabelRemoved {
498 node_id,
499 label: label.to_string(),
500 });
501 }
502 removed
503 }
504
505 #[cfg(feature = "temporal")]
507 pub fn remove_label_versioned(
508 &self,
509 node_id: NodeId,
510 label: &str,
511 transaction_id: TransactionId,
512 ) -> bool {
513 let label_id = {
514 let reg = self.label_registry.read();
515 match reg.get_id(label) {
516 Some(id) => id,
517 None => return false,
518 }
519 };
520
521 let mut node_labels = self.node_labels.write();
522 let current = node_labels
523 .get(&node_id)
524 .and_then(|log| log.latest())
525 .cloned()
526 .unwrap_or_default();
527 if !current.contains(&label_id) {
528 return false;
529 }
530 let mut new_set = current;
531 new_set.remove(&label_id);
532 node_labels
533 .entry(node_id)
534 .or_default()
535 .append(EpochId::PENDING, new_set);
536 drop(node_labels);
537
538 let mut index = self.label_index.write();
540 if (label_id as usize) < index.len() {
541 index[label_id as usize].remove(&node_id);
542 }
543
544 self.property_undo_log
546 .write()
547 .entry(transaction_id)
548 .or_default()
549 .push(PropertyUndoEntry::LabelRemoved {
550 node_id,
551 label: label.to_string(),
552 });
553
554 true
555 }
556}