Skip to main content

grafeo_engine/database/
crud.rs

1//! Node and edge CRUD operations for GrafeoDB.
2
3use grafeo_common::grafeo_warn;
4#[cfg(feature = "wal")]
5use grafeo_storage::wal::WalRecord;
6
7impl super::GrafeoDB {
8    // === Node Operations ===
9
10    /// Creates a node with the given labels and returns its ID.
11    ///
12    /// Labels categorize nodes - think of them like tags. A node can have
13    /// multiple labels (e.g., `["Person", "Employee"]`).
14    ///
15    /// # Examples
16    ///
17    /// ```
18    /// use grafeo_engine::GrafeoDB;
19    ///
20    /// let db = GrafeoDB::new_in_memory();
21    /// let alix = db.create_node(&["Person"]);
22    /// let company = db.create_node(&["Company", "Startup"]);
23    /// ```
24    pub fn create_node(&self, labels: &[&str]) -> grafeo_common::types::NodeId {
25        let id = self.lpg_store().create_node(labels);
26
27        // Log to WAL if enabled
28        #[cfg(feature = "wal")]
29        if let Err(e) = self.log_wal(&WalRecord::CreateNode {
30            id,
31            labels: labels.iter().map(|s| (*s).to_string()).collect(),
32        }) {
33            grafeo_warn!("Failed to log CreateNode to WAL: {}", e);
34        }
35
36        #[cfg(feature = "cdc")]
37        if self.cdc_active() {
38            self.cdc_log.record_create_node(
39                id,
40                self.lpg_store().current_epoch(),
41                None,
42                Some(labels.iter().map(|s| (*s).to_string()).collect()),
43            );
44        }
45
46        id
47    }
48
49    /// Creates a new node with labels and properties.
50    ///
51    /// If WAL is enabled, the operation is logged for durability.
52    pub fn create_node_with_props(
53        &self,
54        labels: &[&str],
55        properties: impl IntoIterator<
56            Item = (
57                impl Into<grafeo_common::types::PropertyKey>,
58                impl Into<grafeo_common::types::Value>,
59            ),
60        >,
61    ) -> grafeo_common::types::NodeId {
62        // Collect properties first so we can log them to WAL
63        let props: Vec<(
64            grafeo_common::types::PropertyKey,
65            grafeo_common::types::Value,
66        )> = properties
67            .into_iter()
68            .map(|(k, v)| (k.into(), v.into()))
69            .collect();
70
71        let id = self
72            .lpg_store()
73            .create_node_with_props(labels, props.iter().map(|(k, v)| (k.clone(), v.clone())));
74
75        // Build CDC snapshot before WAL consumes props
76        #[cfg(feature = "cdc")]
77        let cdc_props: Option<
78            std::collections::HashMap<String, grafeo_common::types::Value>,
79        > = if self.cdc_active() {
80            Some(
81                props
82                    .iter()
83                    .map(|(k, v)| (k.to_string(), v.clone()))
84                    .collect(),
85            )
86        } else {
87            None
88        };
89
90        // Log node creation to WAL
91        #[cfg(feature = "wal")]
92        {
93            if let Err(e) = self.log_wal(&WalRecord::CreateNode {
94                id,
95                labels: labels.iter().map(|s| (*s).to_string()).collect(),
96            }) {
97                grafeo_warn!("Failed to log CreateNode to WAL: {}", e);
98            }
99
100            // Log each property to WAL for full durability
101            for (key, value) in props {
102                if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
103                    id,
104                    key: key.to_string(),
105                    value,
106                }) {
107                    grafeo_warn!("Failed to log SetNodeProperty to WAL: {}", e);
108                }
109            }
110        }
111
112        #[cfg(feature = "cdc")]
113        if let Some(cdc_props) = cdc_props {
114            self.cdc_log.record_create_node(
115                id,
116                self.lpg_store().current_epoch(),
117                if cdc_props.is_empty() {
118                    None
119                } else {
120                    Some(cdc_props)
121                },
122                Some(labels.iter().map(|s| (*s).to_string()).collect()),
123            );
124        }
125
126        // Auto-insert into matching text indexes for the new node
127        #[cfg(feature = "text-index")]
128        if let Some(node) = self.lpg_store().get_node(id) {
129            for label in &node.labels {
130                for (prop_key, prop_val) in &node.properties {
131                    if let grafeo_common::types::Value::String(text) = prop_val
132                        && let Some(index) = self
133                            .lpg_store()
134                            .get_text_index(label.as_str(), prop_key.as_ref())
135                    {
136                        index.write().insert(id, text);
137                    }
138                }
139            }
140        }
141
142        id
143    }
144
145    /// Gets a node by ID.
146    #[must_use]
147    pub fn get_node(
148        &self,
149        id: grafeo_common::types::NodeId,
150    ) -> Option<grafeo_core::graph::lpg::Node> {
151        self.lpg_store().get_node(id)
152    }
153
154    /// Gets a node as it existed at a specific epoch.
155    ///
156    /// Uses pure epoch-based visibility (not transaction-aware), so the node
157    /// is visible if and only if `created_epoch <= epoch` and it was not
158    /// deleted at or before `epoch`.
159    #[must_use]
160    pub fn get_node_at_epoch(
161        &self,
162        id: grafeo_common::types::NodeId,
163        epoch: grafeo_common::types::EpochId,
164    ) -> Option<grafeo_core::graph::lpg::Node> {
165        self.lpg_store().get_node_at_epoch(id, epoch)
166    }
167
168    /// Gets an edge as it existed at a specific epoch.
169    ///
170    /// Uses pure epoch-based visibility (not transaction-aware).
171    #[must_use]
172    pub fn get_edge_at_epoch(
173        &self,
174        id: grafeo_common::types::EdgeId,
175        epoch: grafeo_common::types::EpochId,
176    ) -> Option<grafeo_core::graph::lpg::Edge> {
177        self.lpg_store().get_edge_at_epoch(id, epoch)
178    }
179
180    /// Returns all versions of a node with their creation/deletion epochs.
181    ///
182    /// Properties and labels reflect the current state (not versioned per-epoch).
183    #[must_use]
184    pub fn get_node_history(
185        &self,
186        id: grafeo_common::types::NodeId,
187    ) -> Vec<(
188        grafeo_common::types::EpochId,
189        Option<grafeo_common::types::EpochId>,
190        grafeo_core::graph::lpg::Node,
191    )> {
192        self.lpg_store().get_node_history(id)
193    }
194
195    /// Returns all versions of an edge with their creation/deletion epochs.
196    ///
197    /// Properties reflect the current state (not versioned per-epoch).
198    #[must_use]
199    pub fn get_edge_history(
200        &self,
201        id: grafeo_common::types::EdgeId,
202    ) -> Vec<(
203        grafeo_common::types::EpochId,
204        Option<grafeo_common::types::EpochId>,
205        grafeo_core::graph::lpg::Edge,
206    )> {
207        self.lpg_store().get_edge_history(id)
208    }
209
210    /// Returns a property value as it existed at a specific epoch.
211    ///
212    /// Uses the internal `VersionLog` to do a point-in-time read. Returns
213    /// `None` if the property didn't exist or was deleted at that epoch.
214    #[cfg(feature = "temporal")]
215    #[must_use]
216    pub fn get_node_property_at_epoch(
217        &self,
218        id: grafeo_common::types::NodeId,
219        key: &str,
220        epoch: grafeo_common::types::EpochId,
221    ) -> Option<grafeo_common::types::Value> {
222        let prop_key = grafeo_common::types::PropertyKey::new(key);
223        self.lpg_store()
224            .get_node_property_at_epoch(id, &prop_key, epoch)
225    }
226
227    /// Returns the full version timeline for a single property of a node.
228    ///
229    /// Each entry is `(epoch, value)` in ascending epoch order. Tombstones
230    /// (deletions) appear as `Value::Null`.
231    #[cfg(feature = "temporal")]
232    #[must_use]
233    pub fn get_node_property_history(
234        &self,
235        id: grafeo_common::types::NodeId,
236        key: &str,
237    ) -> Vec<(grafeo_common::types::EpochId, grafeo_common::types::Value)> {
238        self.lpg_store().node_property_history_for_key(id, key)
239    }
240
241    /// Returns the full version history for ALL properties of a node.
242    ///
243    /// Each entry is `(property_key, Vec<(epoch, value)>)`.
244    #[cfg(feature = "temporal")]
245    #[must_use]
246    pub fn get_all_node_property_history(
247        &self,
248        id: grafeo_common::types::NodeId,
249    ) -> Vec<(
250        grafeo_common::types::PropertyKey,
251        Vec<(grafeo_common::types::EpochId, grafeo_common::types::Value)>,
252    )> {
253        self.lpg_store().node_property_history(id)
254    }
255
256    /// Returns the current epoch of the database.
257    #[must_use]
258    pub fn current_epoch(&self) -> grafeo_common::types::EpochId {
259        self.lpg_store().current_epoch()
260    }
261
262    /// Deletes a node and all its edges.
263    ///
264    /// If WAL is enabled, the operation is logged for durability.
265    pub fn delete_node(&self, id: grafeo_common::types::NodeId) -> bool {
266        // Capture properties for CDC before deletion
267        #[cfg(feature = "cdc")]
268        let cdc_props = if self.cdc_active() {
269            self.lpg_store().get_node(id).map(|node| {
270                node.properties
271                    .iter()
272                    .map(|(k, v)| (k.to_string(), v.clone()))
273                    .collect::<std::collections::HashMap<String, grafeo_common::types::Value>>()
274            })
275        } else {
276            None
277        };
278
279        // Collect matching vector indexes BEFORE deletion removes labels
280        #[cfg(feature = "vector-index")]
281        let indexes_to_clean: Vec<
282            std::sync::Arc<grafeo_core::index::vector::VectorIndexKind>,
283        > = self
284            .lpg_store()
285            .get_node(id)
286            .map(|node| {
287                let mut indexes = Vec::new();
288                for label in &node.labels {
289                    let prefix = format!("{}:", label.as_str());
290                    for (key, index) in self.lpg_store().vector_index_entries() {
291                        if key.starts_with(&prefix) {
292                            indexes.push(index);
293                        }
294                    }
295                }
296                indexes
297            })
298            .unwrap_or_default();
299
300        // Collect matching text indexes BEFORE deletion removes labels
301        #[cfg(feature = "text-index")]
302        let text_indexes_to_clean: Vec<
303            std::sync::Arc<parking_lot::RwLock<grafeo_core::index::text::InvertedIndex>>,
304        > = self
305            .lpg_store()
306            .get_node(id)
307            .map(|node| {
308                let mut indexes = Vec::new();
309                for label in &node.labels {
310                    let prefix = format!("{}:", label.as_str());
311                    for (key, index) in self.lpg_store().text_index_entries() {
312                        if key.starts_with(&prefix) {
313                            indexes.push(index);
314                        }
315                    }
316                }
317                indexes
318            })
319            .unwrap_or_default();
320
321        let result = self.lpg_store().delete_node(id);
322
323        // Remove from vector indexes after successful deletion
324        #[cfg(feature = "vector-index")]
325        if result {
326            for index in indexes_to_clean {
327                index.remove(id);
328            }
329        }
330
331        // Remove from text indexes after successful deletion
332        #[cfg(feature = "text-index")]
333        if result {
334            for index in text_indexes_to_clean {
335                index.write().remove(id);
336            }
337        }
338
339        #[cfg(feature = "wal")]
340        if result && let Err(e) = self.log_wal(&WalRecord::DeleteNode { id }) {
341            grafeo_warn!("Failed to log DeleteNode to WAL: {}", e);
342        }
343
344        #[cfg(feature = "cdc")]
345        if result && self.cdc_active() {
346            self.cdc_log.record_delete(
347                crate::cdc::EntityId::Node(id),
348                self.lpg_store().current_epoch(),
349                cdc_props,
350            );
351        }
352
353        result
354    }
355
356    /// Sets a property on a node.
357    ///
358    /// If WAL is enabled, the operation is logged for durability.
359    pub fn set_node_property(
360        &self,
361        id: grafeo_common::types::NodeId,
362        key: &str,
363        value: grafeo_common::types::Value,
364    ) {
365        // Extract vector data before the value is moved into the store
366        #[cfg(feature = "vector-index")]
367        let vector_data = match &value {
368            grafeo_common::types::Value::Vector(v) => Some(v.clone()),
369            _ => None,
370        };
371
372        // Log to WAL first
373        #[cfg(feature = "wal")]
374        if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
375            id,
376            key: key.to_string(),
377            value: value.clone(),
378        }) {
379            grafeo_warn!("Failed to log SetNodeProperty to WAL: {}", e);
380        }
381
382        // Capture old value for CDC before the store write
383        #[cfg(feature = "cdc")]
384        let cdc_active = self.cdc_active();
385        #[cfg(feature = "cdc")]
386        let cdc_old_value = if cdc_active {
387            self.lpg_store()
388                .get_node_property(id, &grafeo_common::types::PropertyKey::new(key))
389        } else {
390            None
391        };
392        #[cfg(feature = "cdc")]
393        let cdc_new_value = if cdc_active {
394            Some(value.clone())
395        } else {
396            None
397        };
398
399        self.lpg_store().set_node_property(id, key, value);
400
401        #[cfg(feature = "cdc")]
402        if let Some(cdc_new_value) = cdc_new_value {
403            self.cdc_log.record_update(
404                crate::cdc::EntityId::Node(id),
405                self.lpg_store().current_epoch(),
406                key,
407                cdc_old_value,
408                cdc_new_value,
409            );
410        }
411
412        // Auto-insert into matching vector indexes
413        #[cfg(feature = "vector-index")]
414        if let Some(vec) = vector_data
415            && let Some(node) = self.lpg_store().get_node(id)
416        {
417            for label in &node.labels {
418                if let Some(index) = self.lpg_store().get_vector_index(label.as_str(), key) {
419                    let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(
420                        &**self.lpg_store(),
421                        key,
422                    );
423                    index.insert(id, &vec, &accessor);
424                }
425            }
426        }
427
428        // Auto-update matching text indexes
429        #[cfg(feature = "text-index")]
430        if let Some(node) = self.lpg_store().get_node(id) {
431            let text_val = node
432                .properties
433                .get(&grafeo_common::types::PropertyKey::new(key))
434                .and_then(|v| match v {
435                    grafeo_common::types::Value::String(s) => Some(s.to_string()),
436                    _ => None,
437                });
438            for label in &node.labels {
439                if let Some(index) = self.lpg_store().get_text_index(label.as_str(), key) {
440                    let mut idx = index.write();
441                    if let Some(ref text) = text_val {
442                        idx.insert(id, text);
443                    } else {
444                        idx.remove(id);
445                    }
446                }
447            }
448        }
449    }
450
451    /// Adds a label to an existing node.
452    ///
453    /// Returns `true` if the label was added, `false` if the node doesn't exist
454    /// or already has the label.
455    ///
456    /// # Examples
457    ///
458    /// ```
459    /// use grafeo_engine::GrafeoDB;
460    ///
461    /// let db = GrafeoDB::new_in_memory();
462    /// let alix = db.create_node(&["Person"]);
463    ///
464    /// // Promote Alix to Employee
465    /// let added = db.add_node_label(alix, "Employee");
466    /// assert!(added);
467    /// ```
468    pub fn add_node_label(&self, id: grafeo_common::types::NodeId, label: &str) -> bool {
469        let result = self.lpg_store().add_label(id, label);
470
471        #[cfg(feature = "wal")]
472        if result {
473            // Log to WAL if enabled
474            if let Err(e) = self.log_wal(&WalRecord::AddNodeLabel {
475                id,
476                label: label.to_string(),
477            }) {
478                grafeo_warn!("Failed to log AddNodeLabel to WAL: {}", e);
479            }
480        }
481
482        // Auto-insert into vector indexes for the newly-added label
483        #[cfg(feature = "vector-index")]
484        if result {
485            let prefix = format!("{label}:");
486            for (key, index) in self.lpg_store().vector_index_entries() {
487                if let Some(property) = key.strip_prefix(&prefix)
488                    && let Some(node) = self.lpg_store().get_node(id)
489                {
490                    let prop_key = grafeo_common::types::PropertyKey::new(property);
491                    if let Some(grafeo_common::types::Value::Vector(v)) =
492                        node.properties.get(&prop_key)
493                    {
494                        let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(
495                            &**self.lpg_store(),
496                            property,
497                        );
498                        index.insert(id, v, &accessor);
499                    }
500                }
501            }
502        }
503
504        // Auto-insert into text indexes for the newly-added label
505        #[cfg(feature = "text-index")]
506        if result && let Some(node) = self.lpg_store().get_node(id) {
507            for (prop_key, prop_val) in &node.properties {
508                if let grafeo_common::types::Value::String(text) = prop_val
509                    && let Some(index) = self.lpg_store().get_text_index(label, prop_key.as_ref())
510                {
511                    index.write().insert(id, text);
512                }
513            }
514        }
515
516        result
517    }
518
519    /// Removes a label from a node.
520    ///
521    /// Returns `true` if the label was removed, `false` if the node doesn't exist
522    /// or doesn't have the label.
523    ///
524    /// # Examples
525    ///
526    /// ```
527    /// use grafeo_engine::GrafeoDB;
528    ///
529    /// let db = GrafeoDB::new_in_memory();
530    /// let alix = db.create_node(&["Person", "Employee"]);
531    ///
532    /// // Remove Employee status
533    /// let removed = db.remove_node_label(alix, "Employee");
534    /// assert!(removed);
535    /// ```
536    pub fn remove_node_label(&self, id: grafeo_common::types::NodeId, label: &str) -> bool {
537        // Collect text indexes to clean BEFORE removing the label
538        #[cfg(feature = "text-index")]
539        let text_indexes_to_clean: Vec<
540            std::sync::Arc<parking_lot::RwLock<grafeo_core::index::text::InvertedIndex>>,
541        > = {
542            let prefix = format!("{label}:");
543            self.lpg_store()
544                .text_index_entries()
545                .into_iter()
546                .filter(|(key, _)| key.starts_with(&prefix))
547                .map(|(_, index)| index)
548                .collect()
549        };
550
551        let result = self.lpg_store().remove_label(id, label);
552
553        #[cfg(feature = "wal")]
554        if result {
555            // Log to WAL if enabled
556            if let Err(e) = self.log_wal(&WalRecord::RemoveNodeLabel {
557                id,
558                label: label.to_string(),
559            }) {
560                grafeo_warn!("Failed to log RemoveNodeLabel to WAL: {}", e);
561            }
562        }
563
564        // Remove from text indexes for the removed label
565        #[cfg(feature = "text-index")]
566        if result {
567            for index in text_indexes_to_clean {
568                index.write().remove(id);
569            }
570        }
571
572        result
573    }
574
575    /// Gets all labels for a node.
576    ///
577    /// Returns `None` if the node doesn't exist.
578    ///
579    /// # Examples
580    ///
581    /// ```
582    /// use grafeo_engine::GrafeoDB;
583    ///
584    /// let db = GrafeoDB::new_in_memory();
585    /// let alix = db.create_node(&["Person", "Employee"]);
586    ///
587    /// let labels = db.get_node_labels(alix).unwrap();
588    /// assert!(labels.contains(&"Person".to_string()));
589    /// assert!(labels.contains(&"Employee".to_string()));
590    /// ```
591    #[must_use]
592    pub fn get_node_labels(&self, id: grafeo_common::types::NodeId) -> Option<Vec<String>> {
593        self.lpg_store()
594            .get_node(id)
595            .map(|node| node.labels.iter().map(|s| s.to_string()).collect())
596    }
597
598    // === Edge Operations ===
599
600    /// Creates an edge (relationship) between two nodes.
601    ///
602    /// Edges connect nodes and have a type that describes the relationship.
603    /// They're directed - the order of `src` and `dst` matters.
604    ///
605    /// # Examples
606    ///
607    /// ```
608    /// use grafeo_engine::GrafeoDB;
609    ///
610    /// let db = GrafeoDB::new_in_memory();
611    /// let alix = db.create_node(&["Person"]);
612    /// let gus = db.create_node(&["Person"]);
613    ///
614    /// // Alix knows Gus (directed: Alix -> Gus)
615    /// let edge = db.create_edge(alix, gus, "KNOWS");
616    /// ```
617    pub fn create_edge(
618        &self,
619        src: grafeo_common::types::NodeId,
620        dst: grafeo_common::types::NodeId,
621        edge_type: &str,
622    ) -> grafeo_common::types::EdgeId {
623        let id = self.lpg_store().create_edge(src, dst, edge_type);
624
625        // Log to WAL if enabled
626        #[cfg(feature = "wal")]
627        if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
628            id,
629            src,
630            dst,
631            edge_type: edge_type.to_string(),
632        }) {
633            grafeo_warn!("Failed to log CreateEdge to WAL: {}", e);
634        }
635
636        #[cfg(feature = "cdc")]
637        if self.cdc_active() {
638            self.cdc_log.record_create_edge(
639                id,
640                self.lpg_store().current_epoch(),
641                None,
642                src.as_u64(),
643                dst.as_u64(),
644                edge_type.to_string(),
645            );
646        }
647
648        id
649    }
650
651    /// Creates a new edge with properties.
652    ///
653    /// If WAL is enabled, the operation is logged for durability.
654    pub fn create_edge_with_props(
655        &self,
656        src: grafeo_common::types::NodeId,
657        dst: grafeo_common::types::NodeId,
658        edge_type: &str,
659        properties: impl IntoIterator<
660            Item = (
661                impl Into<grafeo_common::types::PropertyKey>,
662                impl Into<grafeo_common::types::Value>,
663            ),
664        >,
665    ) -> grafeo_common::types::EdgeId {
666        // Collect properties first so we can log them to WAL
667        let props: Vec<(
668            grafeo_common::types::PropertyKey,
669            grafeo_common::types::Value,
670        )> = properties
671            .into_iter()
672            .map(|(k, v)| (k.into(), v.into()))
673            .collect();
674
675        let id = self.lpg_store().create_edge_with_props(
676            src,
677            dst,
678            edge_type,
679            props.iter().map(|(k, v)| (k.clone(), v.clone())),
680        );
681
682        // Build CDC snapshot before WAL consumes props
683        #[cfg(feature = "cdc")]
684        let cdc_props: Option<
685            std::collections::HashMap<String, grafeo_common::types::Value>,
686        > = if self.cdc_active() {
687            Some(
688                props
689                    .iter()
690                    .map(|(k, v)| (k.to_string(), v.clone()))
691                    .collect(),
692            )
693        } else {
694            None
695        };
696
697        // Log edge creation to WAL
698        #[cfg(feature = "wal")]
699        {
700            if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
701                id,
702                src,
703                dst,
704                edge_type: edge_type.to_string(),
705            }) {
706                grafeo_warn!("Failed to log CreateEdge to WAL: {}", e);
707            }
708
709            // Log each property to WAL for full durability
710            for (key, value) in props {
711                if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
712                    id,
713                    key: key.to_string(),
714                    value,
715                }) {
716                    grafeo_warn!("Failed to log SetEdgeProperty to WAL: {}", e);
717                }
718            }
719        }
720
721        #[cfg(feature = "cdc")]
722        if let Some(cdc_props) = cdc_props {
723            self.cdc_log.record_create_edge(
724                id,
725                self.lpg_store().current_epoch(),
726                if cdc_props.is_empty() {
727                    None
728                } else {
729                    Some(cdc_props)
730                },
731                src.as_u64(),
732                dst.as_u64(),
733                edge_type.to_string(),
734            );
735        }
736
737        id
738    }
739
740    /// Gets an edge by ID.
741    #[must_use]
742    pub fn get_edge(
743        &self,
744        id: grafeo_common::types::EdgeId,
745    ) -> Option<grafeo_core::graph::lpg::Edge> {
746        self.lpg_store().get_edge(id)
747    }
748
749    /// Deletes an edge.
750    ///
751    /// If WAL is enabled, the operation is logged for durability.
752    pub fn delete_edge(&self, id: grafeo_common::types::EdgeId) -> bool {
753        // Capture properties for CDC before deletion
754        #[cfg(feature = "cdc")]
755        let cdc_props = if self.cdc_active() {
756            self.lpg_store().get_edge(id).map(|edge| {
757                edge.properties
758                    .iter()
759                    .map(|(k, v)| (k.to_string(), v.clone()))
760                    .collect::<std::collections::HashMap<String, grafeo_common::types::Value>>()
761            })
762        } else {
763            None
764        };
765
766        let result = self.lpg_store().delete_edge(id);
767
768        #[cfg(feature = "wal")]
769        if result && let Err(e) = self.log_wal(&WalRecord::DeleteEdge { id }) {
770            grafeo_warn!("Failed to log DeleteEdge to WAL: {}", e);
771        }
772
773        #[cfg(feature = "cdc")]
774        if result && self.cdc_active() {
775            self.cdc_log.record_delete(
776                crate::cdc::EntityId::Edge(id),
777                self.lpg_store().current_epoch(),
778                cdc_props,
779            );
780        }
781
782        result
783    }
784
785    /// Sets a property on an edge.
786    ///
787    /// If WAL is enabled, the operation is logged for durability.
788    pub fn set_edge_property(
789        &self,
790        id: grafeo_common::types::EdgeId,
791        key: &str,
792        value: grafeo_common::types::Value,
793    ) {
794        // Log to WAL first
795        #[cfg(feature = "wal")]
796        if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
797            id,
798            key: key.to_string(),
799            value: value.clone(),
800        }) {
801            grafeo_warn!("Failed to log SetEdgeProperty to WAL: {}", e);
802        }
803
804        // Capture old value for CDC before the store write
805        #[cfg(feature = "cdc")]
806        let cdc_active = self.cdc_active();
807        #[cfg(feature = "cdc")]
808        let cdc_old_value = if cdc_active {
809            self.lpg_store()
810                .get_edge_property(id, &grafeo_common::types::PropertyKey::new(key))
811        } else {
812            None
813        };
814        #[cfg(feature = "cdc")]
815        let cdc_new_value = if cdc_active {
816            Some(value.clone())
817        } else {
818            None
819        };
820
821        self.lpg_store().set_edge_property(id, key, value);
822
823        #[cfg(feature = "cdc")]
824        if let Some(cdc_new_value) = cdc_new_value {
825            self.cdc_log.record_update(
826                crate::cdc::EntityId::Edge(id),
827                self.lpg_store().current_epoch(),
828                key,
829                cdc_old_value,
830                cdc_new_value,
831            );
832        }
833    }
834
835    /// Removes a property from a node.
836    ///
837    /// Returns true if the property existed and was removed, false otherwise.
838    pub fn remove_node_property(&self, id: grafeo_common::types::NodeId, key: &str) -> bool {
839        let removed = self.lpg_store().remove_node_property(id, key).is_some();
840
841        #[cfg(feature = "wal")]
842        if removed
843            && let Err(e) = self.log_wal(&WalRecord::RemoveNodeProperty {
844                id,
845                key: key.to_string(),
846            })
847        {
848            grafeo_warn!("WAL log for RemoveNodeProperty failed: {e}");
849        }
850
851        // Remove from matching text indexes
852        #[cfg(feature = "text-index")]
853        if removed && let Some(node) = self.lpg_store().get_node(id) {
854            for label in &node.labels {
855                if let Some(index) = self.lpg_store().get_text_index(label.as_str(), key) {
856                    index.write().remove(id);
857                }
858            }
859        }
860
861        removed
862    }
863
864    /// Removes a property from an edge.
865    ///
866    /// Returns true if the property existed and was removed, false otherwise.
867    pub fn remove_edge_property(&self, id: grafeo_common::types::EdgeId, key: &str) -> bool {
868        let removed = self.lpg_store().remove_edge_property(id, key).is_some();
869
870        #[cfg(feature = "wal")]
871        if removed
872            && let Err(e) = self.log_wal(&WalRecord::RemoveEdgeProperty {
873                id,
874                key: key.to_string(),
875            })
876        {
877            grafeo_warn!("WAL log for RemoveEdgeProperty failed: {e}");
878        }
879
880        removed
881    }
882
883    /// Creates multiple nodes in bulk, each with a single vector property.
884    ///
885    /// Much faster than individual `create_node_with_props` calls because it
886    /// acquires internal locks once and loops in Rust rather than crossing
887    /// the FFI boundary per vector.
888    ///
889    /// **Atomicity note:** Individual node creations within the batch are NOT
890    /// atomic as a group. If a failure occurs mid-batch (e.g. WAL write error),
891    /// nodes created before the failure will persist while later nodes may not.
892    /// If you need all-or-nothing semantics, wrap the call in an explicit
893    /// transaction.
894    ///
895    /// # Arguments
896    ///
897    /// * `label` - Label applied to all created nodes
898    /// * `property` - Property name for the vector data
899    /// * `vectors` - Vector data for each node
900    ///
901    /// # Returns
902    ///
903    /// Vector of created `NodeId`s in the same order as the input vectors.
904    pub fn batch_create_nodes(
905        &self,
906        label: &str,
907        property: &str,
908        vectors: Vec<Vec<f32>>,
909    ) -> Vec<grafeo_common::types::NodeId> {
910        use grafeo_common::types::{PropertyKey, Value};
911
912        let prop_key = PropertyKey::new(property);
913        let labels: &[&str] = &[label];
914
915        let ids: Vec<grafeo_common::types::NodeId> = vectors
916            .into_iter()
917            .map(|vec| {
918                let value = Value::Vector(vec.into());
919                let id = self.lpg_store().create_node_with_props(
920                    labels,
921                    std::iter::once((prop_key.clone(), value.clone())),
922                );
923
924                // Log to WAL
925                #[cfg(feature = "wal")]
926                {
927                    if let Err(e) = self.log_wal(&WalRecord::CreateNode {
928                        id,
929                        labels: labels.iter().map(|s| (*s).to_string()).collect(),
930                    }) {
931                        grafeo_warn!("Failed to log CreateNode to WAL: {}", e);
932                    }
933                    if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
934                        id,
935                        key: property.to_string(),
936                        value,
937                    }) {
938                        grafeo_warn!("Failed to log SetNodeProperty to WAL: {}", e);
939                    }
940                }
941
942                id
943            })
944            .collect();
945
946        // Auto-insert into matching vector index if one exists
947        #[cfg(feature = "vector-index")]
948        if let Some(index) = self.lpg_store().get_vector_index(label, property) {
949            let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(
950                &**self.lpg_store(),
951                property,
952            );
953            for &id in &ids {
954                if let Some(node) = self.lpg_store().get_node(id) {
955                    let pk = grafeo_common::types::PropertyKey::new(property);
956                    if let Some(grafeo_common::types::Value::Vector(v)) = node.properties.get(&pk)
957                        && std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
958                            index.insert(id, v, &accessor);
959                        }))
960                        .is_err()
961                    {
962                        grafeo_warn!("Vector index insert panicked for node {}", id.as_u64());
963                    }
964                }
965            }
966        }
967
968        ids
969    }
970
971    /// Batch-creates nodes with full property maps.
972    ///
973    /// Each entry in `properties_list` is a complete property map for one node.
974    /// Vector values (`Value::Vector`) are automatically inserted into matching
975    /// vector indexes. Text values are automatically inserted into matching text
976    /// indexes.
977    ///
978    /// **Atomicity note:** Individual node creations within the batch are NOT
979    /// atomic as a group. If a failure occurs mid-batch (e.g. WAL write error),
980    /// nodes created before the failure will persist while later nodes may not.
981    /// If you need all-or-nothing semantics, wrap the call in an explicit
982    /// transaction.
983    ///
984    /// # Arguments
985    ///
986    /// * `label` - Label for all created nodes.
987    /// * `properties_list` - One property map per node to create.
988    ///
989    /// # Returns
990    ///
991    /// Vector of created `NodeId`s in the same order as the input.
992    pub fn batch_create_nodes_with_props(
993        &self,
994        label: &str,
995        properties_list: Vec<
996            std::collections::HashMap<
997                grafeo_common::types::PropertyKey,
998                grafeo_common::types::Value,
999            >,
1000        >,
1001    ) -> Vec<grafeo_common::types::NodeId> {
1002        #[cfg(any(feature = "vector-index", feature = "text-index"))]
1003        use grafeo_common::types::Value;
1004
1005        let labels: &[&str] = &[label];
1006
1007        let ids: Vec<grafeo_common::types::NodeId> = properties_list
1008            .into_iter()
1009            .map(|props| {
1010                let id = self.lpg_store().create_node_with_props(
1011                    labels,
1012                    props.iter().map(|(k, v)| (k.clone(), v.clone())),
1013                );
1014
1015                // Build CDC snapshot before WAL consumes props
1016                #[cfg(feature = "cdc")]
1017                let cdc_props: Option<
1018                    std::collections::HashMap<String, grafeo_common::types::Value>,
1019                > = if self.cdc_active() {
1020                    Some(
1021                        props
1022                            .iter()
1023                            .map(|(k, v)| (k.to_string(), v.clone()))
1024                            .collect(),
1025                    )
1026                } else {
1027                    None
1028                };
1029
1030                // Log to WAL
1031                #[cfg(feature = "wal")]
1032                {
1033                    if let Err(e) = self.log_wal(&WalRecord::CreateNode {
1034                        id,
1035                        labels: labels.iter().map(|s| (*s).to_string()).collect(),
1036                    }) {
1037                        grafeo_warn!("Failed to log CreateNode to WAL: {}", e);
1038                    }
1039                    for (key, value) in props {
1040                        if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
1041                            id,
1042                            key: key.to_string(),
1043                            value,
1044                        }) {
1045                            grafeo_warn!("Failed to log SetNodeProperty to WAL: {}", e);
1046                        }
1047                    }
1048                }
1049
1050                #[cfg(feature = "cdc")]
1051                if let Some(cdc_props) = cdc_props {
1052                    self.cdc_log.record_create_node(
1053                        id,
1054                        self.lpg_store().current_epoch(),
1055                        if cdc_props.is_empty() {
1056                            None
1057                        } else {
1058                            Some(cdc_props)
1059                        },
1060                        Some(labels.iter().map(|s| (*s).to_string()).collect()),
1061                    );
1062                }
1063
1064                id
1065            })
1066            .collect();
1067
1068        // Auto-insert into matching vector indexes for any vector properties
1069        #[cfg(feature = "vector-index")]
1070        {
1071            for (key, index) in self.lpg_store().vector_index_entries() {
1072                // key is "label:property"
1073                if !key.starts_with(label) || !key[label.len()..].starts_with(':') {
1074                    continue;
1075                }
1076                let property = &key[label.len() + 1..];
1077                let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(
1078                    &**self.lpg_store(),
1079                    property,
1080                );
1081                let pk = grafeo_common::types::PropertyKey::new(property);
1082                for &id in &ids {
1083                    if let Some(node) = self.lpg_store().get_node(id) {
1084                        // reason: guard would be side-effecting; keep insert in arm body
1085                        #[allow(clippy::collapsible_match)]
1086                        match node.properties.get(&pk) {
1087                            Some(Value::Vector(v)) => {
1088                                if std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
1089                                    index.insert(id, v, &accessor);
1090                                }))
1091                                .is_err()
1092                                {
1093                                    grafeo_warn!(
1094                                        "Vector index insert panicked for node {}",
1095                                        id.as_u64()
1096                                    );
1097                                }
1098                            }
1099                            Some(_other) => {
1100                                grafeo_warn!(
1101                                    "Node {} property '{}' expected Vector, skipping vector index insert",
1102                                    id.as_u64(),
1103                                    property
1104                                );
1105                            }
1106                            None => {} // No property, nothing to index
1107                        }
1108                    }
1109                }
1110            }
1111        }
1112
1113        // Auto-insert into matching text indexes for any string properties
1114        #[cfg(feature = "text-index")]
1115        for &id in &ids {
1116            if let Some(node) = self.lpg_store().get_node(id) {
1117                for (prop_key, prop_val) in &node.properties {
1118                    if let Value::String(text) = prop_val
1119                        && let Some(index) =
1120                            self.lpg_store().get_text_index(label, prop_key.as_ref())
1121                    {
1122                        index.write().insert(id, text);
1123                    }
1124                }
1125            }
1126        }
1127
1128        ids
1129    }
1130}