Skip to main content

grafeo_engine/database/
crud.rs

1//! Node and edge CRUD operations for GrafeoDB.
2
3#[cfg(feature = "wal")]
4use grafeo_adapters::storage::wal::WalRecord;
5use grafeo_common::grafeo_warn;
6
7impl super::GrafeoDB {
8    // === Node Operations ===
9
10    /// Creates a node with the given labels and returns its ID.
11    ///
12    /// Labels categorize nodes - think of them like tags. A node can have
13    /// multiple labels (e.g., `["Person", "Employee"]`).
14    ///
15    /// # Examples
16    ///
17    /// ```
18    /// use grafeo_engine::GrafeoDB;
19    ///
20    /// let db = GrafeoDB::new_in_memory();
21    /// let alix = db.create_node(&["Person"]);
22    /// let company = db.create_node(&["Company", "Startup"]);
23    /// ```
24    pub fn create_node(&self, labels: &[&str]) -> grafeo_common::types::NodeId {
25        let id = self.store.create_node(labels);
26
27        // Log to WAL if enabled
28        #[cfg(feature = "wal")]
29        if let Err(e) = self.log_wal(&WalRecord::CreateNode {
30            id,
31            labels: labels.iter().map(|s| (*s).to_string()).collect(),
32        }) {
33            grafeo_warn!("Failed to log CreateNode to WAL: {}", e);
34        }
35
36        #[cfg(feature = "cdc")]
37        self.cdc_log.record_create_node(
38            id,
39            self.store.current_epoch(),
40            None,
41            Some(labels.iter().map(|s| (*s).to_string()).collect()),
42        );
43
44        id
45    }
46
47    /// Creates a new node with labels and properties.
48    ///
49    /// If WAL is enabled, the operation is logged for durability.
50    pub fn create_node_with_props(
51        &self,
52        labels: &[&str],
53        properties: impl IntoIterator<
54            Item = (
55                impl Into<grafeo_common::types::PropertyKey>,
56                impl Into<grafeo_common::types::Value>,
57            ),
58        >,
59    ) -> grafeo_common::types::NodeId {
60        // Collect properties first so we can log them to WAL
61        let props: Vec<(
62            grafeo_common::types::PropertyKey,
63            grafeo_common::types::Value,
64        )> = properties
65            .into_iter()
66            .map(|(k, v)| (k.into(), v.into()))
67            .collect();
68
69        let id = self
70            .store
71            .create_node_with_props(labels, props.iter().map(|(k, v)| (k.clone(), v.clone())));
72
73        // Build CDC snapshot before WAL consumes props
74        #[cfg(feature = "cdc")]
75        let cdc_props: std::collections::HashMap<String, grafeo_common::types::Value> = props
76            .iter()
77            .map(|(k, v)| (k.to_string(), v.clone()))
78            .collect();
79
80        // Log node creation to WAL
81        #[cfg(feature = "wal")]
82        {
83            if let Err(e) = self.log_wal(&WalRecord::CreateNode {
84                id,
85                labels: labels.iter().map(|s| (*s).to_string()).collect(),
86            }) {
87                grafeo_warn!("Failed to log CreateNode to WAL: {}", e);
88            }
89
90            // Log each property to WAL for full durability
91            for (key, value) in props {
92                if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
93                    id,
94                    key: key.to_string(),
95                    value,
96                }) {
97                    grafeo_warn!("Failed to log SetNodeProperty to WAL: {}", e);
98                }
99            }
100        }
101
102        #[cfg(feature = "cdc")]
103        self.cdc_log.record_create_node(
104            id,
105            self.store.current_epoch(),
106            if cdc_props.is_empty() {
107                None
108            } else {
109                Some(cdc_props)
110            },
111            Some(labels.iter().map(|s| (*s).to_string()).collect()),
112        );
113
114        // Auto-insert into matching text indexes for the new node
115        #[cfg(feature = "text-index")]
116        if let Some(node) = self.store.get_node(id) {
117            for label in &node.labels {
118                for (prop_key, prop_val) in &node.properties {
119                    if let grafeo_common::types::Value::String(text) = prop_val
120                        && let Some(index) =
121                            self.store.get_text_index(label.as_str(), prop_key.as_ref())
122                    {
123                        index.write().insert(id, text);
124                    }
125                }
126            }
127        }
128
129        id
130    }
131
132    /// Gets a node by ID.
133    #[must_use]
134    pub fn get_node(
135        &self,
136        id: grafeo_common::types::NodeId,
137    ) -> Option<grafeo_core::graph::lpg::Node> {
138        self.store.get_node(id)
139    }
140
141    /// Gets a node as it existed at a specific epoch.
142    ///
143    /// Uses pure epoch-based visibility (not transaction-aware), so the node
144    /// is visible if and only if `created_epoch <= epoch` and it was not
145    /// deleted at or before `epoch`.
146    #[must_use]
147    pub fn get_node_at_epoch(
148        &self,
149        id: grafeo_common::types::NodeId,
150        epoch: grafeo_common::types::EpochId,
151    ) -> Option<grafeo_core::graph::lpg::Node> {
152        self.store.get_node_at_epoch(id, epoch)
153    }
154
155    /// Gets an edge as it existed at a specific epoch.
156    ///
157    /// Uses pure epoch-based visibility (not transaction-aware).
158    #[must_use]
159    pub fn get_edge_at_epoch(
160        &self,
161        id: grafeo_common::types::EdgeId,
162        epoch: grafeo_common::types::EpochId,
163    ) -> Option<grafeo_core::graph::lpg::Edge> {
164        self.store.get_edge_at_epoch(id, epoch)
165    }
166
167    /// Returns all versions of a node with their creation/deletion epochs.
168    ///
169    /// Properties and labels reflect the current state (not versioned per-epoch).
170    #[must_use]
171    pub fn get_node_history(
172        &self,
173        id: grafeo_common::types::NodeId,
174    ) -> Vec<(
175        grafeo_common::types::EpochId,
176        Option<grafeo_common::types::EpochId>,
177        grafeo_core::graph::lpg::Node,
178    )> {
179        self.store.get_node_history(id)
180    }
181
182    /// Returns all versions of an edge with their creation/deletion epochs.
183    ///
184    /// Properties reflect the current state (not versioned per-epoch).
185    #[must_use]
186    pub fn get_edge_history(
187        &self,
188        id: grafeo_common::types::EdgeId,
189    ) -> Vec<(
190        grafeo_common::types::EpochId,
191        Option<grafeo_common::types::EpochId>,
192        grafeo_core::graph::lpg::Edge,
193    )> {
194        self.store.get_edge_history(id)
195    }
196
197    /// Returns the current epoch of the database.
198    #[must_use]
199    pub fn current_epoch(&self) -> grafeo_common::types::EpochId {
200        self.store.current_epoch()
201    }
202
203    /// Deletes a node and all its edges.
204    ///
205    /// If WAL is enabled, the operation is logged for durability.
206    pub fn delete_node(&self, id: grafeo_common::types::NodeId) -> bool {
207        // Capture properties for CDC before deletion
208        #[cfg(feature = "cdc")]
209        let cdc_props = self.store.get_node(id).map(|node| {
210            node.properties
211                .iter()
212                .map(|(k, v)| (k.to_string(), v.clone()))
213                .collect::<std::collections::HashMap<String, grafeo_common::types::Value>>()
214        });
215
216        // Collect matching vector indexes BEFORE deletion removes labels
217        #[cfg(feature = "vector-index")]
218        let indexes_to_clean: Vec<std::sync::Arc<grafeo_core::index::vector::HnswIndex>> = self
219            .store
220            .get_node(id)
221            .map(|node| {
222                let mut indexes = Vec::new();
223                for label in &node.labels {
224                    let prefix = format!("{}:", label.as_str());
225                    for (key, index) in self.store.vector_index_entries() {
226                        if key.starts_with(&prefix) {
227                            indexes.push(index);
228                        }
229                    }
230                }
231                indexes
232            })
233            .unwrap_or_default();
234
235        // Collect matching text indexes BEFORE deletion removes labels
236        #[cfg(feature = "text-index")]
237        let text_indexes_to_clean: Vec<
238            std::sync::Arc<parking_lot::RwLock<grafeo_core::index::text::InvertedIndex>>,
239        > = self
240            .store
241            .get_node(id)
242            .map(|node| {
243                let mut indexes = Vec::new();
244                for label in &node.labels {
245                    let prefix = format!("{}:", label.as_str());
246                    for (key, index) in self.store.text_index_entries() {
247                        if key.starts_with(&prefix) {
248                            indexes.push(index);
249                        }
250                    }
251                }
252                indexes
253            })
254            .unwrap_or_default();
255
256        let result = self.store.delete_node(id);
257
258        // Remove from vector indexes after successful deletion
259        #[cfg(feature = "vector-index")]
260        if result {
261            for index in indexes_to_clean {
262                index.remove(id);
263            }
264        }
265
266        // Remove from text indexes after successful deletion
267        #[cfg(feature = "text-index")]
268        if result {
269            for index in text_indexes_to_clean {
270                index.write().remove(id);
271            }
272        }
273
274        #[cfg(feature = "wal")]
275        if result && let Err(e) = self.log_wal(&WalRecord::DeleteNode { id }) {
276            grafeo_warn!("Failed to log DeleteNode to WAL: {}", e);
277        }
278
279        #[cfg(feature = "cdc")]
280        if result {
281            self.cdc_log.record_delete(
282                crate::cdc::EntityId::Node(id),
283                self.store.current_epoch(),
284                cdc_props,
285            );
286        }
287
288        result
289    }
290
291    /// Sets a property on a node.
292    ///
293    /// If WAL is enabled, the operation is logged for durability.
294    pub fn set_node_property(
295        &self,
296        id: grafeo_common::types::NodeId,
297        key: &str,
298        value: grafeo_common::types::Value,
299    ) {
300        // Extract vector data before the value is moved into the store
301        #[cfg(feature = "vector-index")]
302        let vector_data = match &value {
303            grafeo_common::types::Value::Vector(v) => Some(v.clone()),
304            _ => None,
305        };
306
307        // Log to WAL first
308        #[cfg(feature = "wal")]
309        if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
310            id,
311            key: key.to_string(),
312            value: value.clone(),
313        }) {
314            grafeo_warn!("Failed to log SetNodeProperty to WAL: {}", e);
315        }
316
317        // Capture old value for CDC before the store write
318        #[cfg(feature = "cdc")]
319        let cdc_old_value = self
320            .store
321            .get_node_property(id, &grafeo_common::types::PropertyKey::new(key));
322        #[cfg(feature = "cdc")]
323        let cdc_new_value = value.clone();
324
325        self.store.set_node_property(id, key, value);
326
327        #[cfg(feature = "cdc")]
328        self.cdc_log.record_update(
329            crate::cdc::EntityId::Node(id),
330            self.store.current_epoch(),
331            key,
332            cdc_old_value,
333            cdc_new_value,
334        );
335
336        // Auto-insert into matching vector indexes
337        #[cfg(feature = "vector-index")]
338        if let Some(vec) = vector_data
339            && let Some(node) = self.store.get_node(id)
340        {
341            for label in &node.labels {
342                if let Some(index) = self.store.get_vector_index(label.as_str(), key) {
343                    let accessor =
344                        grafeo_core::index::vector::PropertyVectorAccessor::new(&*self.store, key);
345                    index.insert(id, &vec, &accessor);
346                }
347            }
348        }
349
350        // Auto-update matching text indexes
351        #[cfg(feature = "text-index")]
352        if let Some(node) = self.store.get_node(id) {
353            let text_val = node
354                .properties
355                .get(&grafeo_common::types::PropertyKey::new(key))
356                .and_then(|v| match v {
357                    grafeo_common::types::Value::String(s) => Some(s.to_string()),
358                    _ => None,
359                });
360            for label in &node.labels {
361                if let Some(index) = self.store.get_text_index(label.as_str(), key) {
362                    let mut idx = index.write();
363                    if let Some(ref text) = text_val {
364                        idx.insert(id, text);
365                    } else {
366                        idx.remove(id);
367                    }
368                }
369            }
370        }
371    }
372
373    /// Adds a label to an existing node.
374    ///
375    /// Returns `true` if the label was added, `false` if the node doesn't exist
376    /// or already has the label.
377    ///
378    /// # Examples
379    ///
380    /// ```
381    /// use grafeo_engine::GrafeoDB;
382    ///
383    /// let db = GrafeoDB::new_in_memory();
384    /// let alix = db.create_node(&["Person"]);
385    ///
386    /// // Promote Alix to Employee
387    /// let added = db.add_node_label(alix, "Employee");
388    /// assert!(added);
389    /// ```
390    pub fn add_node_label(&self, id: grafeo_common::types::NodeId, label: &str) -> bool {
391        let result = self.store.add_label(id, label);
392
393        #[cfg(feature = "wal")]
394        if result {
395            // Log to WAL if enabled
396            if let Err(e) = self.log_wal(&WalRecord::AddNodeLabel {
397                id,
398                label: label.to_string(),
399            }) {
400                grafeo_warn!("Failed to log AddNodeLabel to WAL: {}", e);
401            }
402        }
403
404        // Auto-insert into vector indexes for the newly-added label
405        #[cfg(feature = "vector-index")]
406        if result {
407            let prefix = format!("{label}:");
408            for (key, index) in self.store.vector_index_entries() {
409                if let Some(property) = key.strip_prefix(&prefix)
410                    && let Some(node) = self.store.get_node(id)
411                {
412                    let prop_key = grafeo_common::types::PropertyKey::new(property);
413                    if let Some(grafeo_common::types::Value::Vector(v)) =
414                        node.properties.get(&prop_key)
415                    {
416                        let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(
417                            &*self.store,
418                            property,
419                        );
420                        index.insert(id, v, &accessor);
421                    }
422                }
423            }
424        }
425
426        // Auto-insert into text indexes for the newly-added label
427        #[cfg(feature = "text-index")]
428        if result && let Some(node) = self.store.get_node(id) {
429            for (prop_key, prop_val) in &node.properties {
430                if let grafeo_common::types::Value::String(text) = prop_val
431                    && let Some(index) = self.store.get_text_index(label, prop_key.as_ref())
432                {
433                    index.write().insert(id, text);
434                }
435            }
436        }
437
438        result
439    }
440
441    /// Removes a label from a node.
442    ///
443    /// Returns `true` if the label was removed, `false` if the node doesn't exist
444    /// or doesn't have the label.
445    ///
446    /// # Examples
447    ///
448    /// ```
449    /// use grafeo_engine::GrafeoDB;
450    ///
451    /// let db = GrafeoDB::new_in_memory();
452    /// let alix = db.create_node(&["Person", "Employee"]);
453    ///
454    /// // Remove Employee status
455    /// let removed = db.remove_node_label(alix, "Employee");
456    /// assert!(removed);
457    /// ```
458    pub fn remove_node_label(&self, id: grafeo_common::types::NodeId, label: &str) -> bool {
459        // Collect text indexes to clean BEFORE removing the label
460        #[cfg(feature = "text-index")]
461        let text_indexes_to_clean: Vec<
462            std::sync::Arc<parking_lot::RwLock<grafeo_core::index::text::InvertedIndex>>,
463        > = {
464            let prefix = format!("{label}:");
465            self.store
466                .text_index_entries()
467                .into_iter()
468                .filter(|(key, _)| key.starts_with(&prefix))
469                .map(|(_, index)| index)
470                .collect()
471        };
472
473        let result = self.store.remove_label(id, label);
474
475        #[cfg(feature = "wal")]
476        if result {
477            // Log to WAL if enabled
478            if let Err(e) = self.log_wal(&WalRecord::RemoveNodeLabel {
479                id,
480                label: label.to_string(),
481            }) {
482                grafeo_warn!("Failed to log RemoveNodeLabel to WAL: {}", e);
483            }
484        }
485
486        // Remove from text indexes for the removed label
487        #[cfg(feature = "text-index")]
488        if result {
489            for index in text_indexes_to_clean {
490                index.write().remove(id);
491            }
492        }
493
494        result
495    }
496
497    /// Gets all labels for a node.
498    ///
499    /// Returns `None` if the node doesn't exist.
500    ///
501    /// # Examples
502    ///
503    /// ```
504    /// use grafeo_engine::GrafeoDB;
505    ///
506    /// let db = GrafeoDB::new_in_memory();
507    /// let alix = db.create_node(&["Person", "Employee"]);
508    ///
509    /// let labels = db.get_node_labels(alix).unwrap();
510    /// assert!(labels.contains(&"Person".to_string()));
511    /// assert!(labels.contains(&"Employee".to_string()));
512    /// ```
513    #[must_use]
514    pub fn get_node_labels(&self, id: grafeo_common::types::NodeId) -> Option<Vec<String>> {
515        self.store
516            .get_node(id)
517            .map(|node| node.labels.iter().map(|s| s.to_string()).collect())
518    }
519
520    // === Edge Operations ===
521
522    /// Creates an edge (relationship) between two nodes.
523    ///
524    /// Edges connect nodes and have a type that describes the relationship.
525    /// They're directed - the order of `src` and `dst` matters.
526    ///
527    /// # Examples
528    ///
529    /// ```
530    /// use grafeo_engine::GrafeoDB;
531    ///
532    /// let db = GrafeoDB::new_in_memory();
533    /// let alix = db.create_node(&["Person"]);
534    /// let gus = db.create_node(&["Person"]);
535    ///
536    /// // Alix knows Gus (directed: Alix -> Gus)
537    /// let edge = db.create_edge(alix, gus, "KNOWS");
538    /// ```
539    pub fn create_edge(
540        &self,
541        src: grafeo_common::types::NodeId,
542        dst: grafeo_common::types::NodeId,
543        edge_type: &str,
544    ) -> grafeo_common::types::EdgeId {
545        let id = self.store.create_edge(src, dst, edge_type);
546
547        // Log to WAL if enabled
548        #[cfg(feature = "wal")]
549        if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
550            id,
551            src,
552            dst,
553            edge_type: edge_type.to_string(),
554        }) {
555            grafeo_warn!("Failed to log CreateEdge to WAL: {}", e);
556        }
557
558        #[cfg(feature = "cdc")]
559        self.cdc_log.record_create_edge(
560            id,
561            self.store.current_epoch(),
562            None,
563            src.as_u64(),
564            dst.as_u64(),
565            edge_type.to_string(),
566        );
567
568        id
569    }
570
571    /// Creates a new edge with properties.
572    ///
573    /// If WAL is enabled, the operation is logged for durability.
574    pub fn create_edge_with_props(
575        &self,
576        src: grafeo_common::types::NodeId,
577        dst: grafeo_common::types::NodeId,
578        edge_type: &str,
579        properties: impl IntoIterator<
580            Item = (
581                impl Into<grafeo_common::types::PropertyKey>,
582                impl Into<grafeo_common::types::Value>,
583            ),
584        >,
585    ) -> grafeo_common::types::EdgeId {
586        // Collect properties first so we can log them to WAL
587        let props: Vec<(
588            grafeo_common::types::PropertyKey,
589            grafeo_common::types::Value,
590        )> = properties
591            .into_iter()
592            .map(|(k, v)| (k.into(), v.into()))
593            .collect();
594
595        let id = self.store.create_edge_with_props(
596            src,
597            dst,
598            edge_type,
599            props.iter().map(|(k, v)| (k.clone(), v.clone())),
600        );
601
602        // Build CDC snapshot before WAL consumes props
603        #[cfg(feature = "cdc")]
604        let cdc_props: std::collections::HashMap<String, grafeo_common::types::Value> = props
605            .iter()
606            .map(|(k, v)| (k.to_string(), v.clone()))
607            .collect();
608
609        // Log edge creation to WAL
610        #[cfg(feature = "wal")]
611        {
612            if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
613                id,
614                src,
615                dst,
616                edge_type: edge_type.to_string(),
617            }) {
618                grafeo_warn!("Failed to log CreateEdge to WAL: {}", e);
619            }
620
621            // Log each property to WAL for full durability
622            for (key, value) in props {
623                if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
624                    id,
625                    key: key.to_string(),
626                    value,
627                }) {
628                    grafeo_warn!("Failed to log SetEdgeProperty to WAL: {}", e);
629                }
630            }
631        }
632
633        #[cfg(feature = "cdc")]
634        self.cdc_log.record_create_edge(
635            id,
636            self.store.current_epoch(),
637            if cdc_props.is_empty() {
638                None
639            } else {
640                Some(cdc_props)
641            },
642            src.as_u64(),
643            dst.as_u64(),
644            edge_type.to_string(),
645        );
646
647        id
648    }
649
650    /// Gets an edge by ID.
651    #[must_use]
652    pub fn get_edge(
653        &self,
654        id: grafeo_common::types::EdgeId,
655    ) -> Option<grafeo_core::graph::lpg::Edge> {
656        self.store.get_edge(id)
657    }
658
659    /// Deletes an edge.
660    ///
661    /// If WAL is enabled, the operation is logged for durability.
662    pub fn delete_edge(&self, id: grafeo_common::types::EdgeId) -> bool {
663        // Capture properties for CDC before deletion
664        #[cfg(feature = "cdc")]
665        let cdc_props = self.store.get_edge(id).map(|edge| {
666            edge.properties
667                .iter()
668                .map(|(k, v)| (k.to_string(), v.clone()))
669                .collect::<std::collections::HashMap<String, grafeo_common::types::Value>>()
670        });
671
672        let result = self.store.delete_edge(id);
673
674        #[cfg(feature = "wal")]
675        if result && let Err(e) = self.log_wal(&WalRecord::DeleteEdge { id }) {
676            grafeo_warn!("Failed to log DeleteEdge to WAL: {}", e);
677        }
678
679        #[cfg(feature = "cdc")]
680        if result {
681            self.cdc_log.record_delete(
682                crate::cdc::EntityId::Edge(id),
683                self.store.current_epoch(),
684                cdc_props,
685            );
686        }
687
688        result
689    }
690
691    /// Sets a property on an edge.
692    ///
693    /// If WAL is enabled, the operation is logged for durability.
694    pub fn set_edge_property(
695        &self,
696        id: grafeo_common::types::EdgeId,
697        key: &str,
698        value: grafeo_common::types::Value,
699    ) {
700        // Log to WAL first
701        #[cfg(feature = "wal")]
702        if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
703            id,
704            key: key.to_string(),
705            value: value.clone(),
706        }) {
707            grafeo_warn!("Failed to log SetEdgeProperty to WAL: {}", e);
708        }
709
710        // Capture old value for CDC before the store write
711        #[cfg(feature = "cdc")]
712        let cdc_old_value = self
713            .store
714            .get_edge_property(id, &grafeo_common::types::PropertyKey::new(key));
715        #[cfg(feature = "cdc")]
716        let cdc_new_value = value.clone();
717
718        self.store.set_edge_property(id, key, value);
719
720        #[cfg(feature = "cdc")]
721        self.cdc_log.record_update(
722            crate::cdc::EntityId::Edge(id),
723            self.store.current_epoch(),
724            key,
725            cdc_old_value,
726            cdc_new_value,
727        );
728    }
729
730    /// Removes a property from a node.
731    ///
732    /// Returns true if the property existed and was removed, false otherwise.
733    pub fn remove_node_property(&self, id: grafeo_common::types::NodeId, key: &str) -> bool {
734        let removed = self.store.remove_node_property(id, key).is_some();
735
736        #[cfg(feature = "wal")]
737        if removed
738            && let Err(e) = self.log_wal(&WalRecord::RemoveNodeProperty {
739                id,
740                key: key.to_string(),
741            })
742        {
743            grafeo_warn!("WAL log for RemoveNodeProperty failed: {e}");
744        }
745
746        // Remove from matching text indexes
747        #[cfg(feature = "text-index")]
748        if removed && let Some(node) = self.store.get_node(id) {
749            for label in &node.labels {
750                if let Some(index) = self.store.get_text_index(label.as_str(), key) {
751                    index.write().remove(id);
752                }
753            }
754        }
755
756        removed
757    }
758
759    /// Removes a property from an edge.
760    ///
761    /// Returns true if the property existed and was removed, false otherwise.
762    pub fn remove_edge_property(&self, id: grafeo_common::types::EdgeId, key: &str) -> bool {
763        let removed = self.store.remove_edge_property(id, key).is_some();
764
765        #[cfg(feature = "wal")]
766        if removed
767            && let Err(e) = self.log_wal(&WalRecord::RemoveEdgeProperty {
768                id,
769                key: key.to_string(),
770            })
771        {
772            grafeo_warn!("WAL log for RemoveEdgeProperty failed: {e}");
773        }
774
775        removed
776    }
777
778    /// Creates multiple nodes in bulk, each with a single vector property.
779    ///
780    /// Much faster than individual `create_node_with_props` calls because it
781    /// acquires internal locks once and loops in Rust rather than crossing
782    /// the FFI boundary per vector.
783    ///
784    /// # Arguments
785    ///
786    /// * `label` - Label applied to all created nodes
787    /// * `property` - Property name for the vector data
788    /// * `vectors` - Vector data for each node
789    ///
790    /// # Returns
791    ///
792    /// Vector of created `NodeId`s in the same order as the input vectors.
793    pub fn batch_create_nodes(
794        &self,
795        label: &str,
796        property: &str,
797        vectors: Vec<Vec<f32>>,
798    ) -> Vec<grafeo_common::types::NodeId> {
799        use grafeo_common::types::{PropertyKey, Value};
800
801        let prop_key = PropertyKey::new(property);
802        let labels: &[&str] = &[label];
803
804        let ids: Vec<grafeo_common::types::NodeId> = vectors
805            .into_iter()
806            .map(|vec| {
807                let value = Value::Vector(vec.into());
808                let id = self.store.create_node_with_props(
809                    labels,
810                    std::iter::once((prop_key.clone(), value.clone())),
811                );
812
813                // Log to WAL
814                #[cfg(feature = "wal")]
815                {
816                    if let Err(e) = self.log_wal(&WalRecord::CreateNode {
817                        id,
818                        labels: labels.iter().map(|s| (*s).to_string()).collect(),
819                    }) {
820                        grafeo_warn!("Failed to log CreateNode to WAL: {}", e);
821                    }
822                    if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
823                        id,
824                        key: property.to_string(),
825                        value,
826                    }) {
827                        grafeo_warn!("Failed to log SetNodeProperty to WAL: {}", e);
828                    }
829                }
830
831                id
832            })
833            .collect();
834
835        // Auto-insert into matching vector index if one exists
836        #[cfg(feature = "vector-index")]
837        if let Some(index) = self.store.get_vector_index(label, property) {
838            let accessor =
839                grafeo_core::index::vector::PropertyVectorAccessor::new(&*self.store, property);
840            for &id in &ids {
841                if let Some(node) = self.store.get_node(id) {
842                    let pk = grafeo_common::types::PropertyKey::new(property);
843                    if let Some(grafeo_common::types::Value::Vector(v)) = node.properties.get(&pk) {
844                        index.insert(id, v, &accessor);
845                    }
846                }
847            }
848        }
849
850        ids
851    }
852}