1#[cfg(feature = "wal")]
4use grafeo_adapters::storage::wal::WalRecord;
5
6impl super::GrafeoDB {
7 pub fn create_node(&self, labels: &[&str]) -> grafeo_common::types::NodeId {
24 let id = self.store.create_node(labels);
25
26 #[cfg(feature = "wal")]
28 if let Err(e) = self.log_wal(&WalRecord::CreateNode {
29 id,
30 labels: labels.iter().map(|s| (*s).to_string()).collect(),
31 }) {
32 tracing::warn!("Failed to log CreateNode to WAL: {}", e);
33 }
34
35 #[cfg(feature = "cdc")]
36 self.cdc_log
37 .record_create_node(id, self.store.current_epoch(), None);
38
39 id
40 }
41
42 pub fn create_node_with_props(
46 &self,
47 labels: &[&str],
48 properties: impl IntoIterator<
49 Item = (
50 impl Into<grafeo_common::types::PropertyKey>,
51 impl Into<grafeo_common::types::Value>,
52 ),
53 >,
54 ) -> grafeo_common::types::NodeId {
55 let props: Vec<(
57 grafeo_common::types::PropertyKey,
58 grafeo_common::types::Value,
59 )> = properties
60 .into_iter()
61 .map(|(k, v)| (k.into(), v.into()))
62 .collect();
63
64 let id = self
65 .store
66 .create_node_with_props(labels, props.iter().map(|(k, v)| (k.clone(), v.clone())));
67
68 #[cfg(feature = "cdc")]
70 let cdc_props: std::collections::HashMap<String, grafeo_common::types::Value> = props
71 .iter()
72 .map(|(k, v)| (k.to_string(), v.clone()))
73 .collect();
74
75 #[cfg(feature = "wal")]
77 {
78 if let Err(e) = self.log_wal(&WalRecord::CreateNode {
79 id,
80 labels: labels.iter().map(|s| (*s).to_string()).collect(),
81 }) {
82 tracing::warn!("Failed to log CreateNode to WAL: {}", e);
83 }
84
85 for (key, value) in props {
87 if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
88 id,
89 key: key.to_string(),
90 value,
91 }) {
92 tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
93 }
94 }
95 }
96
97 #[cfg(feature = "cdc")]
98 self.cdc_log.record_create_node(
99 id,
100 self.store.current_epoch(),
101 if cdc_props.is_empty() {
102 None
103 } else {
104 Some(cdc_props)
105 },
106 );
107
108 #[cfg(feature = "text-index")]
110 if let Some(node) = self.store.get_node(id) {
111 for label in &node.labels {
112 for (prop_key, prop_val) in &node.properties {
113 if let grafeo_common::types::Value::String(text) = prop_val
114 && let Some(index) =
115 self.store.get_text_index(label.as_str(), prop_key.as_ref())
116 {
117 index.write().insert(id, text);
118 }
119 }
120 }
121 }
122
123 id
124 }
125
126 #[must_use]
128 pub fn get_node(
129 &self,
130 id: grafeo_common::types::NodeId,
131 ) -> Option<grafeo_core::graph::lpg::Node> {
132 self.store.get_node(id)
133 }
134
135 #[must_use]
141 pub fn get_node_at_epoch(
142 &self,
143 id: grafeo_common::types::NodeId,
144 epoch: grafeo_common::types::EpochId,
145 ) -> Option<grafeo_core::graph::lpg::Node> {
146 self.store.get_node_at_epoch(id, epoch)
147 }
148
149 #[must_use]
153 pub fn get_edge_at_epoch(
154 &self,
155 id: grafeo_common::types::EdgeId,
156 epoch: grafeo_common::types::EpochId,
157 ) -> Option<grafeo_core::graph::lpg::Edge> {
158 self.store.get_edge_at_epoch(id, epoch)
159 }
160
161 #[must_use]
165 pub fn get_node_history(
166 &self,
167 id: grafeo_common::types::NodeId,
168 ) -> Vec<(
169 grafeo_common::types::EpochId,
170 Option<grafeo_common::types::EpochId>,
171 grafeo_core::graph::lpg::Node,
172 )> {
173 self.store.get_node_history(id)
174 }
175
176 #[must_use]
180 pub fn get_edge_history(
181 &self,
182 id: grafeo_common::types::EdgeId,
183 ) -> Vec<(
184 grafeo_common::types::EpochId,
185 Option<grafeo_common::types::EpochId>,
186 grafeo_core::graph::lpg::Edge,
187 )> {
188 self.store.get_edge_history(id)
189 }
190
191 #[must_use]
193 pub fn current_epoch(&self) -> grafeo_common::types::EpochId {
194 self.store.current_epoch()
195 }
196
197 pub fn delete_node(&self, id: grafeo_common::types::NodeId) -> bool {
201 #[cfg(feature = "cdc")]
203 let cdc_props = self.store.get_node(id).map(|node| {
204 node.properties
205 .iter()
206 .map(|(k, v)| (k.to_string(), v.clone()))
207 .collect::<std::collections::HashMap<String, grafeo_common::types::Value>>()
208 });
209
210 #[cfg(feature = "vector-index")]
212 let indexes_to_clean: Vec<std::sync::Arc<grafeo_core::index::vector::HnswIndex>> = self
213 .store
214 .get_node(id)
215 .map(|node| {
216 let mut indexes = Vec::new();
217 for label in &node.labels {
218 let prefix = format!("{}:", label.as_str());
219 for (key, index) in self.store.vector_index_entries() {
220 if key.starts_with(&prefix) {
221 indexes.push(index);
222 }
223 }
224 }
225 indexes
226 })
227 .unwrap_or_default();
228
229 #[cfg(feature = "text-index")]
231 let text_indexes_to_clean: Vec<
232 std::sync::Arc<parking_lot::RwLock<grafeo_core::index::text::InvertedIndex>>,
233 > = self
234 .store
235 .get_node(id)
236 .map(|node| {
237 let mut indexes = Vec::new();
238 for label in &node.labels {
239 let prefix = format!("{}:", label.as_str());
240 for (key, index) in self.store.text_index_entries() {
241 if key.starts_with(&prefix) {
242 indexes.push(index);
243 }
244 }
245 }
246 indexes
247 })
248 .unwrap_or_default();
249
250 let result = self.store.delete_node(id);
251
252 #[cfg(feature = "vector-index")]
254 if result {
255 for index in indexes_to_clean {
256 index.remove(id);
257 }
258 }
259
260 #[cfg(feature = "text-index")]
262 if result {
263 for index in text_indexes_to_clean {
264 index.write().remove(id);
265 }
266 }
267
268 #[cfg(feature = "wal")]
269 if result && let Err(e) = self.log_wal(&WalRecord::DeleteNode { id }) {
270 tracing::warn!("Failed to log DeleteNode to WAL: {}", e);
271 }
272
273 #[cfg(feature = "cdc")]
274 if result {
275 self.cdc_log.record_delete(
276 crate::cdc::EntityId::Node(id),
277 self.store.current_epoch(),
278 cdc_props,
279 );
280 }
281
282 result
283 }
284
285 pub fn set_node_property(
289 &self,
290 id: grafeo_common::types::NodeId,
291 key: &str,
292 value: grafeo_common::types::Value,
293 ) {
294 #[cfg(feature = "vector-index")]
296 let vector_data = match &value {
297 grafeo_common::types::Value::Vector(v) => Some(v.clone()),
298 _ => None,
299 };
300
301 #[cfg(feature = "wal")]
303 if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
304 id,
305 key: key.to_string(),
306 value: value.clone(),
307 }) {
308 tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
309 }
310
311 #[cfg(feature = "cdc")]
313 let cdc_old_value = self
314 .store
315 .get_node_property(id, &grafeo_common::types::PropertyKey::new(key));
316 #[cfg(feature = "cdc")]
317 let cdc_new_value = value.clone();
318
319 self.store.set_node_property(id, key, value);
320
321 #[cfg(feature = "cdc")]
322 self.cdc_log.record_update(
323 crate::cdc::EntityId::Node(id),
324 self.store.current_epoch(),
325 key,
326 cdc_old_value,
327 cdc_new_value,
328 );
329
330 #[cfg(feature = "vector-index")]
332 if let Some(vec) = vector_data
333 && let Some(node) = self.store.get_node(id)
334 {
335 for label in &node.labels {
336 if let Some(index) = self.store.get_vector_index(label.as_str(), key) {
337 let accessor =
338 grafeo_core::index::vector::PropertyVectorAccessor::new(&*self.store, key);
339 index.insert(id, &vec, &accessor);
340 }
341 }
342 }
343
344 #[cfg(feature = "text-index")]
346 if let Some(node) = self.store.get_node(id) {
347 let text_val = node
348 .properties
349 .get(&grafeo_common::types::PropertyKey::new(key))
350 .and_then(|v| match v {
351 grafeo_common::types::Value::String(s) => Some(s.to_string()),
352 _ => None,
353 });
354 for label in &node.labels {
355 if let Some(index) = self.store.get_text_index(label.as_str(), key) {
356 let mut idx = index.write();
357 if let Some(ref text) = text_val {
358 idx.insert(id, text);
359 } else {
360 idx.remove(id);
361 }
362 }
363 }
364 }
365 }
366
367 pub fn add_node_label(&self, id: grafeo_common::types::NodeId, label: &str) -> bool {
385 let result = self.store.add_label(id, label);
386
387 #[cfg(feature = "wal")]
388 if result {
389 if let Err(e) = self.log_wal(&WalRecord::AddNodeLabel {
391 id,
392 label: label.to_string(),
393 }) {
394 tracing::warn!("Failed to log AddNodeLabel to WAL: {}", e);
395 }
396 }
397
398 #[cfg(feature = "vector-index")]
400 if result {
401 let prefix = format!("{label}:");
402 for (key, index) in self.store.vector_index_entries() {
403 if let Some(property) = key.strip_prefix(&prefix)
404 && let Some(node) = self.store.get_node(id)
405 {
406 let prop_key = grafeo_common::types::PropertyKey::new(property);
407 if let Some(grafeo_common::types::Value::Vector(v)) =
408 node.properties.get(&prop_key)
409 {
410 let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(
411 &*self.store,
412 property,
413 );
414 index.insert(id, v, &accessor);
415 }
416 }
417 }
418 }
419
420 #[cfg(feature = "text-index")]
422 if result && let Some(node) = self.store.get_node(id) {
423 for (prop_key, prop_val) in &node.properties {
424 if let grafeo_common::types::Value::String(text) = prop_val
425 && let Some(index) = self.store.get_text_index(label, prop_key.as_ref())
426 {
427 index.write().insert(id, text);
428 }
429 }
430 }
431
432 result
433 }
434
435 pub fn remove_node_label(&self, id: grafeo_common::types::NodeId, label: &str) -> bool {
453 #[cfg(feature = "text-index")]
455 let text_indexes_to_clean: Vec<
456 std::sync::Arc<parking_lot::RwLock<grafeo_core::index::text::InvertedIndex>>,
457 > = {
458 let prefix = format!("{label}:");
459 self.store
460 .text_index_entries()
461 .into_iter()
462 .filter(|(key, _)| key.starts_with(&prefix))
463 .map(|(_, index)| index)
464 .collect()
465 };
466
467 let result = self.store.remove_label(id, label);
468
469 #[cfg(feature = "wal")]
470 if result {
471 if let Err(e) = self.log_wal(&WalRecord::RemoveNodeLabel {
473 id,
474 label: label.to_string(),
475 }) {
476 tracing::warn!("Failed to log RemoveNodeLabel to WAL: {}", e);
477 }
478 }
479
480 #[cfg(feature = "text-index")]
482 if result {
483 for index in text_indexes_to_clean {
484 index.write().remove(id);
485 }
486 }
487
488 result
489 }
490
491 #[must_use]
508 pub fn get_node_labels(&self, id: grafeo_common::types::NodeId) -> Option<Vec<String>> {
509 self.store
510 .get_node(id)
511 .map(|node| node.labels.iter().map(|s| s.to_string()).collect())
512 }
513
514 pub fn create_edge(
534 &self,
535 src: grafeo_common::types::NodeId,
536 dst: grafeo_common::types::NodeId,
537 edge_type: &str,
538 ) -> grafeo_common::types::EdgeId {
539 let id = self.store.create_edge(src, dst, edge_type);
540
541 #[cfg(feature = "wal")]
543 if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
544 id,
545 src,
546 dst,
547 edge_type: edge_type.to_string(),
548 }) {
549 tracing::warn!("Failed to log CreateEdge to WAL: {}", e);
550 }
551
552 #[cfg(feature = "cdc")]
553 self.cdc_log
554 .record_create_edge(id, self.store.current_epoch(), None);
555
556 id
557 }
558
559 pub fn create_edge_with_props(
563 &self,
564 src: grafeo_common::types::NodeId,
565 dst: grafeo_common::types::NodeId,
566 edge_type: &str,
567 properties: impl IntoIterator<
568 Item = (
569 impl Into<grafeo_common::types::PropertyKey>,
570 impl Into<grafeo_common::types::Value>,
571 ),
572 >,
573 ) -> grafeo_common::types::EdgeId {
574 let props: Vec<(
576 grafeo_common::types::PropertyKey,
577 grafeo_common::types::Value,
578 )> = properties
579 .into_iter()
580 .map(|(k, v)| (k.into(), v.into()))
581 .collect();
582
583 let id = self.store.create_edge_with_props(
584 src,
585 dst,
586 edge_type,
587 props.iter().map(|(k, v)| (k.clone(), v.clone())),
588 );
589
590 #[cfg(feature = "cdc")]
592 let cdc_props: std::collections::HashMap<String, grafeo_common::types::Value> = props
593 .iter()
594 .map(|(k, v)| (k.to_string(), v.clone()))
595 .collect();
596
597 #[cfg(feature = "wal")]
599 {
600 if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
601 id,
602 src,
603 dst,
604 edge_type: edge_type.to_string(),
605 }) {
606 tracing::warn!("Failed to log CreateEdge to WAL: {}", e);
607 }
608
609 for (key, value) in props {
611 if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
612 id,
613 key: key.to_string(),
614 value,
615 }) {
616 tracing::warn!("Failed to log SetEdgeProperty to WAL: {}", e);
617 }
618 }
619 }
620
621 #[cfg(feature = "cdc")]
622 self.cdc_log.record_create_edge(
623 id,
624 self.store.current_epoch(),
625 if cdc_props.is_empty() {
626 None
627 } else {
628 Some(cdc_props)
629 },
630 );
631
632 id
633 }
634
635 #[must_use]
637 pub fn get_edge(
638 &self,
639 id: grafeo_common::types::EdgeId,
640 ) -> Option<grafeo_core::graph::lpg::Edge> {
641 self.store.get_edge(id)
642 }
643
644 pub fn delete_edge(&self, id: grafeo_common::types::EdgeId) -> bool {
648 #[cfg(feature = "cdc")]
650 let cdc_props = self.store.get_edge(id).map(|edge| {
651 edge.properties
652 .iter()
653 .map(|(k, v)| (k.to_string(), v.clone()))
654 .collect::<std::collections::HashMap<String, grafeo_common::types::Value>>()
655 });
656
657 let result = self.store.delete_edge(id);
658
659 #[cfg(feature = "wal")]
660 if result && let Err(e) = self.log_wal(&WalRecord::DeleteEdge { id }) {
661 tracing::warn!("Failed to log DeleteEdge to WAL: {}", e);
662 }
663
664 #[cfg(feature = "cdc")]
665 if result {
666 self.cdc_log.record_delete(
667 crate::cdc::EntityId::Edge(id),
668 self.store.current_epoch(),
669 cdc_props,
670 );
671 }
672
673 result
674 }
675
676 pub fn set_edge_property(
680 &self,
681 id: grafeo_common::types::EdgeId,
682 key: &str,
683 value: grafeo_common::types::Value,
684 ) {
685 #[cfg(feature = "wal")]
687 if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
688 id,
689 key: key.to_string(),
690 value: value.clone(),
691 }) {
692 tracing::warn!("Failed to log SetEdgeProperty to WAL: {}", e);
693 }
694
695 #[cfg(feature = "cdc")]
697 let cdc_old_value = self
698 .store
699 .get_edge_property(id, &grafeo_common::types::PropertyKey::new(key));
700 #[cfg(feature = "cdc")]
701 let cdc_new_value = value.clone();
702
703 self.store.set_edge_property(id, key, value);
704
705 #[cfg(feature = "cdc")]
706 self.cdc_log.record_update(
707 crate::cdc::EntityId::Edge(id),
708 self.store.current_epoch(),
709 key,
710 cdc_old_value,
711 cdc_new_value,
712 );
713 }
714
715 pub fn remove_node_property(&self, id: grafeo_common::types::NodeId, key: &str) -> bool {
719 let removed = self.store.remove_node_property(id, key).is_some();
720
721 #[cfg(feature = "wal")]
722 if removed
723 && let Err(e) = self.log_wal(&WalRecord::RemoveNodeProperty {
724 id,
725 key: key.to_string(),
726 })
727 {
728 tracing::warn!("WAL log for RemoveNodeProperty failed: {e}");
729 }
730
731 #[cfg(feature = "text-index")]
733 if removed && let Some(node) = self.store.get_node(id) {
734 for label in &node.labels {
735 if let Some(index) = self.store.get_text_index(label.as_str(), key) {
736 index.write().remove(id);
737 }
738 }
739 }
740
741 removed
742 }
743
744 pub fn remove_edge_property(&self, id: grafeo_common::types::EdgeId, key: &str) -> bool {
748 let removed = self.store.remove_edge_property(id, key).is_some();
749
750 #[cfg(feature = "wal")]
751 if removed
752 && let Err(e) = self.log_wal(&WalRecord::RemoveEdgeProperty {
753 id,
754 key: key.to_string(),
755 })
756 {
757 tracing::warn!("WAL log for RemoveEdgeProperty failed: {e}");
758 }
759
760 removed
761 }
762
763 pub fn batch_create_nodes(
779 &self,
780 label: &str,
781 property: &str,
782 vectors: Vec<Vec<f32>>,
783 ) -> Vec<grafeo_common::types::NodeId> {
784 use grafeo_common::types::{PropertyKey, Value};
785
786 let prop_key = PropertyKey::new(property);
787 let labels: &[&str] = &[label];
788
789 let ids: Vec<grafeo_common::types::NodeId> = vectors
790 .into_iter()
791 .map(|vec| {
792 let value = Value::Vector(vec.into());
793 let id = self.store.create_node_with_props(
794 labels,
795 std::iter::once((prop_key.clone(), value.clone())),
796 );
797
798 #[cfg(feature = "wal")]
800 {
801 if let Err(e) = self.log_wal(&WalRecord::CreateNode {
802 id,
803 labels: labels.iter().map(|s| (*s).to_string()).collect(),
804 }) {
805 tracing::warn!("Failed to log CreateNode to WAL: {}", e);
806 }
807 if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
808 id,
809 key: property.to_string(),
810 value,
811 }) {
812 tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
813 }
814 }
815
816 id
817 })
818 .collect();
819
820 #[cfg(feature = "vector-index")]
822 if let Some(index) = self.store.get_vector_index(label, property) {
823 let accessor =
824 grafeo_core::index::vector::PropertyVectorAccessor::new(&*self.store, property);
825 for &id in &ids {
826 if let Some(node) = self.store.get_node(id) {
827 let pk = grafeo_common::types::PropertyKey::new(property);
828 if let Some(grafeo_common::types::Value::Vector(v)) = node.properties.get(&pk) {
829 index.insert(id, v, &accessor);
830 }
831 }
832 }
833 }
834
835 ids
836 }
837}