Skip to main content

grafeo_engine/database/
crud.rs

1//! Node and edge CRUD operations for GrafeoDB.
2
3#[cfg(feature = "wal")]
4use grafeo_adapters::storage::wal::WalRecord;
5use grafeo_common::grafeo_warn;
6
7impl super::GrafeoDB {
8    // === Node Operations ===
9
10    /// Creates a node with the given labels and returns its ID.
11    ///
12    /// Labels categorize nodes - think of them like tags. A node can have
13    /// multiple labels (e.g., `["Person", "Employee"]`).
14    ///
15    /// # Examples
16    ///
17    /// ```
18    /// use grafeo_engine::GrafeoDB;
19    ///
20    /// let db = GrafeoDB::new_in_memory();
21    /// let alix = db.create_node(&["Person"]);
22    /// let company = db.create_node(&["Company", "Startup"]);
23    /// ```
24    pub fn create_node(&self, labels: &[&str]) -> grafeo_common::types::NodeId {
25        let id = self.store.create_node(labels);
26
27        // Log to WAL if enabled
28        #[cfg(feature = "wal")]
29        if let Err(e) = self.log_wal(&WalRecord::CreateNode {
30            id,
31            labels: labels.iter().map(|s| (*s).to_string()).collect(),
32        }) {
33            grafeo_warn!("Failed to log CreateNode to WAL: {}", e);
34        }
35
36        #[cfg(feature = "cdc")]
37        self.cdc_log.record_create_node(
38            id,
39            self.store.current_epoch(),
40            None,
41            Some(labels.iter().map(|s| (*s).to_string()).collect()),
42        );
43
44        id
45    }
46
47    /// Creates a new node with labels and properties.
48    ///
49    /// If WAL is enabled, the operation is logged for durability.
50    pub fn create_node_with_props(
51        &self,
52        labels: &[&str],
53        properties: impl IntoIterator<
54            Item = (
55                impl Into<grafeo_common::types::PropertyKey>,
56                impl Into<grafeo_common::types::Value>,
57            ),
58        >,
59    ) -> grafeo_common::types::NodeId {
60        // Collect properties first so we can log them to WAL
61        let props: Vec<(
62            grafeo_common::types::PropertyKey,
63            grafeo_common::types::Value,
64        )> = properties
65            .into_iter()
66            .map(|(k, v)| (k.into(), v.into()))
67            .collect();
68
69        let id = self
70            .store
71            .create_node_with_props(labels, props.iter().map(|(k, v)| (k.clone(), v.clone())));
72
73        // Build CDC snapshot before WAL consumes props
74        #[cfg(feature = "cdc")]
75        let cdc_props: std::collections::HashMap<String, grafeo_common::types::Value> = props
76            .iter()
77            .map(|(k, v)| (k.to_string(), v.clone()))
78            .collect();
79
80        // Log node creation to WAL
81        #[cfg(feature = "wal")]
82        {
83            if let Err(e) = self.log_wal(&WalRecord::CreateNode {
84                id,
85                labels: labels.iter().map(|s| (*s).to_string()).collect(),
86            }) {
87                grafeo_warn!("Failed to log CreateNode to WAL: {}", e);
88            }
89
90            // Log each property to WAL for full durability
91            for (key, value) in props {
92                if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
93                    id,
94                    key: key.to_string(),
95                    value,
96                }) {
97                    grafeo_warn!("Failed to log SetNodeProperty to WAL: {}", e);
98                }
99            }
100        }
101
102        #[cfg(feature = "cdc")]
103        self.cdc_log.record_create_node(
104            id,
105            self.store.current_epoch(),
106            if cdc_props.is_empty() {
107                None
108            } else {
109                Some(cdc_props)
110            },
111            Some(labels.iter().map(|s| (*s).to_string()).collect()),
112        );
113
114        // Auto-insert into matching text indexes for the new node
115        #[cfg(feature = "text-index")]
116        if let Some(node) = self.store.get_node(id) {
117            for label in &node.labels {
118                for (prop_key, prop_val) in &node.properties {
119                    if let grafeo_common::types::Value::String(text) = prop_val
120                        && let Some(index) =
121                            self.store.get_text_index(label.as_str(), prop_key.as_ref())
122                    {
123                        index.write().insert(id, text);
124                    }
125                }
126            }
127        }
128
129        id
130    }
131
132    /// Gets a node by ID.
133    #[must_use]
134    pub fn get_node(
135        &self,
136        id: grafeo_common::types::NodeId,
137    ) -> Option<grafeo_core::graph::lpg::Node> {
138        self.store.get_node(id)
139    }
140
141    /// Gets a node as it existed at a specific epoch.
142    ///
143    /// Uses pure epoch-based visibility (not transaction-aware), so the node
144    /// is visible if and only if `created_epoch <= epoch` and it was not
145    /// deleted at or before `epoch`.
146    #[must_use]
147    pub fn get_node_at_epoch(
148        &self,
149        id: grafeo_common::types::NodeId,
150        epoch: grafeo_common::types::EpochId,
151    ) -> Option<grafeo_core::graph::lpg::Node> {
152        self.store.get_node_at_epoch(id, epoch)
153    }
154
155    /// Gets an edge as it existed at a specific epoch.
156    ///
157    /// Uses pure epoch-based visibility (not transaction-aware).
158    #[must_use]
159    pub fn get_edge_at_epoch(
160        &self,
161        id: grafeo_common::types::EdgeId,
162        epoch: grafeo_common::types::EpochId,
163    ) -> Option<grafeo_core::graph::lpg::Edge> {
164        self.store.get_edge_at_epoch(id, epoch)
165    }
166
167    /// Returns all versions of a node with their creation/deletion epochs.
168    ///
169    /// Properties and labels reflect the current state (not versioned per-epoch).
170    #[must_use]
171    pub fn get_node_history(
172        &self,
173        id: grafeo_common::types::NodeId,
174    ) -> Vec<(
175        grafeo_common::types::EpochId,
176        Option<grafeo_common::types::EpochId>,
177        grafeo_core::graph::lpg::Node,
178    )> {
179        self.store.get_node_history(id)
180    }
181
182    /// Returns all versions of an edge with their creation/deletion epochs.
183    ///
184    /// Properties reflect the current state (not versioned per-epoch).
185    #[must_use]
186    pub fn get_edge_history(
187        &self,
188        id: grafeo_common::types::EdgeId,
189    ) -> Vec<(
190        grafeo_common::types::EpochId,
191        Option<grafeo_common::types::EpochId>,
192        grafeo_core::graph::lpg::Edge,
193    )> {
194        self.store.get_edge_history(id)
195    }
196
197    /// Returns a property value as it existed at a specific epoch.
198    ///
199    /// Uses the internal `VersionLog` to do a point-in-time read. Returns
200    /// `None` if the property didn't exist or was deleted at that epoch.
201    #[cfg(feature = "temporal")]
202    #[must_use]
203    pub fn get_node_property_at_epoch(
204        &self,
205        id: grafeo_common::types::NodeId,
206        key: &str,
207        epoch: grafeo_common::types::EpochId,
208    ) -> Option<grafeo_common::types::Value> {
209        let prop_key = grafeo_common::types::PropertyKey::new(key);
210        self.store.get_node_property_at_epoch(id, &prop_key, epoch)
211    }
212
213    /// Returns the full version timeline for a single property of a node.
214    ///
215    /// Each entry is `(epoch, value)` in ascending epoch order. Tombstones
216    /// (deletions) appear as `Value::Null`.
217    #[cfg(feature = "temporal")]
218    #[must_use]
219    pub fn get_node_property_history(
220        &self,
221        id: grafeo_common::types::NodeId,
222        key: &str,
223    ) -> Vec<(grafeo_common::types::EpochId, grafeo_common::types::Value)> {
224        self.store.node_property_history_for_key(id, key)
225    }
226
227    /// Returns the full version history for ALL properties of a node.
228    ///
229    /// Each entry is `(property_key, Vec<(epoch, value)>)`.
230    #[cfg(feature = "temporal")]
231    #[must_use]
232    pub fn get_all_node_property_history(
233        &self,
234        id: grafeo_common::types::NodeId,
235    ) -> Vec<(
236        grafeo_common::types::PropertyKey,
237        Vec<(grafeo_common::types::EpochId, grafeo_common::types::Value)>,
238    )> {
239        self.store.node_property_history(id)
240    }
241
242    /// Returns the current epoch of the database.
243    #[must_use]
244    pub fn current_epoch(&self) -> grafeo_common::types::EpochId {
245        self.store.current_epoch()
246    }
247
248    /// Deletes a node and all its edges.
249    ///
250    /// If WAL is enabled, the operation is logged for durability.
251    pub fn delete_node(&self, id: grafeo_common::types::NodeId) -> bool {
252        // Capture properties for CDC before deletion
253        #[cfg(feature = "cdc")]
254        let cdc_props = self.store.get_node(id).map(|node| {
255            node.properties
256                .iter()
257                .map(|(k, v)| (k.to_string(), v.clone()))
258                .collect::<std::collections::HashMap<String, grafeo_common::types::Value>>()
259        });
260
261        // Collect matching vector indexes BEFORE deletion removes labels
262        #[cfg(feature = "vector-index")]
263        let indexes_to_clean: Vec<std::sync::Arc<grafeo_core::index::vector::HnswIndex>> = self
264            .store
265            .get_node(id)
266            .map(|node| {
267                let mut indexes = Vec::new();
268                for label in &node.labels {
269                    let prefix = format!("{}:", label.as_str());
270                    for (key, index) in self.store.vector_index_entries() {
271                        if key.starts_with(&prefix) {
272                            indexes.push(index);
273                        }
274                    }
275                }
276                indexes
277            })
278            .unwrap_or_default();
279
280        // Collect matching text indexes BEFORE deletion removes labels
281        #[cfg(feature = "text-index")]
282        let text_indexes_to_clean: Vec<
283            std::sync::Arc<parking_lot::RwLock<grafeo_core::index::text::InvertedIndex>>,
284        > = self
285            .store
286            .get_node(id)
287            .map(|node| {
288                let mut indexes = Vec::new();
289                for label in &node.labels {
290                    let prefix = format!("{}:", label.as_str());
291                    for (key, index) in self.store.text_index_entries() {
292                        if key.starts_with(&prefix) {
293                            indexes.push(index);
294                        }
295                    }
296                }
297                indexes
298            })
299            .unwrap_or_default();
300
301        let result = self.store.delete_node(id);
302
303        // Remove from vector indexes after successful deletion
304        #[cfg(feature = "vector-index")]
305        if result {
306            for index in indexes_to_clean {
307                index.remove(id);
308            }
309        }
310
311        // Remove from text indexes after successful deletion
312        #[cfg(feature = "text-index")]
313        if result {
314            for index in text_indexes_to_clean {
315                index.write().remove(id);
316            }
317        }
318
319        #[cfg(feature = "wal")]
320        if result && let Err(e) = self.log_wal(&WalRecord::DeleteNode { id }) {
321            grafeo_warn!("Failed to log DeleteNode to WAL: {}", e);
322        }
323
324        #[cfg(feature = "cdc")]
325        if result {
326            self.cdc_log.record_delete(
327                crate::cdc::EntityId::Node(id),
328                self.store.current_epoch(),
329                cdc_props,
330            );
331        }
332
333        result
334    }
335
336    /// Sets a property on a node.
337    ///
338    /// If WAL is enabled, the operation is logged for durability.
339    pub fn set_node_property(
340        &self,
341        id: grafeo_common::types::NodeId,
342        key: &str,
343        value: grafeo_common::types::Value,
344    ) {
345        // Extract vector data before the value is moved into the store
346        #[cfg(feature = "vector-index")]
347        let vector_data = match &value {
348            grafeo_common::types::Value::Vector(v) => Some(v.clone()),
349            _ => None,
350        };
351
352        // Log to WAL first
353        #[cfg(feature = "wal")]
354        if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
355            id,
356            key: key.to_string(),
357            value: value.clone(),
358        }) {
359            grafeo_warn!("Failed to log SetNodeProperty to WAL: {}", e);
360        }
361
362        // Capture old value for CDC before the store write
363        #[cfg(feature = "cdc")]
364        let cdc_old_value = self
365            .store
366            .get_node_property(id, &grafeo_common::types::PropertyKey::new(key));
367        #[cfg(feature = "cdc")]
368        let cdc_new_value = value.clone();
369
370        self.store.set_node_property(id, key, value);
371
372        #[cfg(feature = "cdc")]
373        self.cdc_log.record_update(
374            crate::cdc::EntityId::Node(id),
375            self.store.current_epoch(),
376            key,
377            cdc_old_value,
378            cdc_new_value,
379        );
380
381        // Auto-insert into matching vector indexes
382        #[cfg(feature = "vector-index")]
383        if let Some(vec) = vector_data
384            && let Some(node) = self.store.get_node(id)
385        {
386            for label in &node.labels {
387                if let Some(index) = self.store.get_vector_index(label.as_str(), key) {
388                    let accessor =
389                        grafeo_core::index::vector::PropertyVectorAccessor::new(&*self.store, key);
390                    index.insert(id, &vec, &accessor);
391                }
392            }
393        }
394
395        // Auto-update matching text indexes
396        #[cfg(feature = "text-index")]
397        if let Some(node) = self.store.get_node(id) {
398            let text_val = node
399                .properties
400                .get(&grafeo_common::types::PropertyKey::new(key))
401                .and_then(|v| match v {
402                    grafeo_common::types::Value::String(s) => Some(s.to_string()),
403                    _ => None,
404                });
405            for label in &node.labels {
406                if let Some(index) = self.store.get_text_index(label.as_str(), key) {
407                    let mut idx = index.write();
408                    if let Some(ref text) = text_val {
409                        idx.insert(id, text);
410                    } else {
411                        idx.remove(id);
412                    }
413                }
414            }
415        }
416    }
417
418    /// Adds a label to an existing node.
419    ///
420    /// Returns `true` if the label was added, `false` if the node doesn't exist
421    /// or already has the label.
422    ///
423    /// # Examples
424    ///
425    /// ```
426    /// use grafeo_engine::GrafeoDB;
427    ///
428    /// let db = GrafeoDB::new_in_memory();
429    /// let alix = db.create_node(&["Person"]);
430    ///
431    /// // Promote Alix to Employee
432    /// let added = db.add_node_label(alix, "Employee");
433    /// assert!(added);
434    /// ```
435    pub fn add_node_label(&self, id: grafeo_common::types::NodeId, label: &str) -> bool {
436        let result = self.store.add_label(id, label);
437
438        #[cfg(feature = "wal")]
439        if result {
440            // Log to WAL if enabled
441            if let Err(e) = self.log_wal(&WalRecord::AddNodeLabel {
442                id,
443                label: label.to_string(),
444            }) {
445                grafeo_warn!("Failed to log AddNodeLabel to WAL: {}", e);
446            }
447        }
448
449        // Auto-insert into vector indexes for the newly-added label
450        #[cfg(feature = "vector-index")]
451        if result {
452            let prefix = format!("{label}:");
453            for (key, index) in self.store.vector_index_entries() {
454                if let Some(property) = key.strip_prefix(&prefix)
455                    && let Some(node) = self.store.get_node(id)
456                {
457                    let prop_key = grafeo_common::types::PropertyKey::new(property);
458                    if let Some(grafeo_common::types::Value::Vector(v)) =
459                        node.properties.get(&prop_key)
460                    {
461                        let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(
462                            &*self.store,
463                            property,
464                        );
465                        index.insert(id, v, &accessor);
466                    }
467                }
468            }
469        }
470
471        // Auto-insert into text indexes for the newly-added label
472        #[cfg(feature = "text-index")]
473        if result && let Some(node) = self.store.get_node(id) {
474            for (prop_key, prop_val) in &node.properties {
475                if let grafeo_common::types::Value::String(text) = prop_val
476                    && let Some(index) = self.store.get_text_index(label, prop_key.as_ref())
477                {
478                    index.write().insert(id, text);
479                }
480            }
481        }
482
483        result
484    }
485
486    /// Removes a label from a node.
487    ///
488    /// Returns `true` if the label was removed, `false` if the node doesn't exist
489    /// or doesn't have the label.
490    ///
491    /// # Examples
492    ///
493    /// ```
494    /// use grafeo_engine::GrafeoDB;
495    ///
496    /// let db = GrafeoDB::new_in_memory();
497    /// let alix = db.create_node(&["Person", "Employee"]);
498    ///
499    /// // Remove Employee status
500    /// let removed = db.remove_node_label(alix, "Employee");
501    /// assert!(removed);
502    /// ```
503    pub fn remove_node_label(&self, id: grafeo_common::types::NodeId, label: &str) -> bool {
504        // Collect text indexes to clean BEFORE removing the label
505        #[cfg(feature = "text-index")]
506        let text_indexes_to_clean: Vec<
507            std::sync::Arc<parking_lot::RwLock<grafeo_core::index::text::InvertedIndex>>,
508        > = {
509            let prefix = format!("{label}:");
510            self.store
511                .text_index_entries()
512                .into_iter()
513                .filter(|(key, _)| key.starts_with(&prefix))
514                .map(|(_, index)| index)
515                .collect()
516        };
517
518        let result = self.store.remove_label(id, label);
519
520        #[cfg(feature = "wal")]
521        if result {
522            // Log to WAL if enabled
523            if let Err(e) = self.log_wal(&WalRecord::RemoveNodeLabel {
524                id,
525                label: label.to_string(),
526            }) {
527                grafeo_warn!("Failed to log RemoveNodeLabel to WAL: {}", e);
528            }
529        }
530
531        // Remove from text indexes for the removed label
532        #[cfg(feature = "text-index")]
533        if result {
534            for index in text_indexes_to_clean {
535                index.write().remove(id);
536            }
537        }
538
539        result
540    }
541
542    /// Gets all labels for a node.
543    ///
544    /// Returns `None` if the node doesn't exist.
545    ///
546    /// # Examples
547    ///
548    /// ```
549    /// use grafeo_engine::GrafeoDB;
550    ///
551    /// let db = GrafeoDB::new_in_memory();
552    /// let alix = db.create_node(&["Person", "Employee"]);
553    ///
554    /// let labels = db.get_node_labels(alix).unwrap();
555    /// assert!(labels.contains(&"Person".to_string()));
556    /// assert!(labels.contains(&"Employee".to_string()));
557    /// ```
558    #[must_use]
559    pub fn get_node_labels(&self, id: grafeo_common::types::NodeId) -> Option<Vec<String>> {
560        self.store
561            .get_node(id)
562            .map(|node| node.labels.iter().map(|s| s.to_string()).collect())
563    }
564
565    // === Edge Operations ===
566
567    /// Creates an edge (relationship) between two nodes.
568    ///
569    /// Edges connect nodes and have a type that describes the relationship.
570    /// They're directed - the order of `src` and `dst` matters.
571    ///
572    /// # Examples
573    ///
574    /// ```
575    /// use grafeo_engine::GrafeoDB;
576    ///
577    /// let db = GrafeoDB::new_in_memory();
578    /// let alix = db.create_node(&["Person"]);
579    /// let gus = db.create_node(&["Person"]);
580    ///
581    /// // Alix knows Gus (directed: Alix -> Gus)
582    /// let edge = db.create_edge(alix, gus, "KNOWS");
583    /// ```
584    pub fn create_edge(
585        &self,
586        src: grafeo_common::types::NodeId,
587        dst: grafeo_common::types::NodeId,
588        edge_type: &str,
589    ) -> grafeo_common::types::EdgeId {
590        let id = self.store.create_edge(src, dst, edge_type);
591
592        // Log to WAL if enabled
593        #[cfg(feature = "wal")]
594        if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
595            id,
596            src,
597            dst,
598            edge_type: edge_type.to_string(),
599        }) {
600            grafeo_warn!("Failed to log CreateEdge to WAL: {}", e);
601        }
602
603        #[cfg(feature = "cdc")]
604        self.cdc_log.record_create_edge(
605            id,
606            self.store.current_epoch(),
607            None,
608            src.as_u64(),
609            dst.as_u64(),
610            edge_type.to_string(),
611        );
612
613        id
614    }
615
616    /// Creates a new edge with properties.
617    ///
618    /// If WAL is enabled, the operation is logged for durability.
619    pub fn create_edge_with_props(
620        &self,
621        src: grafeo_common::types::NodeId,
622        dst: grafeo_common::types::NodeId,
623        edge_type: &str,
624        properties: impl IntoIterator<
625            Item = (
626                impl Into<grafeo_common::types::PropertyKey>,
627                impl Into<grafeo_common::types::Value>,
628            ),
629        >,
630    ) -> grafeo_common::types::EdgeId {
631        // Collect properties first so we can log them to WAL
632        let props: Vec<(
633            grafeo_common::types::PropertyKey,
634            grafeo_common::types::Value,
635        )> = properties
636            .into_iter()
637            .map(|(k, v)| (k.into(), v.into()))
638            .collect();
639
640        let id = self.store.create_edge_with_props(
641            src,
642            dst,
643            edge_type,
644            props.iter().map(|(k, v)| (k.clone(), v.clone())),
645        );
646
647        // Build CDC snapshot before WAL consumes props
648        #[cfg(feature = "cdc")]
649        let cdc_props: std::collections::HashMap<String, grafeo_common::types::Value> = props
650            .iter()
651            .map(|(k, v)| (k.to_string(), v.clone()))
652            .collect();
653
654        // Log edge creation to WAL
655        #[cfg(feature = "wal")]
656        {
657            if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
658                id,
659                src,
660                dst,
661                edge_type: edge_type.to_string(),
662            }) {
663                grafeo_warn!("Failed to log CreateEdge to WAL: {}", e);
664            }
665
666            // Log each property to WAL for full durability
667            for (key, value) in props {
668                if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
669                    id,
670                    key: key.to_string(),
671                    value,
672                }) {
673                    grafeo_warn!("Failed to log SetEdgeProperty to WAL: {}", e);
674                }
675            }
676        }
677
678        #[cfg(feature = "cdc")]
679        self.cdc_log.record_create_edge(
680            id,
681            self.store.current_epoch(),
682            if cdc_props.is_empty() {
683                None
684            } else {
685                Some(cdc_props)
686            },
687            src.as_u64(),
688            dst.as_u64(),
689            edge_type.to_string(),
690        );
691
692        id
693    }
694
695    /// Gets an edge by ID.
696    #[must_use]
697    pub fn get_edge(
698        &self,
699        id: grafeo_common::types::EdgeId,
700    ) -> Option<grafeo_core::graph::lpg::Edge> {
701        self.store.get_edge(id)
702    }
703
704    /// Deletes an edge.
705    ///
706    /// If WAL is enabled, the operation is logged for durability.
707    pub fn delete_edge(&self, id: grafeo_common::types::EdgeId) -> bool {
708        // Capture properties for CDC before deletion
709        #[cfg(feature = "cdc")]
710        let cdc_props = self.store.get_edge(id).map(|edge| {
711            edge.properties
712                .iter()
713                .map(|(k, v)| (k.to_string(), v.clone()))
714                .collect::<std::collections::HashMap<String, grafeo_common::types::Value>>()
715        });
716
717        let result = self.store.delete_edge(id);
718
719        #[cfg(feature = "wal")]
720        if result && let Err(e) = self.log_wal(&WalRecord::DeleteEdge { id }) {
721            grafeo_warn!("Failed to log DeleteEdge to WAL: {}", e);
722        }
723
724        #[cfg(feature = "cdc")]
725        if result {
726            self.cdc_log.record_delete(
727                crate::cdc::EntityId::Edge(id),
728                self.store.current_epoch(),
729                cdc_props,
730            );
731        }
732
733        result
734    }
735
736    /// Sets a property on an edge.
737    ///
738    /// If WAL is enabled, the operation is logged for durability.
739    pub fn set_edge_property(
740        &self,
741        id: grafeo_common::types::EdgeId,
742        key: &str,
743        value: grafeo_common::types::Value,
744    ) {
745        // Log to WAL first
746        #[cfg(feature = "wal")]
747        if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
748            id,
749            key: key.to_string(),
750            value: value.clone(),
751        }) {
752            grafeo_warn!("Failed to log SetEdgeProperty to WAL: {}", e);
753        }
754
755        // Capture old value for CDC before the store write
756        #[cfg(feature = "cdc")]
757        let cdc_old_value = self
758            .store
759            .get_edge_property(id, &grafeo_common::types::PropertyKey::new(key));
760        #[cfg(feature = "cdc")]
761        let cdc_new_value = value.clone();
762
763        self.store.set_edge_property(id, key, value);
764
765        #[cfg(feature = "cdc")]
766        self.cdc_log.record_update(
767            crate::cdc::EntityId::Edge(id),
768            self.store.current_epoch(),
769            key,
770            cdc_old_value,
771            cdc_new_value,
772        );
773    }
774
775    /// Removes a property from a node.
776    ///
777    /// Returns true if the property existed and was removed, false otherwise.
778    pub fn remove_node_property(&self, id: grafeo_common::types::NodeId, key: &str) -> bool {
779        let removed = self.store.remove_node_property(id, key).is_some();
780
781        #[cfg(feature = "wal")]
782        if removed
783            && let Err(e) = self.log_wal(&WalRecord::RemoveNodeProperty {
784                id,
785                key: key.to_string(),
786            })
787        {
788            grafeo_warn!("WAL log for RemoveNodeProperty failed: {e}");
789        }
790
791        // Remove from matching text indexes
792        #[cfg(feature = "text-index")]
793        if removed && let Some(node) = self.store.get_node(id) {
794            for label in &node.labels {
795                if let Some(index) = self.store.get_text_index(label.as_str(), key) {
796                    index.write().remove(id);
797                }
798            }
799        }
800
801        removed
802    }
803
804    /// Removes a property from an edge.
805    ///
806    /// Returns true if the property existed and was removed, false otherwise.
807    pub fn remove_edge_property(&self, id: grafeo_common::types::EdgeId, key: &str) -> bool {
808        let removed = self.store.remove_edge_property(id, key).is_some();
809
810        #[cfg(feature = "wal")]
811        if removed
812            && let Err(e) = self.log_wal(&WalRecord::RemoveEdgeProperty {
813                id,
814                key: key.to_string(),
815            })
816        {
817            grafeo_warn!("WAL log for RemoveEdgeProperty failed: {e}");
818        }
819
820        removed
821    }
822
823    /// Creates multiple nodes in bulk, each with a single vector property.
824    ///
825    /// Much faster than individual `create_node_with_props` calls because it
826    /// acquires internal locks once and loops in Rust rather than crossing
827    /// the FFI boundary per vector.
828    ///
829    /// **Atomicity note:** Individual node creations within the batch are NOT
830    /// atomic as a group. If a failure occurs mid-batch (e.g. WAL write error),
831    /// nodes created before the failure will persist while later nodes may not.
832    /// If you need all-or-nothing semantics, wrap the call in an explicit
833    /// transaction.
834    ///
835    /// # Arguments
836    ///
837    /// * `label` - Label applied to all created nodes
838    /// * `property` - Property name for the vector data
839    /// * `vectors` - Vector data for each node
840    ///
841    /// # Returns
842    ///
843    /// Vector of created `NodeId`s in the same order as the input vectors.
844    pub fn batch_create_nodes(
845        &self,
846        label: &str,
847        property: &str,
848        vectors: Vec<Vec<f32>>,
849    ) -> Vec<grafeo_common::types::NodeId> {
850        use grafeo_common::types::{PropertyKey, Value};
851
852        let prop_key = PropertyKey::new(property);
853        let labels: &[&str] = &[label];
854
855        let ids: Vec<grafeo_common::types::NodeId> = vectors
856            .into_iter()
857            .map(|vec| {
858                let value = Value::Vector(vec.into());
859                let id = self.store.create_node_with_props(
860                    labels,
861                    std::iter::once((prop_key.clone(), value.clone())),
862                );
863
864                // Log to WAL
865                #[cfg(feature = "wal")]
866                {
867                    if let Err(e) = self.log_wal(&WalRecord::CreateNode {
868                        id,
869                        labels: labels.iter().map(|s| (*s).to_string()).collect(),
870                    }) {
871                        grafeo_warn!("Failed to log CreateNode to WAL: {}", e);
872                    }
873                    if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
874                        id,
875                        key: property.to_string(),
876                        value,
877                    }) {
878                        grafeo_warn!("Failed to log SetNodeProperty to WAL: {}", e);
879                    }
880                }
881
882                id
883            })
884            .collect();
885
886        // Auto-insert into matching vector index if one exists
887        #[cfg(feature = "vector-index")]
888        if let Some(index) = self.store.get_vector_index(label, property) {
889            let accessor =
890                grafeo_core::index::vector::PropertyVectorAccessor::new(&*self.store, property);
891            for &id in &ids {
892                if let Some(node) = self.store.get_node(id) {
893                    let pk = grafeo_common::types::PropertyKey::new(property);
894                    if let Some(grafeo_common::types::Value::Vector(v)) = node.properties.get(&pk)
895                        && std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
896                            index.insert(id, v, &accessor);
897                        }))
898                        .is_err()
899                    {
900                        grafeo_warn!("Vector index insert panicked for node {}", id.as_u64());
901                    }
902                }
903            }
904        }
905
906        ids
907    }
908
909    /// Batch-creates nodes with full property maps.
910    ///
911    /// Each entry in `properties_list` is a complete property map for one node.
912    /// Vector values (`Value::Vector`) are automatically inserted into matching
913    /// vector indexes. Text values are automatically inserted into matching text
914    /// indexes.
915    ///
916    /// **Atomicity note:** Individual node creations within the batch are NOT
917    /// atomic as a group. If a failure occurs mid-batch (e.g. WAL write error),
918    /// nodes created before the failure will persist while later nodes may not.
919    /// If you need all-or-nothing semantics, wrap the call in an explicit
920    /// transaction.
921    ///
922    /// # Arguments
923    ///
924    /// * `label` - Label for all created nodes.
925    /// * `properties_list` - One property map per node to create.
926    ///
927    /// # Returns
928    ///
929    /// Vector of created `NodeId`s in the same order as the input.
930    pub fn batch_create_nodes_with_props(
931        &self,
932        label: &str,
933        properties_list: Vec<
934            std::collections::HashMap<
935                grafeo_common::types::PropertyKey,
936                grafeo_common::types::Value,
937            >,
938        >,
939    ) -> Vec<grafeo_common::types::NodeId> {
940        use grafeo_common::types::Value;
941
942        let labels: &[&str] = &[label];
943
944        let ids: Vec<grafeo_common::types::NodeId> = properties_list
945            .into_iter()
946            .map(|props| {
947                let id = self.store.create_node_with_props(
948                    labels,
949                    props.iter().map(|(k, v)| (k.clone(), v.clone())),
950                );
951
952                // Build CDC snapshot before WAL consumes props
953                #[cfg(feature = "cdc")]
954                let cdc_props: std::collections::HashMap<
955                    String,
956                    grafeo_common::types::Value,
957                > = props
958                    .iter()
959                    .map(|(k, v)| (k.to_string(), v.clone()))
960                    .collect();
961
962                // Log to WAL
963                #[cfg(feature = "wal")]
964                {
965                    if let Err(e) = self.log_wal(&WalRecord::CreateNode {
966                        id,
967                        labels: labels.iter().map(|s| (*s).to_string()).collect(),
968                    }) {
969                        grafeo_warn!("Failed to log CreateNode to WAL: {}", e);
970                    }
971                    for (key, value) in props {
972                        if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
973                            id,
974                            key: key.to_string(),
975                            value,
976                        }) {
977                            grafeo_warn!("Failed to log SetNodeProperty to WAL: {}", e);
978                        }
979                    }
980                }
981
982                #[cfg(feature = "cdc")]
983                self.cdc_log.record_create_node(
984                    id,
985                    self.store.current_epoch(),
986                    if cdc_props.is_empty() {
987                        None
988                    } else {
989                        Some(cdc_props)
990                    },
991                    Some(labels.iter().map(|s| (*s).to_string()).collect()),
992                );
993
994                id
995            })
996            .collect();
997
998        // Auto-insert into matching vector indexes for any vector properties
999        #[cfg(feature = "vector-index")]
1000        {
1001            for (key, index) in self.store.vector_index_entries() {
1002                // key is "label:property"
1003                if !key.starts_with(label) || !key[label.len()..].starts_with(':') {
1004                    continue;
1005                }
1006                let property = &key[label.len() + 1..];
1007                let accessor =
1008                    grafeo_core::index::vector::PropertyVectorAccessor::new(&*self.store, property);
1009                let pk = grafeo_common::types::PropertyKey::new(property);
1010                for &id in &ids {
1011                    if let Some(node) = self.store.get_node(id) {
1012                        match node.properties.get(&pk) {
1013                            Some(Value::Vector(v)) => {
1014                                if std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
1015                                    index.insert(id, v, &accessor);
1016                                }))
1017                                .is_err()
1018                                {
1019                                    grafeo_warn!(
1020                                        "Vector index insert panicked for node {}",
1021                                        id.as_u64()
1022                                    );
1023                                }
1024                            }
1025                            Some(_other) => {
1026                                grafeo_warn!(
1027                                    "Node {} property '{}' expected Vector, skipping vector index insert",
1028                                    id.as_u64(),
1029                                    property
1030                                );
1031                            }
1032                            None => {} // No property, nothing to index
1033                        }
1034                    }
1035                }
1036            }
1037        }
1038
1039        // Auto-insert into matching text indexes for any string properties
1040        #[cfg(feature = "text-index")]
1041        for &id in &ids {
1042            if let Some(node) = self.store.get_node(id) {
1043                for (prop_key, prop_val) in &node.properties {
1044                    if let Value::String(text) = prop_val
1045                        && let Some(index) = self.store.get_text_index(label, prop_key.as_ref())
1046                    {
1047                        index.write().insert(id, text);
1048                    }
1049                }
1050            }
1051        }
1052
1053        ids
1054    }
1055}