1#[cfg(feature = "wal")]
4use grafeo_adapters::storage::wal::WalRecord;
5use grafeo_common::grafeo_warn;
6
7impl super::GrafeoDB {
8 pub fn create_node(&self, labels: &[&str]) -> grafeo_common::types::NodeId {
25 let id = self.store.create_node(labels);
26
27 #[cfg(feature = "wal")]
29 if let Err(e) = self.log_wal(&WalRecord::CreateNode {
30 id,
31 labels: labels.iter().map(|s| (*s).to_string()).collect(),
32 }) {
33 grafeo_warn!("Failed to log CreateNode to WAL: {}", e);
34 }
35
36 #[cfg(feature = "cdc")]
37 self.cdc_log.record_create_node(
38 id,
39 self.store.current_epoch(),
40 None,
41 Some(labels.iter().map(|s| (*s).to_string()).collect()),
42 );
43
44 id
45 }
46
47 pub fn create_node_with_props(
51 &self,
52 labels: &[&str],
53 properties: impl IntoIterator<
54 Item = (
55 impl Into<grafeo_common::types::PropertyKey>,
56 impl Into<grafeo_common::types::Value>,
57 ),
58 >,
59 ) -> grafeo_common::types::NodeId {
60 let props: Vec<(
62 grafeo_common::types::PropertyKey,
63 grafeo_common::types::Value,
64 )> = properties
65 .into_iter()
66 .map(|(k, v)| (k.into(), v.into()))
67 .collect();
68
69 let id = self
70 .store
71 .create_node_with_props(labels, props.iter().map(|(k, v)| (k.clone(), v.clone())));
72
73 #[cfg(feature = "cdc")]
75 let cdc_props: std::collections::HashMap<String, grafeo_common::types::Value> = props
76 .iter()
77 .map(|(k, v)| (k.to_string(), v.clone()))
78 .collect();
79
80 #[cfg(feature = "wal")]
82 {
83 if let Err(e) = self.log_wal(&WalRecord::CreateNode {
84 id,
85 labels: labels.iter().map(|s| (*s).to_string()).collect(),
86 }) {
87 grafeo_warn!("Failed to log CreateNode to WAL: {}", e);
88 }
89
90 for (key, value) in props {
92 if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
93 id,
94 key: key.to_string(),
95 value,
96 }) {
97 grafeo_warn!("Failed to log SetNodeProperty to WAL: {}", e);
98 }
99 }
100 }
101
102 #[cfg(feature = "cdc")]
103 self.cdc_log.record_create_node(
104 id,
105 self.store.current_epoch(),
106 if cdc_props.is_empty() {
107 None
108 } else {
109 Some(cdc_props)
110 },
111 Some(labels.iter().map(|s| (*s).to_string()).collect()),
112 );
113
114 #[cfg(feature = "text-index")]
116 if let Some(node) = self.store.get_node(id) {
117 for label in &node.labels {
118 for (prop_key, prop_val) in &node.properties {
119 if let grafeo_common::types::Value::String(text) = prop_val
120 && let Some(index) =
121 self.store.get_text_index(label.as_str(), prop_key.as_ref())
122 {
123 index.write().insert(id, text);
124 }
125 }
126 }
127 }
128
129 id
130 }
131
132 #[must_use]
134 pub fn get_node(
135 &self,
136 id: grafeo_common::types::NodeId,
137 ) -> Option<grafeo_core::graph::lpg::Node> {
138 self.store.get_node(id)
139 }
140
141 #[must_use]
147 pub fn get_node_at_epoch(
148 &self,
149 id: grafeo_common::types::NodeId,
150 epoch: grafeo_common::types::EpochId,
151 ) -> Option<grafeo_core::graph::lpg::Node> {
152 self.store.get_node_at_epoch(id, epoch)
153 }
154
155 #[must_use]
159 pub fn get_edge_at_epoch(
160 &self,
161 id: grafeo_common::types::EdgeId,
162 epoch: grafeo_common::types::EpochId,
163 ) -> Option<grafeo_core::graph::lpg::Edge> {
164 self.store.get_edge_at_epoch(id, epoch)
165 }
166
167 #[must_use]
171 pub fn get_node_history(
172 &self,
173 id: grafeo_common::types::NodeId,
174 ) -> Vec<(
175 grafeo_common::types::EpochId,
176 Option<grafeo_common::types::EpochId>,
177 grafeo_core::graph::lpg::Node,
178 )> {
179 self.store.get_node_history(id)
180 }
181
182 #[must_use]
186 pub fn get_edge_history(
187 &self,
188 id: grafeo_common::types::EdgeId,
189 ) -> Vec<(
190 grafeo_common::types::EpochId,
191 Option<grafeo_common::types::EpochId>,
192 grafeo_core::graph::lpg::Edge,
193 )> {
194 self.store.get_edge_history(id)
195 }
196
197 #[must_use]
199 pub fn current_epoch(&self) -> grafeo_common::types::EpochId {
200 self.store.current_epoch()
201 }
202
203 pub fn delete_node(&self, id: grafeo_common::types::NodeId) -> bool {
207 #[cfg(feature = "cdc")]
209 let cdc_props = self.store.get_node(id).map(|node| {
210 node.properties
211 .iter()
212 .map(|(k, v)| (k.to_string(), v.clone()))
213 .collect::<std::collections::HashMap<String, grafeo_common::types::Value>>()
214 });
215
216 #[cfg(feature = "vector-index")]
218 let indexes_to_clean: Vec<std::sync::Arc<grafeo_core::index::vector::HnswIndex>> = self
219 .store
220 .get_node(id)
221 .map(|node| {
222 let mut indexes = Vec::new();
223 for label in &node.labels {
224 let prefix = format!("{}:", label.as_str());
225 for (key, index) in self.store.vector_index_entries() {
226 if key.starts_with(&prefix) {
227 indexes.push(index);
228 }
229 }
230 }
231 indexes
232 })
233 .unwrap_or_default();
234
235 #[cfg(feature = "text-index")]
237 let text_indexes_to_clean: Vec<
238 std::sync::Arc<parking_lot::RwLock<grafeo_core::index::text::InvertedIndex>>,
239 > = self
240 .store
241 .get_node(id)
242 .map(|node| {
243 let mut indexes = Vec::new();
244 for label in &node.labels {
245 let prefix = format!("{}:", label.as_str());
246 for (key, index) in self.store.text_index_entries() {
247 if key.starts_with(&prefix) {
248 indexes.push(index);
249 }
250 }
251 }
252 indexes
253 })
254 .unwrap_or_default();
255
256 let result = self.store.delete_node(id);
257
258 #[cfg(feature = "vector-index")]
260 if result {
261 for index in indexes_to_clean {
262 index.remove(id);
263 }
264 }
265
266 #[cfg(feature = "text-index")]
268 if result {
269 for index in text_indexes_to_clean {
270 index.write().remove(id);
271 }
272 }
273
274 #[cfg(feature = "wal")]
275 if result && let Err(e) = self.log_wal(&WalRecord::DeleteNode { id }) {
276 grafeo_warn!("Failed to log DeleteNode to WAL: {}", e);
277 }
278
279 #[cfg(feature = "cdc")]
280 if result {
281 self.cdc_log.record_delete(
282 crate::cdc::EntityId::Node(id),
283 self.store.current_epoch(),
284 cdc_props,
285 );
286 }
287
288 result
289 }
290
291 pub fn set_node_property(
295 &self,
296 id: grafeo_common::types::NodeId,
297 key: &str,
298 value: grafeo_common::types::Value,
299 ) {
300 #[cfg(feature = "vector-index")]
302 let vector_data = match &value {
303 grafeo_common::types::Value::Vector(v) => Some(v.clone()),
304 _ => None,
305 };
306
307 #[cfg(feature = "wal")]
309 if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
310 id,
311 key: key.to_string(),
312 value: value.clone(),
313 }) {
314 grafeo_warn!("Failed to log SetNodeProperty to WAL: {}", e);
315 }
316
317 #[cfg(feature = "cdc")]
319 let cdc_old_value = self
320 .store
321 .get_node_property(id, &grafeo_common::types::PropertyKey::new(key));
322 #[cfg(feature = "cdc")]
323 let cdc_new_value = value.clone();
324
325 self.store.set_node_property(id, key, value);
326
327 #[cfg(feature = "cdc")]
328 self.cdc_log.record_update(
329 crate::cdc::EntityId::Node(id),
330 self.store.current_epoch(),
331 key,
332 cdc_old_value,
333 cdc_new_value,
334 );
335
336 #[cfg(feature = "vector-index")]
338 if let Some(vec) = vector_data
339 && let Some(node) = self.store.get_node(id)
340 {
341 for label in &node.labels {
342 if let Some(index) = self.store.get_vector_index(label.as_str(), key) {
343 let accessor =
344 grafeo_core::index::vector::PropertyVectorAccessor::new(&*self.store, key);
345 index.insert(id, &vec, &accessor);
346 }
347 }
348 }
349
350 #[cfg(feature = "text-index")]
352 if let Some(node) = self.store.get_node(id) {
353 let text_val = node
354 .properties
355 .get(&grafeo_common::types::PropertyKey::new(key))
356 .and_then(|v| match v {
357 grafeo_common::types::Value::String(s) => Some(s.to_string()),
358 _ => None,
359 });
360 for label in &node.labels {
361 if let Some(index) = self.store.get_text_index(label.as_str(), key) {
362 let mut idx = index.write();
363 if let Some(ref text) = text_val {
364 idx.insert(id, text);
365 } else {
366 idx.remove(id);
367 }
368 }
369 }
370 }
371 }
372
373 pub fn add_node_label(&self, id: grafeo_common::types::NodeId, label: &str) -> bool {
391 let result = self.store.add_label(id, label);
392
393 #[cfg(feature = "wal")]
394 if result {
395 if let Err(e) = self.log_wal(&WalRecord::AddNodeLabel {
397 id,
398 label: label.to_string(),
399 }) {
400 grafeo_warn!("Failed to log AddNodeLabel to WAL: {}", e);
401 }
402 }
403
404 #[cfg(feature = "vector-index")]
406 if result {
407 let prefix = format!("{label}:");
408 for (key, index) in self.store.vector_index_entries() {
409 if let Some(property) = key.strip_prefix(&prefix)
410 && let Some(node) = self.store.get_node(id)
411 {
412 let prop_key = grafeo_common::types::PropertyKey::new(property);
413 if let Some(grafeo_common::types::Value::Vector(v)) =
414 node.properties.get(&prop_key)
415 {
416 let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(
417 &*self.store,
418 property,
419 );
420 index.insert(id, v, &accessor);
421 }
422 }
423 }
424 }
425
426 #[cfg(feature = "text-index")]
428 if result && let Some(node) = self.store.get_node(id) {
429 for (prop_key, prop_val) in &node.properties {
430 if let grafeo_common::types::Value::String(text) = prop_val
431 && let Some(index) = self.store.get_text_index(label, prop_key.as_ref())
432 {
433 index.write().insert(id, text);
434 }
435 }
436 }
437
438 result
439 }
440
441 pub fn remove_node_label(&self, id: grafeo_common::types::NodeId, label: &str) -> bool {
459 #[cfg(feature = "text-index")]
461 let text_indexes_to_clean: Vec<
462 std::sync::Arc<parking_lot::RwLock<grafeo_core::index::text::InvertedIndex>>,
463 > = {
464 let prefix = format!("{label}:");
465 self.store
466 .text_index_entries()
467 .into_iter()
468 .filter(|(key, _)| key.starts_with(&prefix))
469 .map(|(_, index)| index)
470 .collect()
471 };
472
473 let result = self.store.remove_label(id, label);
474
475 #[cfg(feature = "wal")]
476 if result {
477 if let Err(e) = self.log_wal(&WalRecord::RemoveNodeLabel {
479 id,
480 label: label.to_string(),
481 }) {
482 grafeo_warn!("Failed to log RemoveNodeLabel to WAL: {}", e);
483 }
484 }
485
486 #[cfg(feature = "text-index")]
488 if result {
489 for index in text_indexes_to_clean {
490 index.write().remove(id);
491 }
492 }
493
494 result
495 }
496
497 #[must_use]
514 pub fn get_node_labels(&self, id: grafeo_common::types::NodeId) -> Option<Vec<String>> {
515 self.store
516 .get_node(id)
517 .map(|node| node.labels.iter().map(|s| s.to_string()).collect())
518 }
519
520 pub fn create_edge(
540 &self,
541 src: grafeo_common::types::NodeId,
542 dst: grafeo_common::types::NodeId,
543 edge_type: &str,
544 ) -> grafeo_common::types::EdgeId {
545 let id = self.store.create_edge(src, dst, edge_type);
546
547 #[cfg(feature = "wal")]
549 if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
550 id,
551 src,
552 dst,
553 edge_type: edge_type.to_string(),
554 }) {
555 grafeo_warn!("Failed to log CreateEdge to WAL: {}", e);
556 }
557
558 #[cfg(feature = "cdc")]
559 self.cdc_log.record_create_edge(
560 id,
561 self.store.current_epoch(),
562 None,
563 src.as_u64(),
564 dst.as_u64(),
565 edge_type.to_string(),
566 );
567
568 id
569 }
570
571 pub fn create_edge_with_props(
575 &self,
576 src: grafeo_common::types::NodeId,
577 dst: grafeo_common::types::NodeId,
578 edge_type: &str,
579 properties: impl IntoIterator<
580 Item = (
581 impl Into<grafeo_common::types::PropertyKey>,
582 impl Into<grafeo_common::types::Value>,
583 ),
584 >,
585 ) -> grafeo_common::types::EdgeId {
586 let props: Vec<(
588 grafeo_common::types::PropertyKey,
589 grafeo_common::types::Value,
590 )> = properties
591 .into_iter()
592 .map(|(k, v)| (k.into(), v.into()))
593 .collect();
594
595 let id = self.store.create_edge_with_props(
596 src,
597 dst,
598 edge_type,
599 props.iter().map(|(k, v)| (k.clone(), v.clone())),
600 );
601
602 #[cfg(feature = "cdc")]
604 let cdc_props: std::collections::HashMap<String, grafeo_common::types::Value> = props
605 .iter()
606 .map(|(k, v)| (k.to_string(), v.clone()))
607 .collect();
608
609 #[cfg(feature = "wal")]
611 {
612 if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
613 id,
614 src,
615 dst,
616 edge_type: edge_type.to_string(),
617 }) {
618 grafeo_warn!("Failed to log CreateEdge to WAL: {}", e);
619 }
620
621 for (key, value) in props {
623 if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
624 id,
625 key: key.to_string(),
626 value,
627 }) {
628 grafeo_warn!("Failed to log SetEdgeProperty to WAL: {}", e);
629 }
630 }
631 }
632
633 #[cfg(feature = "cdc")]
634 self.cdc_log.record_create_edge(
635 id,
636 self.store.current_epoch(),
637 if cdc_props.is_empty() {
638 None
639 } else {
640 Some(cdc_props)
641 },
642 src.as_u64(),
643 dst.as_u64(),
644 edge_type.to_string(),
645 );
646
647 id
648 }
649
650 #[must_use]
652 pub fn get_edge(
653 &self,
654 id: grafeo_common::types::EdgeId,
655 ) -> Option<grafeo_core::graph::lpg::Edge> {
656 self.store.get_edge(id)
657 }
658
659 pub fn delete_edge(&self, id: grafeo_common::types::EdgeId) -> bool {
663 #[cfg(feature = "cdc")]
665 let cdc_props = self.store.get_edge(id).map(|edge| {
666 edge.properties
667 .iter()
668 .map(|(k, v)| (k.to_string(), v.clone()))
669 .collect::<std::collections::HashMap<String, grafeo_common::types::Value>>()
670 });
671
672 let result = self.store.delete_edge(id);
673
674 #[cfg(feature = "wal")]
675 if result && let Err(e) = self.log_wal(&WalRecord::DeleteEdge { id }) {
676 grafeo_warn!("Failed to log DeleteEdge to WAL: {}", e);
677 }
678
679 #[cfg(feature = "cdc")]
680 if result {
681 self.cdc_log.record_delete(
682 crate::cdc::EntityId::Edge(id),
683 self.store.current_epoch(),
684 cdc_props,
685 );
686 }
687
688 result
689 }
690
691 pub fn set_edge_property(
695 &self,
696 id: grafeo_common::types::EdgeId,
697 key: &str,
698 value: grafeo_common::types::Value,
699 ) {
700 #[cfg(feature = "wal")]
702 if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
703 id,
704 key: key.to_string(),
705 value: value.clone(),
706 }) {
707 grafeo_warn!("Failed to log SetEdgeProperty to WAL: {}", e);
708 }
709
710 #[cfg(feature = "cdc")]
712 let cdc_old_value = self
713 .store
714 .get_edge_property(id, &grafeo_common::types::PropertyKey::new(key));
715 #[cfg(feature = "cdc")]
716 let cdc_new_value = value.clone();
717
718 self.store.set_edge_property(id, key, value);
719
720 #[cfg(feature = "cdc")]
721 self.cdc_log.record_update(
722 crate::cdc::EntityId::Edge(id),
723 self.store.current_epoch(),
724 key,
725 cdc_old_value,
726 cdc_new_value,
727 );
728 }
729
730 pub fn remove_node_property(&self, id: grafeo_common::types::NodeId, key: &str) -> bool {
734 let removed = self.store.remove_node_property(id, key).is_some();
735
736 #[cfg(feature = "wal")]
737 if removed
738 && let Err(e) = self.log_wal(&WalRecord::RemoveNodeProperty {
739 id,
740 key: key.to_string(),
741 })
742 {
743 grafeo_warn!("WAL log for RemoveNodeProperty failed: {e}");
744 }
745
746 #[cfg(feature = "text-index")]
748 if removed && let Some(node) = self.store.get_node(id) {
749 for label in &node.labels {
750 if let Some(index) = self.store.get_text_index(label.as_str(), key) {
751 index.write().remove(id);
752 }
753 }
754 }
755
756 removed
757 }
758
759 pub fn remove_edge_property(&self, id: grafeo_common::types::EdgeId, key: &str) -> bool {
763 let removed = self.store.remove_edge_property(id, key).is_some();
764
765 #[cfg(feature = "wal")]
766 if removed
767 && let Err(e) = self.log_wal(&WalRecord::RemoveEdgeProperty {
768 id,
769 key: key.to_string(),
770 })
771 {
772 grafeo_warn!("WAL log for RemoveEdgeProperty failed: {e}");
773 }
774
775 removed
776 }
777
778 pub fn batch_create_nodes(
794 &self,
795 label: &str,
796 property: &str,
797 vectors: Vec<Vec<f32>>,
798 ) -> Vec<grafeo_common::types::NodeId> {
799 use grafeo_common::types::{PropertyKey, Value};
800
801 let prop_key = PropertyKey::new(property);
802 let labels: &[&str] = &[label];
803
804 let ids: Vec<grafeo_common::types::NodeId> = vectors
805 .into_iter()
806 .map(|vec| {
807 let value = Value::Vector(vec.into());
808 let id = self.store.create_node_with_props(
809 labels,
810 std::iter::once((prop_key.clone(), value.clone())),
811 );
812
813 #[cfg(feature = "wal")]
815 {
816 if let Err(e) = self.log_wal(&WalRecord::CreateNode {
817 id,
818 labels: labels.iter().map(|s| (*s).to_string()).collect(),
819 }) {
820 grafeo_warn!("Failed to log CreateNode to WAL: {}", e);
821 }
822 if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
823 id,
824 key: property.to_string(),
825 value,
826 }) {
827 grafeo_warn!("Failed to log SetNodeProperty to WAL: {}", e);
828 }
829 }
830
831 id
832 })
833 .collect();
834
835 #[cfg(feature = "vector-index")]
837 if let Some(index) = self.store.get_vector_index(label, property) {
838 let accessor =
839 grafeo_core::index::vector::PropertyVectorAccessor::new(&*self.store, property);
840 for &id in &ids {
841 if let Some(node) = self.store.get_node(id) {
842 let pk = grafeo_common::types::PropertyKey::new(property);
843 if let Some(grafeo_common::types::Value::Vector(v)) = node.properties.get(&pk) {
844 index.insert(id, v, &accessor);
845 }
846 }
847 }
848 }
849
850 ids
851 }
852}