Skip to main content

grafeo_engine/database/
crud.rs

1//! Node and edge CRUD operations for GrafeoDB.
2
3#[cfg(feature = "wal")]
4use grafeo_adapters::storage::wal::WalRecord;
5use grafeo_common::grafeo_warn;
6
7impl super::GrafeoDB {
8    // === Node Operations ===
9
10    /// Creates a node with the given labels and returns its ID.
11    ///
12    /// Labels categorize nodes - think of them like tags. A node can have
13    /// multiple labels (e.g., `["Person", "Employee"]`).
14    ///
15    /// # Examples
16    ///
17    /// ```
18    /// use grafeo_engine::GrafeoDB;
19    ///
20    /// let db = GrafeoDB::new_in_memory();
21    /// let alix = db.create_node(&["Person"]);
22    /// let company = db.create_node(&["Company", "Startup"]);
23    /// ```
24    pub fn create_node(&self, labels: &[&str]) -> grafeo_common::types::NodeId {
25        let id = self.lpg_store().create_node(labels);
26
27        // Log to WAL if enabled
28        #[cfg(feature = "wal")]
29        if let Err(e) = self.log_wal(&WalRecord::CreateNode {
30            id,
31            labels: labels.iter().map(|s| (*s).to_string()).collect(),
32        }) {
33            grafeo_warn!("Failed to log CreateNode to WAL: {}", e);
34        }
35
36        #[cfg(feature = "cdc")]
37        self.cdc_log.record_create_node(
38            id,
39            self.lpg_store().current_epoch(),
40            None,
41            Some(labels.iter().map(|s| (*s).to_string()).collect()),
42        );
43
44        id
45    }
46
47    /// Creates a new node with labels and properties.
48    ///
49    /// If WAL is enabled, the operation is logged for durability.
50    pub fn create_node_with_props(
51        &self,
52        labels: &[&str],
53        properties: impl IntoIterator<
54            Item = (
55                impl Into<grafeo_common::types::PropertyKey>,
56                impl Into<grafeo_common::types::Value>,
57            ),
58        >,
59    ) -> grafeo_common::types::NodeId {
60        // Collect properties first so we can log them to WAL
61        let props: Vec<(
62            grafeo_common::types::PropertyKey,
63            grafeo_common::types::Value,
64        )> = properties
65            .into_iter()
66            .map(|(k, v)| (k.into(), v.into()))
67            .collect();
68
69        let id = self
70            .lpg_store()
71            .create_node_with_props(labels, props.iter().map(|(k, v)| (k.clone(), v.clone())));
72
73        // Build CDC snapshot before WAL consumes props
74        #[cfg(feature = "cdc")]
75        let cdc_props: std::collections::HashMap<String, grafeo_common::types::Value> = props
76            .iter()
77            .map(|(k, v)| (k.to_string(), v.clone()))
78            .collect();
79
80        // Log node creation to WAL
81        #[cfg(feature = "wal")]
82        {
83            if let Err(e) = self.log_wal(&WalRecord::CreateNode {
84                id,
85                labels: labels.iter().map(|s| (*s).to_string()).collect(),
86            }) {
87                grafeo_warn!("Failed to log CreateNode to WAL: {}", e);
88            }
89
90            // Log each property to WAL for full durability
91            for (key, value) in props {
92                if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
93                    id,
94                    key: key.to_string(),
95                    value,
96                }) {
97                    grafeo_warn!("Failed to log SetNodeProperty to WAL: {}", e);
98                }
99            }
100        }
101
102        #[cfg(feature = "cdc")]
103        self.cdc_log.record_create_node(
104            id,
105            self.lpg_store().current_epoch(),
106            if cdc_props.is_empty() {
107                None
108            } else {
109                Some(cdc_props)
110            },
111            Some(labels.iter().map(|s| (*s).to_string()).collect()),
112        );
113
114        // Auto-insert into matching text indexes for the new node
115        #[cfg(feature = "text-index")]
116        if let Some(node) = self.lpg_store().get_node(id) {
117            for label in &node.labels {
118                for (prop_key, prop_val) in &node.properties {
119                    if let grafeo_common::types::Value::String(text) = prop_val
120                        && let Some(index) = self
121                            .lpg_store()
122                            .get_text_index(label.as_str(), prop_key.as_ref())
123                    {
124                        index.write().insert(id, text);
125                    }
126                }
127            }
128        }
129
130        id
131    }
132
133    /// Gets a node by ID.
134    #[must_use]
135    pub fn get_node(
136        &self,
137        id: grafeo_common::types::NodeId,
138    ) -> Option<grafeo_core::graph::lpg::Node> {
139        self.lpg_store().get_node(id)
140    }
141
142    /// Gets a node as it existed at a specific epoch.
143    ///
144    /// Uses pure epoch-based visibility (not transaction-aware), so the node
145    /// is visible if and only if `created_epoch <= epoch` and it was not
146    /// deleted at or before `epoch`.
147    #[must_use]
148    pub fn get_node_at_epoch(
149        &self,
150        id: grafeo_common::types::NodeId,
151        epoch: grafeo_common::types::EpochId,
152    ) -> Option<grafeo_core::graph::lpg::Node> {
153        self.lpg_store().get_node_at_epoch(id, epoch)
154    }
155
156    /// Gets an edge as it existed at a specific epoch.
157    ///
158    /// Uses pure epoch-based visibility (not transaction-aware).
159    #[must_use]
160    pub fn get_edge_at_epoch(
161        &self,
162        id: grafeo_common::types::EdgeId,
163        epoch: grafeo_common::types::EpochId,
164    ) -> Option<grafeo_core::graph::lpg::Edge> {
165        self.lpg_store().get_edge_at_epoch(id, epoch)
166    }
167
168    /// Returns all versions of a node with their creation/deletion epochs.
169    ///
170    /// Properties and labels reflect the current state (not versioned per-epoch).
171    #[must_use]
172    pub fn get_node_history(
173        &self,
174        id: grafeo_common::types::NodeId,
175    ) -> Vec<(
176        grafeo_common::types::EpochId,
177        Option<grafeo_common::types::EpochId>,
178        grafeo_core::graph::lpg::Node,
179    )> {
180        self.lpg_store().get_node_history(id)
181    }
182
183    /// Returns all versions of an edge with their creation/deletion epochs.
184    ///
185    /// Properties reflect the current state (not versioned per-epoch).
186    #[must_use]
187    pub fn get_edge_history(
188        &self,
189        id: grafeo_common::types::EdgeId,
190    ) -> Vec<(
191        grafeo_common::types::EpochId,
192        Option<grafeo_common::types::EpochId>,
193        grafeo_core::graph::lpg::Edge,
194    )> {
195        self.lpg_store().get_edge_history(id)
196    }
197
198    /// Returns a property value as it existed at a specific epoch.
199    ///
200    /// Uses the internal `VersionLog` to do a point-in-time read. Returns
201    /// `None` if the property didn't exist or was deleted at that epoch.
202    #[cfg(feature = "temporal")]
203    #[must_use]
204    pub fn get_node_property_at_epoch(
205        &self,
206        id: grafeo_common::types::NodeId,
207        key: &str,
208        epoch: grafeo_common::types::EpochId,
209    ) -> Option<grafeo_common::types::Value> {
210        let prop_key = grafeo_common::types::PropertyKey::new(key);
211        self.lpg_store()
212            .get_node_property_at_epoch(id, &prop_key, epoch)
213    }
214
215    /// Returns the full version timeline for a single property of a node.
216    ///
217    /// Each entry is `(epoch, value)` in ascending epoch order. Tombstones
218    /// (deletions) appear as `Value::Null`.
219    #[cfg(feature = "temporal")]
220    #[must_use]
221    pub fn get_node_property_history(
222        &self,
223        id: grafeo_common::types::NodeId,
224        key: &str,
225    ) -> Vec<(grafeo_common::types::EpochId, grafeo_common::types::Value)> {
226        self.lpg_store().node_property_history_for_key(id, key)
227    }
228
229    /// Returns the full version history for ALL properties of a node.
230    ///
231    /// Each entry is `(property_key, Vec<(epoch, value)>)`.
232    #[cfg(feature = "temporal")]
233    #[must_use]
234    pub fn get_all_node_property_history(
235        &self,
236        id: grafeo_common::types::NodeId,
237    ) -> Vec<(
238        grafeo_common::types::PropertyKey,
239        Vec<(grafeo_common::types::EpochId, grafeo_common::types::Value)>,
240    )> {
241        self.lpg_store().node_property_history(id)
242    }
243
244    /// Returns the current epoch of the database.
245    #[must_use]
246    pub fn current_epoch(&self) -> grafeo_common::types::EpochId {
247        self.lpg_store().current_epoch()
248    }
249
250    /// Deletes a node and all its edges.
251    ///
252    /// If WAL is enabled, the operation is logged for durability.
253    pub fn delete_node(&self, id: grafeo_common::types::NodeId) -> bool {
254        // Capture properties for CDC before deletion
255        #[cfg(feature = "cdc")]
256        let cdc_props = self.lpg_store().get_node(id).map(|node| {
257            node.properties
258                .iter()
259                .map(|(k, v)| (k.to_string(), v.clone()))
260                .collect::<std::collections::HashMap<String, grafeo_common::types::Value>>()
261        });
262
263        // Collect matching vector indexes BEFORE deletion removes labels
264        #[cfg(feature = "vector-index")]
265        let indexes_to_clean: Vec<std::sync::Arc<grafeo_core::index::vector::HnswIndex>> = self
266            .lpg_store()
267            .get_node(id)
268            .map(|node| {
269                let mut indexes = Vec::new();
270                for label in &node.labels {
271                    let prefix = format!("{}:", label.as_str());
272                    for (key, index) in self.lpg_store().vector_index_entries() {
273                        if key.starts_with(&prefix) {
274                            indexes.push(index);
275                        }
276                    }
277                }
278                indexes
279            })
280            .unwrap_or_default();
281
282        // Collect matching text indexes BEFORE deletion removes labels
283        #[cfg(feature = "text-index")]
284        let text_indexes_to_clean: Vec<
285            std::sync::Arc<parking_lot::RwLock<grafeo_core::index::text::InvertedIndex>>,
286        > = self
287            .lpg_store()
288            .get_node(id)
289            .map(|node| {
290                let mut indexes = Vec::new();
291                for label in &node.labels {
292                    let prefix = format!("{}:", label.as_str());
293                    for (key, index) in self.lpg_store().text_index_entries() {
294                        if key.starts_with(&prefix) {
295                            indexes.push(index);
296                        }
297                    }
298                }
299                indexes
300            })
301            .unwrap_or_default();
302
303        let result = self.lpg_store().delete_node(id);
304
305        // Remove from vector indexes after successful deletion
306        #[cfg(feature = "vector-index")]
307        if result {
308            for index in indexes_to_clean {
309                index.remove(id);
310            }
311        }
312
313        // Remove from text indexes after successful deletion
314        #[cfg(feature = "text-index")]
315        if result {
316            for index in text_indexes_to_clean {
317                index.write().remove(id);
318            }
319        }
320
321        #[cfg(feature = "wal")]
322        if result && let Err(e) = self.log_wal(&WalRecord::DeleteNode { id }) {
323            grafeo_warn!("Failed to log DeleteNode to WAL: {}", e);
324        }
325
326        #[cfg(feature = "cdc")]
327        if result {
328            self.cdc_log.record_delete(
329                crate::cdc::EntityId::Node(id),
330                self.lpg_store().current_epoch(),
331                cdc_props,
332            );
333        }
334
335        result
336    }
337
338    /// Sets a property on a node.
339    ///
340    /// If WAL is enabled, the operation is logged for durability.
341    pub fn set_node_property(
342        &self,
343        id: grafeo_common::types::NodeId,
344        key: &str,
345        value: grafeo_common::types::Value,
346    ) {
347        // Extract vector data before the value is moved into the store
348        #[cfg(feature = "vector-index")]
349        let vector_data = match &value {
350            grafeo_common::types::Value::Vector(v) => Some(v.clone()),
351            _ => None,
352        };
353
354        // Log to WAL first
355        #[cfg(feature = "wal")]
356        if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
357            id,
358            key: key.to_string(),
359            value: value.clone(),
360        }) {
361            grafeo_warn!("Failed to log SetNodeProperty to WAL: {}", e);
362        }
363
364        // Capture old value for CDC before the store write
365        #[cfg(feature = "cdc")]
366        let cdc_old_value = self
367            .lpg_store()
368            .get_node_property(id, &grafeo_common::types::PropertyKey::new(key));
369        #[cfg(feature = "cdc")]
370        let cdc_new_value = value.clone();
371
372        self.lpg_store().set_node_property(id, key, value);
373
374        #[cfg(feature = "cdc")]
375        self.cdc_log.record_update(
376            crate::cdc::EntityId::Node(id),
377            self.lpg_store().current_epoch(),
378            key,
379            cdc_old_value,
380            cdc_new_value,
381        );
382
383        // Auto-insert into matching vector indexes
384        #[cfg(feature = "vector-index")]
385        if let Some(vec) = vector_data
386            && let Some(node) = self.lpg_store().get_node(id)
387        {
388            for label in &node.labels {
389                if let Some(index) = self.lpg_store().get_vector_index(label.as_str(), key) {
390                    let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(
391                        &**self.lpg_store(),
392                        key,
393                    );
394                    index.insert(id, &vec, &accessor);
395                }
396            }
397        }
398
399        // Auto-update matching text indexes
400        #[cfg(feature = "text-index")]
401        if let Some(node) = self.lpg_store().get_node(id) {
402            let text_val = node
403                .properties
404                .get(&grafeo_common::types::PropertyKey::new(key))
405                .and_then(|v| match v {
406                    grafeo_common::types::Value::String(s) => Some(s.to_string()),
407                    _ => None,
408                });
409            for label in &node.labels {
410                if let Some(index) = self.lpg_store().get_text_index(label.as_str(), key) {
411                    let mut idx = index.write();
412                    if let Some(ref text) = text_val {
413                        idx.insert(id, text);
414                    } else {
415                        idx.remove(id);
416                    }
417                }
418            }
419        }
420    }
421
422    /// Adds a label to an existing node.
423    ///
424    /// Returns `true` if the label was added, `false` if the node doesn't exist
425    /// or already has the label.
426    ///
427    /// # Examples
428    ///
429    /// ```
430    /// use grafeo_engine::GrafeoDB;
431    ///
432    /// let db = GrafeoDB::new_in_memory();
433    /// let alix = db.create_node(&["Person"]);
434    ///
435    /// // Promote Alix to Employee
436    /// let added = db.add_node_label(alix, "Employee");
437    /// assert!(added);
438    /// ```
439    pub fn add_node_label(&self, id: grafeo_common::types::NodeId, label: &str) -> bool {
440        let result = self.lpg_store().add_label(id, label);
441
442        #[cfg(feature = "wal")]
443        if result {
444            // Log to WAL if enabled
445            if let Err(e) = self.log_wal(&WalRecord::AddNodeLabel {
446                id,
447                label: label.to_string(),
448            }) {
449                grafeo_warn!("Failed to log AddNodeLabel to WAL: {}", e);
450            }
451        }
452
453        // Auto-insert into vector indexes for the newly-added label
454        #[cfg(feature = "vector-index")]
455        if result {
456            let prefix = format!("{label}:");
457            for (key, index) in self.lpg_store().vector_index_entries() {
458                if let Some(property) = key.strip_prefix(&prefix)
459                    && let Some(node) = self.lpg_store().get_node(id)
460                {
461                    let prop_key = grafeo_common::types::PropertyKey::new(property);
462                    if let Some(grafeo_common::types::Value::Vector(v)) =
463                        node.properties.get(&prop_key)
464                    {
465                        let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(
466                            &**self.lpg_store(),
467                            property,
468                        );
469                        index.insert(id, v, &accessor);
470                    }
471                }
472            }
473        }
474
475        // Auto-insert into text indexes for the newly-added label
476        #[cfg(feature = "text-index")]
477        if result && let Some(node) = self.lpg_store().get_node(id) {
478            for (prop_key, prop_val) in &node.properties {
479                if let grafeo_common::types::Value::String(text) = prop_val
480                    && let Some(index) = self.lpg_store().get_text_index(label, prop_key.as_ref())
481                {
482                    index.write().insert(id, text);
483                }
484            }
485        }
486
487        result
488    }
489
490    /// Removes a label from a node.
491    ///
492    /// Returns `true` if the label was removed, `false` if the node doesn't exist
493    /// or doesn't have the label.
494    ///
495    /// # Examples
496    ///
497    /// ```
498    /// use grafeo_engine::GrafeoDB;
499    ///
500    /// let db = GrafeoDB::new_in_memory();
501    /// let alix = db.create_node(&["Person", "Employee"]);
502    ///
503    /// // Remove Employee status
504    /// let removed = db.remove_node_label(alix, "Employee");
505    /// assert!(removed);
506    /// ```
507    pub fn remove_node_label(&self, id: grafeo_common::types::NodeId, label: &str) -> bool {
508        // Collect text indexes to clean BEFORE removing the label
509        #[cfg(feature = "text-index")]
510        let text_indexes_to_clean: Vec<
511            std::sync::Arc<parking_lot::RwLock<grafeo_core::index::text::InvertedIndex>>,
512        > = {
513            let prefix = format!("{label}:");
514            self.lpg_store()
515                .text_index_entries()
516                .into_iter()
517                .filter(|(key, _)| key.starts_with(&prefix))
518                .map(|(_, index)| index)
519                .collect()
520        };
521
522        let result = self.lpg_store().remove_label(id, label);
523
524        #[cfg(feature = "wal")]
525        if result {
526            // Log to WAL if enabled
527            if let Err(e) = self.log_wal(&WalRecord::RemoveNodeLabel {
528                id,
529                label: label.to_string(),
530            }) {
531                grafeo_warn!("Failed to log RemoveNodeLabel to WAL: {}", e);
532            }
533        }
534
535        // Remove from text indexes for the removed label
536        #[cfg(feature = "text-index")]
537        if result {
538            for index in text_indexes_to_clean {
539                index.write().remove(id);
540            }
541        }
542
543        result
544    }
545
546    /// Gets all labels for a node.
547    ///
548    /// Returns `None` if the node doesn't exist.
549    ///
550    /// # Examples
551    ///
552    /// ```
553    /// use grafeo_engine::GrafeoDB;
554    ///
555    /// let db = GrafeoDB::new_in_memory();
556    /// let alix = db.create_node(&["Person", "Employee"]);
557    ///
558    /// let labels = db.get_node_labels(alix).unwrap();
559    /// assert!(labels.contains(&"Person".to_string()));
560    /// assert!(labels.contains(&"Employee".to_string()));
561    /// ```
562    #[must_use]
563    pub fn get_node_labels(&self, id: grafeo_common::types::NodeId) -> Option<Vec<String>> {
564        self.lpg_store()
565            .get_node(id)
566            .map(|node| node.labels.iter().map(|s| s.to_string()).collect())
567    }
568
569    // === Edge Operations ===
570
571    /// Creates an edge (relationship) between two nodes.
572    ///
573    /// Edges connect nodes and have a type that describes the relationship.
574    /// They're directed - the order of `src` and `dst` matters.
575    ///
576    /// # Examples
577    ///
578    /// ```
579    /// use grafeo_engine::GrafeoDB;
580    ///
581    /// let db = GrafeoDB::new_in_memory();
582    /// let alix = db.create_node(&["Person"]);
583    /// let gus = db.create_node(&["Person"]);
584    ///
585    /// // Alix knows Gus (directed: Alix -> Gus)
586    /// let edge = db.create_edge(alix, gus, "KNOWS");
587    /// ```
588    pub fn create_edge(
589        &self,
590        src: grafeo_common::types::NodeId,
591        dst: grafeo_common::types::NodeId,
592        edge_type: &str,
593    ) -> grafeo_common::types::EdgeId {
594        let id = self.lpg_store().create_edge(src, dst, edge_type);
595
596        // Log to WAL if enabled
597        #[cfg(feature = "wal")]
598        if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
599            id,
600            src,
601            dst,
602            edge_type: edge_type.to_string(),
603        }) {
604            grafeo_warn!("Failed to log CreateEdge to WAL: {}", e);
605        }
606
607        #[cfg(feature = "cdc")]
608        self.cdc_log.record_create_edge(
609            id,
610            self.lpg_store().current_epoch(),
611            None,
612            src.as_u64(),
613            dst.as_u64(),
614            edge_type.to_string(),
615        );
616
617        id
618    }
619
620    /// Creates a new edge with properties.
621    ///
622    /// If WAL is enabled, the operation is logged for durability.
623    pub fn create_edge_with_props(
624        &self,
625        src: grafeo_common::types::NodeId,
626        dst: grafeo_common::types::NodeId,
627        edge_type: &str,
628        properties: impl IntoIterator<
629            Item = (
630                impl Into<grafeo_common::types::PropertyKey>,
631                impl Into<grafeo_common::types::Value>,
632            ),
633        >,
634    ) -> grafeo_common::types::EdgeId {
635        // Collect properties first so we can log them to WAL
636        let props: Vec<(
637            grafeo_common::types::PropertyKey,
638            grafeo_common::types::Value,
639        )> = properties
640            .into_iter()
641            .map(|(k, v)| (k.into(), v.into()))
642            .collect();
643
644        let id = self.lpg_store().create_edge_with_props(
645            src,
646            dst,
647            edge_type,
648            props.iter().map(|(k, v)| (k.clone(), v.clone())),
649        );
650
651        // Build CDC snapshot before WAL consumes props
652        #[cfg(feature = "cdc")]
653        let cdc_props: std::collections::HashMap<String, grafeo_common::types::Value> = props
654            .iter()
655            .map(|(k, v)| (k.to_string(), v.clone()))
656            .collect();
657
658        // Log edge creation to WAL
659        #[cfg(feature = "wal")]
660        {
661            if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
662                id,
663                src,
664                dst,
665                edge_type: edge_type.to_string(),
666            }) {
667                grafeo_warn!("Failed to log CreateEdge to WAL: {}", e);
668            }
669
670            // Log each property to WAL for full durability
671            for (key, value) in props {
672                if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
673                    id,
674                    key: key.to_string(),
675                    value,
676                }) {
677                    grafeo_warn!("Failed to log SetEdgeProperty to WAL: {}", e);
678                }
679            }
680        }
681
682        #[cfg(feature = "cdc")]
683        self.cdc_log.record_create_edge(
684            id,
685            self.lpg_store().current_epoch(),
686            if cdc_props.is_empty() {
687                None
688            } else {
689                Some(cdc_props)
690            },
691            src.as_u64(),
692            dst.as_u64(),
693            edge_type.to_string(),
694        );
695
696        id
697    }
698
699    /// Gets an edge by ID.
700    #[must_use]
701    pub fn get_edge(
702        &self,
703        id: grafeo_common::types::EdgeId,
704    ) -> Option<grafeo_core::graph::lpg::Edge> {
705        self.lpg_store().get_edge(id)
706    }
707
708    /// Deletes an edge.
709    ///
710    /// If WAL is enabled, the operation is logged for durability.
711    pub fn delete_edge(&self, id: grafeo_common::types::EdgeId) -> bool {
712        // Capture properties for CDC before deletion
713        #[cfg(feature = "cdc")]
714        let cdc_props = self.lpg_store().get_edge(id).map(|edge| {
715            edge.properties
716                .iter()
717                .map(|(k, v)| (k.to_string(), v.clone()))
718                .collect::<std::collections::HashMap<String, grafeo_common::types::Value>>()
719        });
720
721        let result = self.lpg_store().delete_edge(id);
722
723        #[cfg(feature = "wal")]
724        if result && let Err(e) = self.log_wal(&WalRecord::DeleteEdge { id }) {
725            grafeo_warn!("Failed to log DeleteEdge to WAL: {}", e);
726        }
727
728        #[cfg(feature = "cdc")]
729        if result {
730            self.cdc_log.record_delete(
731                crate::cdc::EntityId::Edge(id),
732                self.lpg_store().current_epoch(),
733                cdc_props,
734            );
735        }
736
737        result
738    }
739
740    /// Sets a property on an edge.
741    ///
742    /// If WAL is enabled, the operation is logged for durability.
743    pub fn set_edge_property(
744        &self,
745        id: grafeo_common::types::EdgeId,
746        key: &str,
747        value: grafeo_common::types::Value,
748    ) {
749        // Log to WAL first
750        #[cfg(feature = "wal")]
751        if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
752            id,
753            key: key.to_string(),
754            value: value.clone(),
755        }) {
756            grafeo_warn!("Failed to log SetEdgeProperty to WAL: {}", e);
757        }
758
759        // Capture old value for CDC before the store write
760        #[cfg(feature = "cdc")]
761        let cdc_old_value = self
762            .lpg_store()
763            .get_edge_property(id, &grafeo_common::types::PropertyKey::new(key));
764        #[cfg(feature = "cdc")]
765        let cdc_new_value = value.clone();
766
767        self.lpg_store().set_edge_property(id, key, value);
768
769        #[cfg(feature = "cdc")]
770        self.cdc_log.record_update(
771            crate::cdc::EntityId::Edge(id),
772            self.lpg_store().current_epoch(),
773            key,
774            cdc_old_value,
775            cdc_new_value,
776        );
777    }
778
779    /// Removes a property from a node.
780    ///
781    /// Returns true if the property existed and was removed, false otherwise.
782    pub fn remove_node_property(&self, id: grafeo_common::types::NodeId, key: &str) -> bool {
783        let removed = self.lpg_store().remove_node_property(id, key).is_some();
784
785        #[cfg(feature = "wal")]
786        if removed
787            && let Err(e) = self.log_wal(&WalRecord::RemoveNodeProperty {
788                id,
789                key: key.to_string(),
790            })
791        {
792            grafeo_warn!("WAL log for RemoveNodeProperty failed: {e}");
793        }
794
795        // Remove from matching text indexes
796        #[cfg(feature = "text-index")]
797        if removed && let Some(node) = self.lpg_store().get_node(id) {
798            for label in &node.labels {
799                if let Some(index) = self.lpg_store().get_text_index(label.as_str(), key) {
800                    index.write().remove(id);
801                }
802            }
803        }
804
805        removed
806    }
807
808    /// Removes a property from an edge.
809    ///
810    /// Returns true if the property existed and was removed, false otherwise.
811    pub fn remove_edge_property(&self, id: grafeo_common::types::EdgeId, key: &str) -> bool {
812        let removed = self.lpg_store().remove_edge_property(id, key).is_some();
813
814        #[cfg(feature = "wal")]
815        if removed
816            && let Err(e) = self.log_wal(&WalRecord::RemoveEdgeProperty {
817                id,
818                key: key.to_string(),
819            })
820        {
821            grafeo_warn!("WAL log for RemoveEdgeProperty failed: {e}");
822        }
823
824        removed
825    }
826
827    /// Creates multiple nodes in bulk, each with a single vector property.
828    ///
829    /// Much faster than individual `create_node_with_props` calls because it
830    /// acquires internal locks once and loops in Rust rather than crossing
831    /// the FFI boundary per vector.
832    ///
833    /// **Atomicity note:** Individual node creations within the batch are NOT
834    /// atomic as a group. If a failure occurs mid-batch (e.g. WAL write error),
835    /// nodes created before the failure will persist while later nodes may not.
836    /// If you need all-or-nothing semantics, wrap the call in an explicit
837    /// transaction.
838    ///
839    /// # Arguments
840    ///
841    /// * `label` - Label applied to all created nodes
842    /// * `property` - Property name for the vector data
843    /// * `vectors` - Vector data for each node
844    ///
845    /// # Returns
846    ///
847    /// Vector of created `NodeId`s in the same order as the input vectors.
848    pub fn batch_create_nodes(
849        &self,
850        label: &str,
851        property: &str,
852        vectors: Vec<Vec<f32>>,
853    ) -> Vec<grafeo_common::types::NodeId> {
854        use grafeo_common::types::{PropertyKey, Value};
855
856        let prop_key = PropertyKey::new(property);
857        let labels: &[&str] = &[label];
858
859        let ids: Vec<grafeo_common::types::NodeId> = vectors
860            .into_iter()
861            .map(|vec| {
862                let value = Value::Vector(vec.into());
863                let id = self.lpg_store().create_node_with_props(
864                    labels,
865                    std::iter::once((prop_key.clone(), value.clone())),
866                );
867
868                // Log to WAL
869                #[cfg(feature = "wal")]
870                {
871                    if let Err(e) = self.log_wal(&WalRecord::CreateNode {
872                        id,
873                        labels: labels.iter().map(|s| (*s).to_string()).collect(),
874                    }) {
875                        grafeo_warn!("Failed to log CreateNode to WAL: {}", e);
876                    }
877                    if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
878                        id,
879                        key: property.to_string(),
880                        value,
881                    }) {
882                        grafeo_warn!("Failed to log SetNodeProperty to WAL: {}", e);
883                    }
884                }
885
886                id
887            })
888            .collect();
889
890        // Auto-insert into matching vector index if one exists
891        #[cfg(feature = "vector-index")]
892        if let Some(index) = self.lpg_store().get_vector_index(label, property) {
893            let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(
894                &**self.lpg_store(),
895                property,
896            );
897            for &id in &ids {
898                if let Some(node) = self.lpg_store().get_node(id) {
899                    let pk = grafeo_common::types::PropertyKey::new(property);
900                    if let Some(grafeo_common::types::Value::Vector(v)) = node.properties.get(&pk)
901                        && std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
902                            index.insert(id, v, &accessor);
903                        }))
904                        .is_err()
905                    {
906                        grafeo_warn!("Vector index insert panicked for node {}", id.as_u64());
907                    }
908                }
909            }
910        }
911
912        ids
913    }
914
915    /// Batch-creates nodes with full property maps.
916    ///
917    /// Each entry in `properties_list` is a complete property map for one node.
918    /// Vector values (`Value::Vector`) are automatically inserted into matching
919    /// vector indexes. Text values are automatically inserted into matching text
920    /// indexes.
921    ///
922    /// **Atomicity note:** Individual node creations within the batch are NOT
923    /// atomic as a group. If a failure occurs mid-batch (e.g. WAL write error),
924    /// nodes created before the failure will persist while later nodes may not.
925    /// If you need all-or-nothing semantics, wrap the call in an explicit
926    /// transaction.
927    ///
928    /// # Arguments
929    ///
930    /// * `label` - Label for all created nodes.
931    /// * `properties_list` - One property map per node to create.
932    ///
933    /// # Returns
934    ///
935    /// Vector of created `NodeId`s in the same order as the input.
936    pub fn batch_create_nodes_with_props(
937        &self,
938        label: &str,
939        properties_list: Vec<
940            std::collections::HashMap<
941                grafeo_common::types::PropertyKey,
942                grafeo_common::types::Value,
943            >,
944        >,
945    ) -> Vec<grafeo_common::types::NodeId> {
946        use grafeo_common::types::Value;
947
948        let labels: &[&str] = &[label];
949
950        let ids: Vec<grafeo_common::types::NodeId> = properties_list
951            .into_iter()
952            .map(|props| {
953                let id = self.lpg_store().create_node_with_props(
954                    labels,
955                    props.iter().map(|(k, v)| (k.clone(), v.clone())),
956                );
957
958                // Build CDC snapshot before WAL consumes props
959                #[cfg(feature = "cdc")]
960                let cdc_props: std::collections::HashMap<
961                    String,
962                    grafeo_common::types::Value,
963                > = props
964                    .iter()
965                    .map(|(k, v)| (k.to_string(), v.clone()))
966                    .collect();
967
968                // Log to WAL
969                #[cfg(feature = "wal")]
970                {
971                    if let Err(e) = self.log_wal(&WalRecord::CreateNode {
972                        id,
973                        labels: labels.iter().map(|s| (*s).to_string()).collect(),
974                    }) {
975                        grafeo_warn!("Failed to log CreateNode to WAL: {}", e);
976                    }
977                    for (key, value) in props {
978                        if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
979                            id,
980                            key: key.to_string(),
981                            value,
982                        }) {
983                            grafeo_warn!("Failed to log SetNodeProperty to WAL: {}", e);
984                        }
985                    }
986                }
987
988                #[cfg(feature = "cdc")]
989                self.cdc_log.record_create_node(
990                    id,
991                    self.lpg_store().current_epoch(),
992                    if cdc_props.is_empty() {
993                        None
994                    } else {
995                        Some(cdc_props)
996                    },
997                    Some(labels.iter().map(|s| (*s).to_string()).collect()),
998                );
999
1000                id
1001            })
1002            .collect();
1003
1004        // Auto-insert into matching vector indexes for any vector properties
1005        #[cfg(feature = "vector-index")]
1006        {
1007            for (key, index) in self.lpg_store().vector_index_entries() {
1008                // key is "label:property"
1009                if !key.starts_with(label) || !key[label.len()..].starts_with(':') {
1010                    continue;
1011                }
1012                let property = &key[label.len() + 1..];
1013                let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(
1014                    &**self.lpg_store(),
1015                    property,
1016                );
1017                let pk = grafeo_common::types::PropertyKey::new(property);
1018                for &id in &ids {
1019                    if let Some(node) = self.lpg_store().get_node(id) {
1020                        match node.properties.get(&pk) {
1021                            Some(Value::Vector(v)) => {
1022                                if std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
1023                                    index.insert(id, v, &accessor);
1024                                }))
1025                                .is_err()
1026                                {
1027                                    grafeo_warn!(
1028                                        "Vector index insert panicked for node {}",
1029                                        id.as_u64()
1030                                    );
1031                                }
1032                            }
1033                            Some(_other) => {
1034                                grafeo_warn!(
1035                                    "Node {} property '{}' expected Vector, skipping vector index insert",
1036                                    id.as_u64(),
1037                                    property
1038                                );
1039                            }
1040                            None => {} // No property, nothing to index
1041                        }
1042                    }
1043                }
1044            }
1045        }
1046
1047        // Auto-insert into matching text indexes for any string properties
1048        #[cfg(feature = "text-index")]
1049        for &id in &ids {
1050            if let Some(node) = self.lpg_store().get_node(id) {
1051                for (prop_key, prop_val) in &node.properties {
1052                    if let Value::String(text) = prop_val
1053                        && let Some(index) =
1054                            self.lpg_store().get_text_index(label, prop_key.as_ref())
1055                    {
1056                        index.write().insert(id, text);
1057                    }
1058                }
1059            }
1060        }
1061
1062        ids
1063    }
1064}