Skip to main content

grafeo_core/graph/lpg/store/
schema.rs

1//! Schema, label, edge-type, and property-key methods for [`LpgStore`].
2
3use super::{LpgStore, PropertyUndoEntry};
4#[cfg(feature = "temporal")]
5use grafeo_common::types::EpochId;
6use grafeo_common::types::{NodeId, TransactionId};
7use grafeo_common::utils::hash::FxHashMap;
8
9impl LpgStore {
10    /// Adds a label to a node.
11    ///
12    /// Returns true if the label was added, false if the node doesn't exist
13    /// or already has the label.
14    #[cfg(not(feature = "tiered-storage"))]
15    pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
16        let epoch = self.current_epoch();
17
18        // Check if node exists
19        let nodes = self.nodes.read();
20        if let Some(chain) = nodes.get(&node_id) {
21            if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
22                return false;
23            }
24        } else {
25            return false;
26        }
27        drop(nodes);
28
29        // Get or create label ID
30        let label_id = self.get_or_create_label_id(label);
31
32        // Add to node_labels map
33        let mut node_labels = self.node_labels.write();
34
35        #[cfg(not(feature = "temporal"))]
36        {
37            let label_set = node_labels.entry(node_id).or_default();
38            if label_set.contains(&label_id) {
39                return false;
40            }
41            label_set.insert(label_id);
42        }
43
44        #[cfg(feature = "temporal")]
45        {
46            let current = node_labels
47                .get(&node_id)
48                .and_then(|log| log.latest())
49                .cloned()
50                .unwrap_or_default();
51            if current.contains(&label_id) {
52                return false;
53            }
54            let mut new_set = current;
55            new_set.insert(label_id);
56            node_labels
57                .entry(node_id)
58                .or_default()
59                .append(self.current_epoch(), new_set);
60        }
61
62        drop(node_labels);
63
64        // Add to label_index
65        let mut index = self.label_index.write();
66        if (label_id as usize) >= index.len() {
67            index.resize(label_id as usize + 1, FxHashMap::default());
68        }
69        index[label_id as usize].insert(node_id, ());
70
71        // Update label count in node record
72        #[cfg(not(feature = "temporal"))]
73        if let Some(chain) = self.nodes.write().get_mut(&node_id)
74            && let Some(record) = chain.latest_mut()
75        {
76            let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
77            record.set_label_count(count as u16);
78        }
79
80        true
81    }
82
83    /// Adds a label to a node.
84    /// (Tiered storage version)
85    #[cfg(feature = "tiered-storage")]
86    pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
87        let epoch = self.current_epoch();
88
89        // Check if node exists
90        let versions = self.node_versions.read();
91        if let Some(index) = versions.get(&node_id) {
92            if let Some(vref) = index.visible_at(epoch) {
93                if let Some(record) = self.read_node_record(&vref) {
94                    if record.is_deleted() {
95                        return false;
96                    }
97                } else {
98                    return false;
99                }
100            } else {
101                return false;
102            }
103        } else {
104            return false;
105        }
106        drop(versions);
107
108        // Get or create label ID
109        let label_id = self.get_or_create_label_id(label);
110
111        // Add to node_labels map
112        let mut node_labels = self.node_labels.write();
113
114        #[cfg(not(feature = "temporal"))]
115        {
116            let label_set = node_labels.entry(node_id).or_default();
117            if label_set.contains(&label_id) {
118                return false;
119            }
120            label_set.insert(label_id);
121        }
122
123        #[cfg(feature = "temporal")]
124        {
125            let current = node_labels
126                .get(&node_id)
127                .and_then(|log| log.latest())
128                .cloned()
129                .unwrap_or_default();
130            if current.contains(&label_id) {
131                return false;
132            }
133            let mut new_set = current;
134            new_set.insert(label_id);
135            node_labels
136                .entry(node_id)
137                .or_default()
138                .append(self.current_epoch(), new_set);
139        }
140
141        drop(node_labels);
142
143        // Add to label_index
144        let mut index = self.label_index.write();
145        if (label_id as usize) >= index.len() {
146            index.resize(label_id as usize + 1, FxHashMap::default());
147        }
148        index[label_id as usize].insert(node_id, ());
149
150        true
151    }
152
153    /// Removes a label from a node.
154    ///
155    /// Returns true if the label was removed, false if the node doesn't exist
156    /// or doesn't have the label.
157    #[cfg(not(feature = "tiered-storage"))]
158    pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
159        let epoch = self.current_epoch();
160
161        // Check if node exists
162        let nodes = self.nodes.read();
163        if let Some(chain) = nodes.get(&node_id) {
164            if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
165                return false;
166            }
167        } else {
168            return false;
169        }
170        drop(nodes);
171
172        // Get label ID
173        let label_id = {
174            let reg = self.label_registry.read();
175            match reg.get_id(label) {
176                Some(id) => id,
177                None => return false, // Label doesn't exist
178            }
179        };
180
181        // Remove from node_labels map
182        let mut node_labels = self.node_labels.write();
183
184        #[cfg(not(feature = "temporal"))]
185        {
186            if let Some(label_set) = node_labels.get_mut(&node_id) {
187                if !label_set.remove(&label_id) {
188                    return false;
189                }
190            } else {
191                return false;
192            }
193        }
194
195        #[cfg(feature = "temporal")]
196        {
197            let current = node_labels
198                .get(&node_id)
199                .and_then(|log| log.latest())
200                .cloned()
201                .unwrap_or_default();
202            if !current.contains(&label_id) {
203                return false;
204            }
205            let mut new_set = current;
206            new_set.remove(&label_id);
207            node_labels
208                .entry(node_id)
209                .or_default()
210                .append(self.current_epoch(), new_set);
211        }
212
213        drop(node_labels);
214
215        // Remove from label_index
216        let mut index = self.label_index.write();
217        if (label_id as usize) < index.len() {
218            index[label_id as usize].remove(&node_id);
219        }
220
221        // Update label count in node record
222        #[cfg(not(feature = "temporal"))]
223        if let Some(chain) = self.nodes.write().get_mut(&node_id)
224            && let Some(record) = chain.latest_mut()
225        {
226            let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
227            record.set_label_count(count as u16);
228        }
229
230        true
231    }
232
233    /// Removes a label from a node.
234    /// (Tiered storage version)
235    #[cfg(feature = "tiered-storage")]
236    pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
237        let epoch = self.current_epoch();
238
239        // Check if node exists
240        let versions = self.node_versions.read();
241        if let Some(index) = versions.get(&node_id) {
242            if let Some(vref) = index.visible_at(epoch) {
243                if let Some(record) = self.read_node_record(&vref) {
244                    if record.is_deleted() {
245                        return false;
246                    }
247                } else {
248                    return false;
249                }
250            } else {
251                return false;
252            }
253        } else {
254            return false;
255        }
256        drop(versions);
257
258        // Get label ID
259        let label_id = {
260            let reg = self.label_registry.read();
261            match reg.get_id(label) {
262                Some(id) => id,
263                None => return false,
264            }
265        };
266
267        // Remove from node_labels map
268        let mut node_labels = self.node_labels.write();
269
270        #[cfg(not(feature = "temporal"))]
271        {
272            if let Some(label_set) = node_labels.get_mut(&node_id) {
273                if !label_set.remove(&label_id) {
274                    return false;
275                }
276            } else {
277                return false;
278            }
279        }
280
281        #[cfg(feature = "temporal")]
282        {
283            let current = node_labels
284                .get(&node_id)
285                .and_then(|log| log.latest())
286                .cloned()
287                .unwrap_or_default();
288            if !current.contains(&label_id) {
289                return false;
290            }
291            let mut new_set = current;
292            new_set.remove(&label_id);
293            node_labels
294                .entry(node_id)
295                .or_default()
296                .append(self.current_epoch(), new_set);
297        }
298
299        drop(node_labels);
300
301        // Remove from label_index
302        let mut index = self.label_index.write();
303        if (label_id as usize) < index.len() {
304            index[label_id as usize].remove(&node_id);
305        }
306
307        true
308    }
309
310    /// Returns all nodes with a specific label.
311    ///
312    /// Uses the label index for O(1) lookup per label. Returns a snapshot -
313    /// concurrent modifications won't affect the returned vector. Results are
314    /// sorted by NodeId for deterministic iteration order.
315    pub fn nodes_by_label(&self, label: &str) -> Vec<NodeId> {
316        let reg = self.label_registry.read();
317        if let Some(label_id) = reg.get_id(label) {
318            let index = self.label_index.read();
319            if let Some(set) = index.get(label_id as usize) {
320                let mut ids: Vec<NodeId> = set.keys().copied().collect();
321                ids.sort_unstable();
322                return ids;
323            }
324        }
325        Vec::new()
326    }
327
328    /// Returns the number of distinct labels in the store.
329    #[must_use]
330    pub fn label_count(&self) -> usize {
331        self.label_registry.read().len()
332    }
333
334    /// Returns the number of distinct property keys in the store.
335    ///
336    /// This counts unique property keys across both nodes and edges.
337    #[must_use]
338    pub fn property_key_count(&self) -> usize {
339        let node_keys = self.node_properties.column_count();
340        let edge_keys = self.edge_properties.column_count();
341        // Note: This may count some keys twice if the same key is used
342        // for both nodes and edges. A more precise count would require
343        // tracking unique keys across both storages.
344        node_keys + edge_keys
345    }
346
347    /// Returns the number of distinct edge types in the store.
348    #[must_use]
349    pub fn edge_type_count(&self) -> usize {
350        self.id_to_edge_type.read().len()
351    }
352
353    /// Returns all label names in the database.
354    pub fn all_labels(&self) -> Vec<String> {
355        self.label_registry
356            .read()
357            .names()
358            .iter()
359            .map(|s| s.to_string())
360            .collect()
361    }
362
363    /// Returns all edge type names in the database.
364    pub fn all_edge_types(&self) -> Vec<String> {
365        self.id_to_edge_type
366            .read()
367            .iter()
368            .map(|s| s.to_string())
369            .collect()
370    }
371
372    /// Returns all property keys used in the database.
373    pub fn all_property_keys(&self) -> Vec<String> {
374        let mut keys = std::collections::HashSet::new();
375        for key in self.node_properties.keys() {
376            keys.insert(key.to_string());
377        }
378        for key in self.edge_properties.keys() {
379            keys.insert(key.to_string());
380        }
381        keys.into_iter().collect()
382    }
383
384    /// Returns the next node ID that will be allocated.
385    #[must_use]
386    pub fn peek_next_node_id(&self) -> u64 {
387        self.next_node_id.load(std::sync::atomic::Ordering::Relaxed)
388    }
389
390    /// Returns the next edge ID that will be allocated.
391    #[must_use]
392    pub fn peek_next_edge_id(&self) -> u64 {
393        self.next_edge_id.load(std::sync::atomic::Ordering::Relaxed)
394    }
395
396    /// Adds a label to a node within a transaction, recording the change
397    /// in the undo log so it can be reversed on rollback.
398    #[cfg(not(feature = "temporal"))]
399    pub fn add_label_versioned(
400        &self,
401        node_id: NodeId,
402        label: &str,
403        transaction_id: TransactionId,
404    ) -> bool {
405        let added = self.add_label(node_id, label);
406        if added {
407            self.property_undo_log
408                .write()
409                .entry(transaction_id)
410                .or_default()
411                .push(PropertyUndoEntry::LabelAdded {
412                    node_id,
413                    label: label.to_string(),
414                });
415        }
416        added
417    }
418
419    /// Adds a label to a node within a transaction (temporal version).
420    ///
421    /// Uses `EpochId::PENDING` for the version log entry, finalized on commit.
422    #[cfg(feature = "temporal")]
423    pub fn add_label_versioned(
424        &self,
425        node_id: NodeId,
426        label: &str,
427        transaction_id: TransactionId,
428    ) -> bool {
429        let label_id = self.get_or_create_label_id(label);
430
431        let mut node_labels = self.node_labels.write();
432        let current = node_labels
433            .get(&node_id)
434            .and_then(|log| log.latest())
435            .cloned()
436            .unwrap_or_default();
437        if current.contains(&label_id) {
438            return false;
439        }
440        let mut new_set = current;
441        new_set.insert(label_id);
442        node_labels
443            .entry(node_id)
444            .or_default()
445            .append(EpochId::PENDING, new_set);
446        drop(node_labels);
447
448        // Update label_index
449        let mut index = self.label_index.write();
450        if (label_id as usize) >= index.len() {
451            index.resize(label_id as usize + 1, FxHashMap::default());
452        }
453        index[label_id as usize].insert(node_id, ());
454
455        // Record in undo log
456        self.property_undo_log
457            .write()
458            .entry(transaction_id)
459            .or_default()
460            .push(PropertyUndoEntry::LabelAdded {
461                node_id,
462                label: label.to_string(),
463            });
464
465        true
466    }
467
468    /// Removes a label from a node within a transaction, recording the change
469    /// in the undo log so it can be restored on rollback.
470    #[cfg(not(feature = "temporal"))]
471    pub fn remove_label_versioned(
472        &self,
473        node_id: NodeId,
474        label: &str,
475        transaction_id: TransactionId,
476    ) -> bool {
477        let removed = self.remove_label(node_id, label);
478        if removed {
479            self.property_undo_log
480                .write()
481                .entry(transaction_id)
482                .or_default()
483                .push(PropertyUndoEntry::LabelRemoved {
484                    node_id,
485                    label: label.to_string(),
486                });
487        }
488        removed
489    }
490
491    /// Removes a label from a node within a transaction (temporal version).
492    #[cfg(feature = "temporal")]
493    pub fn remove_label_versioned(
494        &self,
495        node_id: NodeId,
496        label: &str,
497        transaction_id: TransactionId,
498    ) -> bool {
499        let label_id = {
500            let reg = self.label_registry.read();
501            match reg.get_id(label) {
502                Some(id) => id,
503                None => return false,
504            }
505        };
506
507        let mut node_labels = self.node_labels.write();
508        let current = node_labels
509            .get(&node_id)
510            .and_then(|log| log.latest())
511            .cloned()
512            .unwrap_or_default();
513        if !current.contains(&label_id) {
514            return false;
515        }
516        let mut new_set = current;
517        new_set.remove(&label_id);
518        node_labels
519            .entry(node_id)
520            .or_default()
521            .append(EpochId::PENDING, new_set);
522        drop(node_labels);
523
524        // Update label_index
525        let mut index = self.label_index.write();
526        if (label_id as usize) < index.len() {
527            index[label_id as usize].remove(&node_id);
528        }
529
530        // Record in undo log
531        self.property_undo_log
532            .write()
533            .entry(transaction_id)
534            .or_default()
535            .push(PropertyUndoEntry::LabelRemoved {
536                node_id,
537                label: label.to_string(),
538            });
539
540        true
541    }
542}