Skip to main content

grafeo_engine/database/
crud.rs

1//! Node and edge CRUD operations for GrafeoDB.
2
3#[cfg(feature = "wal")]
4use grafeo_adapters::storage::wal::WalRecord;
5
6impl super::GrafeoDB {
7    // === Node Operations ===
8
9    /// Creates a node with the given labels and returns its ID.
10    ///
11    /// Labels categorize nodes - think of them like tags. A node can have
12    /// multiple labels (e.g., `["Person", "Employee"]`).
13    ///
14    /// # Examples
15    ///
16    /// ```
17    /// use grafeo_engine::GrafeoDB;
18    ///
19    /// let db = GrafeoDB::new_in_memory();
20    /// let alice = db.create_node(&["Person"]);
21    /// let company = db.create_node(&["Company", "Startup"]);
22    /// ```
23    pub fn create_node(&self, labels: &[&str]) -> grafeo_common::types::NodeId {
24        let id = self.store.create_node(labels);
25
26        // Log to WAL if enabled
27        #[cfg(feature = "wal")]
28        if let Err(e) = self.log_wal(&WalRecord::CreateNode {
29            id,
30            labels: labels.iter().map(|s| (*s).to_string()).collect(),
31        }) {
32            tracing::warn!("Failed to log CreateNode to WAL: {}", e);
33        }
34
35        #[cfg(feature = "cdc")]
36        self.cdc_log
37            .record_create_node(id, self.store.current_epoch(), None);
38
39        id
40    }
41
42    /// Creates a new node with labels and properties.
43    ///
44    /// If WAL is enabled, the operation is logged for durability.
45    pub fn create_node_with_props(
46        &self,
47        labels: &[&str],
48        properties: impl IntoIterator<
49            Item = (
50                impl Into<grafeo_common::types::PropertyKey>,
51                impl Into<grafeo_common::types::Value>,
52            ),
53        >,
54    ) -> grafeo_common::types::NodeId {
55        // Collect properties first so we can log them to WAL
56        let props: Vec<(
57            grafeo_common::types::PropertyKey,
58            grafeo_common::types::Value,
59        )> = properties
60            .into_iter()
61            .map(|(k, v)| (k.into(), v.into()))
62            .collect();
63
64        let id = self
65            .store
66            .create_node_with_props(labels, props.iter().map(|(k, v)| (k.clone(), v.clone())));
67
68        // Build CDC snapshot before WAL consumes props
69        #[cfg(feature = "cdc")]
70        let cdc_props: std::collections::HashMap<String, grafeo_common::types::Value> = props
71            .iter()
72            .map(|(k, v)| (k.to_string(), v.clone()))
73            .collect();
74
75        // Log node creation to WAL
76        #[cfg(feature = "wal")]
77        {
78            if let Err(e) = self.log_wal(&WalRecord::CreateNode {
79                id,
80                labels: labels.iter().map(|s| (*s).to_string()).collect(),
81            }) {
82                tracing::warn!("Failed to log CreateNode to WAL: {}", e);
83            }
84
85            // Log each property to WAL for full durability
86            for (key, value) in props {
87                if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
88                    id,
89                    key: key.to_string(),
90                    value,
91                }) {
92                    tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
93                }
94            }
95        }
96
97        #[cfg(feature = "cdc")]
98        self.cdc_log.record_create_node(
99            id,
100            self.store.current_epoch(),
101            if cdc_props.is_empty() {
102                None
103            } else {
104                Some(cdc_props)
105            },
106        );
107
108        // Auto-insert into matching text indexes for the new node
109        #[cfg(feature = "text-index")]
110        if let Some(node) = self.store.get_node(id) {
111            for label in &node.labels {
112                for (prop_key, prop_val) in &node.properties {
113                    if let grafeo_common::types::Value::String(text) = prop_val
114                        && let Some(index) =
115                            self.store.get_text_index(label.as_str(), prop_key.as_ref())
116                    {
117                        index.write().insert(id, text);
118                    }
119                }
120            }
121        }
122
123        id
124    }
125
126    /// Gets a node by ID.
127    #[must_use]
128    pub fn get_node(
129        &self,
130        id: grafeo_common::types::NodeId,
131    ) -> Option<grafeo_core::graph::lpg::Node> {
132        self.store.get_node(id)
133    }
134
135    /// Gets a node as it existed at a specific epoch.
136    ///
137    /// Uses pure epoch-based visibility (not transaction-aware), so the node
138    /// is visible if and only if `created_epoch <= epoch` and it was not
139    /// deleted at or before `epoch`.
140    #[must_use]
141    pub fn get_node_at_epoch(
142        &self,
143        id: grafeo_common::types::NodeId,
144        epoch: grafeo_common::types::EpochId,
145    ) -> Option<grafeo_core::graph::lpg::Node> {
146        self.store.get_node_at_epoch(id, epoch)
147    }
148
149    /// Gets an edge as it existed at a specific epoch.
150    ///
151    /// Uses pure epoch-based visibility (not transaction-aware).
152    #[must_use]
153    pub fn get_edge_at_epoch(
154        &self,
155        id: grafeo_common::types::EdgeId,
156        epoch: grafeo_common::types::EpochId,
157    ) -> Option<grafeo_core::graph::lpg::Edge> {
158        self.store.get_edge_at_epoch(id, epoch)
159    }
160
161    /// Returns all versions of a node with their creation/deletion epochs.
162    ///
163    /// Properties and labels reflect the current state (not versioned per-epoch).
164    #[must_use]
165    pub fn get_node_history(
166        &self,
167        id: grafeo_common::types::NodeId,
168    ) -> Vec<(
169        grafeo_common::types::EpochId,
170        Option<grafeo_common::types::EpochId>,
171        grafeo_core::graph::lpg::Node,
172    )> {
173        self.store.get_node_history(id)
174    }
175
176    /// Returns all versions of an edge with their creation/deletion epochs.
177    ///
178    /// Properties reflect the current state (not versioned per-epoch).
179    #[must_use]
180    pub fn get_edge_history(
181        &self,
182        id: grafeo_common::types::EdgeId,
183    ) -> Vec<(
184        grafeo_common::types::EpochId,
185        Option<grafeo_common::types::EpochId>,
186        grafeo_core::graph::lpg::Edge,
187    )> {
188        self.store.get_edge_history(id)
189    }
190
191    /// Returns the current epoch of the database.
192    #[must_use]
193    pub fn current_epoch(&self) -> grafeo_common::types::EpochId {
194        self.store.current_epoch()
195    }
196
197    /// Deletes a node and all its edges.
198    ///
199    /// If WAL is enabled, the operation is logged for durability.
200    pub fn delete_node(&self, id: grafeo_common::types::NodeId) -> bool {
201        // Capture properties for CDC before deletion
202        #[cfg(feature = "cdc")]
203        let cdc_props = self.store.get_node(id).map(|node| {
204            node.properties
205                .iter()
206                .map(|(k, v)| (k.to_string(), v.clone()))
207                .collect::<std::collections::HashMap<String, grafeo_common::types::Value>>()
208        });
209
210        // Collect matching vector indexes BEFORE deletion removes labels
211        #[cfg(feature = "vector-index")]
212        let indexes_to_clean: Vec<std::sync::Arc<grafeo_core::index::vector::HnswIndex>> = self
213            .store
214            .get_node(id)
215            .map(|node| {
216                let mut indexes = Vec::new();
217                for label in &node.labels {
218                    let prefix = format!("{}:", label.as_str());
219                    for (key, index) in self.store.vector_index_entries() {
220                        if key.starts_with(&prefix) {
221                            indexes.push(index);
222                        }
223                    }
224                }
225                indexes
226            })
227            .unwrap_or_default();
228
229        // Collect matching text indexes BEFORE deletion removes labels
230        #[cfg(feature = "text-index")]
231        let text_indexes_to_clean: Vec<
232            std::sync::Arc<parking_lot::RwLock<grafeo_core::index::text::InvertedIndex>>,
233        > = self
234            .store
235            .get_node(id)
236            .map(|node| {
237                let mut indexes = Vec::new();
238                for label in &node.labels {
239                    let prefix = format!("{}:", label.as_str());
240                    for (key, index) in self.store.text_index_entries() {
241                        if key.starts_with(&prefix) {
242                            indexes.push(index);
243                        }
244                    }
245                }
246                indexes
247            })
248            .unwrap_or_default();
249
250        let result = self.store.delete_node(id);
251
252        // Remove from vector indexes after successful deletion
253        #[cfg(feature = "vector-index")]
254        if result {
255            for index in indexes_to_clean {
256                index.remove(id);
257            }
258        }
259
260        // Remove from text indexes after successful deletion
261        #[cfg(feature = "text-index")]
262        if result {
263            for index in text_indexes_to_clean {
264                index.write().remove(id);
265            }
266        }
267
268        #[cfg(feature = "wal")]
269        if result && let Err(e) = self.log_wal(&WalRecord::DeleteNode { id }) {
270            tracing::warn!("Failed to log DeleteNode to WAL: {}", e);
271        }
272
273        #[cfg(feature = "cdc")]
274        if result {
275            self.cdc_log.record_delete(
276                crate::cdc::EntityId::Node(id),
277                self.store.current_epoch(),
278                cdc_props,
279            );
280        }
281
282        result
283    }
284
285    /// Sets a property on a node.
286    ///
287    /// If WAL is enabled, the operation is logged for durability.
288    pub fn set_node_property(
289        &self,
290        id: grafeo_common::types::NodeId,
291        key: &str,
292        value: grafeo_common::types::Value,
293    ) {
294        // Extract vector data before the value is moved into the store
295        #[cfg(feature = "vector-index")]
296        let vector_data = match &value {
297            grafeo_common::types::Value::Vector(v) => Some(v.clone()),
298            _ => None,
299        };
300
301        // Log to WAL first
302        #[cfg(feature = "wal")]
303        if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
304            id,
305            key: key.to_string(),
306            value: value.clone(),
307        }) {
308            tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
309        }
310
311        // Capture old value for CDC before the store write
312        #[cfg(feature = "cdc")]
313        let cdc_old_value = self
314            .store
315            .get_node_property(id, &grafeo_common::types::PropertyKey::new(key));
316        #[cfg(feature = "cdc")]
317        let cdc_new_value = value.clone();
318
319        self.store.set_node_property(id, key, value);
320
321        #[cfg(feature = "cdc")]
322        self.cdc_log.record_update(
323            crate::cdc::EntityId::Node(id),
324            self.store.current_epoch(),
325            key,
326            cdc_old_value,
327            cdc_new_value,
328        );
329
330        // Auto-insert into matching vector indexes
331        #[cfg(feature = "vector-index")]
332        if let Some(vec) = vector_data
333            && let Some(node) = self.store.get_node(id)
334        {
335            for label in &node.labels {
336                if let Some(index) = self.store.get_vector_index(label.as_str(), key) {
337                    let accessor =
338                        grafeo_core::index::vector::PropertyVectorAccessor::new(&*self.store, key);
339                    index.insert(id, &vec, &accessor);
340                }
341            }
342        }
343
344        // Auto-update matching text indexes
345        #[cfg(feature = "text-index")]
346        if let Some(node) = self.store.get_node(id) {
347            let text_val = node
348                .properties
349                .get(&grafeo_common::types::PropertyKey::new(key))
350                .and_then(|v| match v {
351                    grafeo_common::types::Value::String(s) => Some(s.to_string()),
352                    _ => None,
353                });
354            for label in &node.labels {
355                if let Some(index) = self.store.get_text_index(label.as_str(), key) {
356                    let mut idx = index.write();
357                    if let Some(ref text) = text_val {
358                        idx.insert(id, text);
359                    } else {
360                        idx.remove(id);
361                    }
362                }
363            }
364        }
365    }
366
367    /// Adds a label to an existing node.
368    ///
369    /// Returns `true` if the label was added, `false` if the node doesn't exist
370    /// or already has the label.
371    ///
372    /// # Examples
373    ///
374    /// ```
375    /// use grafeo_engine::GrafeoDB;
376    ///
377    /// let db = GrafeoDB::new_in_memory();
378    /// let alice = db.create_node(&["Person"]);
379    ///
380    /// // Promote Alice to Employee
381    /// let added = db.add_node_label(alice, "Employee");
382    /// assert!(added);
383    /// ```
384    pub fn add_node_label(&self, id: grafeo_common::types::NodeId, label: &str) -> bool {
385        let result = self.store.add_label(id, label);
386
387        #[cfg(feature = "wal")]
388        if result {
389            // Log to WAL if enabled
390            if let Err(e) = self.log_wal(&WalRecord::AddNodeLabel {
391                id,
392                label: label.to_string(),
393            }) {
394                tracing::warn!("Failed to log AddNodeLabel to WAL: {}", e);
395            }
396        }
397
398        // Auto-insert into vector indexes for the newly-added label
399        #[cfg(feature = "vector-index")]
400        if result {
401            let prefix = format!("{label}:");
402            for (key, index) in self.store.vector_index_entries() {
403                if let Some(property) = key.strip_prefix(&prefix)
404                    && let Some(node) = self.store.get_node(id)
405                {
406                    let prop_key = grafeo_common::types::PropertyKey::new(property);
407                    if let Some(grafeo_common::types::Value::Vector(v)) =
408                        node.properties.get(&prop_key)
409                    {
410                        let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(
411                            &*self.store,
412                            property,
413                        );
414                        index.insert(id, v, &accessor);
415                    }
416                }
417            }
418        }
419
420        // Auto-insert into text indexes for the newly-added label
421        #[cfg(feature = "text-index")]
422        if result && let Some(node) = self.store.get_node(id) {
423            for (prop_key, prop_val) in &node.properties {
424                if let grafeo_common::types::Value::String(text) = prop_val
425                    && let Some(index) = self.store.get_text_index(label, prop_key.as_ref())
426                {
427                    index.write().insert(id, text);
428                }
429            }
430        }
431
432        result
433    }
434
435    /// Removes a label from a node.
436    ///
437    /// Returns `true` if the label was removed, `false` if the node doesn't exist
438    /// or doesn't have the label.
439    ///
440    /// # Examples
441    ///
442    /// ```
443    /// use grafeo_engine::GrafeoDB;
444    ///
445    /// let db = GrafeoDB::new_in_memory();
446    /// let alice = db.create_node(&["Person", "Employee"]);
447    ///
448    /// // Remove Employee status
449    /// let removed = db.remove_node_label(alice, "Employee");
450    /// assert!(removed);
451    /// ```
452    pub fn remove_node_label(&self, id: grafeo_common::types::NodeId, label: &str) -> bool {
453        // Collect text indexes to clean BEFORE removing the label
454        #[cfg(feature = "text-index")]
455        let text_indexes_to_clean: Vec<
456            std::sync::Arc<parking_lot::RwLock<grafeo_core::index::text::InvertedIndex>>,
457        > = {
458            let prefix = format!("{label}:");
459            self.store
460                .text_index_entries()
461                .into_iter()
462                .filter(|(key, _)| key.starts_with(&prefix))
463                .map(|(_, index)| index)
464                .collect()
465        };
466
467        let result = self.store.remove_label(id, label);
468
469        #[cfg(feature = "wal")]
470        if result {
471            // Log to WAL if enabled
472            if let Err(e) = self.log_wal(&WalRecord::RemoveNodeLabel {
473                id,
474                label: label.to_string(),
475            }) {
476                tracing::warn!("Failed to log RemoveNodeLabel to WAL: {}", e);
477            }
478        }
479
480        // Remove from text indexes for the removed label
481        #[cfg(feature = "text-index")]
482        if result {
483            for index in text_indexes_to_clean {
484                index.write().remove(id);
485            }
486        }
487
488        result
489    }
490
491    /// Gets all labels for a node.
492    ///
493    /// Returns `None` if the node doesn't exist.
494    ///
495    /// # Examples
496    ///
497    /// ```
498    /// use grafeo_engine::GrafeoDB;
499    ///
500    /// let db = GrafeoDB::new_in_memory();
501    /// let alice = db.create_node(&["Person", "Employee"]);
502    ///
503    /// let labels = db.get_node_labels(alice).unwrap();
504    /// assert!(labels.contains(&"Person".to_string()));
505    /// assert!(labels.contains(&"Employee".to_string()));
506    /// ```
507    #[must_use]
508    pub fn get_node_labels(&self, id: grafeo_common::types::NodeId) -> Option<Vec<String>> {
509        self.store
510            .get_node(id)
511            .map(|node| node.labels.iter().map(|s| s.to_string()).collect())
512    }
513
514    // === Edge Operations ===
515
516    /// Creates an edge (relationship) between two nodes.
517    ///
518    /// Edges connect nodes and have a type that describes the relationship.
519    /// They're directed - the order of `src` and `dst` matters.
520    ///
521    /// # Examples
522    ///
523    /// ```
524    /// use grafeo_engine::GrafeoDB;
525    ///
526    /// let db = GrafeoDB::new_in_memory();
527    /// let alice = db.create_node(&["Person"]);
528    /// let bob = db.create_node(&["Person"]);
529    ///
530    /// // Alice knows Bob (directed: Alice -> Bob)
531    /// let edge = db.create_edge(alice, bob, "KNOWS");
532    /// ```
533    pub fn create_edge(
534        &self,
535        src: grafeo_common::types::NodeId,
536        dst: grafeo_common::types::NodeId,
537        edge_type: &str,
538    ) -> grafeo_common::types::EdgeId {
539        let id = self.store.create_edge(src, dst, edge_type);
540
541        // Log to WAL if enabled
542        #[cfg(feature = "wal")]
543        if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
544            id,
545            src,
546            dst,
547            edge_type: edge_type.to_string(),
548        }) {
549            tracing::warn!("Failed to log CreateEdge to WAL: {}", e);
550        }
551
552        #[cfg(feature = "cdc")]
553        self.cdc_log
554            .record_create_edge(id, self.store.current_epoch(), None);
555
556        id
557    }
558
559    /// Creates a new edge with properties.
560    ///
561    /// If WAL is enabled, the operation is logged for durability.
562    pub fn create_edge_with_props(
563        &self,
564        src: grafeo_common::types::NodeId,
565        dst: grafeo_common::types::NodeId,
566        edge_type: &str,
567        properties: impl IntoIterator<
568            Item = (
569                impl Into<grafeo_common::types::PropertyKey>,
570                impl Into<grafeo_common::types::Value>,
571            ),
572        >,
573    ) -> grafeo_common::types::EdgeId {
574        // Collect properties first so we can log them to WAL
575        let props: Vec<(
576            grafeo_common::types::PropertyKey,
577            grafeo_common::types::Value,
578        )> = properties
579            .into_iter()
580            .map(|(k, v)| (k.into(), v.into()))
581            .collect();
582
583        let id = self.store.create_edge_with_props(
584            src,
585            dst,
586            edge_type,
587            props.iter().map(|(k, v)| (k.clone(), v.clone())),
588        );
589
590        // Build CDC snapshot before WAL consumes props
591        #[cfg(feature = "cdc")]
592        let cdc_props: std::collections::HashMap<String, grafeo_common::types::Value> = props
593            .iter()
594            .map(|(k, v)| (k.to_string(), v.clone()))
595            .collect();
596
597        // Log edge creation to WAL
598        #[cfg(feature = "wal")]
599        {
600            if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
601                id,
602                src,
603                dst,
604                edge_type: edge_type.to_string(),
605            }) {
606                tracing::warn!("Failed to log CreateEdge to WAL: {}", e);
607            }
608
609            // Log each property to WAL for full durability
610            for (key, value) in props {
611                if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
612                    id,
613                    key: key.to_string(),
614                    value,
615                }) {
616                    tracing::warn!("Failed to log SetEdgeProperty to WAL: {}", e);
617                }
618            }
619        }
620
621        #[cfg(feature = "cdc")]
622        self.cdc_log.record_create_edge(
623            id,
624            self.store.current_epoch(),
625            if cdc_props.is_empty() {
626                None
627            } else {
628                Some(cdc_props)
629            },
630        );
631
632        id
633    }
634
635    /// Gets an edge by ID.
636    #[must_use]
637    pub fn get_edge(
638        &self,
639        id: grafeo_common::types::EdgeId,
640    ) -> Option<grafeo_core::graph::lpg::Edge> {
641        self.store.get_edge(id)
642    }
643
644    /// Deletes an edge.
645    ///
646    /// If WAL is enabled, the operation is logged for durability.
647    pub fn delete_edge(&self, id: grafeo_common::types::EdgeId) -> bool {
648        // Capture properties for CDC before deletion
649        #[cfg(feature = "cdc")]
650        let cdc_props = self.store.get_edge(id).map(|edge| {
651            edge.properties
652                .iter()
653                .map(|(k, v)| (k.to_string(), v.clone()))
654                .collect::<std::collections::HashMap<String, grafeo_common::types::Value>>()
655        });
656
657        let result = self.store.delete_edge(id);
658
659        #[cfg(feature = "wal")]
660        if result && let Err(e) = self.log_wal(&WalRecord::DeleteEdge { id }) {
661            tracing::warn!("Failed to log DeleteEdge to WAL: {}", e);
662        }
663
664        #[cfg(feature = "cdc")]
665        if result {
666            self.cdc_log.record_delete(
667                crate::cdc::EntityId::Edge(id),
668                self.store.current_epoch(),
669                cdc_props,
670            );
671        }
672
673        result
674    }
675
676    /// Sets a property on an edge.
677    ///
678    /// If WAL is enabled, the operation is logged for durability.
679    pub fn set_edge_property(
680        &self,
681        id: grafeo_common::types::EdgeId,
682        key: &str,
683        value: grafeo_common::types::Value,
684    ) {
685        // Log to WAL first
686        #[cfg(feature = "wal")]
687        if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
688            id,
689            key: key.to_string(),
690            value: value.clone(),
691        }) {
692            tracing::warn!("Failed to log SetEdgeProperty to WAL: {}", e);
693        }
694
695        // Capture old value for CDC before the store write
696        #[cfg(feature = "cdc")]
697        let cdc_old_value = self
698            .store
699            .get_edge_property(id, &grafeo_common::types::PropertyKey::new(key));
700        #[cfg(feature = "cdc")]
701        let cdc_new_value = value.clone();
702
703        self.store.set_edge_property(id, key, value);
704
705        #[cfg(feature = "cdc")]
706        self.cdc_log.record_update(
707            crate::cdc::EntityId::Edge(id),
708            self.store.current_epoch(),
709            key,
710            cdc_old_value,
711            cdc_new_value,
712        );
713    }
714
715    /// Removes a property from a node.
716    ///
717    /// Returns true if the property existed and was removed, false otherwise.
718    pub fn remove_node_property(&self, id: grafeo_common::types::NodeId, key: &str) -> bool {
719        // Note: RemoveProperty WAL records not yet implemented, but operation works in memory
720        let removed = self.store.remove_node_property(id, key).is_some();
721
722        // Remove from matching text indexes
723        #[cfg(feature = "text-index")]
724        if removed && let Some(node) = self.store.get_node(id) {
725            for label in &node.labels {
726                if let Some(index) = self.store.get_text_index(label.as_str(), key) {
727                    index.write().remove(id);
728                }
729            }
730        }
731
732        removed
733    }
734
735    /// Removes a property from an edge.
736    ///
737    /// Returns true if the property existed and was removed, false otherwise.
738    pub fn remove_edge_property(&self, id: grafeo_common::types::EdgeId, key: &str) -> bool {
739        // Note: RemoveProperty WAL records not yet implemented, but operation works in memory
740        self.store.remove_edge_property(id, key).is_some()
741    }
742
743    /// Creates multiple nodes in bulk, each with a single vector property.
744    ///
745    /// Much faster than individual `create_node_with_props` calls because it
746    /// acquires internal locks once and loops in Rust rather than crossing
747    /// the FFI boundary per vector.
748    ///
749    /// # Arguments
750    ///
751    /// * `label` - Label applied to all created nodes
752    /// * `property` - Property name for the vector data
753    /// * `vectors` - Vector data for each node
754    ///
755    /// # Returns
756    ///
757    /// Vector of created `NodeId`s in the same order as the input vectors.
758    pub fn batch_create_nodes(
759        &self,
760        label: &str,
761        property: &str,
762        vectors: Vec<Vec<f32>>,
763    ) -> Vec<grafeo_common::types::NodeId> {
764        use grafeo_common::types::{PropertyKey, Value};
765
766        let prop_key = PropertyKey::new(property);
767        let labels: &[&str] = &[label];
768
769        let ids: Vec<grafeo_common::types::NodeId> = vectors
770            .into_iter()
771            .map(|vec| {
772                let value = Value::Vector(vec.into());
773                let id = self.store.create_node_with_props(
774                    labels,
775                    std::iter::once((prop_key.clone(), value.clone())),
776                );
777
778                // Log to WAL
779                #[cfg(feature = "wal")]
780                {
781                    if let Err(e) = self.log_wal(&WalRecord::CreateNode {
782                        id,
783                        labels: labels.iter().map(|s| (*s).to_string()).collect(),
784                    }) {
785                        tracing::warn!("Failed to log CreateNode to WAL: {}", e);
786                    }
787                    if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
788                        id,
789                        key: property.to_string(),
790                        value,
791                    }) {
792                        tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
793                    }
794                }
795
796                id
797            })
798            .collect();
799
800        // Auto-insert into matching vector index if one exists
801        #[cfg(feature = "vector-index")]
802        if let Some(index) = self.store.get_vector_index(label, property) {
803            let accessor =
804                grafeo_core::index::vector::PropertyVectorAccessor::new(&*self.store, property);
805            for &id in &ids {
806                if let Some(node) = self.store.get_node(id) {
807                    let pk = grafeo_common::types::PropertyKey::new(property);
808                    if let Some(grafeo_common::types::Value::Vector(v)) = node.properties.get(&pk) {
809                        index.insert(id, v, &accessor);
810                    }
811                }
812            }
813        }
814
815        ids
816    }
817}