Skip to main content

grafeo_engine/database/
crud.rs

1//! Node and edge CRUD operations for GrafeoDB.
2
3#[cfg(feature = "wal")]
4use grafeo_adapters::storage::wal::WalRecord;
5
6impl super::GrafeoDB {
7    // === Node Operations ===
8
9    /// Creates a node with the given labels and returns its ID.
10    ///
11    /// Labels categorize nodes - think of them like tags. A node can have
12    /// multiple labels (e.g., `["Person", "Employee"]`).
13    ///
14    /// # Examples
15    ///
16    /// ```
17    /// use grafeo_engine::GrafeoDB;
18    ///
19    /// let db = GrafeoDB::new_in_memory();
20    /// let alice = db.create_node(&["Person"]);
21    /// let company = db.create_node(&["Company", "Startup"]);
22    /// ```
23    pub fn create_node(&self, labels: &[&str]) -> grafeo_common::types::NodeId {
24        let id = self.store.create_node(labels);
25
26        // Log to WAL if enabled
27        #[cfg(feature = "wal")]
28        if let Err(e) = self.log_wal(&WalRecord::CreateNode {
29            id,
30            labels: labels.iter().map(|s| (*s).to_string()).collect(),
31        }) {
32            tracing::warn!("Failed to log CreateNode to WAL: {}", e);
33        }
34
35        #[cfg(feature = "cdc")]
36        self.cdc_log
37            .record_create_node(id, self.store.current_epoch(), None);
38
39        id
40    }
41
42    /// Creates a new node with labels and properties.
43    ///
44    /// If WAL is enabled, the operation is logged for durability.
45    pub fn create_node_with_props(
46        &self,
47        labels: &[&str],
48        properties: impl IntoIterator<
49            Item = (
50                impl Into<grafeo_common::types::PropertyKey>,
51                impl Into<grafeo_common::types::Value>,
52            ),
53        >,
54    ) -> grafeo_common::types::NodeId {
55        // Collect properties first so we can log them to WAL
56        let props: Vec<(
57            grafeo_common::types::PropertyKey,
58            grafeo_common::types::Value,
59        )> = properties
60            .into_iter()
61            .map(|(k, v)| (k.into(), v.into()))
62            .collect();
63
64        let id = self
65            .store
66            .create_node_with_props(labels, props.iter().map(|(k, v)| (k.clone(), v.clone())));
67
68        // Build CDC snapshot before WAL consumes props
69        #[cfg(feature = "cdc")]
70        let cdc_props: std::collections::HashMap<String, grafeo_common::types::Value> = props
71            .iter()
72            .map(|(k, v)| (k.to_string(), v.clone()))
73            .collect();
74
75        // Log node creation to WAL
76        #[cfg(feature = "wal")]
77        {
78            if let Err(e) = self.log_wal(&WalRecord::CreateNode {
79                id,
80                labels: labels.iter().map(|s| (*s).to_string()).collect(),
81            }) {
82                tracing::warn!("Failed to log CreateNode to WAL: {}", e);
83            }
84
85            // Log each property to WAL for full durability
86            for (key, value) in props {
87                if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
88                    id,
89                    key: key.to_string(),
90                    value,
91                }) {
92                    tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
93                }
94            }
95        }
96
97        #[cfg(feature = "cdc")]
98        self.cdc_log.record_create_node(
99            id,
100            self.store.current_epoch(),
101            if cdc_props.is_empty() {
102                None
103            } else {
104                Some(cdc_props)
105            },
106        );
107
108        // Auto-insert into matching text indexes for the new node
109        #[cfg(feature = "text-index")]
110        if let Some(node) = self.store.get_node(id) {
111            for label in &node.labels {
112                for (prop_key, prop_val) in &node.properties {
113                    if let grafeo_common::types::Value::String(text) = prop_val
114                        && let Some(index) =
115                            self.store.get_text_index(label.as_str(), prop_key.as_ref())
116                    {
117                        index.write().insert(id, text);
118                    }
119                }
120            }
121        }
122
123        id
124    }
125
126    /// Gets a node by ID.
127    #[must_use]
128    pub fn get_node(
129        &self,
130        id: grafeo_common::types::NodeId,
131    ) -> Option<grafeo_core::graph::lpg::Node> {
132        self.store.get_node(id)
133    }
134
135    /// Deletes a node and all its edges.
136    ///
137    /// If WAL is enabled, the operation is logged for durability.
138    pub fn delete_node(&self, id: grafeo_common::types::NodeId) -> bool {
139        // Capture properties for CDC before deletion
140        #[cfg(feature = "cdc")]
141        let cdc_props = self.store.get_node(id).map(|node| {
142            node.properties
143                .iter()
144                .map(|(k, v)| (k.to_string(), v.clone()))
145                .collect::<std::collections::HashMap<String, grafeo_common::types::Value>>()
146        });
147
148        // Collect matching vector indexes BEFORE deletion removes labels
149        #[cfg(feature = "vector-index")]
150        let indexes_to_clean: Vec<std::sync::Arc<grafeo_core::index::vector::HnswIndex>> = self
151            .store
152            .get_node(id)
153            .map(|node| {
154                let mut indexes = Vec::new();
155                for label in &node.labels {
156                    let prefix = format!("{}:", label.as_str());
157                    for (key, index) in self.store.vector_index_entries() {
158                        if key.starts_with(&prefix) {
159                            indexes.push(index);
160                        }
161                    }
162                }
163                indexes
164            })
165            .unwrap_or_default();
166
167        // Collect matching text indexes BEFORE deletion removes labels
168        #[cfg(feature = "text-index")]
169        let text_indexes_to_clean: Vec<
170            std::sync::Arc<parking_lot::RwLock<grafeo_core::index::text::InvertedIndex>>,
171        > = self
172            .store
173            .get_node(id)
174            .map(|node| {
175                let mut indexes = Vec::new();
176                for label in &node.labels {
177                    let prefix = format!("{}:", label.as_str());
178                    for (key, index) in self.store.text_index_entries() {
179                        if key.starts_with(&prefix) {
180                            indexes.push(index);
181                        }
182                    }
183                }
184                indexes
185            })
186            .unwrap_or_default();
187
188        let result = self.store.delete_node(id);
189
190        // Remove from vector indexes after successful deletion
191        #[cfg(feature = "vector-index")]
192        if result {
193            for index in indexes_to_clean {
194                index.remove(id);
195            }
196        }
197
198        // Remove from text indexes after successful deletion
199        #[cfg(feature = "text-index")]
200        if result {
201            for index in text_indexes_to_clean {
202                index.write().remove(id);
203            }
204        }
205
206        #[cfg(feature = "wal")]
207        if result && let Err(e) = self.log_wal(&WalRecord::DeleteNode { id }) {
208            tracing::warn!("Failed to log DeleteNode to WAL: {}", e);
209        }
210
211        #[cfg(feature = "cdc")]
212        if result {
213            self.cdc_log.record_delete(
214                crate::cdc::EntityId::Node(id),
215                self.store.current_epoch(),
216                cdc_props,
217            );
218        }
219
220        result
221    }
222
223    /// Sets a property on a node.
224    ///
225    /// If WAL is enabled, the operation is logged for durability.
226    pub fn set_node_property(
227        &self,
228        id: grafeo_common::types::NodeId,
229        key: &str,
230        value: grafeo_common::types::Value,
231    ) {
232        // Extract vector data before the value is moved into the store
233        #[cfg(feature = "vector-index")]
234        let vector_data = match &value {
235            grafeo_common::types::Value::Vector(v) => Some(v.clone()),
236            _ => None,
237        };
238
239        // Log to WAL first
240        #[cfg(feature = "wal")]
241        if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
242            id,
243            key: key.to_string(),
244            value: value.clone(),
245        }) {
246            tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
247        }
248
249        // Capture old value for CDC before the store write
250        #[cfg(feature = "cdc")]
251        let cdc_old_value = self
252            .store
253            .get_node_property(id, &grafeo_common::types::PropertyKey::new(key));
254        #[cfg(feature = "cdc")]
255        let cdc_new_value = value.clone();
256
257        self.store.set_node_property(id, key, value);
258
259        #[cfg(feature = "cdc")]
260        self.cdc_log.record_update(
261            crate::cdc::EntityId::Node(id),
262            self.store.current_epoch(),
263            key,
264            cdc_old_value,
265            cdc_new_value,
266        );
267
268        // Auto-insert into matching vector indexes
269        #[cfg(feature = "vector-index")]
270        if let Some(vec) = vector_data
271            && let Some(node) = self.store.get_node(id)
272        {
273            for label in &node.labels {
274                if let Some(index) = self.store.get_vector_index(label.as_str(), key) {
275                    let accessor =
276                        grafeo_core::index::vector::PropertyVectorAccessor::new(&self.store, key);
277                    index.insert(id, &vec, &accessor);
278                }
279            }
280        }
281
282        // Auto-update matching text indexes
283        #[cfg(feature = "text-index")]
284        if let Some(node) = self.store.get_node(id) {
285            let text_val = node
286                .properties
287                .get(&grafeo_common::types::PropertyKey::new(key))
288                .and_then(|v| match v {
289                    grafeo_common::types::Value::String(s) => Some(s.to_string()),
290                    _ => None,
291                });
292            for label in &node.labels {
293                if let Some(index) = self.store.get_text_index(label.as_str(), key) {
294                    let mut idx = index.write();
295                    if let Some(ref text) = text_val {
296                        idx.insert(id, text);
297                    } else {
298                        idx.remove(id);
299                    }
300                }
301            }
302        }
303    }
304
305    /// Adds a label to an existing node.
306    ///
307    /// Returns `true` if the label was added, `false` if the node doesn't exist
308    /// or already has the label.
309    ///
310    /// # Examples
311    ///
312    /// ```
313    /// use grafeo_engine::GrafeoDB;
314    ///
315    /// let db = GrafeoDB::new_in_memory();
316    /// let alice = db.create_node(&["Person"]);
317    ///
318    /// // Promote Alice to Employee
319    /// let added = db.add_node_label(alice, "Employee");
320    /// assert!(added);
321    /// ```
322    pub fn add_node_label(&self, id: grafeo_common::types::NodeId, label: &str) -> bool {
323        let result = self.store.add_label(id, label);
324
325        #[cfg(feature = "wal")]
326        if result {
327            // Log to WAL if enabled
328            if let Err(e) = self.log_wal(&WalRecord::AddNodeLabel {
329                id,
330                label: label.to_string(),
331            }) {
332                tracing::warn!("Failed to log AddNodeLabel to WAL: {}", e);
333            }
334        }
335
336        // Auto-insert into vector indexes for the newly-added label
337        #[cfg(feature = "vector-index")]
338        if result {
339            let prefix = format!("{label}:");
340            for (key, index) in self.store.vector_index_entries() {
341                if let Some(property) = key.strip_prefix(&prefix)
342                    && let Some(node) = self.store.get_node(id)
343                {
344                    let prop_key = grafeo_common::types::PropertyKey::new(property);
345                    if let Some(grafeo_common::types::Value::Vector(v)) =
346                        node.properties.get(&prop_key)
347                    {
348                        let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(
349                            &self.store,
350                            property,
351                        );
352                        index.insert(id, v, &accessor);
353                    }
354                }
355            }
356        }
357
358        // Auto-insert into text indexes for the newly-added label
359        #[cfg(feature = "text-index")]
360        if result && let Some(node) = self.store.get_node(id) {
361            for (prop_key, prop_val) in &node.properties {
362                if let grafeo_common::types::Value::String(text) = prop_val
363                    && let Some(index) = self.store.get_text_index(label, prop_key.as_ref())
364                {
365                    index.write().insert(id, text);
366                }
367            }
368        }
369
370        result
371    }
372
373    /// Removes a label from a node.
374    ///
375    /// Returns `true` if the label was removed, `false` if the node doesn't exist
376    /// or doesn't have the label.
377    ///
378    /// # Examples
379    ///
380    /// ```
381    /// use grafeo_engine::GrafeoDB;
382    ///
383    /// let db = GrafeoDB::new_in_memory();
384    /// let alice = db.create_node(&["Person", "Employee"]);
385    ///
386    /// // Remove Employee status
387    /// let removed = db.remove_node_label(alice, "Employee");
388    /// assert!(removed);
389    /// ```
390    pub fn remove_node_label(&self, id: grafeo_common::types::NodeId, label: &str) -> bool {
391        // Collect text indexes to clean BEFORE removing the label
392        #[cfg(feature = "text-index")]
393        let text_indexes_to_clean: Vec<
394            std::sync::Arc<parking_lot::RwLock<grafeo_core::index::text::InvertedIndex>>,
395        > = {
396            let prefix = format!("{label}:");
397            self.store
398                .text_index_entries()
399                .into_iter()
400                .filter(|(key, _)| key.starts_with(&prefix))
401                .map(|(_, index)| index)
402                .collect()
403        };
404
405        let result = self.store.remove_label(id, label);
406
407        #[cfg(feature = "wal")]
408        if result {
409            // Log to WAL if enabled
410            if let Err(e) = self.log_wal(&WalRecord::RemoveNodeLabel {
411                id,
412                label: label.to_string(),
413            }) {
414                tracing::warn!("Failed to log RemoveNodeLabel to WAL: {}", e);
415            }
416        }
417
418        // Remove from text indexes for the removed label
419        #[cfg(feature = "text-index")]
420        if result {
421            for index in text_indexes_to_clean {
422                index.write().remove(id);
423            }
424        }
425
426        result
427    }
428
429    /// Gets all labels for a node.
430    ///
431    /// Returns `None` if the node doesn't exist.
432    ///
433    /// # Examples
434    ///
435    /// ```
436    /// use grafeo_engine::GrafeoDB;
437    ///
438    /// let db = GrafeoDB::new_in_memory();
439    /// let alice = db.create_node(&["Person", "Employee"]);
440    ///
441    /// let labels = db.get_node_labels(alice).unwrap();
442    /// assert!(labels.contains(&"Person".to_string()));
443    /// assert!(labels.contains(&"Employee".to_string()));
444    /// ```
445    #[must_use]
446    pub fn get_node_labels(&self, id: grafeo_common::types::NodeId) -> Option<Vec<String>> {
447        self.store
448            .get_node(id)
449            .map(|node| node.labels.iter().map(|s| s.to_string()).collect())
450    }
451
452    // === Edge Operations ===
453
454    /// Creates an edge (relationship) between two nodes.
455    ///
456    /// Edges connect nodes and have a type that describes the relationship.
457    /// They're directed - the order of `src` and `dst` matters.
458    ///
459    /// # Examples
460    ///
461    /// ```
462    /// use grafeo_engine::GrafeoDB;
463    ///
464    /// let db = GrafeoDB::new_in_memory();
465    /// let alice = db.create_node(&["Person"]);
466    /// let bob = db.create_node(&["Person"]);
467    ///
468    /// // Alice knows Bob (directed: Alice -> Bob)
469    /// let edge = db.create_edge(alice, bob, "KNOWS");
470    /// ```
471    pub fn create_edge(
472        &self,
473        src: grafeo_common::types::NodeId,
474        dst: grafeo_common::types::NodeId,
475        edge_type: &str,
476    ) -> grafeo_common::types::EdgeId {
477        let id = self.store.create_edge(src, dst, edge_type);
478
479        // Log to WAL if enabled
480        #[cfg(feature = "wal")]
481        if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
482            id,
483            src,
484            dst,
485            edge_type: edge_type.to_string(),
486        }) {
487            tracing::warn!("Failed to log CreateEdge to WAL: {}", e);
488        }
489
490        #[cfg(feature = "cdc")]
491        self.cdc_log
492            .record_create_edge(id, self.store.current_epoch(), None);
493
494        id
495    }
496
497    /// Creates a new edge with properties.
498    ///
499    /// If WAL is enabled, the operation is logged for durability.
500    pub fn create_edge_with_props(
501        &self,
502        src: grafeo_common::types::NodeId,
503        dst: grafeo_common::types::NodeId,
504        edge_type: &str,
505        properties: impl IntoIterator<
506            Item = (
507                impl Into<grafeo_common::types::PropertyKey>,
508                impl Into<grafeo_common::types::Value>,
509            ),
510        >,
511    ) -> grafeo_common::types::EdgeId {
512        // Collect properties first so we can log them to WAL
513        let props: Vec<(
514            grafeo_common::types::PropertyKey,
515            grafeo_common::types::Value,
516        )> = properties
517            .into_iter()
518            .map(|(k, v)| (k.into(), v.into()))
519            .collect();
520
521        let id = self.store.create_edge_with_props(
522            src,
523            dst,
524            edge_type,
525            props.iter().map(|(k, v)| (k.clone(), v.clone())),
526        );
527
528        // Build CDC snapshot before WAL consumes props
529        #[cfg(feature = "cdc")]
530        let cdc_props: std::collections::HashMap<String, grafeo_common::types::Value> = props
531            .iter()
532            .map(|(k, v)| (k.to_string(), v.clone()))
533            .collect();
534
535        // Log edge creation to WAL
536        #[cfg(feature = "wal")]
537        {
538            if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
539                id,
540                src,
541                dst,
542                edge_type: edge_type.to_string(),
543            }) {
544                tracing::warn!("Failed to log CreateEdge to WAL: {}", e);
545            }
546
547            // Log each property to WAL for full durability
548            for (key, value) in props {
549                if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
550                    id,
551                    key: key.to_string(),
552                    value,
553                }) {
554                    tracing::warn!("Failed to log SetEdgeProperty to WAL: {}", e);
555                }
556            }
557        }
558
559        #[cfg(feature = "cdc")]
560        self.cdc_log.record_create_edge(
561            id,
562            self.store.current_epoch(),
563            if cdc_props.is_empty() {
564                None
565            } else {
566                Some(cdc_props)
567            },
568        );
569
570        id
571    }
572
573    /// Gets an edge by ID.
574    #[must_use]
575    pub fn get_edge(
576        &self,
577        id: grafeo_common::types::EdgeId,
578    ) -> Option<grafeo_core::graph::lpg::Edge> {
579        self.store.get_edge(id)
580    }
581
582    /// Deletes an edge.
583    ///
584    /// If WAL is enabled, the operation is logged for durability.
585    pub fn delete_edge(&self, id: grafeo_common::types::EdgeId) -> bool {
586        // Capture properties for CDC before deletion
587        #[cfg(feature = "cdc")]
588        let cdc_props = self.store.get_edge(id).map(|edge| {
589            edge.properties
590                .iter()
591                .map(|(k, v)| (k.to_string(), v.clone()))
592                .collect::<std::collections::HashMap<String, grafeo_common::types::Value>>()
593        });
594
595        let result = self.store.delete_edge(id);
596
597        #[cfg(feature = "wal")]
598        if result && let Err(e) = self.log_wal(&WalRecord::DeleteEdge { id }) {
599            tracing::warn!("Failed to log DeleteEdge to WAL: {}", e);
600        }
601
602        #[cfg(feature = "cdc")]
603        if result {
604            self.cdc_log.record_delete(
605                crate::cdc::EntityId::Edge(id),
606                self.store.current_epoch(),
607                cdc_props,
608            );
609        }
610
611        result
612    }
613
614    /// Sets a property on an edge.
615    ///
616    /// If WAL is enabled, the operation is logged for durability.
617    pub fn set_edge_property(
618        &self,
619        id: grafeo_common::types::EdgeId,
620        key: &str,
621        value: grafeo_common::types::Value,
622    ) {
623        // Log to WAL first
624        #[cfg(feature = "wal")]
625        if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
626            id,
627            key: key.to_string(),
628            value: value.clone(),
629        }) {
630            tracing::warn!("Failed to log SetEdgeProperty to WAL: {}", e);
631        }
632
633        // Capture old value for CDC before the store write
634        #[cfg(feature = "cdc")]
635        let cdc_old_value = self
636            .store
637            .get_edge_property(id, &grafeo_common::types::PropertyKey::new(key));
638        #[cfg(feature = "cdc")]
639        let cdc_new_value = value.clone();
640
641        self.store.set_edge_property(id, key, value);
642
643        #[cfg(feature = "cdc")]
644        self.cdc_log.record_update(
645            crate::cdc::EntityId::Edge(id),
646            self.store.current_epoch(),
647            key,
648            cdc_old_value,
649            cdc_new_value,
650        );
651    }
652
653    /// Removes a property from a node.
654    ///
655    /// Returns true if the property existed and was removed, false otherwise.
656    pub fn remove_node_property(&self, id: grafeo_common::types::NodeId, key: &str) -> bool {
657        // Note: RemoveProperty WAL records not yet implemented, but operation works in memory
658        let removed = self.store.remove_node_property(id, key).is_some();
659
660        // Remove from matching text indexes
661        #[cfg(feature = "text-index")]
662        if removed && let Some(node) = self.store.get_node(id) {
663            for label in &node.labels {
664                if let Some(index) = self.store.get_text_index(label.as_str(), key) {
665                    index.write().remove(id);
666                }
667            }
668        }
669
670        removed
671    }
672
673    /// Removes a property from an edge.
674    ///
675    /// Returns true if the property existed and was removed, false otherwise.
676    pub fn remove_edge_property(&self, id: grafeo_common::types::EdgeId, key: &str) -> bool {
677        // Note: RemoveProperty WAL records not yet implemented, but operation works in memory
678        self.store.remove_edge_property(id, key).is_some()
679    }
680
681    /// Creates multiple nodes in bulk, each with a single vector property.
682    ///
683    /// Much faster than individual `create_node_with_props` calls because it
684    /// acquires internal locks once and loops in Rust rather than crossing
685    /// the FFI boundary per vector.
686    ///
687    /// # Arguments
688    ///
689    /// * `label` - Label applied to all created nodes
690    /// * `property` - Property name for the vector data
691    /// * `vectors` - Vector data for each node
692    ///
693    /// # Returns
694    ///
695    /// Vector of created `NodeId`s in the same order as the input vectors.
696    pub fn batch_create_nodes(
697        &self,
698        label: &str,
699        property: &str,
700        vectors: Vec<Vec<f32>>,
701    ) -> Vec<grafeo_common::types::NodeId> {
702        use grafeo_common::types::{PropertyKey, Value};
703
704        let prop_key = PropertyKey::new(property);
705        let labels: &[&str] = &[label];
706
707        let ids: Vec<grafeo_common::types::NodeId> = vectors
708            .into_iter()
709            .map(|vec| {
710                let value = Value::Vector(vec.into());
711                let id = self.store.create_node_with_props(
712                    labels,
713                    std::iter::once((prop_key.clone(), value.clone())),
714                );
715
716                // Log to WAL
717                #[cfg(feature = "wal")]
718                {
719                    if let Err(e) = self.log_wal(&WalRecord::CreateNode {
720                        id,
721                        labels: labels.iter().map(|s| (*s).to_string()).collect(),
722                    }) {
723                        tracing::warn!("Failed to log CreateNode to WAL: {}", e);
724                    }
725                    if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
726                        id,
727                        key: property.to_string(),
728                        value,
729                    }) {
730                        tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
731                    }
732                }
733
734                id
735            })
736            .collect();
737
738        // Auto-insert into matching vector index if one exists
739        #[cfg(feature = "vector-index")]
740        if let Some(index) = self.store.get_vector_index(label, property) {
741            let accessor =
742                grafeo_core::index::vector::PropertyVectorAccessor::new(&self.store, property);
743            for &id in &ids {
744                if let Some(node) = self.store.get_node(id) {
745                    let pk = grafeo_common::types::PropertyKey::new(property);
746                    if let Some(grafeo_common::types::Value::Vector(v)) = node.properties.get(&pk) {
747                        index.insert(id, v, &accessor);
748                    }
749                }
750            }
751        }
752
753        ids
754    }
755}