1#[cfg(feature = "wal")]
4use grafeo_adapters::storage::wal::WalRecord;
5use grafeo_common::grafeo_warn;
6
7impl super::GrafeoDB {
8 pub fn create_node(&self, labels: &[&str]) -> grafeo_common::types::NodeId {
25 let id = self.lpg_store().create_node(labels);
26
27 #[cfg(feature = "wal")]
29 if let Err(e) = self.log_wal(&WalRecord::CreateNode {
30 id,
31 labels: labels.iter().map(|s| (*s).to_string()).collect(),
32 }) {
33 grafeo_warn!("Failed to log CreateNode to WAL: {}", e);
34 }
35
36 #[cfg(feature = "cdc")]
37 if self.cdc_active() {
38 self.cdc_log.record_create_node(
39 id,
40 self.lpg_store().current_epoch(),
41 None,
42 Some(labels.iter().map(|s| (*s).to_string()).collect()),
43 );
44 }
45
46 id
47 }
48
49 pub fn create_node_with_props(
53 &self,
54 labels: &[&str],
55 properties: impl IntoIterator<
56 Item = (
57 impl Into<grafeo_common::types::PropertyKey>,
58 impl Into<grafeo_common::types::Value>,
59 ),
60 >,
61 ) -> grafeo_common::types::NodeId {
62 let props: Vec<(
64 grafeo_common::types::PropertyKey,
65 grafeo_common::types::Value,
66 )> = properties
67 .into_iter()
68 .map(|(k, v)| (k.into(), v.into()))
69 .collect();
70
71 let id = self
72 .lpg_store()
73 .create_node_with_props(labels, props.iter().map(|(k, v)| (k.clone(), v.clone())));
74
75 #[cfg(feature = "cdc")]
77 let cdc_props: Option<
78 std::collections::HashMap<String, grafeo_common::types::Value>,
79 > = if self.cdc_active() {
80 Some(
81 props
82 .iter()
83 .map(|(k, v)| (k.to_string(), v.clone()))
84 .collect(),
85 )
86 } else {
87 None
88 };
89
90 #[cfg(feature = "wal")]
92 {
93 if let Err(e) = self.log_wal(&WalRecord::CreateNode {
94 id,
95 labels: labels.iter().map(|s| (*s).to_string()).collect(),
96 }) {
97 grafeo_warn!("Failed to log CreateNode to WAL: {}", e);
98 }
99
100 for (key, value) in props {
102 if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
103 id,
104 key: key.to_string(),
105 value,
106 }) {
107 grafeo_warn!("Failed to log SetNodeProperty to WAL: {}", e);
108 }
109 }
110 }
111
112 #[cfg(feature = "cdc")]
113 if let Some(cdc_props) = cdc_props {
114 self.cdc_log.record_create_node(
115 id,
116 self.lpg_store().current_epoch(),
117 if cdc_props.is_empty() {
118 None
119 } else {
120 Some(cdc_props)
121 },
122 Some(labels.iter().map(|s| (*s).to_string()).collect()),
123 );
124 }
125
126 #[cfg(feature = "text-index")]
128 if let Some(node) = self.lpg_store().get_node(id) {
129 for label in &node.labels {
130 for (prop_key, prop_val) in &node.properties {
131 if let grafeo_common::types::Value::String(text) = prop_val
132 && let Some(index) = self
133 .lpg_store()
134 .get_text_index(label.as_str(), prop_key.as_ref())
135 {
136 index.write().insert(id, text);
137 }
138 }
139 }
140 }
141
142 id
143 }
144
145 #[must_use]
147 pub fn get_node(
148 &self,
149 id: grafeo_common::types::NodeId,
150 ) -> Option<grafeo_core::graph::lpg::Node> {
151 self.lpg_store().get_node(id)
152 }
153
154 #[must_use]
160 pub fn get_node_at_epoch(
161 &self,
162 id: grafeo_common::types::NodeId,
163 epoch: grafeo_common::types::EpochId,
164 ) -> Option<grafeo_core::graph::lpg::Node> {
165 self.lpg_store().get_node_at_epoch(id, epoch)
166 }
167
168 #[must_use]
172 pub fn get_edge_at_epoch(
173 &self,
174 id: grafeo_common::types::EdgeId,
175 epoch: grafeo_common::types::EpochId,
176 ) -> Option<grafeo_core::graph::lpg::Edge> {
177 self.lpg_store().get_edge_at_epoch(id, epoch)
178 }
179
180 #[must_use]
184 pub fn get_node_history(
185 &self,
186 id: grafeo_common::types::NodeId,
187 ) -> Vec<(
188 grafeo_common::types::EpochId,
189 Option<grafeo_common::types::EpochId>,
190 grafeo_core::graph::lpg::Node,
191 )> {
192 self.lpg_store().get_node_history(id)
193 }
194
195 #[must_use]
199 pub fn get_edge_history(
200 &self,
201 id: grafeo_common::types::EdgeId,
202 ) -> Vec<(
203 grafeo_common::types::EpochId,
204 Option<grafeo_common::types::EpochId>,
205 grafeo_core::graph::lpg::Edge,
206 )> {
207 self.lpg_store().get_edge_history(id)
208 }
209
210 #[cfg(feature = "temporal")]
215 #[must_use]
216 pub fn get_node_property_at_epoch(
217 &self,
218 id: grafeo_common::types::NodeId,
219 key: &str,
220 epoch: grafeo_common::types::EpochId,
221 ) -> Option<grafeo_common::types::Value> {
222 let prop_key = grafeo_common::types::PropertyKey::new(key);
223 self.lpg_store()
224 .get_node_property_at_epoch(id, &prop_key, epoch)
225 }
226
227 #[cfg(feature = "temporal")]
232 #[must_use]
233 pub fn get_node_property_history(
234 &self,
235 id: grafeo_common::types::NodeId,
236 key: &str,
237 ) -> Vec<(grafeo_common::types::EpochId, grafeo_common::types::Value)> {
238 self.lpg_store().node_property_history_for_key(id, key)
239 }
240
241 #[cfg(feature = "temporal")]
245 #[must_use]
246 pub fn get_all_node_property_history(
247 &self,
248 id: grafeo_common::types::NodeId,
249 ) -> Vec<(
250 grafeo_common::types::PropertyKey,
251 Vec<(grafeo_common::types::EpochId, grafeo_common::types::Value)>,
252 )> {
253 self.lpg_store().node_property_history(id)
254 }
255
256 #[must_use]
258 pub fn current_epoch(&self) -> grafeo_common::types::EpochId {
259 self.lpg_store().current_epoch()
260 }
261
262 pub fn delete_node(&self, id: grafeo_common::types::NodeId) -> bool {
266 #[cfg(feature = "cdc")]
268 let cdc_props = if self.cdc_active() {
269 self.lpg_store().get_node(id).map(|node| {
270 node.properties
271 .iter()
272 .map(|(k, v)| (k.to_string(), v.clone()))
273 .collect::<std::collections::HashMap<String, grafeo_common::types::Value>>()
274 })
275 } else {
276 None
277 };
278
279 #[cfg(feature = "vector-index")]
281 let indexes_to_clean: Vec<std::sync::Arc<grafeo_core::index::vector::HnswIndex>> = self
282 .lpg_store()
283 .get_node(id)
284 .map(|node| {
285 let mut indexes = Vec::new();
286 for label in &node.labels {
287 let prefix = format!("{}:", label.as_str());
288 for (key, index) in self.lpg_store().vector_index_entries() {
289 if key.starts_with(&prefix) {
290 indexes.push(index);
291 }
292 }
293 }
294 indexes
295 })
296 .unwrap_or_default();
297
298 #[cfg(feature = "text-index")]
300 let text_indexes_to_clean: Vec<
301 std::sync::Arc<parking_lot::RwLock<grafeo_core::index::text::InvertedIndex>>,
302 > = self
303 .lpg_store()
304 .get_node(id)
305 .map(|node| {
306 let mut indexes = Vec::new();
307 for label in &node.labels {
308 let prefix = format!("{}:", label.as_str());
309 for (key, index) in self.lpg_store().text_index_entries() {
310 if key.starts_with(&prefix) {
311 indexes.push(index);
312 }
313 }
314 }
315 indexes
316 })
317 .unwrap_or_default();
318
319 let result = self.lpg_store().delete_node(id);
320
321 #[cfg(feature = "vector-index")]
323 if result {
324 for index in indexes_to_clean {
325 index.remove(id);
326 }
327 }
328
329 #[cfg(feature = "text-index")]
331 if result {
332 for index in text_indexes_to_clean {
333 index.write().remove(id);
334 }
335 }
336
337 #[cfg(feature = "wal")]
338 if result && let Err(e) = self.log_wal(&WalRecord::DeleteNode { id }) {
339 grafeo_warn!("Failed to log DeleteNode to WAL: {}", e);
340 }
341
342 #[cfg(feature = "cdc")]
343 if result && self.cdc_active() {
344 self.cdc_log.record_delete(
345 crate::cdc::EntityId::Node(id),
346 self.lpg_store().current_epoch(),
347 cdc_props,
348 );
349 }
350
351 result
352 }
353
354 pub fn set_node_property(
358 &self,
359 id: grafeo_common::types::NodeId,
360 key: &str,
361 value: grafeo_common::types::Value,
362 ) {
363 #[cfg(feature = "vector-index")]
365 let vector_data = match &value {
366 grafeo_common::types::Value::Vector(v) => Some(v.clone()),
367 _ => None,
368 };
369
370 #[cfg(feature = "wal")]
372 if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
373 id,
374 key: key.to_string(),
375 value: value.clone(),
376 }) {
377 grafeo_warn!("Failed to log SetNodeProperty to WAL: {}", e);
378 }
379
380 #[cfg(feature = "cdc")]
382 let cdc_active = self.cdc_active();
383 #[cfg(feature = "cdc")]
384 let cdc_old_value = if cdc_active {
385 self.lpg_store()
386 .get_node_property(id, &grafeo_common::types::PropertyKey::new(key))
387 } else {
388 None
389 };
390 #[cfg(feature = "cdc")]
391 let cdc_new_value = if cdc_active {
392 Some(value.clone())
393 } else {
394 None
395 };
396
397 self.lpg_store().set_node_property(id, key, value);
398
399 #[cfg(feature = "cdc")]
400 if let Some(cdc_new_value) = cdc_new_value {
401 self.cdc_log.record_update(
402 crate::cdc::EntityId::Node(id),
403 self.lpg_store().current_epoch(),
404 key,
405 cdc_old_value,
406 cdc_new_value,
407 );
408 }
409
410 #[cfg(feature = "vector-index")]
412 if let Some(vec) = vector_data
413 && let Some(node) = self.lpg_store().get_node(id)
414 {
415 for label in &node.labels {
416 if let Some(index) = self.lpg_store().get_vector_index(label.as_str(), key) {
417 let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(
418 &**self.lpg_store(),
419 key,
420 );
421 index.insert(id, &vec, &accessor);
422 }
423 }
424 }
425
426 #[cfg(feature = "text-index")]
428 if let Some(node) = self.lpg_store().get_node(id) {
429 let text_val = node
430 .properties
431 .get(&grafeo_common::types::PropertyKey::new(key))
432 .and_then(|v| match v {
433 grafeo_common::types::Value::String(s) => Some(s.to_string()),
434 _ => None,
435 });
436 for label in &node.labels {
437 if let Some(index) = self.lpg_store().get_text_index(label.as_str(), key) {
438 let mut idx = index.write();
439 if let Some(ref text) = text_val {
440 idx.insert(id, text);
441 } else {
442 idx.remove(id);
443 }
444 }
445 }
446 }
447 }
448
449 pub fn add_node_label(&self, id: grafeo_common::types::NodeId, label: &str) -> bool {
467 let result = self.lpg_store().add_label(id, label);
468
469 #[cfg(feature = "wal")]
470 if result {
471 if let Err(e) = self.log_wal(&WalRecord::AddNodeLabel {
473 id,
474 label: label.to_string(),
475 }) {
476 grafeo_warn!("Failed to log AddNodeLabel to WAL: {}", e);
477 }
478 }
479
480 #[cfg(feature = "vector-index")]
482 if result {
483 let prefix = format!("{label}:");
484 for (key, index) in self.lpg_store().vector_index_entries() {
485 if let Some(property) = key.strip_prefix(&prefix)
486 && let Some(node) = self.lpg_store().get_node(id)
487 {
488 let prop_key = grafeo_common::types::PropertyKey::new(property);
489 if let Some(grafeo_common::types::Value::Vector(v)) =
490 node.properties.get(&prop_key)
491 {
492 let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(
493 &**self.lpg_store(),
494 property,
495 );
496 index.insert(id, v, &accessor);
497 }
498 }
499 }
500 }
501
502 #[cfg(feature = "text-index")]
504 if result && let Some(node) = self.lpg_store().get_node(id) {
505 for (prop_key, prop_val) in &node.properties {
506 if let grafeo_common::types::Value::String(text) = prop_val
507 && let Some(index) = self.lpg_store().get_text_index(label, prop_key.as_ref())
508 {
509 index.write().insert(id, text);
510 }
511 }
512 }
513
514 result
515 }
516
517 pub fn remove_node_label(&self, id: grafeo_common::types::NodeId, label: &str) -> bool {
535 #[cfg(feature = "text-index")]
537 let text_indexes_to_clean: Vec<
538 std::sync::Arc<parking_lot::RwLock<grafeo_core::index::text::InvertedIndex>>,
539 > = {
540 let prefix = format!("{label}:");
541 self.lpg_store()
542 .text_index_entries()
543 .into_iter()
544 .filter(|(key, _)| key.starts_with(&prefix))
545 .map(|(_, index)| index)
546 .collect()
547 };
548
549 let result = self.lpg_store().remove_label(id, label);
550
551 #[cfg(feature = "wal")]
552 if result {
553 if let Err(e) = self.log_wal(&WalRecord::RemoveNodeLabel {
555 id,
556 label: label.to_string(),
557 }) {
558 grafeo_warn!("Failed to log RemoveNodeLabel to WAL: {}", e);
559 }
560 }
561
562 #[cfg(feature = "text-index")]
564 if result {
565 for index in text_indexes_to_clean {
566 index.write().remove(id);
567 }
568 }
569
570 result
571 }
572
573 #[must_use]
590 pub fn get_node_labels(&self, id: grafeo_common::types::NodeId) -> Option<Vec<String>> {
591 self.lpg_store()
592 .get_node(id)
593 .map(|node| node.labels.iter().map(|s| s.to_string()).collect())
594 }
595
596 pub fn create_edge(
616 &self,
617 src: grafeo_common::types::NodeId,
618 dst: grafeo_common::types::NodeId,
619 edge_type: &str,
620 ) -> grafeo_common::types::EdgeId {
621 let id = self.lpg_store().create_edge(src, dst, edge_type);
622
623 #[cfg(feature = "wal")]
625 if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
626 id,
627 src,
628 dst,
629 edge_type: edge_type.to_string(),
630 }) {
631 grafeo_warn!("Failed to log CreateEdge to WAL: {}", e);
632 }
633
634 #[cfg(feature = "cdc")]
635 if self.cdc_active() {
636 self.cdc_log.record_create_edge(
637 id,
638 self.lpg_store().current_epoch(),
639 None,
640 src.as_u64(),
641 dst.as_u64(),
642 edge_type.to_string(),
643 );
644 }
645
646 id
647 }
648
649 pub fn create_edge_with_props(
653 &self,
654 src: grafeo_common::types::NodeId,
655 dst: grafeo_common::types::NodeId,
656 edge_type: &str,
657 properties: impl IntoIterator<
658 Item = (
659 impl Into<grafeo_common::types::PropertyKey>,
660 impl Into<grafeo_common::types::Value>,
661 ),
662 >,
663 ) -> grafeo_common::types::EdgeId {
664 let props: Vec<(
666 grafeo_common::types::PropertyKey,
667 grafeo_common::types::Value,
668 )> = properties
669 .into_iter()
670 .map(|(k, v)| (k.into(), v.into()))
671 .collect();
672
673 let id = self.lpg_store().create_edge_with_props(
674 src,
675 dst,
676 edge_type,
677 props.iter().map(|(k, v)| (k.clone(), v.clone())),
678 );
679
680 #[cfg(feature = "cdc")]
682 let cdc_props: Option<
683 std::collections::HashMap<String, grafeo_common::types::Value>,
684 > = if self.cdc_active() {
685 Some(
686 props
687 .iter()
688 .map(|(k, v)| (k.to_string(), v.clone()))
689 .collect(),
690 )
691 } else {
692 None
693 };
694
695 #[cfg(feature = "wal")]
697 {
698 if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
699 id,
700 src,
701 dst,
702 edge_type: edge_type.to_string(),
703 }) {
704 grafeo_warn!("Failed to log CreateEdge to WAL: {}", e);
705 }
706
707 for (key, value) in props {
709 if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
710 id,
711 key: key.to_string(),
712 value,
713 }) {
714 grafeo_warn!("Failed to log SetEdgeProperty to WAL: {}", e);
715 }
716 }
717 }
718
719 #[cfg(feature = "cdc")]
720 if let Some(cdc_props) = cdc_props {
721 self.cdc_log.record_create_edge(
722 id,
723 self.lpg_store().current_epoch(),
724 if cdc_props.is_empty() {
725 None
726 } else {
727 Some(cdc_props)
728 },
729 src.as_u64(),
730 dst.as_u64(),
731 edge_type.to_string(),
732 );
733 }
734
735 id
736 }
737
738 #[must_use]
740 pub fn get_edge(
741 &self,
742 id: grafeo_common::types::EdgeId,
743 ) -> Option<grafeo_core::graph::lpg::Edge> {
744 self.lpg_store().get_edge(id)
745 }
746
747 pub fn delete_edge(&self, id: grafeo_common::types::EdgeId) -> bool {
751 #[cfg(feature = "cdc")]
753 let cdc_props = if self.cdc_active() {
754 self.lpg_store().get_edge(id).map(|edge| {
755 edge.properties
756 .iter()
757 .map(|(k, v)| (k.to_string(), v.clone()))
758 .collect::<std::collections::HashMap<String, grafeo_common::types::Value>>()
759 })
760 } else {
761 None
762 };
763
764 let result = self.lpg_store().delete_edge(id);
765
766 #[cfg(feature = "wal")]
767 if result && let Err(e) = self.log_wal(&WalRecord::DeleteEdge { id }) {
768 grafeo_warn!("Failed to log DeleteEdge to WAL: {}", e);
769 }
770
771 #[cfg(feature = "cdc")]
772 if result && self.cdc_active() {
773 self.cdc_log.record_delete(
774 crate::cdc::EntityId::Edge(id),
775 self.lpg_store().current_epoch(),
776 cdc_props,
777 );
778 }
779
780 result
781 }
782
783 pub fn set_edge_property(
787 &self,
788 id: grafeo_common::types::EdgeId,
789 key: &str,
790 value: grafeo_common::types::Value,
791 ) {
792 #[cfg(feature = "wal")]
794 if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
795 id,
796 key: key.to_string(),
797 value: value.clone(),
798 }) {
799 grafeo_warn!("Failed to log SetEdgeProperty to WAL: {}", e);
800 }
801
802 #[cfg(feature = "cdc")]
804 let cdc_active = self.cdc_active();
805 #[cfg(feature = "cdc")]
806 let cdc_old_value = if cdc_active {
807 self.lpg_store()
808 .get_edge_property(id, &grafeo_common::types::PropertyKey::new(key))
809 } else {
810 None
811 };
812 #[cfg(feature = "cdc")]
813 let cdc_new_value = if cdc_active {
814 Some(value.clone())
815 } else {
816 None
817 };
818
819 self.lpg_store().set_edge_property(id, key, value);
820
821 #[cfg(feature = "cdc")]
822 if let Some(cdc_new_value) = cdc_new_value {
823 self.cdc_log.record_update(
824 crate::cdc::EntityId::Edge(id),
825 self.lpg_store().current_epoch(),
826 key,
827 cdc_old_value,
828 cdc_new_value,
829 );
830 }
831 }
832
833 pub fn remove_node_property(&self, id: grafeo_common::types::NodeId, key: &str) -> bool {
837 let removed = self.lpg_store().remove_node_property(id, key).is_some();
838
839 #[cfg(feature = "wal")]
840 if removed
841 && let Err(e) = self.log_wal(&WalRecord::RemoveNodeProperty {
842 id,
843 key: key.to_string(),
844 })
845 {
846 grafeo_warn!("WAL log for RemoveNodeProperty failed: {e}");
847 }
848
849 #[cfg(feature = "text-index")]
851 if removed && let Some(node) = self.lpg_store().get_node(id) {
852 for label in &node.labels {
853 if let Some(index) = self.lpg_store().get_text_index(label.as_str(), key) {
854 index.write().remove(id);
855 }
856 }
857 }
858
859 removed
860 }
861
862 pub fn remove_edge_property(&self, id: grafeo_common::types::EdgeId, key: &str) -> bool {
866 let removed = self.lpg_store().remove_edge_property(id, key).is_some();
867
868 #[cfg(feature = "wal")]
869 if removed
870 && let Err(e) = self.log_wal(&WalRecord::RemoveEdgeProperty {
871 id,
872 key: key.to_string(),
873 })
874 {
875 grafeo_warn!("WAL log for RemoveEdgeProperty failed: {e}");
876 }
877
878 removed
879 }
880
881 pub fn batch_create_nodes(
903 &self,
904 label: &str,
905 property: &str,
906 vectors: Vec<Vec<f32>>,
907 ) -> Vec<grafeo_common::types::NodeId> {
908 use grafeo_common::types::{PropertyKey, Value};
909
910 let prop_key = PropertyKey::new(property);
911 let labels: &[&str] = &[label];
912
913 let ids: Vec<grafeo_common::types::NodeId> = vectors
914 .into_iter()
915 .map(|vec| {
916 let value = Value::Vector(vec.into());
917 let id = self.lpg_store().create_node_with_props(
918 labels,
919 std::iter::once((prop_key.clone(), value.clone())),
920 );
921
922 #[cfg(feature = "wal")]
924 {
925 if let Err(e) = self.log_wal(&WalRecord::CreateNode {
926 id,
927 labels: labels.iter().map(|s| (*s).to_string()).collect(),
928 }) {
929 grafeo_warn!("Failed to log CreateNode to WAL: {}", e);
930 }
931 if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
932 id,
933 key: property.to_string(),
934 value,
935 }) {
936 grafeo_warn!("Failed to log SetNodeProperty to WAL: {}", e);
937 }
938 }
939
940 id
941 })
942 .collect();
943
944 #[cfg(feature = "vector-index")]
946 if let Some(index) = self.lpg_store().get_vector_index(label, property) {
947 let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(
948 &**self.lpg_store(),
949 property,
950 );
951 for &id in &ids {
952 if let Some(node) = self.lpg_store().get_node(id) {
953 let pk = grafeo_common::types::PropertyKey::new(property);
954 if let Some(grafeo_common::types::Value::Vector(v)) = node.properties.get(&pk)
955 && std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
956 index.insert(id, v, &accessor);
957 }))
958 .is_err()
959 {
960 grafeo_warn!("Vector index insert panicked for node {}", id.as_u64());
961 }
962 }
963 }
964 }
965
966 ids
967 }
968
969 pub fn batch_create_nodes_with_props(
991 &self,
992 label: &str,
993 properties_list: Vec<
994 std::collections::HashMap<
995 grafeo_common::types::PropertyKey,
996 grafeo_common::types::Value,
997 >,
998 >,
999 ) -> Vec<grafeo_common::types::NodeId> {
1000 #[cfg(any(feature = "vector-index", feature = "text-index"))]
1001 use grafeo_common::types::Value;
1002
1003 let labels: &[&str] = &[label];
1004
1005 let ids: Vec<grafeo_common::types::NodeId> = properties_list
1006 .into_iter()
1007 .map(|props| {
1008 let id = self.lpg_store().create_node_with_props(
1009 labels,
1010 props.iter().map(|(k, v)| (k.clone(), v.clone())),
1011 );
1012
1013 #[cfg(feature = "cdc")]
1015 let cdc_props: Option<
1016 std::collections::HashMap<String, grafeo_common::types::Value>,
1017 > = if self.cdc_active() {
1018 Some(
1019 props
1020 .iter()
1021 .map(|(k, v)| (k.to_string(), v.clone()))
1022 .collect(),
1023 )
1024 } else {
1025 None
1026 };
1027
1028 #[cfg(feature = "wal")]
1030 {
1031 if let Err(e) = self.log_wal(&WalRecord::CreateNode {
1032 id,
1033 labels: labels.iter().map(|s| (*s).to_string()).collect(),
1034 }) {
1035 grafeo_warn!("Failed to log CreateNode to WAL: {}", e);
1036 }
1037 for (key, value) in props {
1038 if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
1039 id,
1040 key: key.to_string(),
1041 value,
1042 }) {
1043 grafeo_warn!("Failed to log SetNodeProperty to WAL: {}", e);
1044 }
1045 }
1046 }
1047
1048 #[cfg(feature = "cdc")]
1049 if let Some(cdc_props) = cdc_props {
1050 self.cdc_log.record_create_node(
1051 id,
1052 self.lpg_store().current_epoch(),
1053 if cdc_props.is_empty() {
1054 None
1055 } else {
1056 Some(cdc_props)
1057 },
1058 Some(labels.iter().map(|s| (*s).to_string()).collect()),
1059 );
1060 }
1061
1062 id
1063 })
1064 .collect();
1065
1066 #[cfg(feature = "vector-index")]
1068 {
1069 for (key, index) in self.lpg_store().vector_index_entries() {
1070 if !key.starts_with(label) || !key[label.len()..].starts_with(':') {
1072 continue;
1073 }
1074 let property = &key[label.len() + 1..];
1075 let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(
1076 &**self.lpg_store(),
1077 property,
1078 );
1079 let pk = grafeo_common::types::PropertyKey::new(property);
1080 for &id in &ids {
1081 if let Some(node) = self.lpg_store().get_node(id) {
1082 match node.properties.get(&pk) {
1083 Some(Value::Vector(v)) => {
1084 if std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
1085 index.insert(id, v, &accessor);
1086 }))
1087 .is_err()
1088 {
1089 grafeo_warn!(
1090 "Vector index insert panicked for node {}",
1091 id.as_u64()
1092 );
1093 }
1094 }
1095 Some(_other) => {
1096 grafeo_warn!(
1097 "Node {} property '{}' expected Vector, skipping vector index insert",
1098 id.as_u64(),
1099 property
1100 );
1101 }
1102 None => {} }
1104 }
1105 }
1106 }
1107 }
1108
1109 #[cfg(feature = "text-index")]
1111 for &id in &ids {
1112 if let Some(node) = self.lpg_store().get_node(id) {
1113 for (prop_key, prop_val) in &node.properties {
1114 if let Value::String(text) = prop_val
1115 && let Some(index) =
1116 self.lpg_store().get_text_index(label, prop_key.as_ref())
1117 {
1118 index.write().insert(id, text);
1119 }
1120 }
1121 }
1122 }
1123
1124 ids
1125 }
1126}