1use crate::runtime::wal::{Mutation, WriteAheadLog};
5use anyhow::Result;
6use std::collections::{HashMap, HashSet};
7use std::sync::Arc;
8use std::time::{SystemTime, UNIX_EPOCH};
9use tracing::{instrument, trace};
10use uni_common::core::id::{Eid, Vid};
11use uni_common::graph::simple_graph::{Direction, SimpleGraph};
12use uni_common::{Properties, Value};
13use uni_crdt::Crdt;
14
15#[derive(Debug, Default)]
23pub struct OccReadSet {
24 pub vertices: HashSet<Vid>,
26 pub edges: HashSet<Eid>,
28}
29
30impl OccReadSet {
31 pub fn is_empty(&self) -> bool {
34 self.vertices.is_empty() && self.edges.is_empty()
35 }
36}
37
38fn now_nanos() -> i64 {
40 SystemTime::now()
41 .duration_since(UNIX_EPOCH)
42 .map(|d| d.as_nanos() as i64)
43 .unwrap_or(0)
44}
45
46pub(crate) fn try_as_crdt(v: &Value) -> Option<Crdt> {
58 if !matches!(v, Value::Map(_)) {
59 return None;
60 }
61 serde_json::from_value::<Crdt>(v.clone().into()).ok()
62}
63
64pub fn serialize_constraint_key(label: &str, key_values: &[(String, Value)]) -> Vec<u8> {
67 let mut buf = label.as_bytes().to_vec();
68 buf.push(0); let mut sorted = key_values.to_vec();
70 sorted.sort_by(|a, b| a.0.cmp(&b.0));
71 for (k, v) in &sorted {
72 buf.extend(k.as_bytes());
73 buf.push(0);
74 buf.extend(serde_json::to_vec(v).unwrap_or_default());
76 buf.push(0);
77 }
78 buf
79}
80
81#[derive(Debug, Clone, Default)]
87pub struct MutationStats {
88 pub nodes_created: usize,
89 pub nodes_deleted: usize,
90 pub relationships_created: usize,
91 pub relationships_deleted: usize,
92 pub properties_set: usize,
93 pub properties_removed: usize,
94 pub labels_added: usize,
95 pub labels_removed: usize,
96}
97
98impl MutationStats {
99 pub fn diff(&self, before: &Self) -> Self {
101 Self {
102 nodes_created: self.nodes_created.saturating_sub(before.nodes_created),
103 nodes_deleted: self.nodes_deleted.saturating_sub(before.nodes_deleted),
104 relationships_created: self
105 .relationships_created
106 .saturating_sub(before.relationships_created),
107 relationships_deleted: self
108 .relationships_deleted
109 .saturating_sub(before.relationships_deleted),
110 properties_set: self.properties_set.saturating_sub(before.properties_set),
111 properties_removed: self
112 .properties_removed
113 .saturating_sub(before.properties_removed),
114 labels_added: self.labels_added.saturating_sub(before.labels_added),
115 labels_removed: self.labels_removed.saturating_sub(before.labels_removed),
116 }
117 }
118}
119
120#[derive(Clone, Debug)]
121pub struct TombstoneEntry {
122 pub eid: Eid,
123 pub src_vid: Vid,
124 pub dst_vid: Vid,
125 pub edge_type: u32,
126}
127
128pub struct L0Buffer {
129 pub graph: SimpleGraph,
131 pub tombstones: HashMap<Eid, TombstoneEntry>,
133 pub vertex_tombstones: HashSet<Vid>,
135 pub edge_versions: HashMap<Eid, u64>,
137 pub vertex_versions: HashMap<Vid, u64>,
139 pub edge_properties: HashMap<Eid, Properties>,
141 pub vertex_properties: HashMap<Vid, Properties>,
143 pub edge_endpoints: HashMap<Eid, (Vid, Vid, u32)>,
145 pub vertex_labels: HashMap<Vid, Vec<String>>,
148 pub label_to_vids: HashMap<String, HashSet<Vid>>,
151 pub vertex_label_overwrites: HashSet<Vid>,
159 pub edge_types: HashMap<Eid, String>,
161 pub current_version: u64,
163 pub mutation_count: usize,
165 pub mutation_stats: MutationStats,
167 pub wal: Option<Arc<WriteAheadLog>>,
169 pub wal_lsn_at_flush: u64,
172 pub vertex_created_at: HashMap<Vid, i64>,
174 pub vertex_updated_at: HashMap<Vid, i64>,
176 pub edge_created_at: HashMap<Eid, i64>,
178 pub edge_updated_at: HashMap<Eid, i64>,
180 pub estimated_size: usize,
183 pub constraint_index: HashMap<Vec<u8>, Vid>,
187 pub merge_guard_index: HashMap<Vec<u8>, Vid>,
197 pub extid_index: HashMap<String, Vid>,
204 pub vertex_partial_keys: HashMap<Vid, HashSet<String>>,
210 pub edge_partial_keys: HashMap<Eid, HashSet<String>>,
217 pub pending_embeddings: HashMap<Vid, String>,
224 pub occ_read_seq: u64,
229 pub occ_read_set: Option<Arc<parking_lot::Mutex<OccReadSet>>>,
234}
235
236impl std::fmt::Debug for L0Buffer {
237 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
238 f.debug_struct("L0Buffer")
239 .field("vertex_count", &self.graph.vertex_count())
240 .field("edge_count", &self.graph.edge_count())
241 .field("tombstones", &self.tombstones.len())
242 .field("vertex_tombstones", &self.vertex_tombstones.len())
243 .field("current_version", &self.current_version)
244 .field("mutation_count", &self.mutation_count)
245 .finish()
246 }
247}
248
249impl Clone for L0Buffer {
250 fn clone(&self) -> Self {
255 Self {
256 graph: self.graph.clone(),
257 tombstones: self.tombstones.clone(),
258 vertex_tombstones: self.vertex_tombstones.clone(),
259 edge_versions: self.edge_versions.clone(),
260 vertex_versions: self.vertex_versions.clone(),
261 edge_properties: self.edge_properties.clone(),
262 vertex_properties: self.vertex_properties.clone(),
263 edge_endpoints: self.edge_endpoints.clone(),
264 vertex_labels: self.vertex_labels.clone(),
265 label_to_vids: self.label_to_vids.clone(),
266 vertex_label_overwrites: self.vertex_label_overwrites.clone(),
267 edge_types: self.edge_types.clone(),
268 current_version: self.current_version,
269 mutation_count: self.mutation_count,
270 mutation_stats: self.mutation_stats.clone(),
271 wal: None, wal_lsn_at_flush: self.wal_lsn_at_flush,
273 vertex_created_at: self.vertex_created_at.clone(),
274 vertex_updated_at: self.vertex_updated_at.clone(),
275 edge_created_at: self.edge_created_at.clone(),
276 edge_updated_at: self.edge_updated_at.clone(),
277 estimated_size: self.estimated_size,
278 constraint_index: self.constraint_index.clone(),
279 merge_guard_index: self.merge_guard_index.clone(),
280 extid_index: self.extid_index.clone(),
281 vertex_partial_keys: self.vertex_partial_keys.clone(),
282 edge_partial_keys: self.edge_partial_keys.clone(),
283 pending_embeddings: self.pending_embeddings.clone(),
284 occ_read_seq: self.occ_read_seq,
285 occ_read_set: None,
287 }
288 }
289}
290
291impl L0Buffer {
292 fn append_unique_labels(existing: &mut Vec<String>, labels: &[String]) {
294 for label in labels {
295 if !existing.contains(label) {
296 existing.push(label.clone());
297 }
298 }
299 }
300
301 fn index_labels_for_vid(&mut self, vid: Vid, labels: &[String]) {
303 for label in labels {
304 self.label_to_vids
305 .entry(label.clone())
306 .or_default()
307 .insert(vid);
308 }
309 }
310
311 fn extid_of(props: &Properties) -> Option<String> {
313 props
314 .get("ext_id")
315 .and_then(|v| v.as_str())
316 .map(str::to_owned)
317 }
318
319 fn sync_extid_index(&mut self, vid: Vid, old: Option<String>, new: Option<String>) {
327 if old == new {
328 return;
329 }
330 if let Some(old) = old
331 && self.extid_index.get(&old) == Some(&vid)
332 {
333 self.extid_index.remove(&old);
334 }
335 if let Some(new) = new {
336 self.extid_index.insert(new, vid);
337 }
338 }
339
340 fn remove_vid_from_label_index(&mut self, vid: Vid) {
342 if let Some(labels) = self.vertex_labels.get(&vid) {
343 for label in labels {
344 if let Some(set) = self.label_to_vids.get_mut(label) {
345 set.remove(&vid);
346 }
347 }
348 }
349 }
350
351 pub fn set_vertex_labels(&mut self, vid: Vid, labels: &[String]) {
362 self.remove_vid_from_label_index(vid);
363 self.vertex_labels.insert(vid, labels.to_vec());
364 self.index_labels_for_vid(vid, labels);
365 self.vertex_label_overwrites.insert(vid);
366 self.current_version += 1;
367 self.mutation_count += 1;
368 }
369
370 fn merge_crdt_properties(entry: &mut Properties, properties: Properties) {
379 if entry.is_empty() {
381 *entry = properties;
382 return;
383 }
384
385 for (k, v) in properties {
386 if let Some(mut new_crdt) = try_as_crdt(&v)
390 && let Some(existing_v) = entry.get(&k)
391 && let Ok(existing_crdt) = serde_json::from_value::<Crdt>(existing_v.clone().into())
392 {
393 if new_crdt.try_merge(&existing_crdt).is_ok()
395 && let Ok(merged_json) = serde_json::to_value(new_crdt)
396 {
397 entry.insert(k, uni_common::Value::from(merged_json));
398 continue;
399 }
400 tracing::warn!(
408 property = %k,
409 existing_variant = existing_crdt.type_name(),
410 "overwriting CRDT property with a different CRDT variant \
411 (last-writer-wins); merged CRDT state is discarded"
412 );
413 } else if try_as_crdt(&v).is_none()
414 && entry.get(&k).is_some_and(|e| try_as_crdt(e).is_some())
415 {
416 tracing::warn!(
423 property = %k,
424 "overwriting CRDT property with non-CRDT value (last-writer-wins); \
425 merged CRDT state is discarded"
426 );
427 }
428 entry.insert(k, v);
430 }
431 }
432
433 fn estimate_properties_size(props: &Properties) -> usize {
435 props.keys().map(|k| k.len() + 32).sum()
436 }
437
438 pub fn size_bytes(&self) -> usize {
441 let mut total = 0;
442
443 total += self.graph.vertex_count() * 8;
445 total += self.graph.edge_count() * 24;
446
447 for props in self.vertex_properties.values() {
449 total += Self::estimate_properties_size(props);
450 }
451 for props in self.edge_properties.values() {
452 total += Self::estimate_properties_size(props);
453 }
454
455 total += self.tombstones.len() * 64;
457 total += self.vertex_tombstones.len() * 8;
458 total += self.edge_versions.len() * 16;
459 total += self.vertex_versions.len() * 16;
460 total += self.edge_endpoints.len() * 28; for labels in self.vertex_labels.values() {
464 total += labels.iter().map(|l| l.len() + 24).sum::<usize>();
465 }
466
467 for (label, vids) in &self.label_to_vids {
469 total += label.len() + 24 + vids.len() * 8 + 48; }
471
472 for type_name in self.edge_types.values() {
474 total += type_name.len() + 24;
475 }
476
477 total += self.vertex_created_at.len() * 16;
479 total += self.vertex_updated_at.len() * 16;
480 total += self.edge_created_at.len() * 16;
481 total += self.edge_updated_at.len() * 16;
482
483 total
484 }
485
486 pub fn new(start_version: u64, wal: Option<Arc<WriteAheadLog>>) -> Self {
487 Self {
488 graph: SimpleGraph::new(),
489 tombstones: HashMap::new(),
490 vertex_tombstones: HashSet::new(),
491 edge_versions: HashMap::new(),
492 vertex_versions: HashMap::new(),
493 edge_properties: HashMap::new(),
494 vertex_properties: HashMap::new(),
495 edge_endpoints: HashMap::new(),
496 vertex_labels: HashMap::new(),
497 label_to_vids: HashMap::new(),
498 vertex_label_overwrites: HashSet::new(),
499 edge_types: HashMap::new(),
500 current_version: start_version,
501 mutation_count: 0,
502 mutation_stats: MutationStats::default(),
503 wal,
504 wal_lsn_at_flush: 0,
505 vertex_created_at: HashMap::new(),
506 vertex_updated_at: HashMap::new(),
507 edge_created_at: HashMap::new(),
508 edge_updated_at: HashMap::new(),
509 estimated_size: 0,
510 constraint_index: HashMap::new(),
511 merge_guard_index: HashMap::new(),
512 extid_index: HashMap::new(),
513 vertex_partial_keys: HashMap::new(),
514 edge_partial_keys: HashMap::new(),
515 pending_embeddings: HashMap::new(),
516 occ_read_seq: 0,
517 occ_read_set: None,
518 }
519 }
520
521 pub fn insert_vertex(&mut self, vid: Vid, properties: Properties) {
522 self.insert_vertex_with_labels(vid, properties, &[]);
523 }
524
525 pub fn insert_vertex_with_labels(
527 &mut self,
528 vid: Vid,
529 properties: Properties,
530 labels: &[String],
531 ) {
532 self.insert_vertex_with_labels_impl(vid, properties, labels, false);
533 }
534
535 fn insert_vertex_with_labels_impl(
538 &mut self,
539 vid: Vid,
540 properties: Properties,
541 labels: &[String],
542 skip_wal: bool,
543 ) {
544 self.current_version += 1;
545 let version = self.current_version;
546 let now = now_nanos();
547
548 if !skip_wal && let Some(wal) = &self.wal {
549 let _ = wal.append(Mutation::InsertVertex {
550 vid,
551 properties: properties.clone(),
552 labels: labels.to_vec(),
553 });
554 }
555
556 self.vertex_tombstones.remove(&vid);
557
558 self.vertex_partial_keys.remove(&vid);
561
562 let props_size = Self::estimate_properties_size(&properties);
565 let props_count = properties.len();
566 let tracks_extid = properties.contains_key("ext_id");
567
568 let entry = self.vertex_properties.entry(vid).or_default();
569 let old_extid = if tracks_extid {
570 Self::extid_of(entry)
571 } else {
572 None
573 };
574 Self::merge_crdt_properties(entry, properties);
575 if tracks_extid {
576 let new_extid =
577 Self::extid_of(self.vertex_properties.get(&vid).expect("just inserted"));
578 self.sync_extid_index(vid, old_extid, new_extid);
579 }
580 self.vertex_versions.insert(vid, version);
581
582 self.vertex_created_at.entry(vid).or_insert(now);
584 self.vertex_updated_at.insert(vid, now);
585
586 let labels_size: usize = labels.iter().map(|l| l.len() + 24).sum();
589 let existing = self.vertex_labels.entry(vid).or_default();
590 Self::append_unique_labels(existing, labels);
591 self.index_labels_for_vid(vid, labels);
592
593 self.graph.add_vertex(vid);
594 self.mutation_count += 1;
595 self.mutation_stats.nodes_created += 1;
596 self.mutation_stats.properties_set += props_count;
597 self.mutation_stats.labels_added += labels.len();
598
599 self.estimated_size += 8 + props_size + 16 + labels_size + 32;
600 }
601
602 pub fn insert_vertex_partial_full(
623 &mut self,
624 vid: Vid,
625 props: Properties,
626 touched_keys: HashSet<String>,
627 labels: &[String],
628 ) {
629 self.insert_vertex_with_labels_partial_impl(vid, props, labels, false);
633 self.vertex_partial_keys
634 .entry(vid)
635 .or_default()
636 .extend(touched_keys);
637 }
638
639 pub fn insert_vertex_partial(&mut self, vid: Vid, touched: Properties, labels: &[String]) {
643 let touched_keys: Vec<String> = touched.keys().cloned().collect();
647
648 let already_full = self.vertex_properties.contains_key(&vid)
659 && !self.vertex_partial_keys.contains_key(&vid);
660
661 self.insert_vertex_with_labels_partial_impl(vid, touched, labels, false);
666
667 if !already_full {
668 self.vertex_partial_keys
669 .entry(vid)
670 .or_default()
671 .extend(touched_keys);
672 }
673 }
674
675 fn insert_vertex_with_labels_partial_impl(
679 &mut self,
680 vid: Vid,
681 properties: Properties,
682 labels: &[String],
683 skip_wal: bool,
684 ) {
685 self.current_version += 1;
686 let version = self.current_version;
687 let now = now_nanos();
688
689 if !skip_wal && let Some(wal) = &self.wal {
690 let _ = wal.append(Mutation::InsertVertex {
696 vid,
697 properties: properties.clone(),
698 labels: labels.to_vec(),
699 });
700 }
701
702 self.vertex_tombstones.remove(&vid);
703 let props_size = Self::estimate_properties_size(&properties);
709 let props_count = properties.len();
710 let tracks_extid = properties.contains_key("ext_id");
711
712 let entry = self.vertex_properties.entry(vid).or_default();
713 let old_extid = if tracks_extid {
714 Self::extid_of(entry)
715 } else {
716 None
717 };
718 Self::merge_crdt_properties(entry, properties);
719 if tracks_extid {
720 let new_extid =
721 Self::extid_of(self.vertex_properties.get(&vid).expect("just inserted"));
722 self.sync_extid_index(vid, old_extid, new_extid);
723 }
724 self.vertex_versions.insert(vid, version);
725
726 self.vertex_created_at.entry(vid).or_insert(now);
727 self.vertex_updated_at.insert(vid, now);
728
729 let labels_size: usize = labels.iter().map(|l| l.len() + 24).sum();
730 let existing = self.vertex_labels.entry(vid).or_default();
731 Self::append_unique_labels(existing, labels);
732 self.index_labels_for_vid(vid, labels);
733
734 self.graph.add_vertex(vid);
735 self.mutation_count += 1;
736 self.mutation_stats.properties_set += props_count;
739 self.mutation_stats.labels_added += labels.len();
740
741 self.estimated_size += 8 + props_size + 16 + labels_size + 32;
742 }
743
744 pub fn add_vertex_labels(&mut self, vid: Vid, labels: &[String]) {
746 let existing = self.vertex_labels.entry(vid).or_default();
747 Self::append_unique_labels(existing, labels);
748 self.index_labels_for_vid(vid, labels);
749 }
750
751 pub fn remove_vertex_label(&mut self, vid: Vid, label: &str) -> bool {
754 if let Some(labels) = self.vertex_labels.get_mut(&vid)
755 && let Some(pos) = labels.iter().position(|l| l == label)
756 {
757 labels.remove(pos);
758 if let Some(set) = self.label_to_vids.get_mut(label) {
759 set.remove(&vid);
760 }
761 self.current_version += 1;
762 self.mutation_count += 1;
763 self.mutation_stats.labels_removed += 1;
764 return true;
767 }
768 false
769 }
770
771 pub fn set_edge_type(&mut self, eid: Eid, edge_type: String) {
773 self.edge_types.insert(eid, edge_type);
774 }
775
776 pub fn delete_vertex(&mut self, vid: Vid) -> Result<()> {
777 self.delete_vertex_impl(vid, false)
778 }
779
780 fn delete_vertex_impl(&mut self, vid: Vid, skip_wal: bool) -> Result<()> {
783 self.current_version += 1;
784
785 if !skip_wal && let Some(wal) = &mut self.wal {
786 let labels = self.vertex_labels.get(&vid).cloned().unwrap_or_default();
787 wal.append(Mutation::DeleteVertex { vid, labels })?;
788 }
789
790 self.apply_vertex_deletion(vid);
791 Ok(())
792 }
793
794 fn apply_vertex_deletion(&mut self, vid: Vid) {
798 let version = self.current_version;
799
800 let mut edges_to_remove = HashSet::new();
802
803 for entry in self.graph.neighbors(vid, Direction::Outgoing) {
805 edges_to_remove.insert(entry.eid);
806 }
807
808 for entry in self.graph.neighbors(vid, Direction::Incoming) {
810 edges_to_remove.insert(entry.eid); }
812
813 let cascaded_edges_count = edges_to_remove.len();
814
815 for eid in edges_to_remove {
817 if let Some((src, dst, etype)) = self.edge_endpoints.get(&eid) {
819 self.tombstones.insert(
820 eid,
821 TombstoneEntry {
822 eid,
823 src_vid: *src,
824 dst_vid: *dst,
825 edge_type: *etype,
826 },
827 );
828 self.edge_versions.insert(eid, version);
829 self.edge_endpoints.remove(&eid);
830 self.edge_properties.remove(&eid);
831 self.graph.remove_edge(eid);
832 self.mutation_count += 1;
833 self.mutation_stats.relationships_deleted += 1;
834 }
835 }
836
837 self.remove_vid_from_label_index(vid);
838 self.vertex_tombstones.insert(vid);
839 if let Some(props) = self.vertex_properties.get(&vid)
842 && let Some(ext) = Self::extid_of(props)
843 && self.extid_index.get(&ext) == Some(&vid)
844 {
845 self.extid_index.remove(&ext);
846 }
847 self.vertex_properties.remove(&vid);
848 self.vertex_partial_keys.remove(&vid);
850 self.vertex_versions.insert(vid, version);
851 self.graph.remove_vertex(vid);
852 self.mutation_count += 1;
853 self.mutation_stats.nodes_deleted += 1;
854
855 self.constraint_index.retain(|_, v| *v != vid);
857 self.merge_guard_index.retain(|_, v| *v != vid);
860
861 self.estimated_size += cascaded_edges_count * 72 + 8;
863 }
864
865 pub fn insert_edge(
866 &mut self,
867 src_vid: Vid,
868 dst_vid: Vid,
869 edge_type: u32,
870 eid: Eid,
871 properties: Properties,
872 edge_type_name: Option<String>,
873 ) -> Result<()> {
874 self.insert_edge_impl(
875 src_vid,
876 dst_vid,
877 edge_type,
878 eid,
879 properties,
880 edge_type_name,
881 false,
882 )
883 }
884
885 #[allow(clippy::too_many_arguments)]
888 fn insert_edge_impl(
889 &mut self,
890 src_vid: Vid,
891 dst_vid: Vid,
892 edge_type: u32,
893 eid: Eid,
894 properties: Properties,
895 edge_type_name: Option<String>,
896 skip_wal: bool,
897 ) -> Result<()> {
898 self.current_version += 1;
899 let now = now_nanos();
900
901 if !skip_wal && let Some(wal) = &mut self.wal {
902 wal.append(Mutation::InsertEdge {
903 src_vid,
904 dst_vid,
905 edge_type,
906 eid,
907 version: self.current_version,
908 properties: properties.clone(),
909 edge_type_name: edge_type_name.clone(),
910 })?;
911 }
912
913 self.apply_edge_insertion(src_vid, dst_vid, edge_type, eid, properties)?;
914
915 let type_name_size = if let Some(ref name) = edge_type_name {
917 let size = name.len() + 24;
918 self.edge_types.insert(eid, name.clone());
919 size
920 } else {
921 0
922 };
923
924 self.edge_created_at.entry(eid).or_insert(now);
926 self.edge_updated_at.insert(eid, now);
927
928 self.edge_partial_keys.remove(&eid);
931
932 self.estimated_size += type_name_size;
933
934 Ok(())
935 }
936
937 #[allow(clippy::too_many_arguments)]
942 pub fn insert_edge_partial_full(
943 &mut self,
944 src_vid: Vid,
945 dst_vid: Vid,
946 edge_type: u32,
947 eid: Eid,
948 properties: Properties,
949 edge_type_name: Option<String>,
950 touched_keys: HashSet<String>,
951 ) -> Result<()> {
952 self.current_version += 1;
953 let now = now_nanos();
954
955 if let Some(wal) = &mut self.wal {
956 wal.append(Mutation::InsertEdge {
957 src_vid,
958 dst_vid,
959 edge_type,
960 eid,
961 version: self.current_version,
962 properties: properties.clone(),
963 edge_type_name: edge_type_name.clone(),
964 })?;
965 }
966
967 self.apply_edge_insertion(src_vid, dst_vid, edge_type, eid, properties)?;
968
969 self.edge_partial_keys
973 .entry(eid)
974 .or_default()
975 .extend(touched_keys);
976
977 let type_name_size = if let Some(ref name) = edge_type_name {
978 let size = name.len() + 24;
979 self.edge_types.insert(eid, name.clone());
980 size
981 } else {
982 0
983 };
984
985 self.edge_created_at.entry(eid).or_insert(now);
986 self.edge_updated_at.insert(eid, now);
987
988 self.estimated_size += type_name_size;
989
990 Ok(())
991 }
992
993 fn apply_edge_insertion(
1002 &mut self,
1003 src_vid: Vid,
1004 dst_vid: Vid,
1005 edge_type: u32,
1006 eid: Eid,
1007 properties: Properties,
1008 ) -> Result<()> {
1009 let version = self.current_version;
1010
1011 if self.vertex_tombstones.contains(&src_vid) {
1014 anyhow::bail!(
1015 "Cannot insert edge: source vertex {} has been deleted (issue #77)",
1016 src_vid
1017 );
1018 }
1019 if self.vertex_tombstones.contains(&dst_vid) {
1020 anyhow::bail!(
1021 "Cannot insert edge: destination vertex {} has been deleted (issue #77)",
1022 dst_vid
1023 );
1024 }
1025
1026 if !self.graph.contains_vertex(src_vid) {
1031 self.graph.add_vertex(src_vid);
1032 }
1033 if !self.graph.contains_vertex(dst_vid) {
1034 self.graph.add_vertex(dst_vid);
1035 }
1036
1037 self.graph.add_edge(src_vid, dst_vid, eid, edge_type);
1038
1039 let props_size = Self::estimate_properties_size(&properties);
1041 let props_count = properties.len();
1042 if !properties.is_empty() {
1043 let entry = self.edge_properties.entry(eid).or_default();
1044 Self::merge_crdt_properties(entry, properties);
1045 }
1046
1047 self.edge_versions.insert(eid, version);
1048 self.edge_endpoints
1049 .insert(eid, (src_vid, dst_vid, edge_type));
1050 self.tombstones.remove(&eid);
1051 self.mutation_count += 1;
1052 self.mutation_stats.relationships_created += 1;
1053 self.mutation_stats.properties_set += props_count;
1054
1055 self.estimated_size += 24 + props_size + 16 + 28 + 32;
1057
1058 Ok(())
1059 }
1060
1061 pub fn delete_edge(
1062 &mut self,
1063 eid: Eid,
1064 src_vid: Vid,
1065 dst_vid: Vid,
1066 edge_type: u32,
1067 ) -> Result<()> {
1068 self.delete_edge_impl(eid, src_vid, dst_vid, edge_type, false)
1069 }
1070
1071 fn delete_edge_impl(
1074 &mut self,
1075 eid: Eid,
1076 src_vid: Vid,
1077 dst_vid: Vid,
1078 edge_type: u32,
1079 skip_wal: bool,
1080 ) -> Result<()> {
1081 self.current_version += 1;
1082 let now = now_nanos();
1083
1084 if !skip_wal && let Some(wal) = &mut self.wal {
1085 wal.append(Mutation::DeleteEdge {
1086 eid,
1087 src_vid,
1088 dst_vid,
1089 edge_type,
1090 version: self.current_version,
1091 })?;
1092 }
1093
1094 self.apply_edge_deletion(eid, src_vid, dst_vid, edge_type);
1095
1096 self.edge_updated_at.insert(eid, now);
1098
1099 Ok(())
1100 }
1101
1102 fn apply_edge_deletion(&mut self, eid: Eid, src_vid: Vid, dst_vid: Vid, edge_type: u32) {
1106 let version = self.current_version;
1107
1108 self.tombstones.insert(
1109 eid,
1110 TombstoneEntry {
1111 eid,
1112 src_vid,
1113 dst_vid,
1114 edge_type,
1115 },
1116 );
1117 self.edge_versions.insert(eid, version);
1118 self.edge_partial_keys.remove(&eid);
1121 self.graph.remove_edge(eid);
1122 self.mutation_count += 1;
1123 self.mutation_stats.relationships_deleted += 1;
1124
1125 self.estimated_size += 80;
1127 }
1128
1129 pub fn get_neighbors(
1132 &self,
1133 vid: Vid,
1134 edge_type: u32,
1135 direction: Direction,
1136 ) -> Vec<(Vid, Eid, u64)> {
1137 let edges = self.graph.neighbors(vid, direction);
1138
1139 edges
1140 .iter()
1141 .filter(|e| e.edge_type == edge_type && !self.is_tombstoned(e.eid))
1142 .map(|e| {
1143 let neighbor = match direction {
1144 Direction::Outgoing => e.dst_vid,
1145 Direction::Incoming => e.src_vid,
1146 };
1147 let version = self.edge_versions.get(&e.eid).copied().unwrap_or(0);
1148 (neighbor, e.eid, version)
1149 })
1150 .collect()
1151 }
1152
1153 pub fn is_tombstoned(&self, eid: Eid) -> bool {
1154 self.tombstones.contains_key(&eid)
1155 }
1156
1157 pub fn vids_for_label(&self, label_name: &str) -> Vec<Vid> {
1160 self.label_to_vids
1161 .get(label_name)
1162 .map(|set| set.iter().copied().collect())
1163 .unwrap_or_default()
1164 }
1165
1166 pub fn all_vertex_vids(&self) -> Vec<Vid> {
1170 self.vertex_properties.keys().copied().collect()
1171 }
1172
1173 pub fn vids_for_labels(&self, label_names: &[&str]) -> Vec<Vid> {
1176 let mut result = HashSet::new();
1177 for label_name in label_names {
1178 if let Some(set) = self.label_to_vids.get(*label_name) {
1179 result.extend(set.iter().copied());
1180 }
1181 }
1182 result.into_iter().collect()
1183 }
1184
1185 pub fn vids_with_all_labels(&self, label_names: &[&str]) -> Vec<Vid> {
1188 if label_names.is_empty() {
1189 return Vec::new();
1190 }
1191 let sets: Vec<&HashSet<Vid>> = match label_names
1194 .iter()
1195 .map(|ln| self.label_to_vids.get(*ln))
1196 .collect::<Option<Vec<_>>>()
1197 {
1198 Some(s) => s,
1199 None => return Vec::new(),
1200 };
1201 let smallest = sets.iter().min_by_key(|s| s.len()).unwrap();
1203 smallest
1204 .iter()
1205 .copied()
1206 .filter(|vid| sets.iter().all(|s| s.contains(vid)))
1207 .collect()
1208 }
1209
1210 pub fn get_vertex_labels(&self, vid: Vid) -> Option<&[String]> {
1212 self.vertex_labels.get(&vid).map(|v| v.as_slice())
1213 }
1214
1215 pub fn get_edge_type(&self, eid: Eid) -> Option<&str> {
1217 self.edge_types.get(&eid).map(|s| s.as_str())
1218 }
1219
1220 pub fn eids_for_type(&self, type_name: &str) -> Vec<Eid> {
1223 self.edge_types
1224 .iter()
1225 .filter(|(eid, etype)| *etype == type_name && !self.tombstones.contains_key(eid))
1226 .map(|(eid, _)| *eid)
1227 .collect()
1228 }
1229
1230 pub fn all_edge_eids(&self) -> Vec<Eid> {
1234 self.edge_endpoints
1235 .keys()
1236 .filter(|eid| !self.tombstones.contains_key(eid))
1237 .copied()
1238 .collect()
1239 }
1240
1241 pub fn get_edge_endpoints(&self, eid: Eid) -> Option<(Vid, Vid)> {
1243 self.edge_endpoints
1244 .get(&eid)
1245 .map(|(src, dst, _)| (*src, *dst))
1246 }
1247
1248 pub fn get_edge_endpoint_full(&self, eid: Eid) -> Option<(Vid, Vid, u32)> {
1250 self.edge_endpoints.get(&eid).copied()
1251 }
1252
1253 pub fn insert_constraint_key(&mut self, key: Vec<u8>, vid: Vid) {
1255 self.constraint_index.insert(key, vid);
1256 }
1257
1258 pub fn has_constraint_key(&self, key: &[u8], exclude_vid: Vid) -> bool {
1261 self.constraint_index
1262 .get(key)
1263 .is_some_and(|&v| v != exclude_vid)
1264 }
1265
1266 pub fn insert_merge_guard_key(&mut self, key: Vec<u8>, vid: Vid) {
1268 self.merge_guard_index.insert(key, vid);
1269 }
1270
1271 pub fn has_merge_guard_key(&self, key: &[u8], exclude_vid: Vid) -> bool {
1274 self.merge_guard_index
1275 .get(key)
1276 .is_some_and(|&v| v != exclude_vid)
1277 }
1278
1279 #[instrument(skip(self, other), level = "trace")]
1280 pub fn validate_merge_edge_endpoints(&self, other: &L0Buffer) -> Result<()> {
1299 let is_deleted = |vid: &Vid| {
1303 (self.vertex_tombstones.contains(vid) || other.vertex_tombstones.contains(vid))
1304 && !other.vertex_properties.contains_key(vid)
1305 };
1306 for (eid, (src_vid, dst_vid, _etype)) in &other.edge_endpoints {
1307 if other.tombstones.contains_key(eid) {
1308 continue; }
1310 if is_deleted(src_vid) {
1311 anyhow::bail!(
1312 "Cannot insert edge {}: source vertex {} has been deleted (issue #77)",
1313 eid,
1314 src_vid
1315 );
1316 }
1317 if is_deleted(dst_vid) {
1318 anyhow::bail!(
1319 "Cannot insert edge {}: destination vertex {} has been deleted (issue #77)",
1320 eid,
1321 dst_vid
1322 );
1323 }
1324 }
1325 Ok(())
1326 }
1327
1328 pub fn merge(&mut self, other: &L0Buffer) -> Result<()> {
1329 self.validate_merge_edge_endpoints(other)?;
1333 self.merge_validated(
1334 other,
1335 other.vertex_properties.clone(),
1336 other.edge_properties.clone(),
1337 )
1338 }
1339
1340 pub fn merge_take(&mut self, other: &mut L0Buffer) -> Result<()> {
1349 self.validate_merge_edge_endpoints(other)?;
1352 let vertex_props = std::mem::take(&mut other.vertex_properties);
1353 let edge_props = std::mem::take(&mut other.edge_properties);
1354 self.merge_validated(other, vertex_props, edge_props)
1355 }
1356
1357 fn merge_validated(
1361 &mut self,
1362 other: &L0Buffer,
1363 vertex_props: HashMap<Vid, Properties>,
1364 mut edge_props: HashMap<Eid, Properties>,
1365 ) -> Result<()> {
1366 trace!(
1367 other_mutation_count = other.mutation_count,
1368 "Merging L0 buffer"
1369 );
1370 for &vid in &other.vertex_tombstones {
1375 self.delete_vertex_impl(vid, true)?;
1376 }
1377
1378 for (vid, props) in vertex_props {
1379 let labels = other.vertex_labels.get(&vid).cloned().unwrap_or_default();
1380 self.insert_vertex_with_labels_impl(vid, props, &labels, true);
1381 }
1382
1383 for (vid, labels) in &other.vertex_labels {
1385 if !self.vertex_labels.contains_key(vid) {
1386 self.vertex_labels.insert(*vid, labels.clone());
1387 for label in labels {
1388 self.label_to_vids
1389 .entry(label.clone())
1390 .or_default()
1391 .insert(*vid);
1392 }
1393 }
1394 }
1395
1396 for vid in &other.vertex_label_overwrites {
1403 if other.vertex_tombstones.contains(vid) {
1404 continue;
1405 }
1406 let labels = other.vertex_labels.get(vid).cloned().unwrap_or_default();
1407 self.remove_vid_from_label_index(*vid);
1408 self.vertex_labels.insert(*vid, labels.clone());
1409 self.index_labels_for_vid(*vid, &labels);
1410 }
1411
1412 for (eid, (src, dst, etype)) in &other.edge_endpoints {
1414 if other.tombstones.contains_key(eid) {
1415 self.delete_edge_impl(*eid, *src, *dst, *etype, true)?;
1416 } else {
1417 let props = edge_props.remove(eid).unwrap_or_default();
1418 let etype_name = other.edge_types.get(eid).cloned();
1419 self.insert_edge_impl(*src, *dst, *etype, *eid, props, etype_name, true)?;
1420 }
1421 }
1422
1423 for (eid, tombstone) in &other.tombstones {
1427 if !other.edge_endpoints.contains_key(eid) {
1428 self.delete_edge_impl(
1429 *eid,
1430 tombstone.src_vid,
1431 tombstone.dst_vid,
1432 tombstone.edge_type,
1433 true,
1434 )?;
1435 }
1436 }
1437
1438 for (vid, ts) in &other.vertex_created_at {
1443 self.vertex_created_at.entry(*vid).or_insert(*ts); }
1445 for (vid, ts) in &other.vertex_updated_at {
1446 self.vertex_updated_at.insert(*vid, *ts); }
1448
1449 for (eid, ts) in &other.edge_created_at {
1450 self.edge_created_at.entry(*eid).or_insert(*ts); }
1452 for (eid, ts) in &other.edge_updated_at {
1453 self.edge_updated_at.insert(*eid, *ts); }
1455
1456 self.estimated_size += other.estimated_size;
1459
1460 for (key, vid) in &other.constraint_index {
1462 self.constraint_index.insert(key.clone(), *vid);
1463 }
1464
1465 for (key, vid) in &other.merge_guard_index {
1468 self.merge_guard_index.insert(key.clone(), *vid);
1469 }
1470
1471 Ok(())
1472 }
1473
1474 #[instrument(skip(self, mutations), level = "debug")]
1478 pub fn replay_mutations(&mut self, mutations: Vec<Mutation>) -> Result<()> {
1479 trace!(count = mutations.len(), "Replaying mutations");
1480 for mutation in mutations {
1481 match mutation {
1482 Mutation::InsertVertex {
1483 vid,
1484 properties,
1485 labels,
1486 } => {
1487 self.current_version += 1;
1489 let version = self.current_version;
1490
1491 self.vertex_tombstones.remove(&vid);
1492 let tracks_extid = properties.contains_key("ext_id");
1493 let entry = self.vertex_properties.entry(vid).or_default();
1494 let old_extid = if tracks_extid {
1495 Self::extid_of(entry)
1496 } else {
1497 None
1498 };
1499 Self::merge_crdt_properties(entry, properties);
1500 if tracks_extid {
1501 let new_extid = Self::extid_of(
1502 self.vertex_properties.get(&vid).expect("just inserted"),
1503 );
1504 self.sync_extid_index(vid, old_extid, new_extid);
1505 }
1506 self.vertex_versions.insert(vid, version);
1507 self.graph.add_vertex(vid);
1508 self.mutation_count += 1;
1509
1510 let existing = self.vertex_labels.entry(vid).or_default();
1512 Self::append_unique_labels(existing, &labels);
1513 for label in &labels {
1514 self.label_to_vids
1515 .entry(label.clone())
1516 .or_default()
1517 .insert(vid);
1518 }
1519 }
1520 Mutation::DeleteVertex { vid, labels } => {
1521 self.current_version += 1;
1522 if !labels.is_empty() {
1524 let existing = self.vertex_labels.entry(vid).or_default();
1525 Self::append_unique_labels(existing, &labels);
1526 for label in &labels {
1527 self.label_to_vids
1528 .entry(label.clone())
1529 .or_default()
1530 .insert(vid);
1531 }
1532 }
1533 self.apply_vertex_deletion(vid);
1534 }
1535 Mutation::SetVertexLabels { vid, labels } => {
1536 self.current_version += 1;
1541 self.remove_vid_from_label_index(vid);
1542 self.vertex_labels.insert(vid, labels.clone());
1543 self.index_labels_for_vid(vid, &labels);
1544 self.mutation_count += 1;
1545 }
1546 Mutation::InsertEdge {
1547 src_vid,
1548 dst_vid,
1549 edge_type,
1550 eid,
1551 version: _,
1552 properties,
1553 edge_type_name,
1554 } => {
1555 self.current_version += 1;
1556 match self.apply_edge_insertion(src_vid, dst_vid, edge_type, eid, properties) {
1561 Ok(()) => {
1562 if let Some(name) = edge_type_name {
1564 self.edge_types.insert(eid, name);
1565 }
1566 }
1567 Err(e) => {
1568 tracing::warn!(
1569 ?eid,
1570 ?src_vid,
1571 ?dst_vid,
1572 error = %e,
1573 "WAL replay: skipping edge insertion to a deleted endpoint (issue #77)"
1574 );
1575 }
1576 }
1577 }
1578 Mutation::DeleteEdge {
1579 eid,
1580 src_vid,
1581 dst_vid,
1582 edge_type,
1583 version: _,
1584 } => {
1585 self.current_version += 1;
1586 self.apply_edge_deletion(eid, src_vid, dst_vid, edge_type);
1587 }
1588 }
1589 }
1590 Ok(())
1591 }
1592}
1593
1594#[cfg(test)]
1595mod tests {
1596 use super::*;
1597
1598 #[test]
1599 fn test_l0_buffer_ops() -> Result<()> {
1600 let mut l0 = L0Buffer::new(0, None);
1601 let vid_a = Vid::new(1);
1602 let vid_b = Vid::new(2);
1603 let eid_ab = Eid::new(101);
1604
1605 l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
1606
1607 let neighbors = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
1608 assert_eq!(neighbors.len(), 1);
1609 assert_eq!(neighbors[0].0, vid_b);
1610 assert_eq!(neighbors[0].1, eid_ab);
1611
1612 l0.delete_edge(eid_ab, vid_a, vid_b, 1)?;
1613 assert!(l0.is_tombstoned(eid_ab));
1614
1615 let neighbors_after = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
1617 assert_eq!(neighbors_after.len(), 0);
1618
1619 Ok(())
1620 }
1621
1622 #[test]
1628 fn validate_merge_rejects_edge_to_tombstoned_endpoint() {
1629 let mut main = L0Buffer::new(0, None);
1630 let vid_a = Vid::new(1);
1631 let vid_b = Vid::new(2);
1632 main.insert_vertex(vid_a, HashMap::new());
1633 main.insert_vertex(vid_b, HashMap::new());
1634 main.delete_vertex(vid_b).unwrap(); let mut tx = L0Buffer::new(0, None);
1638 let eid = Eid::new(101);
1639 tx.insert_edge(vid_a, vid_b, 1, eid, HashMap::new(), None)
1640 .unwrap();
1641
1642 assert!(
1643 main.validate_merge_edge_endpoints(&tx).is_err(),
1644 "edge to a tombstoned endpoint must be rejected before merge"
1645 );
1646 assert!(
1648 main.merge(&tx).is_err(),
1649 "merge must reject, not bail mid-apply"
1650 );
1651 assert!(
1652 !main.edge_endpoints.contains_key(&eid),
1653 "a rejected merge must not have partially applied the edge"
1654 );
1655 }
1656
1657 #[test]
1660 fn validate_merge_allows_edge_when_endpoint_reinserted() {
1661 let mut main = L0Buffer::new(0, None);
1662 let vid_a = Vid::new(1);
1663 let vid_b = Vid::new(2);
1664 main.insert_vertex(vid_a, HashMap::new());
1665 main.insert_vertex(vid_b, HashMap::new());
1666 main.delete_vertex(vid_b).unwrap();
1667
1668 let mut tx = L0Buffer::new(0, None);
1669 tx.insert_vertex(vid_b, HashMap::new()); let eid = Eid::new(101);
1671 tx.insert_edge(vid_a, vid_b, 1, eid, HashMap::new(), None)
1672 .unwrap();
1673
1674 assert!(main.validate_merge_edge_endpoints(&tx).is_ok());
1675 assert!(main.merge(&tx).is_ok());
1676 assert!(main.edge_endpoints.contains_key(&eid));
1677 }
1678
1679 #[test]
1681 fn validate_merge_allows_edge_to_live_endpoints() {
1682 let mut main = L0Buffer::new(0, None);
1683 let vid_a = Vid::new(1);
1684 let vid_b = Vid::new(2);
1685 main.insert_vertex(vid_a, HashMap::new());
1686 main.insert_vertex(vid_b, HashMap::new());
1687
1688 let mut tx = L0Buffer::new(0, None);
1689 let eid = Eid::new(101);
1690 tx.insert_edge(vid_a, vid_b, 1, eid, HashMap::new(), None)
1691 .unwrap();
1692
1693 assert!(main.validate_merge_edge_endpoints(&tx).is_ok());
1694 assert!(main.merge(&tx).is_ok());
1695 assert!(main.edge_endpoints.contains_key(&eid));
1696 }
1697
1698 #[test]
1699 fn test_l0_buffer_multiple_edges() -> Result<()> {
1700 let mut l0 = L0Buffer::new(0, None);
1701 let vid_a = Vid::new(1);
1702 let vid_b = Vid::new(2);
1703 let vid_c = Vid::new(3);
1704 let eid_ab = Eid::new(101);
1705 let eid_ac = Eid::new(102);
1706
1707 l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
1708 l0.insert_edge(vid_a, vid_c, 1, eid_ac, HashMap::new(), None)?;
1709
1710 let neighbors = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
1711 assert_eq!(neighbors.len(), 2);
1712
1713 l0.delete_edge(eid_ab, vid_a, vid_b, 1)?;
1715
1716 let neighbors_after = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
1718 assert_eq!(neighbors_after.len(), 1);
1719 assert_eq!(neighbors_after[0].0, vid_c);
1720
1721 Ok(())
1722 }
1723
1724 #[test]
1725 fn test_l0_buffer_edge_type_filter() -> Result<()> {
1726 let mut l0 = L0Buffer::new(0, None);
1727 let vid_a = Vid::new(1);
1728 let vid_b = Vid::new(2);
1729 let vid_c = Vid::new(3);
1730 let eid_ab = Eid::new(101);
1731 let eid_ac = Eid::new(201); l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
1734 l0.insert_edge(vid_a, vid_c, 2, eid_ac, HashMap::new(), None)?;
1735
1736 let type1_neighbors = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
1738 assert_eq!(type1_neighbors.len(), 1);
1739 assert_eq!(type1_neighbors[0].0, vid_b);
1740
1741 let type2_neighbors = l0.get_neighbors(vid_a, 2, Direction::Outgoing);
1743 assert_eq!(type2_neighbors.len(), 1);
1744 assert_eq!(type2_neighbors[0].0, vid_c);
1745
1746 Ok(())
1747 }
1748
1749 #[test]
1750 fn test_l0_buffer_incoming_edges() -> Result<()> {
1751 let mut l0 = L0Buffer::new(0, None);
1752 let vid_a = Vid::new(1);
1753 let vid_b = Vid::new(2);
1754 let vid_c = Vid::new(3);
1755 let eid_ab = Eid::new(101);
1756 let eid_cb = Eid::new(102);
1757
1758 l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
1760 l0.insert_edge(vid_c, vid_b, 1, eid_cb, HashMap::new(), None)?;
1761
1762 let incoming = l0.get_neighbors(vid_b, 1, Direction::Incoming);
1764 assert_eq!(incoming.len(), 2);
1765
1766 Ok(())
1767 }
1768
1769 #[test]
1771 fn test_merge_empty_props_edge() -> Result<()> {
1772 let mut main_l0 = L0Buffer::new(0, None);
1773 let mut tx_l0 = L0Buffer::new(0, None);
1774
1775 let vid_a = Vid::new(1);
1776 let vid_b = Vid::new(2);
1777 let eid_ab = Eid::new(101);
1778
1779 tx_l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
1781
1782 assert!(tx_l0.edge_endpoints.contains_key(&eid_ab));
1784 assert!(!tx_l0.edge_properties.contains_key(&eid_ab)); main_l0.merge(&tx_l0)?;
1788
1789 assert!(main_l0.edge_endpoints.contains_key(&eid_ab));
1791 let neighbors = main_l0.get_neighbors(vid_a, 1, Direction::Outgoing);
1792 assert_eq!(neighbors.len(), 1);
1793 assert_eq!(neighbors[0].0, vid_b);
1794
1795 Ok(())
1796 }
1797
1798 #[test]
1800 fn test_replay_crdt_merge() -> Result<()> {
1801 use crate::runtime::wal::Mutation;
1802 use serde_json::json;
1803 use uni_common::Value;
1804
1805 let mut l0 = L0Buffer::new(0, None);
1806 let vid = Vid::new(1);
1807
1808 let counter1: Value = json!({
1811 "t": "gc",
1812 "d": {"counts": {"node1": 5}}
1813 })
1814 .into();
1815 let counter2: Value = json!({
1816 "t": "gc",
1817 "d": {"counts": {"node2": 3}}
1818 })
1819 .into();
1820
1821 let mut props1 = HashMap::new();
1823 props1.insert("counter".to_string(), counter1.clone());
1824 l0.replay_mutations(vec![Mutation::InsertVertex {
1825 vid,
1826 properties: props1,
1827 labels: vec![],
1828 }])?;
1829
1830 let mut props2 = HashMap::new();
1832 props2.insert("counter".to_string(), counter2.clone());
1833 l0.replay_mutations(vec![Mutation::InsertVertex {
1834 vid,
1835 properties: props2,
1836 labels: vec![],
1837 }])?;
1838
1839 let stored_props = l0.vertex_properties.get(&vid).unwrap();
1841 let stored_counter = stored_props.get("counter").unwrap();
1842
1843 let stored_json: serde_json::Value = stored_counter.clone().into();
1845 let data = stored_json.get("d").unwrap();
1847 let counts = data.get("counts").unwrap();
1848 assert_eq!(counts.get("node1"), Some(&json!(5)));
1849 assert_eq!(counts.get("node2"), Some(&json!(3)));
1850
1851 Ok(())
1852 }
1853
1854 #[test]
1855 fn test_merge_preserves_vertex_timestamps() -> Result<()> {
1856 let mut l0_main = L0Buffer::new(0, None);
1857 let mut l0_tx = L0Buffer::new(0, None);
1858 let vid = Vid::new(1);
1859
1860 let ts_main_created = 1000;
1862 let ts_main_updated = 1100;
1863 l0_main.insert_vertex(vid, HashMap::new());
1864 l0_main.vertex_created_at.insert(vid, ts_main_created);
1865 l0_main.vertex_updated_at.insert(vid, ts_main_updated);
1866
1867 let ts_tx_created = 2000; let ts_tx_updated = 2100; l0_tx.insert_vertex(vid, HashMap::new());
1871 l0_tx.vertex_created_at.insert(vid, ts_tx_created);
1872 l0_tx.vertex_updated_at.insert(vid, ts_tx_updated);
1873
1874 l0_main.merge(&l0_tx)?;
1876
1877 assert_eq!(
1879 *l0_main.vertex_created_at.get(&vid).unwrap(),
1880 ts_main_created,
1881 "created_at should preserve oldest timestamp"
1882 );
1883
1884 assert_eq!(
1886 *l0_main.vertex_updated_at.get(&vid).unwrap(),
1887 ts_tx_updated,
1888 "updated_at should use latest timestamp"
1889 );
1890
1891 Ok(())
1892 }
1893
1894 #[test]
1895 fn test_merge_preserves_edge_timestamps() -> Result<()> {
1896 let mut l0_main = L0Buffer::new(0, None);
1897 let mut l0_tx = L0Buffer::new(0, None);
1898 let vid_a = Vid::new(1);
1899 let vid_b = Vid::new(2);
1900 let eid = Eid::new(100);
1901
1902 let ts_main_created = 1000;
1904 let ts_main_updated = 1100;
1905 l0_main.insert_edge(vid_a, vid_b, 1, eid, HashMap::new(), None)?;
1906 l0_main.edge_created_at.insert(eid, ts_main_created);
1907 l0_main.edge_updated_at.insert(eid, ts_main_updated);
1908
1909 let ts_tx_created = 2000; let ts_tx_updated = 2100; l0_tx.insert_edge(vid_a, vid_b, 1, eid, HashMap::new(), None)?;
1913 l0_tx.edge_created_at.insert(eid, ts_tx_created);
1914 l0_tx.edge_updated_at.insert(eid, ts_tx_updated);
1915
1916 l0_main.merge(&l0_tx)?;
1918
1919 assert_eq!(
1921 *l0_main.edge_created_at.get(&eid).unwrap(),
1922 ts_main_created,
1923 "edge created_at should preserve oldest timestamp"
1924 );
1925
1926 assert_eq!(
1928 *l0_main.edge_updated_at.get(&eid).unwrap(),
1929 ts_tx_updated,
1930 "edge updated_at should use latest timestamp"
1931 );
1932
1933 Ok(())
1934 }
1935
1936 #[test]
1937 fn test_merge_created_at_not_overwritten_for_existing_vertex() -> Result<()> {
1938 use uni_common::Value;
1939
1940 let mut l0_main = L0Buffer::new(0, None);
1941 let mut l0_tx = L0Buffer::new(0, None);
1942 let vid = Vid::new(1);
1943
1944 let ts_original = 1000;
1946 l0_main.insert_vertex(vid, HashMap::new());
1947 l0_main.vertex_created_at.insert(vid, ts_original);
1948 l0_main.vertex_updated_at.insert(vid, ts_original);
1949
1950 let ts_tx = 2000;
1952 let mut props = HashMap::new();
1953 props.insert("updated".to_string(), Value::String("yes".to_string()));
1954 l0_tx.insert_vertex(vid, props);
1955 l0_tx.vertex_created_at.insert(vid, ts_tx);
1956 l0_tx.vertex_updated_at.insert(vid, ts_tx);
1957
1958 l0_main.merge(&l0_tx)?;
1960
1961 assert_eq!(
1963 *l0_main.vertex_created_at.get(&vid).unwrap(),
1964 ts_original,
1965 "created_at must not be overwritten for existing vertex"
1966 );
1967
1968 assert_eq!(
1970 *l0_main.vertex_updated_at.get(&vid).unwrap(),
1971 ts_tx,
1972 "updated_at should reflect transaction timestamp"
1973 );
1974
1975 assert!(
1977 l0_main
1978 .vertex_properties
1979 .get(&vid)
1980 .unwrap()
1981 .contains_key("updated")
1982 );
1983
1984 Ok(())
1985 }
1986
1987 #[test]
1989 fn test_replay_mutations_preserves_vertex_labels() -> Result<()> {
1990 use crate::runtime::wal::Mutation;
1991
1992 let mut l0 = L0Buffer::new(0, None);
1993 let vid = Vid::new(42);
1994
1995 let mutations = vec![Mutation::InsertVertex {
1997 vid,
1998 properties: {
1999 let mut props = HashMap::new();
2000 props.insert(
2001 "name".to_string(),
2002 uni_common::Value::String("Alice".to_string()),
2003 );
2004 props
2005 },
2006 labels: vec!["Person".to_string(), "User".to_string()],
2007 }];
2008
2009 l0.replay_mutations(mutations)?;
2011
2012 assert!(l0.vertex_properties.contains_key(&vid));
2014
2015 let labels = l0.get_vertex_labels(vid).expect("Labels should exist");
2017 assert_eq!(labels.len(), 2);
2018 assert!(labels.contains(&"Person".to_string()));
2019 assert!(labels.contains(&"User".to_string()));
2020
2021 let person_vids = l0.vids_for_label("Person");
2023 assert_eq!(person_vids.len(), 1);
2024 assert_eq!(person_vids[0], vid);
2025
2026 let user_vids = l0.vids_for_label("User");
2027 assert_eq!(user_vids.len(), 1);
2028 assert_eq!(user_vids[0], vid);
2029
2030 Ok(())
2031 }
2032
2033 #[test]
2035 fn test_replay_mutations_preserves_delete_vertex_labels() -> Result<()> {
2036 use crate::runtime::wal::Mutation;
2037
2038 let mut l0 = L0Buffer::new(0, None);
2039 let vid = Vid::new(99);
2040
2041 l0.insert_vertex_with_labels(
2043 vid,
2044 HashMap::new(),
2045 &["Person".to_string(), "Admin".to_string()],
2046 );
2047
2048 assert!(l0.vertex_properties.contains_key(&vid));
2050 let labels = l0.get_vertex_labels(vid).expect("Labels should exist");
2051 assert_eq!(labels.len(), 2);
2052
2053 let mutations = vec![Mutation::DeleteVertex {
2055 vid,
2056 labels: vec!["Person".to_string(), "Admin".to_string()],
2057 }];
2058
2059 l0.replay_mutations(mutations)?;
2061
2062 assert!(l0.vertex_tombstones.contains(&vid));
2064
2065 let labels = l0.get_vertex_labels(vid);
2068 assert!(
2069 labels.is_some(),
2070 "Labels should be preserved even after deletion for tombstone flushing"
2071 );
2072
2073 Ok(())
2074 }
2075
2076 #[test]
2078 fn test_replay_mutations_preserves_edge_type_name() -> Result<()> {
2079 use crate::runtime::wal::Mutation;
2080
2081 let mut l0 = L0Buffer::new(0, None);
2082 let src = Vid::new(1);
2083 let dst = Vid::new(2);
2084 let eid = Eid::new(500);
2085 let edge_type = 100;
2086
2087 let mutations = vec![Mutation::InsertEdge {
2089 src_vid: src,
2090 dst_vid: dst,
2091 edge_type,
2092 eid,
2093 version: 1,
2094 properties: {
2095 let mut props = HashMap::new();
2096 props.insert("since".to_string(), uni_common::Value::Int(2020));
2097 props
2098 },
2099 edge_type_name: Some("KNOWS".to_string()),
2100 }];
2101
2102 l0.replay_mutations(mutations)?;
2104
2105 assert!(l0.edge_endpoints.contains_key(&eid));
2107
2108 let type_name = l0.get_edge_type(eid).expect("Edge type name should exist");
2110 assert_eq!(type_name, "KNOWS");
2111
2112 let knows_eids = l0.eids_for_type("KNOWS");
2114 assert_eq!(knows_eids.len(), 1);
2115 assert_eq!(knows_eids[0], eid);
2116
2117 Ok(())
2118 }
2119
2120 #[test]
2122 fn test_edge_type_mapping_survives_multiple_replays() -> Result<()> {
2123 use crate::runtime::wal::Mutation;
2124
2125 let mut l0 = L0Buffer::new(0, None);
2126
2127 let mutations = vec![
2129 Mutation::InsertEdge {
2130 src_vid: Vid::new(1),
2131 dst_vid: Vid::new(2),
2132 edge_type: 100,
2133 eid: Eid::new(1000),
2134 version: 1,
2135 properties: HashMap::new(),
2136 edge_type_name: Some("KNOWS".to_string()),
2137 },
2138 Mutation::InsertEdge {
2139 src_vid: Vid::new(2),
2140 dst_vid: Vid::new(3),
2141 edge_type: 101,
2142 eid: Eid::new(1001),
2143 version: 2,
2144 properties: HashMap::new(),
2145 edge_type_name: Some("LIKES".to_string()),
2146 },
2147 Mutation::InsertEdge {
2148 src_vid: Vid::new(3),
2149 dst_vid: Vid::new(1),
2150 edge_type: 100,
2151 eid: Eid::new(1002),
2152 version: 3,
2153 properties: HashMap::new(),
2154 edge_type_name: Some("KNOWS".to_string()),
2155 },
2156 ];
2157
2158 l0.replay_mutations(mutations)?;
2159
2160 assert_eq!(l0.get_edge_type(Eid::new(1000)), Some("KNOWS"));
2162 assert_eq!(l0.get_edge_type(Eid::new(1001)), Some("LIKES"));
2163 assert_eq!(l0.get_edge_type(Eid::new(1002)), Some("KNOWS"));
2164
2165 let knows_edges = l0.eids_for_type("KNOWS");
2167 assert_eq!(knows_edges.len(), 2);
2168 assert!(knows_edges.contains(&Eid::new(1000)));
2169 assert!(knows_edges.contains(&Eid::new(1002)));
2170
2171 let likes_edges = l0.eids_for_type("LIKES");
2172 assert_eq!(likes_edges.len(), 1);
2173 assert_eq!(likes_edges[0], Eid::new(1001));
2174
2175 Ok(())
2176 }
2177
2178 #[test]
2180 fn test_replay_mutations_combined_labels_and_edge_types() -> Result<()> {
2181 use crate::runtime::wal::Mutation;
2182
2183 let mut l0 = L0Buffer::new(0, None);
2184 let alice = Vid::new(1);
2185 let bob = Vid::new(2);
2186 let eid = Eid::new(100);
2187
2188 let mutations = vec![
2190 Mutation::InsertVertex {
2192 vid: alice,
2193 properties: {
2194 let mut props = HashMap::new();
2195 props.insert(
2196 "name".to_string(),
2197 uni_common::Value::String("Alice".to_string()),
2198 );
2199 props
2200 },
2201 labels: vec!["Person".to_string()],
2202 },
2203 Mutation::InsertVertex {
2205 vid: bob,
2206 properties: {
2207 let mut props = HashMap::new();
2208 props.insert(
2209 "name".to_string(),
2210 uni_common::Value::String("Bob".to_string()),
2211 );
2212 props
2213 },
2214 labels: vec!["Person".to_string()],
2215 },
2216 Mutation::InsertEdge {
2218 src_vid: alice,
2219 dst_vid: bob,
2220 edge_type: 1,
2221 eid,
2222 version: 3,
2223 properties: HashMap::new(),
2224 edge_type_name: Some("KNOWS".to_string()),
2225 },
2226 ];
2227
2228 l0.replay_mutations(mutations)?;
2230
2231 assert_eq!(l0.get_vertex_labels(alice).unwrap().len(), 1);
2233 assert_eq!(l0.get_vertex_labels(bob).unwrap().len(), 1);
2234 assert_eq!(l0.vids_for_label("Person").len(), 2);
2235
2236 assert_eq!(l0.get_edge_type(eid).unwrap(), "KNOWS");
2238 assert_eq!(l0.eids_for_type("KNOWS").len(), 1);
2239
2240 let alice_neighbors = l0.get_neighbors(alice, 1, Direction::Outgoing);
2242 assert_eq!(alice_neighbors.len(), 1);
2243 assert_eq!(alice_neighbors[0].0, bob);
2244
2245 Ok(())
2246 }
2247
2248 #[test]
2250 fn test_replay_mutations_backward_compat_empty_labels() -> Result<()> {
2251 use crate::runtime::wal::Mutation;
2252
2253 let mut l0 = L0Buffer::new(0, None);
2254 let vid = Vid::new(1);
2255
2256 let mutations = vec![Mutation::InsertVertex {
2259 vid,
2260 properties: HashMap::new(),
2261 labels: vec![], }];
2263
2264 l0.replay_mutations(mutations)?;
2265
2266 assert!(l0.vertex_properties.contains_key(&vid));
2268
2269 let labels = l0.get_vertex_labels(vid);
2271 assert!(labels.is_some(), "Labels entry should exist even if empty");
2272 assert_eq!(labels.unwrap().len(), 0);
2273
2274 Ok(())
2275 }
2276
2277 #[test]
2278 fn test_now_nanos_returns_nanosecond_range() {
2279 let now = now_nanos();
2283
2284 assert!(
2286 now > 1_700_000_000_000_000_000,
2287 "now_nanos() returned {}, expected > 1.7e18 for nanoseconds",
2288 now
2289 );
2290
2291 assert!(
2293 now < 4_100_000_000_000_000_000,
2294 "now_nanos() returned {}, expected < 4.1e18",
2295 now
2296 );
2297 }
2298}