Skip to main content

grafeo_core/graph/lpg/store/
schema.rs

1//! Schema, label, edge-type, and property-key methods for [`LpgStore`].
2
3use super::{LpgStore, PropertyUndoEntry};
4#[cfg(feature = "temporal")]
5use grafeo_common::types::EpochId;
6use grafeo_common::types::{NodeId, TransactionId};
7use grafeo_common::utils::hash::FxHashMap;
8
9impl LpgStore {
10    /// Adds a label to a node.
11    ///
12    /// Returns true if the label was added, false if the node doesn't exist
13    /// or already has the label.
14    #[cfg(not(feature = "tiered-storage"))]
15    pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
16        let epoch = self.current_epoch();
17
18        // Check if node exists
19        let nodes = self.nodes.read();
20        if let Some(chain) = nodes.get(&node_id) {
21            if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
22                return false;
23            }
24        } else {
25            return false;
26        }
27        drop(nodes);
28
29        // Get or create label ID
30        let label_id = self.get_or_create_label_id(label);
31
32        // Add to node_labels map
33        let mut node_labels = self.node_labels.write();
34
35        #[cfg(not(feature = "temporal"))]
36        {
37            let label_set = node_labels.entry(node_id).or_default();
38            if label_set.contains(&label_id) {
39                return false;
40            }
41            label_set.insert(label_id);
42        }
43
44        #[cfg(feature = "temporal")]
45        {
46            let current = node_labels
47                .get(&node_id)
48                .and_then(|log| log.latest())
49                .cloned()
50                .unwrap_or_default();
51            if current.contains(&label_id) {
52                return false;
53            }
54            let mut new_set = current;
55            new_set.insert(label_id);
56            node_labels
57                .entry(node_id)
58                .or_default()
59                .append(self.current_epoch(), new_set);
60        }
61
62        drop(node_labels);
63
64        // Add to label_index
65        let mut index = self.label_index.write();
66        if (label_id as usize) >= index.len() {
67            index.resize(label_id as usize + 1, FxHashMap::default());
68        }
69        index[label_id as usize].insert(node_id, ());
70
71        // Update label count in node record
72        #[cfg(not(feature = "temporal"))]
73        if let Some(chain) = self.nodes.write().get_mut(&node_id)
74            && let Some(record) = chain.latest_mut()
75        {
76            let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
77            record.set_label_count(u16::try_from(count).unwrap_or(u16::MAX));
78        }
79
80        true
81    }
82
83    /// Adds a label to a node.
84    /// (Tiered storage version)
85    #[cfg(feature = "tiered-storage")]
86    pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
87        let epoch = self.current_epoch();
88
89        // Check if node exists
90        let versions = self.node_versions.read();
91        if let Some(index) = versions.get(&node_id) {
92            if let Some(vref) = index.visible_at(epoch) {
93                if let Some(record) = self.read_node_record(&vref) {
94                    if record.is_deleted() {
95                        return false;
96                    }
97                } else {
98                    return false;
99                }
100            } else {
101                return false;
102            }
103        } else {
104            return false;
105        }
106        drop(versions);
107
108        // Get or create label ID
109        let label_id = self.get_or_create_label_id(label);
110
111        // Add to node_labels map
112        let mut node_labels = self.node_labels.write();
113
114        #[cfg(not(feature = "temporal"))]
115        {
116            let label_set = node_labels.entry(node_id).or_default();
117            if label_set.contains(&label_id) {
118                return false;
119            }
120            label_set.insert(label_id);
121        }
122
123        #[cfg(feature = "temporal")]
124        {
125            let current = node_labels
126                .get(&node_id)
127                .and_then(|log| log.latest())
128                .cloned()
129                .unwrap_or_default();
130            if current.contains(&label_id) {
131                return false;
132            }
133            let mut new_set = current;
134            new_set.insert(label_id);
135            node_labels
136                .entry(node_id)
137                .or_default()
138                .append(self.current_epoch(), new_set);
139        }
140
141        drop(node_labels);
142
143        // Add to label_index
144        let mut index = self.label_index.write();
145        if (label_id as usize) >= index.len() {
146            index.resize(label_id as usize + 1, FxHashMap::default());
147        }
148        index[label_id as usize].insert(node_id, ());
149
150        true
151    }
152
153    /// Removes a label from a node.
154    ///
155    /// Returns true if the label was removed, false if the node doesn't exist
156    /// or doesn't have the label.
157    #[cfg(not(feature = "tiered-storage"))]
158    pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
159        let epoch = self.current_epoch();
160
161        // Check if node exists
162        let nodes = self.nodes.read();
163        if let Some(chain) = nodes.get(&node_id) {
164            if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
165                return false;
166            }
167        } else {
168            return false;
169        }
170        drop(nodes);
171
172        // Get label ID
173        let label_id = {
174            let reg = self.label_registry.read();
175            match reg.get_id(label) {
176                Some(id) => id,
177                None => return false, // Label doesn't exist
178            }
179        };
180
181        // Remove from node_labels map
182        let mut node_labels = self.node_labels.write();
183
184        #[cfg(not(feature = "temporal"))]
185        {
186            if let Some(label_set) = node_labels.get_mut(&node_id) {
187                if !label_set.remove(&label_id) {
188                    return false;
189                }
190            } else {
191                return false;
192            }
193        }
194
195        #[cfg(feature = "temporal")]
196        {
197            let current = node_labels
198                .get(&node_id)
199                .and_then(|log| log.latest())
200                .cloned()
201                .unwrap_or_default();
202            if !current.contains(&label_id) {
203                return false;
204            }
205            let mut new_set = current;
206            new_set.remove(&label_id);
207            node_labels
208                .entry(node_id)
209                .or_default()
210                .append(self.current_epoch(), new_set);
211        }
212
213        drop(node_labels);
214
215        // Remove from label_index
216        let mut index = self.label_index.write();
217        if (label_id as usize) < index.len() {
218            index[label_id as usize].remove(&node_id);
219        }
220
221        // Update label count in node record
222        #[cfg(not(feature = "temporal"))]
223        if let Some(chain) = self.nodes.write().get_mut(&node_id)
224            && let Some(record) = chain.latest_mut()
225        {
226            let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
227            record.set_label_count(u16::try_from(count).unwrap_or(u16::MAX));
228        }
229
230        true
231    }
232
233    /// Removes a label from a node.
234    /// (Tiered storage version)
235    #[cfg(feature = "tiered-storage")]
236    pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
237        let epoch = self.current_epoch();
238
239        // Check if node exists
240        let versions = self.node_versions.read();
241        if let Some(index) = versions.get(&node_id) {
242            if let Some(vref) = index.visible_at(epoch) {
243                if let Some(record) = self.read_node_record(&vref) {
244                    if record.is_deleted() {
245                        return false;
246                    }
247                } else {
248                    return false;
249                }
250            } else {
251                return false;
252            }
253        } else {
254            return false;
255        }
256        drop(versions);
257
258        // Get label ID
259        let label_id = {
260            let reg = self.label_registry.read();
261            match reg.get_id(label) {
262                Some(id) => id,
263                None => return false,
264            }
265        };
266
267        // Remove from node_labels map
268        let mut node_labels = self.node_labels.write();
269
270        #[cfg(not(feature = "temporal"))]
271        {
272            if let Some(label_set) = node_labels.get_mut(&node_id) {
273                if !label_set.remove(&label_id) {
274                    return false;
275                }
276            } else {
277                return false;
278            }
279        }
280
281        #[cfg(feature = "temporal")]
282        {
283            let current = node_labels
284                .get(&node_id)
285                .and_then(|log| log.latest())
286                .cloned()
287                .unwrap_or_default();
288            if !current.contains(&label_id) {
289                return false;
290            }
291            let mut new_set = current;
292            new_set.remove(&label_id);
293            node_labels
294                .entry(node_id)
295                .or_default()
296                .append(self.current_epoch(), new_set);
297        }
298
299        drop(node_labels);
300
301        // Remove from label_index
302        let mut index = self.label_index.write();
303        if (label_id as usize) < index.len() {
304            index[label_id as usize].remove(&node_id);
305        }
306
307        true
308    }
309
310    /// Returns all nodes with a specific label.
311    ///
312    /// Uses the label index for O(1) lookup per label. Returns a snapshot -
313    /// concurrent modifications won't affect the returned vector. Results are
314    /// sorted by NodeId for deterministic iteration order.
315    pub fn nodes_by_label(&self, label: &str) -> Vec<NodeId> {
316        let reg = self.label_registry.read();
317        if let Some(label_id) = reg.get_id(label) {
318            let index = self.label_index.read();
319            if let Some(set) = index.get(label_id as usize) {
320                let mut ids: Vec<NodeId> = set.keys().copied().collect();
321                ids.sort_unstable();
322                return ids;
323            }
324        }
325        Vec::new()
326    }
327
328    /// Returns the number of nodes with a specific label without allocating
329    /// the full ID list. O(1) via the label index.
330    #[must_use]
331    pub fn nodes_by_label_count(&self, label: &str) -> usize {
332        let reg = self.label_registry.read();
333        let Some(label_id) = reg.get_id(label) else {
334            return 0;
335        };
336        self.label_index
337            .read()
338            .get(label_id as usize)
339            .map_or(0, |set| set.len())
340    }
341
342    /// Returns the number of distinct labels in the store.
343    #[must_use]
344    pub fn label_count(&self) -> usize {
345        self.label_registry.read().len()
346    }
347
348    /// Returns the number of distinct property keys in the store.
349    ///
350    /// This counts unique property keys across both nodes and edges.
351    #[must_use]
352    pub fn property_key_count(&self) -> usize {
353        let node_keys = self.node_properties.column_count();
354        let edge_keys = self.edge_properties.column_count();
355        // Note: This may count some keys twice if the same key is used
356        // for both nodes and edges. A more precise count would require
357        // tracking unique keys across both storages.
358        node_keys + edge_keys
359    }
360
361    /// Returns the number of distinct edge types in the store.
362    #[must_use]
363    pub fn edge_type_count(&self) -> usize {
364        self.id_to_edge_type.read().len()
365    }
366
367    /// Returns all label names in the database.
368    pub fn all_labels(&self) -> Vec<String> {
369        self.label_registry
370            .read()
371            .names()
372            .iter()
373            .map(|s| s.to_string())
374            .collect()
375    }
376
377    /// Returns all edge type names in the database.
378    pub fn all_edge_types(&self) -> Vec<String> {
379        self.id_to_edge_type
380            .read()
381            .iter()
382            .map(|s| s.to_string())
383            .collect()
384    }
385
386    /// Returns all property keys used in the database.
387    pub fn all_property_keys(&self) -> Vec<String> {
388        let mut keys = std::collections::HashSet::new();
389        for key in self.node_properties.keys() {
390            keys.insert(key.to_string());
391        }
392        for key in self.edge_properties.keys() {
393            keys.insert(key.to_string());
394        }
395        keys.into_iter().collect()
396    }
397
398    /// Returns the next node ID that will be allocated.
399    #[must_use]
400    pub fn peek_next_node_id(&self) -> u64 {
401        self.next_node_id.load(std::sync::atomic::Ordering::Relaxed)
402    }
403
404    /// Returns the next edge ID that will be allocated.
405    #[must_use]
406    pub fn peek_next_edge_id(&self) -> u64 {
407        self.next_edge_id.load(std::sync::atomic::Ordering::Relaxed)
408    }
409
410    /// Adds a label to a node within a transaction, recording the change
411    /// in the undo log so it can be reversed on rollback.
412    #[cfg(not(feature = "temporal"))]
413    pub fn add_label_versioned(
414        &self,
415        node_id: NodeId,
416        label: &str,
417        transaction_id: TransactionId,
418    ) -> bool {
419        let added = self.add_label(node_id, label);
420        if added {
421            self.property_undo_log
422                .write()
423                .entry(transaction_id)
424                .or_default()
425                .push(PropertyUndoEntry::LabelAdded {
426                    node_id,
427                    label: label.to_string(),
428                });
429        }
430        added
431    }
432
433    /// Adds a label to a node within a transaction (temporal version).
434    ///
435    /// Uses `EpochId::PENDING` for the version log entry, finalized on commit.
436    #[cfg(feature = "temporal")]
437    pub fn add_label_versioned(
438        &self,
439        node_id: NodeId,
440        label: &str,
441        transaction_id: TransactionId,
442    ) -> bool {
443        let label_id = self.get_or_create_label_id(label);
444
445        let mut node_labels = self.node_labels.write();
446        let current = node_labels
447            .get(&node_id)
448            .and_then(|log| log.latest())
449            .cloned()
450            .unwrap_or_default();
451        if current.contains(&label_id) {
452            return false;
453        }
454        let mut new_set = current;
455        new_set.insert(label_id);
456        node_labels
457            .entry(node_id)
458            .or_default()
459            .append(EpochId::PENDING, new_set);
460        drop(node_labels);
461
462        // Update label_index
463        let mut index = self.label_index.write();
464        if (label_id as usize) >= index.len() {
465            index.resize(label_id as usize + 1, FxHashMap::default());
466        }
467        index[label_id as usize].insert(node_id, ());
468
469        // Record in undo log
470        self.property_undo_log
471            .write()
472            .entry(transaction_id)
473            .or_default()
474            .push(PropertyUndoEntry::LabelAdded {
475                node_id,
476                label: label.to_string(),
477            });
478
479        true
480    }
481
482    /// Removes a label from a node within a transaction, recording the change
483    /// in the undo log so it can be restored on rollback.
484    #[cfg(not(feature = "temporal"))]
485    pub fn remove_label_versioned(
486        &self,
487        node_id: NodeId,
488        label: &str,
489        transaction_id: TransactionId,
490    ) -> bool {
491        let removed = self.remove_label(node_id, label);
492        if removed {
493            self.property_undo_log
494                .write()
495                .entry(transaction_id)
496                .or_default()
497                .push(PropertyUndoEntry::LabelRemoved {
498                    node_id,
499                    label: label.to_string(),
500                });
501        }
502        removed
503    }
504
505    /// Removes a label from a node within a transaction (temporal version).
506    #[cfg(feature = "temporal")]
507    pub fn remove_label_versioned(
508        &self,
509        node_id: NodeId,
510        label: &str,
511        transaction_id: TransactionId,
512    ) -> bool {
513        let label_id = {
514            let reg = self.label_registry.read();
515            match reg.get_id(label) {
516                Some(id) => id,
517                None => return false,
518            }
519        };
520
521        let mut node_labels = self.node_labels.write();
522        let current = node_labels
523            .get(&node_id)
524            .and_then(|log| log.latest())
525            .cloned()
526            .unwrap_or_default();
527        if !current.contains(&label_id) {
528            return false;
529        }
530        let mut new_set = current;
531        new_set.remove(&label_id);
532        node_labels
533            .entry(node_id)
534            .or_default()
535            .append(EpochId::PENDING, new_set);
536        drop(node_labels);
537
538        // Update label_index
539        let mut index = self.label_index.write();
540        if (label_id as usize) < index.len() {
541            index[label_id as usize].remove(&node_id);
542        }
543
544        // Record in undo log
545        self.property_undo_log
546            .write()
547            .entry(transaction_id)
548            .or_default()
549            .push(PropertyUndoEntry::LabelRemoved {
550                node_id,
551                label: label.to_string(),
552            });
553
554        true
555    }
556}