Skip to main content

grafeo_core/graph/lpg/store/
schema.rs

1//! Schema, label, edge-type, and property-key methods for [`LpgStore`].
2
3use super::{LpgStore, PropertyUndoEntry};
4#[cfg(feature = "temporal")]
5use grafeo_common::types::EpochId;
6use grafeo_common::types::{NodeId, TransactionId};
7use grafeo_common::utils::hash::FxHashMap;
8
9impl LpgStore {
10    /// Adds a label to a node.
11    ///
12    /// Returns true if the label was added, false if the node doesn't exist
13    /// or already has the label.
14    #[cfg(not(feature = "tiered-storage"))]
15    pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
16        let epoch = self.current_epoch();
17
18        // Check if node exists
19        let nodes = self.nodes.read();
20        if let Some(chain) = nodes.get(&node_id) {
21            if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
22                return false;
23            }
24        } else {
25            return false;
26        }
27        drop(nodes);
28
29        // Get or create label ID
30        let label_id = self.get_or_create_label_id(label);
31
32        // Add to node_labels map
33        let mut node_labels = self.node_labels.write();
34
35        #[cfg(not(feature = "temporal"))]
36        {
37            let label_set = node_labels.entry(node_id).or_default();
38            if label_set.contains(&label_id) {
39                return false;
40            }
41            label_set.insert(label_id);
42        }
43
44        #[cfg(feature = "temporal")]
45        {
46            let current = node_labels
47                .get(&node_id)
48                .and_then(|log| log.latest())
49                .cloned()
50                .unwrap_or_default();
51            if current.contains(&label_id) {
52                return false;
53            }
54            let mut new_set = current;
55            new_set.insert(label_id);
56            node_labels
57                .entry(node_id)
58                .or_default()
59                .append(self.current_epoch(), new_set);
60        }
61
62        drop(node_labels);
63
64        // Add to label_index
65        let mut index = self.label_index.write();
66        if (label_id as usize) >= index.len() {
67            index.resize(label_id as usize + 1, FxHashMap::default());
68        }
69        index[label_id as usize].insert(node_id, ());
70
71        // Update label count in node record
72        #[cfg(not(feature = "temporal"))]
73        if let Some(chain) = self.nodes.write().get_mut(&node_id)
74            && let Some(record) = chain.latest_mut()
75        {
76            let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
77            record.set_label_count(count as u16);
78        }
79
80        true
81    }
82
83    /// Adds a label to a node.
84    /// (Tiered storage version)
85    #[cfg(feature = "tiered-storage")]
86    pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
87        let epoch = self.current_epoch();
88
89        // Check if node exists
90        let versions = self.node_versions.read();
91        if let Some(index) = versions.get(&node_id) {
92            if let Some(vref) = index.visible_at(epoch) {
93                if let Some(record) = self.read_node_record(&vref) {
94                    if record.is_deleted() {
95                        return false;
96                    }
97                } else {
98                    return false;
99                }
100            } else {
101                return false;
102            }
103        } else {
104            return false;
105        }
106        drop(versions);
107
108        // Get or create label ID
109        let label_id = self.get_or_create_label_id(label);
110
111        // Add to node_labels map
112        let mut node_labels = self.node_labels.write();
113
114        #[cfg(not(feature = "temporal"))]
115        {
116            let label_set = node_labels.entry(node_id).or_default();
117            if label_set.contains(&label_id) {
118                return false;
119            }
120            label_set.insert(label_id);
121        }
122
123        #[cfg(feature = "temporal")]
124        {
125            let current = node_labels
126                .get(&node_id)
127                .and_then(|log| log.latest())
128                .cloned()
129                .unwrap_or_default();
130            if current.contains(&label_id) {
131                return false;
132            }
133            let mut new_set = current;
134            new_set.insert(label_id);
135            node_labels
136                .entry(node_id)
137                .or_default()
138                .append(self.current_epoch(), new_set);
139        }
140
141        drop(node_labels);
142
143        // Add to label_index
144        let mut index = self.label_index.write();
145        if (label_id as usize) >= index.len() {
146            index.resize(label_id as usize + 1, FxHashMap::default());
147        }
148        index[label_id as usize].insert(node_id, ());
149
150        true
151    }
152
153    /// Removes a label from a node.
154    ///
155    /// Returns true if the label was removed, false if the node doesn't exist
156    /// or doesn't have the label.
157    #[cfg(not(feature = "tiered-storage"))]
158    pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
159        let epoch = self.current_epoch();
160
161        // Check if node exists
162        let nodes = self.nodes.read();
163        if let Some(chain) = nodes.get(&node_id) {
164            if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
165                return false;
166            }
167        } else {
168            return false;
169        }
170        drop(nodes);
171
172        // Get label ID
173        let label_id = {
174            let label_ids = self.label_to_id.read();
175            match label_ids.get(label) {
176                Some(&id) => id,
177                None => return false, // Label doesn't exist
178            }
179        };
180
181        // Remove from node_labels map
182        let mut node_labels = self.node_labels.write();
183
184        #[cfg(not(feature = "temporal"))]
185        {
186            if let Some(label_set) = node_labels.get_mut(&node_id) {
187                if !label_set.remove(&label_id) {
188                    return false;
189                }
190            } else {
191                return false;
192            }
193        }
194
195        #[cfg(feature = "temporal")]
196        {
197            let current = node_labels
198                .get(&node_id)
199                .and_then(|log| log.latest())
200                .cloned()
201                .unwrap_or_default();
202            if !current.contains(&label_id) {
203                return false;
204            }
205            let mut new_set = current;
206            new_set.remove(&label_id);
207            node_labels
208                .entry(node_id)
209                .or_default()
210                .append(self.current_epoch(), new_set);
211        }
212
213        drop(node_labels);
214
215        // Remove from label_index
216        let mut index = self.label_index.write();
217        if (label_id as usize) < index.len() {
218            index[label_id as usize].remove(&node_id);
219        }
220
221        // Update label count in node record
222        #[cfg(not(feature = "temporal"))]
223        if let Some(chain) = self.nodes.write().get_mut(&node_id)
224            && let Some(record) = chain.latest_mut()
225        {
226            let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
227            record.set_label_count(count as u16);
228        }
229
230        true
231    }
232
233    /// Removes a label from a node.
234    /// (Tiered storage version)
235    #[cfg(feature = "tiered-storage")]
236    pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
237        let epoch = self.current_epoch();
238
239        // Check if node exists
240        let versions = self.node_versions.read();
241        if let Some(index) = versions.get(&node_id) {
242            if let Some(vref) = index.visible_at(epoch) {
243                if let Some(record) = self.read_node_record(&vref) {
244                    if record.is_deleted() {
245                        return false;
246                    }
247                } else {
248                    return false;
249                }
250            } else {
251                return false;
252            }
253        } else {
254            return false;
255        }
256        drop(versions);
257
258        // Get label ID
259        let label_id = {
260            let label_ids = self.label_to_id.read();
261            match label_ids.get(label) {
262                Some(&id) => id,
263                None => return false,
264            }
265        };
266
267        // Remove from node_labels map
268        let mut node_labels = self.node_labels.write();
269
270        #[cfg(not(feature = "temporal"))]
271        {
272            if let Some(label_set) = node_labels.get_mut(&node_id) {
273                if !label_set.remove(&label_id) {
274                    return false;
275                }
276            } else {
277                return false;
278            }
279        }
280
281        #[cfg(feature = "temporal")]
282        {
283            let current = node_labels
284                .get(&node_id)
285                .and_then(|log| log.latest())
286                .cloned()
287                .unwrap_or_default();
288            if !current.contains(&label_id) {
289                return false;
290            }
291            let mut new_set = current;
292            new_set.remove(&label_id);
293            node_labels
294                .entry(node_id)
295                .or_default()
296                .append(self.current_epoch(), new_set);
297        }
298
299        drop(node_labels);
300
301        // Remove from label_index
302        let mut index = self.label_index.write();
303        if (label_id as usize) < index.len() {
304            index[label_id as usize].remove(&node_id);
305        }
306
307        true
308    }
309
310    /// Returns all nodes with a specific label.
311    ///
312    /// Uses the label index for O(1) lookup per label. Returns a snapshot -
313    /// concurrent modifications won't affect the returned vector. Results are
314    /// sorted by NodeId for deterministic iteration order.
315    pub fn nodes_by_label(&self, label: &str) -> Vec<NodeId> {
316        let label_to_id = self.label_to_id.read();
317        if let Some(&label_id) = label_to_id.get(label) {
318            let index = self.label_index.read();
319            if let Some(set) = index.get(label_id as usize) {
320                let mut ids: Vec<NodeId> = set.keys().copied().collect();
321                ids.sort_unstable();
322                return ids;
323            }
324        }
325        Vec::new()
326    }
327
328    /// Returns the number of distinct labels in the store.
329    #[must_use]
330    pub fn label_count(&self) -> usize {
331        self.id_to_label.read().len()
332    }
333
334    /// Returns the number of distinct property keys in the store.
335    ///
336    /// This counts unique property keys across both nodes and edges.
337    #[must_use]
338    pub fn property_key_count(&self) -> usize {
339        let node_keys = self.node_properties.column_count();
340        let edge_keys = self.edge_properties.column_count();
341        // Note: This may count some keys twice if the same key is used
342        // for both nodes and edges. A more precise count would require
343        // tracking unique keys across both storages.
344        node_keys + edge_keys
345    }
346
347    /// Returns the number of distinct edge types in the store.
348    #[must_use]
349    pub fn edge_type_count(&self) -> usize {
350        self.id_to_edge_type.read().len()
351    }
352
353    /// Returns all label names in the database.
354    pub fn all_labels(&self) -> Vec<String> {
355        self.id_to_label
356            .read()
357            .iter()
358            .map(|s| s.to_string())
359            .collect()
360    }
361
362    /// Returns all edge type names in the database.
363    pub fn all_edge_types(&self) -> Vec<String> {
364        self.id_to_edge_type
365            .read()
366            .iter()
367            .map(|s| s.to_string())
368            .collect()
369    }
370
371    /// Returns all property keys used in the database.
372    pub fn all_property_keys(&self) -> Vec<String> {
373        let mut keys = std::collections::HashSet::new();
374        for key in self.node_properties.keys() {
375            keys.insert(key.to_string());
376        }
377        for key in self.edge_properties.keys() {
378            keys.insert(key.to_string());
379        }
380        keys.into_iter().collect()
381    }
382
383    /// Returns the next node ID that will be allocated.
384    #[must_use]
385    pub fn peek_next_node_id(&self) -> u64 {
386        self.next_node_id.load(std::sync::atomic::Ordering::Relaxed)
387    }
388
389    /// Returns the next edge ID that will be allocated.
390    #[must_use]
391    pub fn peek_next_edge_id(&self) -> u64 {
392        self.next_edge_id.load(std::sync::atomic::Ordering::Relaxed)
393    }
394
395    /// Adds a label to a node within a transaction, recording the change
396    /// in the undo log so it can be reversed on rollback.
397    #[cfg(not(feature = "temporal"))]
398    pub fn add_label_versioned(
399        &self,
400        node_id: NodeId,
401        label: &str,
402        transaction_id: TransactionId,
403    ) -> bool {
404        let added = self.add_label(node_id, label);
405        if added {
406            self.property_undo_log
407                .write()
408                .entry(transaction_id)
409                .or_default()
410                .push(PropertyUndoEntry::LabelAdded {
411                    node_id,
412                    label: label.to_string(),
413                });
414        }
415        added
416    }
417
418    /// Adds a label to a node within a transaction (temporal version).
419    ///
420    /// Uses `EpochId::PENDING` for the version log entry, finalized on commit.
421    #[cfg(feature = "temporal")]
422    pub fn add_label_versioned(
423        &self,
424        node_id: NodeId,
425        label: &str,
426        transaction_id: TransactionId,
427    ) -> bool {
428        let label_id = self.get_or_create_label_id(label);
429
430        let mut node_labels = self.node_labels.write();
431        let current = node_labels
432            .get(&node_id)
433            .and_then(|log| log.latest())
434            .cloned()
435            .unwrap_or_default();
436        if current.contains(&label_id) {
437            return false;
438        }
439        let mut new_set = current;
440        new_set.insert(label_id);
441        node_labels
442            .entry(node_id)
443            .or_default()
444            .append(EpochId::PENDING, new_set);
445        drop(node_labels);
446
447        // Update label_index
448        let mut index = self.label_index.write();
449        if (label_id as usize) >= index.len() {
450            index.resize(label_id as usize + 1, FxHashMap::default());
451        }
452        index[label_id as usize].insert(node_id, ());
453
454        // Record in undo log
455        self.property_undo_log
456            .write()
457            .entry(transaction_id)
458            .or_default()
459            .push(PropertyUndoEntry::LabelAdded {
460                node_id,
461                label: label.to_string(),
462            });
463
464        true
465    }
466
467    /// Removes a label from a node within a transaction, recording the change
468    /// in the undo log so it can be restored on rollback.
469    #[cfg(not(feature = "temporal"))]
470    pub fn remove_label_versioned(
471        &self,
472        node_id: NodeId,
473        label: &str,
474        transaction_id: TransactionId,
475    ) -> bool {
476        let removed = self.remove_label(node_id, label);
477        if removed {
478            self.property_undo_log
479                .write()
480                .entry(transaction_id)
481                .or_default()
482                .push(PropertyUndoEntry::LabelRemoved {
483                    node_id,
484                    label: label.to_string(),
485                });
486        }
487        removed
488    }
489
490    /// Removes a label from a node within a transaction (temporal version).
491    #[cfg(feature = "temporal")]
492    pub fn remove_label_versioned(
493        &self,
494        node_id: NodeId,
495        label: &str,
496        transaction_id: TransactionId,
497    ) -> bool {
498        let label_id = {
499            let label_ids = self.label_to_id.read();
500            match label_ids.get(label) {
501                Some(&id) => id,
502                None => return false,
503            }
504        };
505
506        let mut node_labels = self.node_labels.write();
507        let current = node_labels
508            .get(&node_id)
509            .and_then(|log| log.latest())
510            .cloned()
511            .unwrap_or_default();
512        if !current.contains(&label_id) {
513            return false;
514        }
515        let mut new_set = current;
516        new_set.remove(&label_id);
517        node_labels
518            .entry(node_id)
519            .or_default()
520            .append(EpochId::PENDING, new_set);
521        drop(node_labels);
522
523        // Update label_index
524        let mut index = self.label_index.write();
525        if (label_id as usize) < index.len() {
526            index[label_id as usize].remove(&node_id);
527        }
528
529        // Record in undo log
530        self.property_undo_log
531            .write()
532            .entry(transaction_id)
533            .or_default()
534            .push(PropertyUndoEntry::LabelRemoved {
535                node_id,
536                label: label.to_string(),
537            });
538
539        true
540    }
541}