Skip to main content

grafeo_engine/database/
crud.rs

1//! Node and edge CRUD operations for GrafeoDB.
2
3use grafeo_common::grafeo_warn;
4#[cfg(feature = "wal")]
5use grafeo_storage::wal::WalRecord;
6
7impl super::GrafeoDB {
8    // === Node Operations ===
9
10    /// Creates a node with the given labels and returns its ID.
11    ///
12    /// Labels categorize nodes - think of them like tags. A node can have
13    /// multiple labels (e.g., `["Person", "Employee"]`).
14    ///
15    /// # Examples
16    ///
17    /// ```
18    /// use grafeo_engine::GrafeoDB;
19    ///
20    /// let db = GrafeoDB::new_in_memory();
21    /// let alix = db.create_node(&["Person"]);
22    /// let company = db.create_node(&["Company", "Startup"]);
23    /// ```
24    pub fn create_node(&self, labels: &[&str]) -> grafeo_common::types::NodeId {
25        let id = self.lpg_store().create_node(labels);
26
27        // Log to WAL if enabled
28        #[cfg(feature = "wal")]
29        if let Err(e) = self.log_wal(&WalRecord::CreateNode {
30            id,
31            labels: labels.iter().map(|s| (*s).to_string()).collect(),
32        }) {
33            grafeo_warn!("Failed to log CreateNode to WAL: {}", e);
34        }
35
36        #[cfg(feature = "cdc")]
37        if self.cdc_active() {
38            self.cdc_log.record_create_node(
39                id,
40                self.lpg_store().current_epoch(),
41                None,
42                Some(labels.iter().map(|s| (*s).to_string()).collect()),
43            );
44        }
45
46        id
47    }
48
49    /// Creates a new node with labels and properties.
50    ///
51    /// If WAL is enabled, the operation is logged for durability.
52    pub fn create_node_with_props(
53        &self,
54        labels: &[&str],
55        properties: impl IntoIterator<
56            Item = (
57                impl Into<grafeo_common::types::PropertyKey>,
58                impl Into<grafeo_common::types::Value>,
59            ),
60        >,
61    ) -> grafeo_common::types::NodeId {
62        // Collect properties first so we can log them to WAL
63        let props: Vec<(
64            grafeo_common::types::PropertyKey,
65            grafeo_common::types::Value,
66        )> = properties
67            .into_iter()
68            .map(|(k, v)| (k.into(), v.into()))
69            .collect();
70
71        let id = self
72            .lpg_store()
73            .create_node_with_props(labels, props.iter().map(|(k, v)| (k.clone(), v.clone())));
74
75        // Build CDC snapshot before WAL consumes props
76        #[cfg(feature = "cdc")]
77        let cdc_props: Option<
78            std::collections::HashMap<String, grafeo_common::types::Value>,
79        > = if self.cdc_active() {
80            Some(
81                props
82                    .iter()
83                    .map(|(k, v)| (k.to_string(), v.clone()))
84                    .collect(),
85            )
86        } else {
87            None
88        };
89
90        // Log node creation to WAL
91        #[cfg(feature = "wal")]
92        {
93            if let Err(e) = self.log_wal(&WalRecord::CreateNode {
94                id,
95                labels: labels.iter().map(|s| (*s).to_string()).collect(),
96            }) {
97                grafeo_warn!("Failed to log CreateNode to WAL: {}", e);
98            }
99
100            // Log each property to WAL for full durability
101            for (key, value) in props {
102                if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
103                    id,
104                    key: key.to_string(),
105                    value,
106                }) {
107                    grafeo_warn!("Failed to log SetNodeProperty to WAL: {}", e);
108                }
109            }
110        }
111
112        #[cfg(feature = "cdc")]
113        if let Some(cdc_props) = cdc_props {
114            self.cdc_log.record_create_node(
115                id,
116                self.lpg_store().current_epoch(),
117                if cdc_props.is_empty() {
118                    None
119                } else {
120                    Some(cdc_props)
121                },
122                Some(labels.iter().map(|s| (*s).to_string()).collect()),
123            );
124        }
125
126        // Auto-insert into matching text indexes for the new node
127        #[cfg(feature = "text-index")]
128        if let Some(node) = self.lpg_store().get_node(id) {
129            for label in &node.labels {
130                for (prop_key, prop_val) in &node.properties {
131                    if let grafeo_common::types::Value::String(text) = prop_val
132                        && let Some(index) = self
133                            .lpg_store()
134                            .get_text_index(label.as_str(), prop_key.as_ref())
135                    {
136                        index.write().insert(id, text);
137                    }
138                }
139            }
140        }
141
142        id
143    }
144
145    /// Gets a node by ID.
146    #[must_use]
147    pub fn get_node(
148        &self,
149        id: grafeo_common::types::NodeId,
150    ) -> Option<grafeo_core::graph::lpg::Node> {
151        self.lpg_store().get_node(id)
152    }
153
154    /// Gets a node as it existed at a specific epoch.
155    ///
156    /// Uses pure epoch-based visibility (not transaction-aware), so the node
157    /// is visible if and only if `created_epoch <= epoch` and it was not
158    /// deleted at or before `epoch`.
159    #[must_use]
160    pub fn get_node_at_epoch(
161        &self,
162        id: grafeo_common::types::NodeId,
163        epoch: grafeo_common::types::EpochId,
164    ) -> Option<grafeo_core::graph::lpg::Node> {
165        self.lpg_store().get_node_at_epoch(id, epoch)
166    }
167
168    /// Gets an edge as it existed at a specific epoch.
169    ///
170    /// Uses pure epoch-based visibility (not transaction-aware).
171    #[must_use]
172    pub fn get_edge_at_epoch(
173        &self,
174        id: grafeo_common::types::EdgeId,
175        epoch: grafeo_common::types::EpochId,
176    ) -> Option<grafeo_core::graph::lpg::Edge> {
177        self.lpg_store().get_edge_at_epoch(id, epoch)
178    }
179
180    /// Returns all versions of a node with their creation/deletion epochs.
181    ///
182    /// Properties and labels reflect the current state (not versioned per-epoch).
183    #[must_use]
184    pub fn get_node_history(
185        &self,
186        id: grafeo_common::types::NodeId,
187    ) -> Vec<(
188        grafeo_common::types::EpochId,
189        Option<grafeo_common::types::EpochId>,
190        grafeo_core::graph::lpg::Node,
191    )> {
192        self.lpg_store().get_node_history(id)
193    }
194
195    /// Returns all versions of an edge with their creation/deletion epochs.
196    ///
197    /// Properties reflect the current state (not versioned per-epoch).
198    #[must_use]
199    pub fn get_edge_history(
200        &self,
201        id: grafeo_common::types::EdgeId,
202    ) -> Vec<(
203        grafeo_common::types::EpochId,
204        Option<grafeo_common::types::EpochId>,
205        grafeo_core::graph::lpg::Edge,
206    )> {
207        self.lpg_store().get_edge_history(id)
208    }
209
210    /// Returns a property value as it existed at a specific epoch.
211    ///
212    /// Uses the internal `VersionLog` to do a point-in-time read. Returns
213    /// `None` if the property didn't exist or was deleted at that epoch.
214    #[cfg(feature = "temporal")]
215    #[must_use]
216    pub fn get_node_property_at_epoch(
217        &self,
218        id: grafeo_common::types::NodeId,
219        key: &str,
220        epoch: grafeo_common::types::EpochId,
221    ) -> Option<grafeo_common::types::Value> {
222        let prop_key = grafeo_common::types::PropertyKey::new(key);
223        self.lpg_store()
224            .get_node_property_at_epoch(id, &prop_key, epoch)
225    }
226
227    /// Returns the full version timeline for a single property of a node.
228    ///
229    /// Each entry is `(epoch, value)` in ascending epoch order. Tombstones
230    /// (deletions) appear as `Value::Null`.
231    #[cfg(feature = "temporal")]
232    #[must_use]
233    pub fn get_node_property_history(
234        &self,
235        id: grafeo_common::types::NodeId,
236        key: &str,
237    ) -> Vec<(grafeo_common::types::EpochId, grafeo_common::types::Value)> {
238        self.lpg_store().node_property_history_for_key(id, key)
239    }
240
241    /// Returns the full version history for ALL properties of a node.
242    ///
243    /// Each entry is `(property_key, Vec<(epoch, value)>)`.
244    #[cfg(feature = "temporal")]
245    #[must_use]
246    pub fn get_all_node_property_history(
247        &self,
248        id: grafeo_common::types::NodeId,
249    ) -> Vec<(
250        grafeo_common::types::PropertyKey,
251        Vec<(grafeo_common::types::EpochId, grafeo_common::types::Value)>,
252    )> {
253        self.lpg_store().node_property_history(id)
254    }
255
256    /// Returns the current epoch of the database.
257    #[must_use]
258    pub fn current_epoch(&self) -> grafeo_common::types::EpochId {
259        self.lpg_store().current_epoch()
260    }
261
262    /// Deletes a node and all its edges.
263    ///
264    /// If WAL is enabled, the operation is logged for durability.
265    pub fn delete_node(&self, id: grafeo_common::types::NodeId) -> bool {
266        // Capture properties for CDC before deletion
267        #[cfg(feature = "cdc")]
268        let cdc_props = if self.cdc_active() {
269            self.lpg_store().get_node(id).map(|node| {
270                node.properties
271                    .iter()
272                    .map(|(k, v)| (k.to_string(), v.clone()))
273                    .collect::<std::collections::HashMap<String, grafeo_common::types::Value>>()
274            })
275        } else {
276            None
277        };
278
279        // Collect matching vector indexes BEFORE deletion removes labels
280        #[cfg(feature = "vector-index")]
281        let indexes_to_clean: Vec<std::sync::Arc<grafeo_core::index::vector::HnswIndex>> = self
282            .lpg_store()
283            .get_node(id)
284            .map(|node| {
285                let mut indexes = Vec::new();
286                for label in &node.labels {
287                    let prefix = format!("{}:", label.as_str());
288                    for (key, index) in self.lpg_store().vector_index_entries() {
289                        if key.starts_with(&prefix) {
290                            indexes.push(index);
291                        }
292                    }
293                }
294                indexes
295            })
296            .unwrap_or_default();
297
298        // Collect matching text indexes BEFORE deletion removes labels
299        #[cfg(feature = "text-index")]
300        let text_indexes_to_clean: Vec<
301            std::sync::Arc<parking_lot::RwLock<grafeo_core::index::text::InvertedIndex>>,
302        > = self
303            .lpg_store()
304            .get_node(id)
305            .map(|node| {
306                let mut indexes = Vec::new();
307                for label in &node.labels {
308                    let prefix = format!("{}:", label.as_str());
309                    for (key, index) in self.lpg_store().text_index_entries() {
310                        if key.starts_with(&prefix) {
311                            indexes.push(index);
312                        }
313                    }
314                }
315                indexes
316            })
317            .unwrap_or_default();
318
319        let result = self.lpg_store().delete_node(id);
320
321        // Remove from vector indexes after successful deletion
322        #[cfg(feature = "vector-index")]
323        if result {
324            for index in indexes_to_clean {
325                index.remove(id);
326            }
327        }
328
329        // Remove from text indexes after successful deletion
330        #[cfg(feature = "text-index")]
331        if result {
332            for index in text_indexes_to_clean {
333                index.write().remove(id);
334            }
335        }
336
337        #[cfg(feature = "wal")]
338        if result && let Err(e) = self.log_wal(&WalRecord::DeleteNode { id }) {
339            grafeo_warn!("Failed to log DeleteNode to WAL: {}", e);
340        }
341
342        #[cfg(feature = "cdc")]
343        if result && self.cdc_active() {
344            self.cdc_log.record_delete(
345                crate::cdc::EntityId::Node(id),
346                self.lpg_store().current_epoch(),
347                cdc_props,
348            );
349        }
350
351        result
352    }
353
354    /// Sets a property on a node.
355    ///
356    /// If WAL is enabled, the operation is logged for durability.
357    pub fn set_node_property(
358        &self,
359        id: grafeo_common::types::NodeId,
360        key: &str,
361        value: grafeo_common::types::Value,
362    ) {
363        // Extract vector data before the value is moved into the store
364        #[cfg(feature = "vector-index")]
365        let vector_data = match &value {
366            grafeo_common::types::Value::Vector(v) => Some(v.clone()),
367            _ => None,
368        };
369
370        // Log to WAL first
371        #[cfg(feature = "wal")]
372        if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
373            id,
374            key: key.to_string(),
375            value: value.clone(),
376        }) {
377            grafeo_warn!("Failed to log SetNodeProperty to WAL: {}", e);
378        }
379
380        // Capture old value for CDC before the store write
381        #[cfg(feature = "cdc")]
382        let cdc_active = self.cdc_active();
383        #[cfg(feature = "cdc")]
384        let cdc_old_value = if cdc_active {
385            self.lpg_store()
386                .get_node_property(id, &grafeo_common::types::PropertyKey::new(key))
387        } else {
388            None
389        };
390        #[cfg(feature = "cdc")]
391        let cdc_new_value = if cdc_active {
392            Some(value.clone())
393        } else {
394            None
395        };
396
397        self.lpg_store().set_node_property(id, key, value);
398
399        #[cfg(feature = "cdc")]
400        if let Some(cdc_new_value) = cdc_new_value {
401            self.cdc_log.record_update(
402                crate::cdc::EntityId::Node(id),
403                self.lpg_store().current_epoch(),
404                key,
405                cdc_old_value,
406                cdc_new_value,
407            );
408        }
409
410        // Auto-insert into matching vector indexes
411        #[cfg(feature = "vector-index")]
412        if let Some(vec) = vector_data
413            && let Some(node) = self.lpg_store().get_node(id)
414        {
415            for label in &node.labels {
416                if let Some(index) = self.lpg_store().get_vector_index(label.as_str(), key) {
417                    let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(
418                        &**self.lpg_store(),
419                        key,
420                    );
421                    index.insert(id, &vec, &accessor);
422                }
423            }
424        }
425
426        // Auto-update matching text indexes
427        #[cfg(feature = "text-index")]
428        if let Some(node) = self.lpg_store().get_node(id) {
429            let text_val = node
430                .properties
431                .get(&grafeo_common::types::PropertyKey::new(key))
432                .and_then(|v| match v {
433                    grafeo_common::types::Value::String(s) => Some(s.to_string()),
434                    _ => None,
435                });
436            for label in &node.labels {
437                if let Some(index) = self.lpg_store().get_text_index(label.as_str(), key) {
438                    let mut idx = index.write();
439                    if let Some(ref text) = text_val {
440                        idx.insert(id, text);
441                    } else {
442                        idx.remove(id);
443                    }
444                }
445            }
446        }
447    }
448
449    /// Adds a label to an existing node.
450    ///
451    /// Returns `true` if the label was added, `false` if the node doesn't exist
452    /// or already has the label.
453    ///
454    /// # Examples
455    ///
456    /// ```
457    /// use grafeo_engine::GrafeoDB;
458    ///
459    /// let db = GrafeoDB::new_in_memory();
460    /// let alix = db.create_node(&["Person"]);
461    ///
462    /// // Promote Alix to Employee
463    /// let added = db.add_node_label(alix, "Employee");
464    /// assert!(added);
465    /// ```
466    pub fn add_node_label(&self, id: grafeo_common::types::NodeId, label: &str) -> bool {
467        let result = self.lpg_store().add_label(id, label);
468
469        #[cfg(feature = "wal")]
470        if result {
471            // Log to WAL if enabled
472            if let Err(e) = self.log_wal(&WalRecord::AddNodeLabel {
473                id,
474                label: label.to_string(),
475            }) {
476                grafeo_warn!("Failed to log AddNodeLabel to WAL: {}", e);
477            }
478        }
479
480        // Auto-insert into vector indexes for the newly-added label
481        #[cfg(feature = "vector-index")]
482        if result {
483            let prefix = format!("{label}:");
484            for (key, index) in self.lpg_store().vector_index_entries() {
485                if let Some(property) = key.strip_prefix(&prefix)
486                    && let Some(node) = self.lpg_store().get_node(id)
487                {
488                    let prop_key = grafeo_common::types::PropertyKey::new(property);
489                    if let Some(grafeo_common::types::Value::Vector(v)) =
490                        node.properties.get(&prop_key)
491                    {
492                        let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(
493                            &**self.lpg_store(),
494                            property,
495                        );
496                        index.insert(id, v, &accessor);
497                    }
498                }
499            }
500        }
501
502        // Auto-insert into text indexes for the newly-added label
503        #[cfg(feature = "text-index")]
504        if result && let Some(node) = self.lpg_store().get_node(id) {
505            for (prop_key, prop_val) in &node.properties {
506                if let grafeo_common::types::Value::String(text) = prop_val
507                    && let Some(index) = self.lpg_store().get_text_index(label, prop_key.as_ref())
508                {
509                    index.write().insert(id, text);
510                }
511            }
512        }
513
514        result
515    }
516
517    /// Removes a label from a node.
518    ///
519    /// Returns `true` if the label was removed, `false` if the node doesn't exist
520    /// or doesn't have the label.
521    ///
522    /// # Examples
523    ///
524    /// ```
525    /// use grafeo_engine::GrafeoDB;
526    ///
527    /// let db = GrafeoDB::new_in_memory();
528    /// let alix = db.create_node(&["Person", "Employee"]);
529    ///
530    /// // Remove Employee status
531    /// let removed = db.remove_node_label(alix, "Employee");
532    /// assert!(removed);
533    /// ```
534    pub fn remove_node_label(&self, id: grafeo_common::types::NodeId, label: &str) -> bool {
535        // Collect text indexes to clean BEFORE removing the label
536        #[cfg(feature = "text-index")]
537        let text_indexes_to_clean: Vec<
538            std::sync::Arc<parking_lot::RwLock<grafeo_core::index::text::InvertedIndex>>,
539        > = {
540            let prefix = format!("{label}:");
541            self.lpg_store()
542                .text_index_entries()
543                .into_iter()
544                .filter(|(key, _)| key.starts_with(&prefix))
545                .map(|(_, index)| index)
546                .collect()
547        };
548
549        let result = self.lpg_store().remove_label(id, label);
550
551        #[cfg(feature = "wal")]
552        if result {
553            // Log to WAL if enabled
554            if let Err(e) = self.log_wal(&WalRecord::RemoveNodeLabel {
555                id,
556                label: label.to_string(),
557            }) {
558                grafeo_warn!("Failed to log RemoveNodeLabel to WAL: {}", e);
559            }
560        }
561
562        // Remove from text indexes for the removed label
563        #[cfg(feature = "text-index")]
564        if result {
565            for index in text_indexes_to_clean {
566                index.write().remove(id);
567            }
568        }
569
570        result
571    }
572
573    /// Gets all labels for a node.
574    ///
575    /// Returns `None` if the node doesn't exist.
576    ///
577    /// # Examples
578    ///
579    /// ```
580    /// use grafeo_engine::GrafeoDB;
581    ///
582    /// let db = GrafeoDB::new_in_memory();
583    /// let alix = db.create_node(&["Person", "Employee"]);
584    ///
585    /// let labels = db.get_node_labels(alix).unwrap();
586    /// assert!(labels.contains(&"Person".to_string()));
587    /// assert!(labels.contains(&"Employee".to_string()));
588    /// ```
589    #[must_use]
590    pub fn get_node_labels(&self, id: grafeo_common::types::NodeId) -> Option<Vec<String>> {
591        self.lpg_store()
592            .get_node(id)
593            .map(|node| node.labels.iter().map(|s| s.to_string()).collect())
594    }
595
596    // === Edge Operations ===
597
598    /// Creates an edge (relationship) between two nodes.
599    ///
600    /// Edges connect nodes and have a type that describes the relationship.
601    /// They're directed - the order of `src` and `dst` matters.
602    ///
603    /// # Examples
604    ///
605    /// ```
606    /// use grafeo_engine::GrafeoDB;
607    ///
608    /// let db = GrafeoDB::new_in_memory();
609    /// let alix = db.create_node(&["Person"]);
610    /// let gus = db.create_node(&["Person"]);
611    ///
612    /// // Alix knows Gus (directed: Alix -> Gus)
613    /// let edge = db.create_edge(alix, gus, "KNOWS");
614    /// ```
615    pub fn create_edge(
616        &self,
617        src: grafeo_common::types::NodeId,
618        dst: grafeo_common::types::NodeId,
619        edge_type: &str,
620    ) -> grafeo_common::types::EdgeId {
621        let id = self.lpg_store().create_edge(src, dst, edge_type);
622
623        // Log to WAL if enabled
624        #[cfg(feature = "wal")]
625        if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
626            id,
627            src,
628            dst,
629            edge_type: edge_type.to_string(),
630        }) {
631            grafeo_warn!("Failed to log CreateEdge to WAL: {}", e);
632        }
633
634        #[cfg(feature = "cdc")]
635        if self.cdc_active() {
636            self.cdc_log.record_create_edge(
637                id,
638                self.lpg_store().current_epoch(),
639                None,
640                src.as_u64(),
641                dst.as_u64(),
642                edge_type.to_string(),
643            );
644        }
645
646        id
647    }
648
649    /// Creates a new edge with properties.
650    ///
651    /// If WAL is enabled, the operation is logged for durability.
652    pub fn create_edge_with_props(
653        &self,
654        src: grafeo_common::types::NodeId,
655        dst: grafeo_common::types::NodeId,
656        edge_type: &str,
657        properties: impl IntoIterator<
658            Item = (
659                impl Into<grafeo_common::types::PropertyKey>,
660                impl Into<grafeo_common::types::Value>,
661            ),
662        >,
663    ) -> grafeo_common::types::EdgeId {
664        // Collect properties first so we can log them to WAL
665        let props: Vec<(
666            grafeo_common::types::PropertyKey,
667            grafeo_common::types::Value,
668        )> = properties
669            .into_iter()
670            .map(|(k, v)| (k.into(), v.into()))
671            .collect();
672
673        let id = self.lpg_store().create_edge_with_props(
674            src,
675            dst,
676            edge_type,
677            props.iter().map(|(k, v)| (k.clone(), v.clone())),
678        );
679
680        // Build CDC snapshot before WAL consumes props
681        #[cfg(feature = "cdc")]
682        let cdc_props: Option<
683            std::collections::HashMap<String, grafeo_common::types::Value>,
684        > = if self.cdc_active() {
685            Some(
686                props
687                    .iter()
688                    .map(|(k, v)| (k.to_string(), v.clone()))
689                    .collect(),
690            )
691        } else {
692            None
693        };
694
695        // Log edge creation to WAL
696        #[cfg(feature = "wal")]
697        {
698            if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
699                id,
700                src,
701                dst,
702                edge_type: edge_type.to_string(),
703            }) {
704                grafeo_warn!("Failed to log CreateEdge to WAL: {}", e);
705            }
706
707            // Log each property to WAL for full durability
708            for (key, value) in props {
709                if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
710                    id,
711                    key: key.to_string(),
712                    value,
713                }) {
714                    grafeo_warn!("Failed to log SetEdgeProperty to WAL: {}", e);
715                }
716            }
717        }
718
719        #[cfg(feature = "cdc")]
720        if let Some(cdc_props) = cdc_props {
721            self.cdc_log.record_create_edge(
722                id,
723                self.lpg_store().current_epoch(),
724                if cdc_props.is_empty() {
725                    None
726                } else {
727                    Some(cdc_props)
728                },
729                src.as_u64(),
730                dst.as_u64(),
731                edge_type.to_string(),
732            );
733        }
734
735        id
736    }
737
738    /// Gets an edge by ID.
739    #[must_use]
740    pub fn get_edge(
741        &self,
742        id: grafeo_common::types::EdgeId,
743    ) -> Option<grafeo_core::graph::lpg::Edge> {
744        self.lpg_store().get_edge(id)
745    }
746
747    /// Deletes an edge.
748    ///
749    /// If WAL is enabled, the operation is logged for durability.
750    pub fn delete_edge(&self, id: grafeo_common::types::EdgeId) -> bool {
751        // Capture properties for CDC before deletion
752        #[cfg(feature = "cdc")]
753        let cdc_props = if self.cdc_active() {
754            self.lpg_store().get_edge(id).map(|edge| {
755                edge.properties
756                    .iter()
757                    .map(|(k, v)| (k.to_string(), v.clone()))
758                    .collect::<std::collections::HashMap<String, grafeo_common::types::Value>>()
759            })
760        } else {
761            None
762        };
763
764        let result = self.lpg_store().delete_edge(id);
765
766        #[cfg(feature = "wal")]
767        if result && let Err(e) = self.log_wal(&WalRecord::DeleteEdge { id }) {
768            grafeo_warn!("Failed to log DeleteEdge to WAL: {}", e);
769        }
770
771        #[cfg(feature = "cdc")]
772        if result && self.cdc_active() {
773            self.cdc_log.record_delete(
774                crate::cdc::EntityId::Edge(id),
775                self.lpg_store().current_epoch(),
776                cdc_props,
777            );
778        }
779
780        result
781    }
782
783    /// Sets a property on an edge.
784    ///
785    /// If WAL is enabled, the operation is logged for durability.
786    pub fn set_edge_property(
787        &self,
788        id: grafeo_common::types::EdgeId,
789        key: &str,
790        value: grafeo_common::types::Value,
791    ) {
792        // Log to WAL first
793        #[cfg(feature = "wal")]
794        if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
795            id,
796            key: key.to_string(),
797            value: value.clone(),
798        }) {
799            grafeo_warn!("Failed to log SetEdgeProperty to WAL: {}", e);
800        }
801
802        // Capture old value for CDC before the store write
803        #[cfg(feature = "cdc")]
804        let cdc_active = self.cdc_active();
805        #[cfg(feature = "cdc")]
806        let cdc_old_value = if cdc_active {
807            self.lpg_store()
808                .get_edge_property(id, &grafeo_common::types::PropertyKey::new(key))
809        } else {
810            None
811        };
812        #[cfg(feature = "cdc")]
813        let cdc_new_value = if cdc_active {
814            Some(value.clone())
815        } else {
816            None
817        };
818
819        self.lpg_store().set_edge_property(id, key, value);
820
821        #[cfg(feature = "cdc")]
822        if let Some(cdc_new_value) = cdc_new_value {
823            self.cdc_log.record_update(
824                crate::cdc::EntityId::Edge(id),
825                self.lpg_store().current_epoch(),
826                key,
827                cdc_old_value,
828                cdc_new_value,
829            );
830        }
831    }
832
833    /// Removes a property from a node.
834    ///
835    /// Returns true if the property existed and was removed, false otherwise.
836    pub fn remove_node_property(&self, id: grafeo_common::types::NodeId, key: &str) -> bool {
837        let removed = self.lpg_store().remove_node_property(id, key).is_some();
838
839        #[cfg(feature = "wal")]
840        if removed
841            && let Err(e) = self.log_wal(&WalRecord::RemoveNodeProperty {
842                id,
843                key: key.to_string(),
844            })
845        {
846            grafeo_warn!("WAL log for RemoveNodeProperty failed: {e}");
847        }
848
849        // Remove from matching text indexes
850        #[cfg(feature = "text-index")]
851        if removed && let Some(node) = self.lpg_store().get_node(id) {
852            for label in &node.labels {
853                if let Some(index) = self.lpg_store().get_text_index(label.as_str(), key) {
854                    index.write().remove(id);
855                }
856            }
857        }
858
859        removed
860    }
861
862    /// Removes a property from an edge.
863    ///
864    /// Returns true if the property existed and was removed, false otherwise.
865    pub fn remove_edge_property(&self, id: grafeo_common::types::EdgeId, key: &str) -> bool {
866        let removed = self.lpg_store().remove_edge_property(id, key).is_some();
867
868        #[cfg(feature = "wal")]
869        if removed
870            && let Err(e) = self.log_wal(&WalRecord::RemoveEdgeProperty {
871                id,
872                key: key.to_string(),
873            })
874        {
875            grafeo_warn!("WAL log for RemoveEdgeProperty failed: {e}");
876        }
877
878        removed
879    }
880
881    /// Creates multiple nodes in bulk, each with a single vector property.
882    ///
883    /// Much faster than individual `create_node_with_props` calls because it
884    /// acquires internal locks once and loops in Rust rather than crossing
885    /// the FFI boundary per vector.
886    ///
887    /// **Atomicity note:** Individual node creations within the batch are NOT
888    /// atomic as a group. If a failure occurs mid-batch (e.g. WAL write error),
889    /// nodes created before the failure will persist while later nodes may not.
890    /// If you need all-or-nothing semantics, wrap the call in an explicit
891    /// transaction.
892    ///
893    /// # Arguments
894    ///
895    /// * `label` - Label applied to all created nodes
896    /// * `property` - Property name for the vector data
897    /// * `vectors` - Vector data for each node
898    ///
899    /// # Returns
900    ///
901    /// Vector of created `NodeId`s in the same order as the input vectors.
902    pub fn batch_create_nodes(
903        &self,
904        label: &str,
905        property: &str,
906        vectors: Vec<Vec<f32>>,
907    ) -> Vec<grafeo_common::types::NodeId> {
908        use grafeo_common::types::{PropertyKey, Value};
909
910        let prop_key = PropertyKey::new(property);
911        let labels: &[&str] = &[label];
912
913        let ids: Vec<grafeo_common::types::NodeId> = vectors
914            .into_iter()
915            .map(|vec| {
916                let value = Value::Vector(vec.into());
917                let id = self.lpg_store().create_node_with_props(
918                    labels,
919                    std::iter::once((prop_key.clone(), value.clone())),
920                );
921
922                // Log to WAL
923                #[cfg(feature = "wal")]
924                {
925                    if let Err(e) = self.log_wal(&WalRecord::CreateNode {
926                        id,
927                        labels: labels.iter().map(|s| (*s).to_string()).collect(),
928                    }) {
929                        grafeo_warn!("Failed to log CreateNode to WAL: {}", e);
930                    }
931                    if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
932                        id,
933                        key: property.to_string(),
934                        value,
935                    }) {
936                        grafeo_warn!("Failed to log SetNodeProperty to WAL: {}", e);
937                    }
938                }
939
940                id
941            })
942            .collect();
943
944        // Auto-insert into matching vector index if one exists
945        #[cfg(feature = "vector-index")]
946        if let Some(index) = self.lpg_store().get_vector_index(label, property) {
947            let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(
948                &**self.lpg_store(),
949                property,
950            );
951            for &id in &ids {
952                if let Some(node) = self.lpg_store().get_node(id) {
953                    let pk = grafeo_common::types::PropertyKey::new(property);
954                    if let Some(grafeo_common::types::Value::Vector(v)) = node.properties.get(&pk)
955                        && std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
956                            index.insert(id, v, &accessor);
957                        }))
958                        .is_err()
959                    {
960                        grafeo_warn!("Vector index insert panicked for node {}", id.as_u64());
961                    }
962                }
963            }
964        }
965
966        ids
967    }
968
969    /// Batch-creates nodes with full property maps.
970    ///
971    /// Each entry in `properties_list` is a complete property map for one node.
972    /// Vector values (`Value::Vector`) are automatically inserted into matching
973    /// vector indexes. Text values are automatically inserted into matching text
974    /// indexes.
975    ///
976    /// **Atomicity note:** Individual node creations within the batch are NOT
977    /// atomic as a group. If a failure occurs mid-batch (e.g. WAL write error),
978    /// nodes created before the failure will persist while later nodes may not.
979    /// If you need all-or-nothing semantics, wrap the call in an explicit
980    /// transaction.
981    ///
982    /// # Arguments
983    ///
984    /// * `label` - Label for all created nodes.
985    /// * `properties_list` - One property map per node to create.
986    ///
987    /// # Returns
988    ///
989    /// Vector of created `NodeId`s in the same order as the input.
990    pub fn batch_create_nodes_with_props(
991        &self,
992        label: &str,
993        properties_list: Vec<
994            std::collections::HashMap<
995                grafeo_common::types::PropertyKey,
996                grafeo_common::types::Value,
997            >,
998        >,
999    ) -> Vec<grafeo_common::types::NodeId> {
1000        #[cfg(any(feature = "vector-index", feature = "text-index"))]
1001        use grafeo_common::types::Value;
1002
1003        let labels: &[&str] = &[label];
1004
1005        let ids: Vec<grafeo_common::types::NodeId> = properties_list
1006            .into_iter()
1007            .map(|props| {
1008                let id = self.lpg_store().create_node_with_props(
1009                    labels,
1010                    props.iter().map(|(k, v)| (k.clone(), v.clone())),
1011                );
1012
1013                // Build CDC snapshot before WAL consumes props
1014                #[cfg(feature = "cdc")]
1015                let cdc_props: Option<
1016                    std::collections::HashMap<String, grafeo_common::types::Value>,
1017                > = if self.cdc_active() {
1018                    Some(
1019                        props
1020                            .iter()
1021                            .map(|(k, v)| (k.to_string(), v.clone()))
1022                            .collect(),
1023                    )
1024                } else {
1025                    None
1026                };
1027
1028                // Log to WAL
1029                #[cfg(feature = "wal")]
1030                {
1031                    if let Err(e) = self.log_wal(&WalRecord::CreateNode {
1032                        id,
1033                        labels: labels.iter().map(|s| (*s).to_string()).collect(),
1034                    }) {
1035                        grafeo_warn!("Failed to log CreateNode to WAL: {}", e);
1036                    }
1037                    for (key, value) in props {
1038                        if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
1039                            id,
1040                            key: key.to_string(),
1041                            value,
1042                        }) {
1043                            grafeo_warn!("Failed to log SetNodeProperty to WAL: {}", e);
1044                        }
1045                    }
1046                }
1047
1048                #[cfg(feature = "cdc")]
1049                if let Some(cdc_props) = cdc_props {
1050                    self.cdc_log.record_create_node(
1051                        id,
1052                        self.lpg_store().current_epoch(),
1053                        if cdc_props.is_empty() {
1054                            None
1055                        } else {
1056                            Some(cdc_props)
1057                        },
1058                        Some(labels.iter().map(|s| (*s).to_string()).collect()),
1059                    );
1060                }
1061
1062                id
1063            })
1064            .collect();
1065
1066        // Auto-insert into matching vector indexes for any vector properties
1067        #[cfg(feature = "vector-index")]
1068        {
1069            for (key, index) in self.lpg_store().vector_index_entries() {
1070                // key is "label:property"
1071                if !key.starts_with(label) || !key[label.len()..].starts_with(':') {
1072                    continue;
1073                }
1074                let property = &key[label.len() + 1..];
1075                let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(
1076                    &**self.lpg_store(),
1077                    property,
1078                );
1079                let pk = grafeo_common::types::PropertyKey::new(property);
1080                for &id in &ids {
1081                    if let Some(node) = self.lpg_store().get_node(id) {
1082                        match node.properties.get(&pk) {
1083                            Some(Value::Vector(v)) => {
1084                                if std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
1085                                    index.insert(id, v, &accessor);
1086                                }))
1087                                .is_err()
1088                                {
1089                                    grafeo_warn!(
1090                                        "Vector index insert panicked for node {}",
1091                                        id.as_u64()
1092                                    );
1093                                }
1094                            }
1095                            Some(_other) => {
1096                                grafeo_warn!(
1097                                    "Node {} property '{}' expected Vector, skipping vector index insert",
1098                                    id.as_u64(),
1099                                    property
1100                                );
1101                            }
1102                            None => {} // No property, nothing to index
1103                        }
1104                    }
1105                }
1106            }
1107        }
1108
1109        // Auto-insert into matching text indexes for any string properties
1110        #[cfg(feature = "text-index")]
1111        for &id in &ids {
1112            if let Some(node) = self.lpg_store().get_node(id) {
1113                for (prop_key, prop_val) in &node.properties {
1114                    if let Value::String(text) = prop_val
1115                        && let Some(index) =
1116                            self.lpg_store().get_text_index(label, prop_key.as_ref())
1117                    {
1118                        index.write().insert(id, text);
1119                    }
1120                }
1121            }
1122        }
1123
1124        ids
1125    }
1126}