1use grafeo_common::grafeo_warn;
4#[cfg(feature = "wal")]
5use grafeo_storage::wal::WalRecord;
6
7impl super::GrafeoDB {
8 pub fn create_node(&self, labels: &[&str]) -> grafeo_common::types::NodeId {
25 let id = self.lpg_store().create_node(labels);
26
27 #[cfg(feature = "wal")]
29 if let Err(e) = self.log_wal(&WalRecord::CreateNode {
30 id,
31 labels: labels.iter().map(|s| (*s).to_string()).collect(),
32 }) {
33 grafeo_warn!("Failed to log CreateNode to WAL: {}", e);
34 }
35
36 #[cfg(feature = "cdc")]
37 if self.cdc_active() {
38 self.cdc_log.record_create_node(
39 id,
40 self.lpg_store().current_epoch(),
41 None,
42 Some(labels.iter().map(|s| (*s).to_string()).collect()),
43 );
44 }
45
46 id
47 }
48
49 pub fn create_node_with_props(
53 &self,
54 labels: &[&str],
55 properties: impl IntoIterator<
56 Item = (
57 impl Into<grafeo_common::types::PropertyKey>,
58 impl Into<grafeo_common::types::Value>,
59 ),
60 >,
61 ) -> grafeo_common::types::NodeId {
62 let props: Vec<(
64 grafeo_common::types::PropertyKey,
65 grafeo_common::types::Value,
66 )> = properties
67 .into_iter()
68 .map(|(k, v)| (k.into(), v.into()))
69 .collect();
70
71 let id = self
72 .lpg_store()
73 .create_node_with_props(labels, props.iter().map(|(k, v)| (k.clone(), v.clone())));
74
75 #[cfg(feature = "cdc")]
77 let cdc_props: Option<
78 std::collections::HashMap<String, grafeo_common::types::Value>,
79 > = if self.cdc_active() {
80 Some(
81 props
82 .iter()
83 .map(|(k, v)| (k.to_string(), v.clone()))
84 .collect(),
85 )
86 } else {
87 None
88 };
89
90 #[cfg(feature = "wal")]
92 {
93 if let Err(e) = self.log_wal(&WalRecord::CreateNode {
94 id,
95 labels: labels.iter().map(|s| (*s).to_string()).collect(),
96 }) {
97 grafeo_warn!("Failed to log CreateNode to WAL: {}", e);
98 }
99
100 for (key, value) in props {
102 if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
103 id,
104 key: key.to_string(),
105 value,
106 }) {
107 grafeo_warn!("Failed to log SetNodeProperty to WAL: {}", e);
108 }
109 }
110 }
111
112 #[cfg(feature = "cdc")]
113 if let Some(cdc_props) = cdc_props {
114 self.cdc_log.record_create_node(
115 id,
116 self.lpg_store().current_epoch(),
117 if cdc_props.is_empty() {
118 None
119 } else {
120 Some(cdc_props)
121 },
122 Some(labels.iter().map(|s| (*s).to_string()).collect()),
123 );
124 }
125
126 #[cfg(feature = "text-index")]
128 if let Some(node) = self.lpg_store().get_node(id) {
129 for label in &node.labels {
130 for (prop_key, prop_val) in &node.properties {
131 if let grafeo_common::types::Value::String(text) = prop_val
132 && let Some(index) = self
133 .lpg_store()
134 .get_text_index(label.as_str(), prop_key.as_ref())
135 {
136 index.write().insert(id, text);
137 }
138 }
139 }
140 }
141
142 id
143 }
144
145 #[must_use]
147 pub fn get_node(
148 &self,
149 id: grafeo_common::types::NodeId,
150 ) -> Option<grafeo_core::graph::lpg::Node> {
151 self.lpg_store().get_node(id)
152 }
153
154 #[must_use]
160 pub fn get_node_at_epoch(
161 &self,
162 id: grafeo_common::types::NodeId,
163 epoch: grafeo_common::types::EpochId,
164 ) -> Option<grafeo_core::graph::lpg::Node> {
165 self.lpg_store().get_node_at_epoch(id, epoch)
166 }
167
168 #[must_use]
172 pub fn get_edge_at_epoch(
173 &self,
174 id: grafeo_common::types::EdgeId,
175 epoch: grafeo_common::types::EpochId,
176 ) -> Option<grafeo_core::graph::lpg::Edge> {
177 self.lpg_store().get_edge_at_epoch(id, epoch)
178 }
179
180 #[must_use]
184 pub fn get_node_history(
185 &self,
186 id: grafeo_common::types::NodeId,
187 ) -> Vec<(
188 grafeo_common::types::EpochId,
189 Option<grafeo_common::types::EpochId>,
190 grafeo_core::graph::lpg::Node,
191 )> {
192 self.lpg_store().get_node_history(id)
193 }
194
195 #[must_use]
199 pub fn get_edge_history(
200 &self,
201 id: grafeo_common::types::EdgeId,
202 ) -> Vec<(
203 grafeo_common::types::EpochId,
204 Option<grafeo_common::types::EpochId>,
205 grafeo_core::graph::lpg::Edge,
206 )> {
207 self.lpg_store().get_edge_history(id)
208 }
209
210 #[cfg(feature = "temporal")]
215 #[must_use]
216 pub fn get_node_property_at_epoch(
217 &self,
218 id: grafeo_common::types::NodeId,
219 key: &str,
220 epoch: grafeo_common::types::EpochId,
221 ) -> Option<grafeo_common::types::Value> {
222 let prop_key = grafeo_common::types::PropertyKey::new(key);
223 self.lpg_store()
224 .get_node_property_at_epoch(id, &prop_key, epoch)
225 }
226
227 #[cfg(feature = "temporal")]
232 #[must_use]
233 pub fn get_node_property_history(
234 &self,
235 id: grafeo_common::types::NodeId,
236 key: &str,
237 ) -> Vec<(grafeo_common::types::EpochId, grafeo_common::types::Value)> {
238 self.lpg_store().node_property_history_for_key(id, key)
239 }
240
241 #[cfg(feature = "temporal")]
245 #[must_use]
246 pub fn get_all_node_property_history(
247 &self,
248 id: grafeo_common::types::NodeId,
249 ) -> Vec<(
250 grafeo_common::types::PropertyKey,
251 Vec<(grafeo_common::types::EpochId, grafeo_common::types::Value)>,
252 )> {
253 self.lpg_store().node_property_history(id)
254 }
255
256 #[must_use]
258 pub fn current_epoch(&self) -> grafeo_common::types::EpochId {
259 self.lpg_store().current_epoch()
260 }
261
262 pub fn delete_node(&self, id: grafeo_common::types::NodeId) -> bool {
266 #[cfg(feature = "cdc")]
268 let cdc_props = if self.cdc_active() {
269 self.lpg_store().get_node(id).map(|node| {
270 node.properties
271 .iter()
272 .map(|(k, v)| (k.to_string(), v.clone()))
273 .collect::<std::collections::HashMap<String, grafeo_common::types::Value>>()
274 })
275 } else {
276 None
277 };
278
279 #[cfg(feature = "vector-index")]
281 let indexes_to_clean: Vec<
282 std::sync::Arc<grafeo_core::index::vector::VectorIndexKind>,
283 > = self
284 .lpg_store()
285 .get_node(id)
286 .map(|node| {
287 let mut indexes = Vec::new();
288 for label in &node.labels {
289 let prefix = format!("{}:", label.as_str());
290 for (key, index) in self.lpg_store().vector_index_entries() {
291 if key.starts_with(&prefix) {
292 indexes.push(index);
293 }
294 }
295 }
296 indexes
297 })
298 .unwrap_or_default();
299
300 #[cfg(feature = "text-index")]
302 let text_indexes_to_clean: Vec<
303 std::sync::Arc<parking_lot::RwLock<grafeo_core::index::text::InvertedIndex>>,
304 > = self
305 .lpg_store()
306 .get_node(id)
307 .map(|node| {
308 let mut indexes = Vec::new();
309 for label in &node.labels {
310 let prefix = format!("{}:", label.as_str());
311 for (key, index) in self.lpg_store().text_index_entries() {
312 if key.starts_with(&prefix) {
313 indexes.push(index);
314 }
315 }
316 }
317 indexes
318 })
319 .unwrap_or_default();
320
321 let result = self.lpg_store().delete_node(id);
322
323 #[cfg(feature = "vector-index")]
325 if result {
326 for index in indexes_to_clean {
327 index.remove(id);
328 }
329 }
330
331 #[cfg(feature = "text-index")]
333 if result {
334 for index in text_indexes_to_clean {
335 index.write().remove(id);
336 }
337 }
338
339 #[cfg(feature = "wal")]
340 if result && let Err(e) = self.log_wal(&WalRecord::DeleteNode { id }) {
341 grafeo_warn!("Failed to log DeleteNode to WAL: {}", e);
342 }
343
344 #[cfg(feature = "cdc")]
345 if result && self.cdc_active() {
346 self.cdc_log.record_delete(
347 crate::cdc::EntityId::Node(id),
348 self.lpg_store().current_epoch(),
349 cdc_props,
350 );
351 }
352
353 result
354 }
355
356 pub fn set_node_property(
360 &self,
361 id: grafeo_common::types::NodeId,
362 key: &str,
363 value: grafeo_common::types::Value,
364 ) {
365 #[cfg(feature = "vector-index")]
367 let vector_data = match &value {
368 grafeo_common::types::Value::Vector(v) => Some(v.clone()),
369 _ => None,
370 };
371
372 #[cfg(feature = "wal")]
374 if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
375 id,
376 key: key.to_string(),
377 value: value.clone(),
378 }) {
379 grafeo_warn!("Failed to log SetNodeProperty to WAL: {}", e);
380 }
381
382 #[cfg(feature = "cdc")]
384 let cdc_active = self.cdc_active();
385 #[cfg(feature = "cdc")]
386 let cdc_old_value = if cdc_active {
387 self.lpg_store()
388 .get_node_property(id, &grafeo_common::types::PropertyKey::new(key))
389 } else {
390 None
391 };
392 #[cfg(feature = "cdc")]
393 let cdc_new_value = if cdc_active {
394 Some(value.clone())
395 } else {
396 None
397 };
398
399 self.lpg_store().set_node_property(id, key, value);
400
401 #[cfg(feature = "cdc")]
402 if let Some(cdc_new_value) = cdc_new_value {
403 self.cdc_log.record_update(
404 crate::cdc::EntityId::Node(id),
405 self.lpg_store().current_epoch(),
406 key,
407 cdc_old_value,
408 cdc_new_value,
409 );
410 }
411
412 #[cfg(feature = "vector-index")]
414 if let Some(vec) = vector_data
415 && let Some(node) = self.lpg_store().get_node(id)
416 {
417 for label in &node.labels {
418 if let Some(index) = self.lpg_store().get_vector_index(label.as_str(), key) {
419 let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(
420 &**self.lpg_store(),
421 key,
422 );
423 index.insert(id, &vec, &accessor);
424 }
425 }
426 }
427
428 #[cfg(feature = "text-index")]
430 if let Some(node) = self.lpg_store().get_node(id) {
431 let text_val = node
432 .properties
433 .get(&grafeo_common::types::PropertyKey::new(key))
434 .and_then(|v| match v {
435 grafeo_common::types::Value::String(s) => Some(s.to_string()),
436 _ => None,
437 });
438 for label in &node.labels {
439 if let Some(index) = self.lpg_store().get_text_index(label.as_str(), key) {
440 let mut idx = index.write();
441 if let Some(ref text) = text_val {
442 idx.insert(id, text);
443 } else {
444 idx.remove(id);
445 }
446 }
447 }
448 }
449 }
450
451 pub fn add_node_label(&self, id: grafeo_common::types::NodeId, label: &str) -> bool {
469 let result = self.lpg_store().add_label(id, label);
470
471 #[cfg(feature = "wal")]
472 if result {
473 if let Err(e) = self.log_wal(&WalRecord::AddNodeLabel {
475 id,
476 label: label.to_string(),
477 }) {
478 grafeo_warn!("Failed to log AddNodeLabel to WAL: {}", e);
479 }
480 }
481
482 #[cfg(feature = "vector-index")]
484 if result {
485 let prefix = format!("{label}:");
486 for (key, index) in self.lpg_store().vector_index_entries() {
487 if let Some(property) = key.strip_prefix(&prefix)
488 && let Some(node) = self.lpg_store().get_node(id)
489 {
490 let prop_key = grafeo_common::types::PropertyKey::new(property);
491 if let Some(grafeo_common::types::Value::Vector(v)) =
492 node.properties.get(&prop_key)
493 {
494 let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(
495 &**self.lpg_store(),
496 property,
497 );
498 index.insert(id, v, &accessor);
499 }
500 }
501 }
502 }
503
504 #[cfg(feature = "text-index")]
506 if result && let Some(node) = self.lpg_store().get_node(id) {
507 for (prop_key, prop_val) in &node.properties {
508 if let grafeo_common::types::Value::String(text) = prop_val
509 && let Some(index) = self.lpg_store().get_text_index(label, prop_key.as_ref())
510 {
511 index.write().insert(id, text);
512 }
513 }
514 }
515
516 result
517 }
518
519 pub fn remove_node_label(&self, id: grafeo_common::types::NodeId, label: &str) -> bool {
537 #[cfg(feature = "text-index")]
539 let text_indexes_to_clean: Vec<
540 std::sync::Arc<parking_lot::RwLock<grafeo_core::index::text::InvertedIndex>>,
541 > = {
542 let prefix = format!("{label}:");
543 self.lpg_store()
544 .text_index_entries()
545 .into_iter()
546 .filter(|(key, _)| key.starts_with(&prefix))
547 .map(|(_, index)| index)
548 .collect()
549 };
550
551 let result = self.lpg_store().remove_label(id, label);
552
553 #[cfg(feature = "wal")]
554 if result {
555 if let Err(e) = self.log_wal(&WalRecord::RemoveNodeLabel {
557 id,
558 label: label.to_string(),
559 }) {
560 grafeo_warn!("Failed to log RemoveNodeLabel to WAL: {}", e);
561 }
562 }
563
564 #[cfg(feature = "text-index")]
566 if result {
567 for index in text_indexes_to_clean {
568 index.write().remove(id);
569 }
570 }
571
572 result
573 }
574
575 #[must_use]
592 pub fn get_node_labels(&self, id: grafeo_common::types::NodeId) -> Option<Vec<String>> {
593 self.lpg_store()
594 .get_node(id)
595 .map(|node| node.labels.iter().map(|s| s.to_string()).collect())
596 }
597
598 pub fn create_edge(
618 &self,
619 src: grafeo_common::types::NodeId,
620 dst: grafeo_common::types::NodeId,
621 edge_type: &str,
622 ) -> grafeo_common::types::EdgeId {
623 let id = self.lpg_store().create_edge(src, dst, edge_type);
624
625 #[cfg(feature = "wal")]
627 if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
628 id,
629 src,
630 dst,
631 edge_type: edge_type.to_string(),
632 }) {
633 grafeo_warn!("Failed to log CreateEdge to WAL: {}", e);
634 }
635
636 #[cfg(feature = "cdc")]
637 if self.cdc_active() {
638 self.cdc_log.record_create_edge(
639 id,
640 self.lpg_store().current_epoch(),
641 None,
642 src.as_u64(),
643 dst.as_u64(),
644 edge_type.to_string(),
645 );
646 }
647
648 id
649 }
650
651 pub fn create_edge_with_props(
655 &self,
656 src: grafeo_common::types::NodeId,
657 dst: grafeo_common::types::NodeId,
658 edge_type: &str,
659 properties: impl IntoIterator<
660 Item = (
661 impl Into<grafeo_common::types::PropertyKey>,
662 impl Into<grafeo_common::types::Value>,
663 ),
664 >,
665 ) -> grafeo_common::types::EdgeId {
666 let props: Vec<(
668 grafeo_common::types::PropertyKey,
669 grafeo_common::types::Value,
670 )> = properties
671 .into_iter()
672 .map(|(k, v)| (k.into(), v.into()))
673 .collect();
674
675 let id = self.lpg_store().create_edge_with_props(
676 src,
677 dst,
678 edge_type,
679 props.iter().map(|(k, v)| (k.clone(), v.clone())),
680 );
681
682 #[cfg(feature = "cdc")]
684 let cdc_props: Option<
685 std::collections::HashMap<String, grafeo_common::types::Value>,
686 > = if self.cdc_active() {
687 Some(
688 props
689 .iter()
690 .map(|(k, v)| (k.to_string(), v.clone()))
691 .collect(),
692 )
693 } else {
694 None
695 };
696
697 #[cfg(feature = "wal")]
699 {
700 if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
701 id,
702 src,
703 dst,
704 edge_type: edge_type.to_string(),
705 }) {
706 grafeo_warn!("Failed to log CreateEdge to WAL: {}", e);
707 }
708
709 for (key, value) in props {
711 if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
712 id,
713 key: key.to_string(),
714 value,
715 }) {
716 grafeo_warn!("Failed to log SetEdgeProperty to WAL: {}", e);
717 }
718 }
719 }
720
721 #[cfg(feature = "cdc")]
722 if let Some(cdc_props) = cdc_props {
723 self.cdc_log.record_create_edge(
724 id,
725 self.lpg_store().current_epoch(),
726 if cdc_props.is_empty() {
727 None
728 } else {
729 Some(cdc_props)
730 },
731 src.as_u64(),
732 dst.as_u64(),
733 edge_type.to_string(),
734 );
735 }
736
737 id
738 }
739
740 #[must_use]
742 pub fn get_edge(
743 &self,
744 id: grafeo_common::types::EdgeId,
745 ) -> Option<grafeo_core::graph::lpg::Edge> {
746 self.lpg_store().get_edge(id)
747 }
748
749 pub fn delete_edge(&self, id: grafeo_common::types::EdgeId) -> bool {
753 #[cfg(feature = "cdc")]
755 let cdc_props = if self.cdc_active() {
756 self.lpg_store().get_edge(id).map(|edge| {
757 edge.properties
758 .iter()
759 .map(|(k, v)| (k.to_string(), v.clone()))
760 .collect::<std::collections::HashMap<String, grafeo_common::types::Value>>()
761 })
762 } else {
763 None
764 };
765
766 let result = self.lpg_store().delete_edge(id);
767
768 #[cfg(feature = "wal")]
769 if result && let Err(e) = self.log_wal(&WalRecord::DeleteEdge { id }) {
770 grafeo_warn!("Failed to log DeleteEdge to WAL: {}", e);
771 }
772
773 #[cfg(feature = "cdc")]
774 if result && self.cdc_active() {
775 self.cdc_log.record_delete(
776 crate::cdc::EntityId::Edge(id),
777 self.lpg_store().current_epoch(),
778 cdc_props,
779 );
780 }
781
782 result
783 }
784
785 pub fn set_edge_property(
789 &self,
790 id: grafeo_common::types::EdgeId,
791 key: &str,
792 value: grafeo_common::types::Value,
793 ) {
794 #[cfg(feature = "wal")]
796 if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
797 id,
798 key: key.to_string(),
799 value: value.clone(),
800 }) {
801 grafeo_warn!("Failed to log SetEdgeProperty to WAL: {}", e);
802 }
803
804 #[cfg(feature = "cdc")]
806 let cdc_active = self.cdc_active();
807 #[cfg(feature = "cdc")]
808 let cdc_old_value = if cdc_active {
809 self.lpg_store()
810 .get_edge_property(id, &grafeo_common::types::PropertyKey::new(key))
811 } else {
812 None
813 };
814 #[cfg(feature = "cdc")]
815 let cdc_new_value = if cdc_active {
816 Some(value.clone())
817 } else {
818 None
819 };
820
821 self.lpg_store().set_edge_property(id, key, value);
822
823 #[cfg(feature = "cdc")]
824 if let Some(cdc_new_value) = cdc_new_value {
825 self.cdc_log.record_update(
826 crate::cdc::EntityId::Edge(id),
827 self.lpg_store().current_epoch(),
828 key,
829 cdc_old_value,
830 cdc_new_value,
831 );
832 }
833 }
834
835 pub fn remove_node_property(&self, id: grafeo_common::types::NodeId, key: &str) -> bool {
839 let removed = self.lpg_store().remove_node_property(id, key).is_some();
840
841 #[cfg(feature = "wal")]
842 if removed
843 && let Err(e) = self.log_wal(&WalRecord::RemoveNodeProperty {
844 id,
845 key: key.to_string(),
846 })
847 {
848 grafeo_warn!("WAL log for RemoveNodeProperty failed: {e}");
849 }
850
851 #[cfg(feature = "text-index")]
853 if removed && let Some(node) = self.lpg_store().get_node(id) {
854 for label in &node.labels {
855 if let Some(index) = self.lpg_store().get_text_index(label.as_str(), key) {
856 index.write().remove(id);
857 }
858 }
859 }
860
861 removed
862 }
863
864 pub fn remove_edge_property(&self, id: grafeo_common::types::EdgeId, key: &str) -> bool {
868 let removed = self.lpg_store().remove_edge_property(id, key).is_some();
869
870 #[cfg(feature = "wal")]
871 if removed
872 && let Err(e) = self.log_wal(&WalRecord::RemoveEdgeProperty {
873 id,
874 key: key.to_string(),
875 })
876 {
877 grafeo_warn!("WAL log for RemoveEdgeProperty failed: {e}");
878 }
879
880 removed
881 }
882
883 pub fn batch_create_nodes(
905 &self,
906 label: &str,
907 property: &str,
908 vectors: Vec<Vec<f32>>,
909 ) -> Vec<grafeo_common::types::NodeId> {
910 use grafeo_common::types::{PropertyKey, Value};
911
912 let prop_key = PropertyKey::new(property);
913 let labels: &[&str] = &[label];
914
915 let ids: Vec<grafeo_common::types::NodeId> = vectors
916 .into_iter()
917 .map(|vec| {
918 let value = Value::Vector(vec.into());
919 let id = self.lpg_store().create_node_with_props(
920 labels,
921 std::iter::once((prop_key.clone(), value.clone())),
922 );
923
924 #[cfg(feature = "wal")]
926 {
927 if let Err(e) = self.log_wal(&WalRecord::CreateNode {
928 id,
929 labels: labels.iter().map(|s| (*s).to_string()).collect(),
930 }) {
931 grafeo_warn!("Failed to log CreateNode to WAL: {}", e);
932 }
933 if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
934 id,
935 key: property.to_string(),
936 value,
937 }) {
938 grafeo_warn!("Failed to log SetNodeProperty to WAL: {}", e);
939 }
940 }
941
942 id
943 })
944 .collect();
945
946 #[cfg(feature = "vector-index")]
948 if let Some(index) = self.lpg_store().get_vector_index(label, property) {
949 let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(
950 &**self.lpg_store(),
951 property,
952 );
953 for &id in &ids {
954 if let Some(node) = self.lpg_store().get_node(id) {
955 let pk = grafeo_common::types::PropertyKey::new(property);
956 if let Some(grafeo_common::types::Value::Vector(v)) = node.properties.get(&pk)
957 && std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
958 index.insert(id, v, &accessor);
959 }))
960 .is_err()
961 {
962 grafeo_warn!("Vector index insert panicked for node {}", id.as_u64());
963 }
964 }
965 }
966 }
967
968 ids
969 }
970
971 pub fn batch_create_nodes_with_props(
993 &self,
994 label: &str,
995 properties_list: Vec<
996 std::collections::HashMap<
997 grafeo_common::types::PropertyKey,
998 grafeo_common::types::Value,
999 >,
1000 >,
1001 ) -> Vec<grafeo_common::types::NodeId> {
1002 #[cfg(any(feature = "vector-index", feature = "text-index"))]
1003 use grafeo_common::types::Value;
1004
1005 let labels: &[&str] = &[label];
1006
1007 let ids: Vec<grafeo_common::types::NodeId> = properties_list
1008 .into_iter()
1009 .map(|props| {
1010 let id = self.lpg_store().create_node_with_props(
1011 labels,
1012 props.iter().map(|(k, v)| (k.clone(), v.clone())),
1013 );
1014
1015 #[cfg(feature = "cdc")]
1017 let cdc_props: Option<
1018 std::collections::HashMap<String, grafeo_common::types::Value>,
1019 > = if self.cdc_active() {
1020 Some(
1021 props
1022 .iter()
1023 .map(|(k, v)| (k.to_string(), v.clone()))
1024 .collect(),
1025 )
1026 } else {
1027 None
1028 };
1029
1030 #[cfg(feature = "wal")]
1032 {
1033 if let Err(e) = self.log_wal(&WalRecord::CreateNode {
1034 id,
1035 labels: labels.iter().map(|s| (*s).to_string()).collect(),
1036 }) {
1037 grafeo_warn!("Failed to log CreateNode to WAL: {}", e);
1038 }
1039 for (key, value) in props {
1040 if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
1041 id,
1042 key: key.to_string(),
1043 value,
1044 }) {
1045 grafeo_warn!("Failed to log SetNodeProperty to WAL: {}", e);
1046 }
1047 }
1048 }
1049
1050 #[cfg(feature = "cdc")]
1051 if let Some(cdc_props) = cdc_props {
1052 self.cdc_log.record_create_node(
1053 id,
1054 self.lpg_store().current_epoch(),
1055 if cdc_props.is_empty() {
1056 None
1057 } else {
1058 Some(cdc_props)
1059 },
1060 Some(labels.iter().map(|s| (*s).to_string()).collect()),
1061 );
1062 }
1063
1064 id
1065 })
1066 .collect();
1067
1068 #[cfg(feature = "vector-index")]
1070 {
1071 for (key, index) in self.lpg_store().vector_index_entries() {
1072 if !key.starts_with(label) || !key[label.len()..].starts_with(':') {
1074 continue;
1075 }
1076 let property = &key[label.len() + 1..];
1077 let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(
1078 &**self.lpg_store(),
1079 property,
1080 );
1081 let pk = grafeo_common::types::PropertyKey::new(property);
1082 for &id in &ids {
1083 if let Some(node) = self.lpg_store().get_node(id) {
1084 match node.properties.get(&pk) {
1085 Some(Value::Vector(v)) => {
1086 if std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
1087 index.insert(id, v, &accessor);
1088 }))
1089 .is_err()
1090 {
1091 grafeo_warn!(
1092 "Vector index insert panicked for node {}",
1093 id.as_u64()
1094 );
1095 }
1096 }
1097 Some(_other) => {
1098 grafeo_warn!(
1099 "Node {} property '{}' expected Vector, skipping vector index insert",
1100 id.as_u64(),
1101 property
1102 );
1103 }
1104 None => {} }
1106 }
1107 }
1108 }
1109 }
1110
1111 #[cfg(feature = "text-index")]
1113 for &id in &ids {
1114 if let Some(node) = self.lpg_store().get_node(id) {
1115 for (prop_key, prop_val) in &node.properties {
1116 if let Value::String(text) = prop_val
1117 && let Some(index) =
1118 self.lpg_store().get_text_index(label, prop_key.as_ref())
1119 {
1120 index.write().insert(id, text);
1121 }
1122 }
1123 }
1124 }
1125
1126 ids
1127 }
1128}