1#[cfg(feature = "wal")]
4use grafeo_adapters::storage::wal::WalRecord;
5
6impl super::GrafeoDB {
7 pub fn create_node(&self, labels: &[&str]) -> grafeo_common::types::NodeId {
24 let id = self.store.create_node(labels);
25
26 #[cfg(feature = "wal")]
28 if let Err(e) = self.log_wal(&WalRecord::CreateNode {
29 id,
30 labels: labels.iter().map(|s| (*s).to_string()).collect(),
31 }) {
32 tracing::warn!("Failed to log CreateNode to WAL: {}", e);
33 }
34
35 #[cfg(feature = "cdc")]
36 self.cdc_log
37 .record_create_node(id, self.store.current_epoch(), None);
38
39 id
40 }
41
42 pub fn create_node_with_props(
46 &self,
47 labels: &[&str],
48 properties: impl IntoIterator<
49 Item = (
50 impl Into<grafeo_common::types::PropertyKey>,
51 impl Into<grafeo_common::types::Value>,
52 ),
53 >,
54 ) -> grafeo_common::types::NodeId {
55 let props: Vec<(
57 grafeo_common::types::PropertyKey,
58 grafeo_common::types::Value,
59 )> = properties
60 .into_iter()
61 .map(|(k, v)| (k.into(), v.into()))
62 .collect();
63
64 let id = self
65 .store
66 .create_node_with_props(labels, props.iter().map(|(k, v)| (k.clone(), v.clone())));
67
68 #[cfg(feature = "cdc")]
70 let cdc_props: std::collections::HashMap<String, grafeo_common::types::Value> = props
71 .iter()
72 .map(|(k, v)| (k.to_string(), v.clone()))
73 .collect();
74
75 #[cfg(feature = "wal")]
77 {
78 if let Err(e) = self.log_wal(&WalRecord::CreateNode {
79 id,
80 labels: labels.iter().map(|s| (*s).to_string()).collect(),
81 }) {
82 tracing::warn!("Failed to log CreateNode to WAL: {}", e);
83 }
84
85 for (key, value) in props {
87 if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
88 id,
89 key: key.to_string(),
90 value,
91 }) {
92 tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
93 }
94 }
95 }
96
97 #[cfg(feature = "cdc")]
98 self.cdc_log.record_create_node(
99 id,
100 self.store.current_epoch(),
101 if cdc_props.is_empty() {
102 None
103 } else {
104 Some(cdc_props)
105 },
106 );
107
108 #[cfg(feature = "text-index")]
110 if let Some(node) = self.store.get_node(id) {
111 for label in &node.labels {
112 for (prop_key, prop_val) in &node.properties {
113 if let grafeo_common::types::Value::String(text) = prop_val
114 && let Some(index) =
115 self.store.get_text_index(label.as_str(), prop_key.as_ref())
116 {
117 index.write().insert(id, text);
118 }
119 }
120 }
121 }
122
123 id
124 }
125
126 #[must_use]
128 pub fn get_node(
129 &self,
130 id: grafeo_common::types::NodeId,
131 ) -> Option<grafeo_core::graph::lpg::Node> {
132 self.store.get_node(id)
133 }
134
135 pub fn delete_node(&self, id: grafeo_common::types::NodeId) -> bool {
139 #[cfg(feature = "cdc")]
141 let cdc_props = self.store.get_node(id).map(|node| {
142 node.properties
143 .iter()
144 .map(|(k, v)| (k.to_string(), v.clone()))
145 .collect::<std::collections::HashMap<String, grafeo_common::types::Value>>()
146 });
147
148 #[cfg(feature = "vector-index")]
150 let indexes_to_clean: Vec<std::sync::Arc<grafeo_core::index::vector::HnswIndex>> = self
151 .store
152 .get_node(id)
153 .map(|node| {
154 let mut indexes = Vec::new();
155 for label in &node.labels {
156 let prefix = format!("{}:", label.as_str());
157 for (key, index) in self.store.vector_index_entries() {
158 if key.starts_with(&prefix) {
159 indexes.push(index);
160 }
161 }
162 }
163 indexes
164 })
165 .unwrap_or_default();
166
167 #[cfg(feature = "text-index")]
169 let text_indexes_to_clean: Vec<
170 std::sync::Arc<parking_lot::RwLock<grafeo_core::index::text::InvertedIndex>>,
171 > = self
172 .store
173 .get_node(id)
174 .map(|node| {
175 let mut indexes = Vec::new();
176 for label in &node.labels {
177 let prefix = format!("{}:", label.as_str());
178 for (key, index) in self.store.text_index_entries() {
179 if key.starts_with(&prefix) {
180 indexes.push(index);
181 }
182 }
183 }
184 indexes
185 })
186 .unwrap_or_default();
187
188 let result = self.store.delete_node(id);
189
190 #[cfg(feature = "vector-index")]
192 if result {
193 for index in indexes_to_clean {
194 index.remove(id);
195 }
196 }
197
198 #[cfg(feature = "text-index")]
200 if result {
201 for index in text_indexes_to_clean {
202 index.write().remove(id);
203 }
204 }
205
206 #[cfg(feature = "wal")]
207 if result && let Err(e) = self.log_wal(&WalRecord::DeleteNode { id }) {
208 tracing::warn!("Failed to log DeleteNode to WAL: {}", e);
209 }
210
211 #[cfg(feature = "cdc")]
212 if result {
213 self.cdc_log.record_delete(
214 crate::cdc::EntityId::Node(id),
215 self.store.current_epoch(),
216 cdc_props,
217 );
218 }
219
220 result
221 }
222
223 pub fn set_node_property(
227 &self,
228 id: grafeo_common::types::NodeId,
229 key: &str,
230 value: grafeo_common::types::Value,
231 ) {
232 #[cfg(feature = "vector-index")]
234 let vector_data = match &value {
235 grafeo_common::types::Value::Vector(v) => Some(v.clone()),
236 _ => None,
237 };
238
239 #[cfg(feature = "wal")]
241 if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
242 id,
243 key: key.to_string(),
244 value: value.clone(),
245 }) {
246 tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
247 }
248
249 #[cfg(feature = "cdc")]
251 let cdc_old_value = self
252 .store
253 .get_node_property(id, &grafeo_common::types::PropertyKey::new(key));
254 #[cfg(feature = "cdc")]
255 let cdc_new_value = value.clone();
256
257 self.store.set_node_property(id, key, value);
258
259 #[cfg(feature = "cdc")]
260 self.cdc_log.record_update(
261 crate::cdc::EntityId::Node(id),
262 self.store.current_epoch(),
263 key,
264 cdc_old_value,
265 cdc_new_value,
266 );
267
268 #[cfg(feature = "vector-index")]
270 if let Some(vec) = vector_data
271 && let Some(node) = self.store.get_node(id)
272 {
273 for label in &node.labels {
274 if let Some(index) = self.store.get_vector_index(label.as_str(), key) {
275 let accessor =
276 grafeo_core::index::vector::PropertyVectorAccessor::new(&self.store, key);
277 index.insert(id, &vec, &accessor);
278 }
279 }
280 }
281
282 #[cfg(feature = "text-index")]
284 if let Some(node) = self.store.get_node(id) {
285 let text_val = node
286 .properties
287 .get(&grafeo_common::types::PropertyKey::new(key))
288 .and_then(|v| match v {
289 grafeo_common::types::Value::String(s) => Some(s.to_string()),
290 _ => None,
291 });
292 for label in &node.labels {
293 if let Some(index) = self.store.get_text_index(label.as_str(), key) {
294 let mut idx = index.write();
295 if let Some(ref text) = text_val {
296 idx.insert(id, text);
297 } else {
298 idx.remove(id);
299 }
300 }
301 }
302 }
303 }
304
305 pub fn add_node_label(&self, id: grafeo_common::types::NodeId, label: &str) -> bool {
323 let result = self.store.add_label(id, label);
324
325 #[cfg(feature = "wal")]
326 if result {
327 if let Err(e) = self.log_wal(&WalRecord::AddNodeLabel {
329 id,
330 label: label.to_string(),
331 }) {
332 tracing::warn!("Failed to log AddNodeLabel to WAL: {}", e);
333 }
334 }
335
336 #[cfg(feature = "vector-index")]
338 if result {
339 let prefix = format!("{label}:");
340 for (key, index) in self.store.vector_index_entries() {
341 if let Some(property) = key.strip_prefix(&prefix)
342 && let Some(node) = self.store.get_node(id)
343 {
344 let prop_key = grafeo_common::types::PropertyKey::new(property);
345 if let Some(grafeo_common::types::Value::Vector(v)) =
346 node.properties.get(&prop_key)
347 {
348 let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(
349 &self.store,
350 property,
351 );
352 index.insert(id, v, &accessor);
353 }
354 }
355 }
356 }
357
358 #[cfg(feature = "text-index")]
360 if result && let Some(node) = self.store.get_node(id) {
361 for (prop_key, prop_val) in &node.properties {
362 if let grafeo_common::types::Value::String(text) = prop_val
363 && let Some(index) = self.store.get_text_index(label, prop_key.as_ref())
364 {
365 index.write().insert(id, text);
366 }
367 }
368 }
369
370 result
371 }
372
373 pub fn remove_node_label(&self, id: grafeo_common::types::NodeId, label: &str) -> bool {
391 #[cfg(feature = "text-index")]
393 let text_indexes_to_clean: Vec<
394 std::sync::Arc<parking_lot::RwLock<grafeo_core::index::text::InvertedIndex>>,
395 > = {
396 let prefix = format!("{label}:");
397 self.store
398 .text_index_entries()
399 .into_iter()
400 .filter(|(key, _)| key.starts_with(&prefix))
401 .map(|(_, index)| index)
402 .collect()
403 };
404
405 let result = self.store.remove_label(id, label);
406
407 #[cfg(feature = "wal")]
408 if result {
409 if let Err(e) = self.log_wal(&WalRecord::RemoveNodeLabel {
411 id,
412 label: label.to_string(),
413 }) {
414 tracing::warn!("Failed to log RemoveNodeLabel to WAL: {}", e);
415 }
416 }
417
418 #[cfg(feature = "text-index")]
420 if result {
421 for index in text_indexes_to_clean {
422 index.write().remove(id);
423 }
424 }
425
426 result
427 }
428
429 #[must_use]
446 pub fn get_node_labels(&self, id: grafeo_common::types::NodeId) -> Option<Vec<String>> {
447 self.store
448 .get_node(id)
449 .map(|node| node.labels.iter().map(|s| s.to_string()).collect())
450 }
451
452 pub fn create_edge(
472 &self,
473 src: grafeo_common::types::NodeId,
474 dst: grafeo_common::types::NodeId,
475 edge_type: &str,
476 ) -> grafeo_common::types::EdgeId {
477 let id = self.store.create_edge(src, dst, edge_type);
478
479 #[cfg(feature = "wal")]
481 if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
482 id,
483 src,
484 dst,
485 edge_type: edge_type.to_string(),
486 }) {
487 tracing::warn!("Failed to log CreateEdge to WAL: {}", e);
488 }
489
490 #[cfg(feature = "cdc")]
491 self.cdc_log
492 .record_create_edge(id, self.store.current_epoch(), None);
493
494 id
495 }
496
497 pub fn create_edge_with_props(
501 &self,
502 src: grafeo_common::types::NodeId,
503 dst: grafeo_common::types::NodeId,
504 edge_type: &str,
505 properties: impl IntoIterator<
506 Item = (
507 impl Into<grafeo_common::types::PropertyKey>,
508 impl Into<grafeo_common::types::Value>,
509 ),
510 >,
511 ) -> grafeo_common::types::EdgeId {
512 let props: Vec<(
514 grafeo_common::types::PropertyKey,
515 grafeo_common::types::Value,
516 )> = properties
517 .into_iter()
518 .map(|(k, v)| (k.into(), v.into()))
519 .collect();
520
521 let id = self.store.create_edge_with_props(
522 src,
523 dst,
524 edge_type,
525 props.iter().map(|(k, v)| (k.clone(), v.clone())),
526 );
527
528 #[cfg(feature = "cdc")]
530 let cdc_props: std::collections::HashMap<String, grafeo_common::types::Value> = props
531 .iter()
532 .map(|(k, v)| (k.to_string(), v.clone()))
533 .collect();
534
535 #[cfg(feature = "wal")]
537 {
538 if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
539 id,
540 src,
541 dst,
542 edge_type: edge_type.to_string(),
543 }) {
544 tracing::warn!("Failed to log CreateEdge to WAL: {}", e);
545 }
546
547 for (key, value) in props {
549 if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
550 id,
551 key: key.to_string(),
552 value,
553 }) {
554 tracing::warn!("Failed to log SetEdgeProperty to WAL: {}", e);
555 }
556 }
557 }
558
559 #[cfg(feature = "cdc")]
560 self.cdc_log.record_create_edge(
561 id,
562 self.store.current_epoch(),
563 if cdc_props.is_empty() {
564 None
565 } else {
566 Some(cdc_props)
567 },
568 );
569
570 id
571 }
572
573 #[must_use]
575 pub fn get_edge(
576 &self,
577 id: grafeo_common::types::EdgeId,
578 ) -> Option<grafeo_core::graph::lpg::Edge> {
579 self.store.get_edge(id)
580 }
581
582 pub fn delete_edge(&self, id: grafeo_common::types::EdgeId) -> bool {
586 #[cfg(feature = "cdc")]
588 let cdc_props = self.store.get_edge(id).map(|edge| {
589 edge.properties
590 .iter()
591 .map(|(k, v)| (k.to_string(), v.clone()))
592 .collect::<std::collections::HashMap<String, grafeo_common::types::Value>>()
593 });
594
595 let result = self.store.delete_edge(id);
596
597 #[cfg(feature = "wal")]
598 if result && let Err(e) = self.log_wal(&WalRecord::DeleteEdge { id }) {
599 tracing::warn!("Failed to log DeleteEdge to WAL: {}", e);
600 }
601
602 #[cfg(feature = "cdc")]
603 if result {
604 self.cdc_log.record_delete(
605 crate::cdc::EntityId::Edge(id),
606 self.store.current_epoch(),
607 cdc_props,
608 );
609 }
610
611 result
612 }
613
614 pub fn set_edge_property(
618 &self,
619 id: grafeo_common::types::EdgeId,
620 key: &str,
621 value: grafeo_common::types::Value,
622 ) {
623 #[cfg(feature = "wal")]
625 if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
626 id,
627 key: key.to_string(),
628 value: value.clone(),
629 }) {
630 tracing::warn!("Failed to log SetEdgeProperty to WAL: {}", e);
631 }
632
633 #[cfg(feature = "cdc")]
635 let cdc_old_value = self
636 .store
637 .get_edge_property(id, &grafeo_common::types::PropertyKey::new(key));
638 #[cfg(feature = "cdc")]
639 let cdc_new_value = value.clone();
640
641 self.store.set_edge_property(id, key, value);
642
643 #[cfg(feature = "cdc")]
644 self.cdc_log.record_update(
645 crate::cdc::EntityId::Edge(id),
646 self.store.current_epoch(),
647 key,
648 cdc_old_value,
649 cdc_new_value,
650 );
651 }
652
653 pub fn remove_node_property(&self, id: grafeo_common::types::NodeId, key: &str) -> bool {
657 let removed = self.store.remove_node_property(id, key).is_some();
659
660 #[cfg(feature = "text-index")]
662 if removed && let Some(node) = self.store.get_node(id) {
663 for label in &node.labels {
664 if let Some(index) = self.store.get_text_index(label.as_str(), key) {
665 index.write().remove(id);
666 }
667 }
668 }
669
670 removed
671 }
672
673 pub fn remove_edge_property(&self, id: grafeo_common::types::EdgeId, key: &str) -> bool {
677 self.store.remove_edge_property(id, key).is_some()
679 }
680
681 pub fn batch_create_nodes(
697 &self,
698 label: &str,
699 property: &str,
700 vectors: Vec<Vec<f32>>,
701 ) -> Vec<grafeo_common::types::NodeId> {
702 use grafeo_common::types::{PropertyKey, Value};
703
704 let prop_key = PropertyKey::new(property);
705 let labels: &[&str] = &[label];
706
707 let ids: Vec<grafeo_common::types::NodeId> = vectors
708 .into_iter()
709 .map(|vec| {
710 let value = Value::Vector(vec.into());
711 let id = self.store.create_node_with_props(
712 labels,
713 std::iter::once((prop_key.clone(), value.clone())),
714 );
715
716 #[cfg(feature = "wal")]
718 {
719 if let Err(e) = self.log_wal(&WalRecord::CreateNode {
720 id,
721 labels: labels.iter().map(|s| (*s).to_string()).collect(),
722 }) {
723 tracing::warn!("Failed to log CreateNode to WAL: {}", e);
724 }
725 if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
726 id,
727 key: property.to_string(),
728 value,
729 }) {
730 tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
731 }
732 }
733
734 id
735 })
736 .collect();
737
738 #[cfg(feature = "vector-index")]
740 if let Some(index) = self.store.get_vector_index(label, property) {
741 let accessor =
742 grafeo_core::index::vector::PropertyVectorAccessor::new(&self.store, property);
743 for &id in &ids {
744 if let Some(node) = self.store.get_node(id) {
745 let pk = grafeo_common::types::PropertyKey::new(property);
746 if let Some(grafeo_common::types::Value::Vector(v)) = node.properties.get(&pk) {
747 index.insert(id, v, &accessor);
748 }
749 }
750 }
751 }
752
753 ids
754 }
755}