grafeo_core/graph/lpg/store/
schema.rs1use super::{LpgStore, PropertyUndoEntry};
4#[cfg(feature = "temporal")]
5use grafeo_common::types::EpochId;
6use grafeo_common::types::{NodeId, TransactionId};
7use grafeo_common::utils::hash::FxHashMap;
8
9impl LpgStore {
10 #[cfg(not(feature = "tiered-storage"))]
15 pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
16 let epoch = self.current_epoch();
17
18 let nodes = self.nodes.read();
20 if let Some(chain) = nodes.get(&node_id) {
21 if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
22 return false;
23 }
24 } else {
25 return false;
26 }
27 drop(nodes);
28
29 let label_id = self.get_or_create_label_id(label);
31
32 let mut node_labels = self.node_labels.write();
34
35 #[cfg(not(feature = "temporal"))]
36 {
37 let label_set = node_labels.entry(node_id).or_default();
38 if label_set.contains(&label_id) {
39 return false;
40 }
41 label_set.insert(label_id);
42 }
43
44 #[cfg(feature = "temporal")]
45 {
46 let current = node_labels
47 .get(&node_id)
48 .and_then(|log| log.latest())
49 .cloned()
50 .unwrap_or_default();
51 if current.contains(&label_id) {
52 return false;
53 }
54 let mut new_set = current;
55 new_set.insert(label_id);
56 node_labels
57 .entry(node_id)
58 .or_default()
59 .append(self.current_epoch(), new_set);
60 }
61
62 drop(node_labels);
63
64 let mut index = self.label_index.write();
66 if (label_id as usize) >= index.len() {
67 index.resize(label_id as usize + 1, FxHashMap::default());
68 }
69 index[label_id as usize].insert(node_id, ());
70
71 #[cfg(not(feature = "temporal"))]
73 if let Some(chain) = self.nodes.write().get_mut(&node_id)
74 && let Some(record) = chain.latest_mut()
75 {
76 let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
77 record.set_label_count(count as u16);
78 }
79
80 true
81 }
82
83 #[cfg(feature = "tiered-storage")]
86 pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
87 let epoch = self.current_epoch();
88
89 let versions = self.node_versions.read();
91 if let Some(index) = versions.get(&node_id) {
92 if let Some(vref) = index.visible_at(epoch) {
93 if let Some(record) = self.read_node_record(&vref) {
94 if record.is_deleted() {
95 return false;
96 }
97 } else {
98 return false;
99 }
100 } else {
101 return false;
102 }
103 } else {
104 return false;
105 }
106 drop(versions);
107
108 let label_id = self.get_or_create_label_id(label);
110
111 let mut node_labels = self.node_labels.write();
113
114 #[cfg(not(feature = "temporal"))]
115 {
116 let label_set = node_labels.entry(node_id).or_default();
117 if label_set.contains(&label_id) {
118 return false;
119 }
120 label_set.insert(label_id);
121 }
122
123 #[cfg(feature = "temporal")]
124 {
125 let current = node_labels
126 .get(&node_id)
127 .and_then(|log| log.latest())
128 .cloned()
129 .unwrap_or_default();
130 if current.contains(&label_id) {
131 return false;
132 }
133 let mut new_set = current;
134 new_set.insert(label_id);
135 node_labels
136 .entry(node_id)
137 .or_default()
138 .append(self.current_epoch(), new_set);
139 }
140
141 drop(node_labels);
142
143 let mut index = self.label_index.write();
145 if (label_id as usize) >= index.len() {
146 index.resize(label_id as usize + 1, FxHashMap::default());
147 }
148 index[label_id as usize].insert(node_id, ());
149
150 true
151 }
152
153 #[cfg(not(feature = "tiered-storage"))]
158 pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
159 let epoch = self.current_epoch();
160
161 let nodes = self.nodes.read();
163 if let Some(chain) = nodes.get(&node_id) {
164 if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
165 return false;
166 }
167 } else {
168 return false;
169 }
170 drop(nodes);
171
172 let label_id = {
174 let label_ids = self.label_to_id.read();
175 match label_ids.get(label) {
176 Some(&id) => id,
177 None => return false, }
179 };
180
181 let mut node_labels = self.node_labels.write();
183
184 #[cfg(not(feature = "temporal"))]
185 {
186 if let Some(label_set) = node_labels.get_mut(&node_id) {
187 if !label_set.remove(&label_id) {
188 return false;
189 }
190 } else {
191 return false;
192 }
193 }
194
195 #[cfg(feature = "temporal")]
196 {
197 let current = node_labels
198 .get(&node_id)
199 .and_then(|log| log.latest())
200 .cloned()
201 .unwrap_or_default();
202 if !current.contains(&label_id) {
203 return false;
204 }
205 let mut new_set = current;
206 new_set.remove(&label_id);
207 node_labels
208 .entry(node_id)
209 .or_default()
210 .append(self.current_epoch(), new_set);
211 }
212
213 drop(node_labels);
214
215 let mut index = self.label_index.write();
217 if (label_id as usize) < index.len() {
218 index[label_id as usize].remove(&node_id);
219 }
220
221 #[cfg(not(feature = "temporal"))]
223 if let Some(chain) = self.nodes.write().get_mut(&node_id)
224 && let Some(record) = chain.latest_mut()
225 {
226 let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
227 record.set_label_count(count as u16);
228 }
229
230 true
231 }
232
233 #[cfg(feature = "tiered-storage")]
236 pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
237 let epoch = self.current_epoch();
238
239 let versions = self.node_versions.read();
241 if let Some(index) = versions.get(&node_id) {
242 if let Some(vref) = index.visible_at(epoch) {
243 if let Some(record) = self.read_node_record(&vref) {
244 if record.is_deleted() {
245 return false;
246 }
247 } else {
248 return false;
249 }
250 } else {
251 return false;
252 }
253 } else {
254 return false;
255 }
256 drop(versions);
257
258 let label_id = {
260 let label_ids = self.label_to_id.read();
261 match label_ids.get(label) {
262 Some(&id) => id,
263 None => return false,
264 }
265 };
266
267 let mut node_labels = self.node_labels.write();
269
270 #[cfg(not(feature = "temporal"))]
271 {
272 if let Some(label_set) = node_labels.get_mut(&node_id) {
273 if !label_set.remove(&label_id) {
274 return false;
275 }
276 } else {
277 return false;
278 }
279 }
280
281 #[cfg(feature = "temporal")]
282 {
283 let current = node_labels
284 .get(&node_id)
285 .and_then(|log| log.latest())
286 .cloned()
287 .unwrap_or_default();
288 if !current.contains(&label_id) {
289 return false;
290 }
291 let mut new_set = current;
292 new_set.remove(&label_id);
293 node_labels
294 .entry(node_id)
295 .or_default()
296 .append(self.current_epoch(), new_set);
297 }
298
299 drop(node_labels);
300
301 let mut index = self.label_index.write();
303 if (label_id as usize) < index.len() {
304 index[label_id as usize].remove(&node_id);
305 }
306
307 true
308 }
309
310 pub fn nodes_by_label(&self, label: &str) -> Vec<NodeId> {
316 let label_to_id = self.label_to_id.read();
317 if let Some(&label_id) = label_to_id.get(label) {
318 let index = self.label_index.read();
319 if let Some(set) = index.get(label_id as usize) {
320 let mut ids: Vec<NodeId> = set.keys().copied().collect();
321 ids.sort_unstable();
322 return ids;
323 }
324 }
325 Vec::new()
326 }
327
328 #[must_use]
330 pub fn label_count(&self) -> usize {
331 self.id_to_label.read().len()
332 }
333
334 #[must_use]
338 pub fn property_key_count(&self) -> usize {
339 let node_keys = self.node_properties.column_count();
340 let edge_keys = self.edge_properties.column_count();
341 node_keys + edge_keys
345 }
346
347 #[must_use]
349 pub fn edge_type_count(&self) -> usize {
350 self.id_to_edge_type.read().len()
351 }
352
353 pub fn all_labels(&self) -> Vec<String> {
355 self.id_to_label
356 .read()
357 .iter()
358 .map(|s| s.to_string())
359 .collect()
360 }
361
362 pub fn all_edge_types(&self) -> Vec<String> {
364 self.id_to_edge_type
365 .read()
366 .iter()
367 .map(|s| s.to_string())
368 .collect()
369 }
370
371 pub fn all_property_keys(&self) -> Vec<String> {
373 let mut keys = std::collections::HashSet::new();
374 for key in self.node_properties.keys() {
375 keys.insert(key.to_string());
376 }
377 for key in self.edge_properties.keys() {
378 keys.insert(key.to_string());
379 }
380 keys.into_iter().collect()
381 }
382
383 #[must_use]
385 pub fn peek_next_node_id(&self) -> u64 {
386 self.next_node_id.load(std::sync::atomic::Ordering::Relaxed)
387 }
388
389 #[must_use]
391 pub fn peek_next_edge_id(&self) -> u64 {
392 self.next_edge_id.load(std::sync::atomic::Ordering::Relaxed)
393 }
394
395 #[cfg(not(feature = "temporal"))]
398 pub fn add_label_versioned(
399 &self,
400 node_id: NodeId,
401 label: &str,
402 transaction_id: TransactionId,
403 ) -> bool {
404 let added = self.add_label(node_id, label);
405 if added {
406 self.property_undo_log
407 .write()
408 .entry(transaction_id)
409 .or_default()
410 .push(PropertyUndoEntry::LabelAdded {
411 node_id,
412 label: label.to_string(),
413 });
414 }
415 added
416 }
417
418 #[cfg(feature = "temporal")]
422 pub fn add_label_versioned(
423 &self,
424 node_id: NodeId,
425 label: &str,
426 transaction_id: TransactionId,
427 ) -> bool {
428 let label_id = self.get_or_create_label_id(label);
429
430 let mut node_labels = self.node_labels.write();
431 let current = node_labels
432 .get(&node_id)
433 .and_then(|log| log.latest())
434 .cloned()
435 .unwrap_or_default();
436 if current.contains(&label_id) {
437 return false;
438 }
439 let mut new_set = current;
440 new_set.insert(label_id);
441 node_labels
442 .entry(node_id)
443 .or_default()
444 .append(EpochId::PENDING, new_set);
445 drop(node_labels);
446
447 let mut index = self.label_index.write();
449 if (label_id as usize) >= index.len() {
450 index.resize(label_id as usize + 1, FxHashMap::default());
451 }
452 index[label_id as usize].insert(node_id, ());
453
454 self.property_undo_log
456 .write()
457 .entry(transaction_id)
458 .or_default()
459 .push(PropertyUndoEntry::LabelAdded {
460 node_id,
461 label: label.to_string(),
462 });
463
464 true
465 }
466
467 #[cfg(not(feature = "temporal"))]
470 pub fn remove_label_versioned(
471 &self,
472 node_id: NodeId,
473 label: &str,
474 transaction_id: TransactionId,
475 ) -> bool {
476 let removed = self.remove_label(node_id, label);
477 if removed {
478 self.property_undo_log
479 .write()
480 .entry(transaction_id)
481 .or_default()
482 .push(PropertyUndoEntry::LabelRemoved {
483 node_id,
484 label: label.to_string(),
485 });
486 }
487 removed
488 }
489
490 #[cfg(feature = "temporal")]
492 pub fn remove_label_versioned(
493 &self,
494 node_id: NodeId,
495 label: &str,
496 transaction_id: TransactionId,
497 ) -> bool {
498 let label_id = {
499 let label_ids = self.label_to_id.read();
500 match label_ids.get(label) {
501 Some(&id) => id,
502 None => return false,
503 }
504 };
505
506 let mut node_labels = self.node_labels.write();
507 let current = node_labels
508 .get(&node_id)
509 .and_then(|log| log.latest())
510 .cloned()
511 .unwrap_or_default();
512 if !current.contains(&label_id) {
513 return false;
514 }
515 let mut new_set = current;
516 new_set.remove(&label_id);
517 node_labels
518 .entry(node_id)
519 .or_default()
520 .append(EpochId::PENDING, new_set);
521 drop(node_labels);
522
523 let mut index = self.label_index.write();
525 if (label_id as usize) < index.len() {
526 index[label_id as usize].remove(&node_id);
527 }
528
529 self.property_undo_log
531 .write()
532 .entry(transaction_id)
533 .or_default()
534 .push(PropertyUndoEntry::LabelRemoved {
535 node_id,
536 label: label.to_string(),
537 });
538
539 true
540 }
541}