1use crate::runtime::wal::{Mutation, WriteAheadLog};
5use anyhow::Result;
6use std::collections::{HashMap, HashSet};
7use std::sync::Arc;
8use std::time::{SystemTime, UNIX_EPOCH};
9use tracing::{instrument, trace};
10use uni_common::core::id::{Eid, Vid};
11use uni_common::graph::simple_graph::{Direction, SimpleGraph};
12use uni_common::{Properties, Value};
13use uni_crdt::Crdt;
14
15#[derive(Debug, Default)]
23pub struct OccReadSet {
24 pub vertices: HashSet<Vid>,
26 pub edges: HashSet<Eid>,
28}
29
30impl OccReadSet {
31 pub fn is_empty(&self) -> bool {
34 self.vertices.is_empty() && self.edges.is_empty()
35 }
36}
37
38fn now_nanos() -> i64 {
40 SystemTime::now()
41 .duration_since(UNIX_EPOCH)
42 .map(|d| d.as_nanos() as i64)
43 .unwrap_or(0)
44}
45
46pub(crate) fn try_as_crdt(v: &Value) -> Option<Crdt> {
58 if !matches!(v, Value::Map(_)) {
59 return None;
60 }
61 serde_json::from_value::<Crdt>(v.clone().into()).ok()
62}
63
64pub fn serialize_constraint_key(label: &str, key_values: &[(String, Value)]) -> Vec<u8> {
67 let mut buf = label.as_bytes().to_vec();
68 buf.push(0); let mut sorted = key_values.to_vec();
70 sorted.sort_by(|a, b| a.0.cmp(&b.0));
71 for (k, v) in &sorted {
72 buf.extend(k.as_bytes());
73 buf.push(0);
74 buf.extend(serde_json::to_vec(v).unwrap_or_default());
76 buf.push(0);
77 }
78 buf
79}
80
81#[derive(Debug, Clone, Default)]
87pub struct MutationStats {
88 pub nodes_created: usize,
89 pub nodes_deleted: usize,
90 pub relationships_created: usize,
91 pub relationships_deleted: usize,
92 pub properties_set: usize,
93 pub properties_removed: usize,
94 pub labels_added: usize,
95 pub labels_removed: usize,
96}
97
98impl MutationStats {
99 pub fn diff(&self, before: &Self) -> Self {
101 Self {
102 nodes_created: self.nodes_created.saturating_sub(before.nodes_created),
103 nodes_deleted: self.nodes_deleted.saturating_sub(before.nodes_deleted),
104 relationships_created: self
105 .relationships_created
106 .saturating_sub(before.relationships_created),
107 relationships_deleted: self
108 .relationships_deleted
109 .saturating_sub(before.relationships_deleted),
110 properties_set: self.properties_set.saturating_sub(before.properties_set),
111 properties_removed: self
112 .properties_removed
113 .saturating_sub(before.properties_removed),
114 labels_added: self.labels_added.saturating_sub(before.labels_added),
115 labels_removed: self.labels_removed.saturating_sub(before.labels_removed),
116 }
117 }
118}
119
120#[derive(Clone, Debug)]
121pub struct TombstoneEntry {
122 pub eid: Eid,
123 pub src_vid: Vid,
124 pub dst_vid: Vid,
125 pub edge_type: u32,
126}
127
128pub struct L0Buffer {
129 pub graph: SimpleGraph,
131 pub tombstones: HashMap<Eid, TombstoneEntry>,
133 pub vertex_tombstones: HashSet<Vid>,
135 pub edge_versions: HashMap<Eid, u64>,
137 pub vertex_versions: HashMap<Vid, u64>,
139 pub edge_properties: HashMap<Eid, Properties>,
141 pub vertex_properties: HashMap<Vid, Properties>,
143 pub edge_endpoints: HashMap<Eid, (Vid, Vid, u32)>,
145 pub vertex_labels: HashMap<Vid, Vec<String>>,
148 pub label_to_vids: HashMap<String, HashSet<Vid>>,
151 pub vertex_label_overwrites: HashSet<Vid>,
159 pub edge_types: HashMap<Eid, String>,
161 pub current_version: u64,
163 pub mutation_count: usize,
165 pub mutation_stats: MutationStats,
167 pub wal: Option<Arc<WriteAheadLog>>,
169 pub wal_lsn_at_flush: u64,
172 pub vertex_created_at: HashMap<Vid, i64>,
174 pub vertex_updated_at: HashMap<Vid, i64>,
176 pub edge_created_at: HashMap<Eid, i64>,
178 pub edge_updated_at: HashMap<Eid, i64>,
180 pub estimated_size: usize,
183 pub constraint_index: HashMap<Vec<u8>, Vid>,
187 pub merge_guard_index: HashMap<Vec<u8>, Vid>,
197 pub extid_index: HashMap<String, Vid>,
204 pub vertex_partial_keys: HashMap<Vid, HashSet<String>>,
210 pub edge_partial_keys: HashMap<Eid, HashSet<String>>,
217 pub pending_embeddings: HashMap<Vid, String>,
224 pub occ_read_seq: u64,
229 pub occ_read_set: Option<Arc<parking_lot::Mutex<OccReadSet>>>,
234}
235
236impl std::fmt::Debug for L0Buffer {
237 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
238 f.debug_struct("L0Buffer")
239 .field("vertex_count", &self.graph.vertex_count())
240 .field("edge_count", &self.graph.edge_count())
241 .field("tombstones", &self.tombstones.len())
242 .field("vertex_tombstones", &self.vertex_tombstones.len())
243 .field("current_version", &self.current_version)
244 .field("mutation_count", &self.mutation_count)
245 .finish()
246 }
247}
248
249impl Clone for L0Buffer {
250 fn clone(&self) -> Self {
255 Self {
256 graph: self.graph.clone(),
257 tombstones: self.tombstones.clone(),
258 vertex_tombstones: self.vertex_tombstones.clone(),
259 edge_versions: self.edge_versions.clone(),
260 vertex_versions: self.vertex_versions.clone(),
261 edge_properties: self.edge_properties.clone(),
262 vertex_properties: self.vertex_properties.clone(),
263 edge_endpoints: self.edge_endpoints.clone(),
264 vertex_labels: self.vertex_labels.clone(),
265 label_to_vids: self.label_to_vids.clone(),
266 vertex_label_overwrites: self.vertex_label_overwrites.clone(),
267 edge_types: self.edge_types.clone(),
268 current_version: self.current_version,
269 mutation_count: self.mutation_count,
270 mutation_stats: self.mutation_stats.clone(),
271 wal: None, wal_lsn_at_flush: self.wal_lsn_at_flush,
273 vertex_created_at: self.vertex_created_at.clone(),
274 vertex_updated_at: self.vertex_updated_at.clone(),
275 edge_created_at: self.edge_created_at.clone(),
276 edge_updated_at: self.edge_updated_at.clone(),
277 estimated_size: self.estimated_size,
278 constraint_index: self.constraint_index.clone(),
279 merge_guard_index: self.merge_guard_index.clone(),
280 extid_index: self.extid_index.clone(),
281 vertex_partial_keys: self.vertex_partial_keys.clone(),
282 edge_partial_keys: self.edge_partial_keys.clone(),
283 pending_embeddings: self.pending_embeddings.clone(),
284 occ_read_seq: self.occ_read_seq,
285 occ_read_set: None,
287 }
288 }
289}
290
291impl L0Buffer {
292 fn append_unique_labels(existing: &mut Vec<String>, labels: &[String]) {
294 for label in labels {
295 if !existing.contains(label) {
296 existing.push(label.clone());
297 }
298 }
299 }
300
301 fn index_labels_for_vid(&mut self, vid: Vid, labels: &[String]) {
303 for label in labels {
304 self.label_to_vids
305 .entry(label.clone())
306 .or_default()
307 .insert(vid);
308 }
309 }
310
311 fn extid_of(props: &Properties) -> Option<String> {
313 props
314 .get("ext_id")
315 .and_then(|v| v.as_str())
316 .map(str::to_owned)
317 }
318
319 fn sync_extid_index(&mut self, vid: Vid, old: Option<String>, new: Option<String>) {
327 if old == new {
328 return;
329 }
330 if let Some(old) = old
331 && self.extid_index.get(&old) == Some(&vid)
332 {
333 self.extid_index.remove(&old);
334 }
335 if let Some(new) = new {
336 self.extid_index.insert(new, vid);
337 }
338 }
339
340 fn remove_vid_from_label_index(&mut self, vid: Vid) {
342 if let Some(labels) = self.vertex_labels.get(&vid) {
343 for label in labels {
344 if let Some(set) = self.label_to_vids.get_mut(label) {
345 set.remove(&vid);
346 }
347 }
348 }
349 }
350
351 pub fn set_vertex_labels(&mut self, vid: Vid, labels: &[String]) {
362 self.remove_vid_from_label_index(vid);
363 self.vertex_labels.insert(vid, labels.to_vec());
364 self.index_labels_for_vid(vid, labels);
365 self.vertex_label_overwrites.insert(vid);
366 self.current_version += 1;
367 self.mutation_count += 1;
368 }
369
370 fn merge_crdt_properties(entry: &mut Properties, properties: Properties) {
379 if entry.is_empty() {
381 *entry = properties;
382 return;
383 }
384
385 for (k, v) in properties {
386 if let Some(mut new_crdt) = try_as_crdt(&v)
390 && let Some(existing_v) = entry.get(&k)
391 && let Ok(existing_crdt) = serde_json::from_value::<Crdt>(existing_v.clone().into())
392 {
393 if new_crdt.try_merge(&existing_crdt).is_ok()
395 && let Ok(merged_json) = serde_json::to_value(new_crdt)
396 {
397 entry.insert(k, uni_common::Value::from(merged_json));
398 continue;
399 }
400 tracing::warn!(
408 property = %k,
409 existing_variant = existing_crdt.type_name(),
410 "overwriting CRDT property with a different CRDT variant \
411 (last-writer-wins); merged CRDT state is discarded"
412 );
413 } else if try_as_crdt(&v).is_none()
414 && entry.get(&k).is_some_and(|e| try_as_crdt(e).is_some())
415 {
416 tracing::warn!(
423 property = %k,
424 "overwriting CRDT property with non-CRDT value (last-writer-wins); \
425 merged CRDT state is discarded"
426 );
427 }
428 entry.insert(k, v);
430 }
431 }
432
433 fn estimate_properties_size(props: &Properties) -> usize {
435 props.keys().map(|k| k.len() + 32).sum()
436 }
437
438 pub fn size_bytes(&self) -> usize {
441 let mut total = 0;
442
443 total += self.graph.vertex_count() * 8;
445 total += self.graph.edge_count() * 24;
446
447 for props in self.vertex_properties.values() {
449 total += Self::estimate_properties_size(props);
450 }
451 for props in self.edge_properties.values() {
452 total += Self::estimate_properties_size(props);
453 }
454
455 total += self.tombstones.len() * 64;
457 total += self.vertex_tombstones.len() * 8;
458 total += self.edge_versions.len() * 16;
459 total += self.vertex_versions.len() * 16;
460 total += self.edge_endpoints.len() * 28; for labels in self.vertex_labels.values() {
464 total += labels.iter().map(|l| l.len() + 24).sum::<usize>();
465 }
466
467 for (label, vids) in &self.label_to_vids {
469 total += label.len() + 24 + vids.len() * 8 + 48; }
471
472 for type_name in self.edge_types.values() {
474 total += type_name.len() + 24;
475 }
476
477 total += self.vertex_created_at.len() * 16;
479 total += self.vertex_updated_at.len() * 16;
480 total += self.edge_created_at.len() * 16;
481 total += self.edge_updated_at.len() * 16;
482
483 total
484 }
485
486 pub fn new(start_version: u64, wal: Option<Arc<WriteAheadLog>>) -> Self {
487 Self {
488 graph: SimpleGraph::new(),
489 tombstones: HashMap::new(),
490 vertex_tombstones: HashSet::new(),
491 edge_versions: HashMap::new(),
492 vertex_versions: HashMap::new(),
493 edge_properties: HashMap::new(),
494 vertex_properties: HashMap::new(),
495 edge_endpoints: HashMap::new(),
496 vertex_labels: HashMap::new(),
497 label_to_vids: HashMap::new(),
498 vertex_label_overwrites: HashSet::new(),
499 edge_types: HashMap::new(),
500 current_version: start_version,
501 mutation_count: 0,
502 mutation_stats: MutationStats::default(),
503 wal,
504 wal_lsn_at_flush: 0,
505 vertex_created_at: HashMap::new(),
506 vertex_updated_at: HashMap::new(),
507 edge_created_at: HashMap::new(),
508 edge_updated_at: HashMap::new(),
509 estimated_size: 0,
510 constraint_index: HashMap::new(),
511 merge_guard_index: HashMap::new(),
512 extid_index: HashMap::new(),
513 vertex_partial_keys: HashMap::new(),
514 edge_partial_keys: HashMap::new(),
515 pending_embeddings: HashMap::new(),
516 occ_read_seq: 0,
517 occ_read_set: None,
518 }
519 }
520
521 pub fn insert_vertex(&mut self, vid: Vid, properties: Properties) {
522 self.insert_vertex_with_labels(vid, properties, &[]);
523 }
524
525 pub fn insert_vertex_with_labels(
527 &mut self,
528 vid: Vid,
529 properties: Properties,
530 labels: &[String],
531 ) {
532 self.insert_vertex_with_labels_impl(vid, properties, labels, false);
533 }
534
535 fn insert_vertex_with_labels_impl(
538 &mut self,
539 vid: Vid,
540 properties: Properties,
541 labels: &[String],
542 skip_wal: bool,
543 ) {
544 self.current_version += 1;
545 let version = self.current_version;
546 let now = now_nanos();
547
548 if !skip_wal && let Some(wal) = &self.wal {
549 let _ = wal.append(Mutation::InsertVertex {
550 vid,
551 properties: properties.clone(),
552 labels: labels.to_vec(),
553 });
554 }
555
556 self.vertex_tombstones.remove(&vid);
557
558 self.vertex_partial_keys.remove(&vid);
561
562 let props_size = Self::estimate_properties_size(&properties);
565 let props_count = properties.len();
566 let tracks_extid = properties.contains_key("ext_id");
567
568 let entry = self.vertex_properties.entry(vid).or_default();
569 let old_extid = if tracks_extid {
570 Self::extid_of(entry)
571 } else {
572 None
573 };
574 Self::merge_crdt_properties(entry, properties);
575 if tracks_extid {
576 let new_extid =
577 Self::extid_of(self.vertex_properties.get(&vid).expect("just inserted"));
578 self.sync_extid_index(vid, old_extid, new_extid);
579 }
580 self.vertex_versions.insert(vid, version);
581
582 self.vertex_created_at.entry(vid).or_insert(now);
584 self.vertex_updated_at.insert(vid, now);
585
586 let labels_size: usize = labels.iter().map(|l| l.len() + 24).sum();
589 let existing = self.vertex_labels.entry(vid).or_default();
590 Self::append_unique_labels(existing, labels);
591 self.index_labels_for_vid(vid, labels);
592
593 self.graph.add_vertex(vid);
594 self.mutation_count += 1;
595 self.mutation_stats.nodes_created += 1;
596 self.mutation_stats.properties_set += props_count;
597 self.mutation_stats.labels_added += labels.len();
598
599 self.estimated_size += 8 + props_size + 16 + labels_size + 32;
600 }
601
602 pub fn insert_vertex_partial_full(
623 &mut self,
624 vid: Vid,
625 props: Properties,
626 touched_keys: HashSet<String>,
627 labels: &[String],
628 ) {
629 self.insert_vertex_with_labels_partial_impl(vid, props, labels, false);
633 self.vertex_partial_keys
634 .entry(vid)
635 .or_default()
636 .extend(touched_keys);
637 }
638
639 pub fn insert_vertex_partial(&mut self, vid: Vid, touched: Properties, labels: &[String]) {
643 let touched_keys: Vec<String> = touched.keys().cloned().collect();
647
648 let already_full = self.vertex_properties.contains_key(&vid)
659 && !self.vertex_partial_keys.contains_key(&vid);
660
661 self.insert_vertex_with_labels_partial_impl(vid, touched, labels, false);
666
667 if !already_full {
668 self.vertex_partial_keys
669 .entry(vid)
670 .or_default()
671 .extend(touched_keys);
672 }
673 }
674
675 fn insert_vertex_with_labels_partial_impl(
679 &mut self,
680 vid: Vid,
681 properties: Properties,
682 labels: &[String],
683 skip_wal: bool,
684 ) {
685 self.current_version += 1;
686 let version = self.current_version;
687 let now = now_nanos();
688
689 if !skip_wal && let Some(wal) = &self.wal {
690 let _ = wal.append(Mutation::InsertVertex {
696 vid,
697 properties: properties.clone(),
698 labels: labels.to_vec(),
699 });
700 }
701
702 self.vertex_tombstones.remove(&vid);
703 let props_size = Self::estimate_properties_size(&properties);
709 let props_count = properties.len();
710 let tracks_extid = properties.contains_key("ext_id");
711
712 let entry = self.vertex_properties.entry(vid).or_default();
713 let old_extid = if tracks_extid {
714 Self::extid_of(entry)
715 } else {
716 None
717 };
718 Self::merge_crdt_properties(entry, properties);
719 if tracks_extid {
720 let new_extid =
721 Self::extid_of(self.vertex_properties.get(&vid).expect("just inserted"));
722 self.sync_extid_index(vid, old_extid, new_extid);
723 }
724 self.vertex_versions.insert(vid, version);
725
726 self.vertex_created_at.entry(vid).or_insert(now);
727 self.vertex_updated_at.insert(vid, now);
728
729 let labels_size: usize = labels.iter().map(|l| l.len() + 24).sum();
730 let existing = self.vertex_labels.entry(vid).or_default();
731 Self::append_unique_labels(existing, labels);
732 self.index_labels_for_vid(vid, labels);
733
734 self.graph.add_vertex(vid);
735 self.mutation_count += 1;
736 self.mutation_stats.properties_set += props_count;
739 self.mutation_stats.labels_added += labels.len();
740
741 self.estimated_size += 8 + props_size + 16 + labels_size + 32;
742 }
743
744 pub fn add_vertex_labels(&mut self, vid: Vid, labels: &[String]) {
746 let existing = self.vertex_labels.entry(vid).or_default();
747 Self::append_unique_labels(existing, labels);
748 self.index_labels_for_vid(vid, labels);
749 }
750
751 pub fn remove_vertex_label(&mut self, vid: Vid, label: &str) -> bool {
754 if let Some(labels) = self.vertex_labels.get_mut(&vid)
755 && let Some(pos) = labels.iter().position(|l| l == label)
756 {
757 labels.remove(pos);
758 if let Some(set) = self.label_to_vids.get_mut(label) {
759 set.remove(&vid);
760 }
761 self.current_version += 1;
762 self.mutation_count += 1;
763 self.mutation_stats.labels_removed += 1;
764 return true;
767 }
768 false
769 }
770
771 pub fn set_edge_type(&mut self, eid: Eid, edge_type: String) {
773 self.edge_types.insert(eid, edge_type);
774 }
775
776 pub fn delete_vertex(&mut self, vid: Vid) -> Result<()> {
777 self.delete_vertex_impl(vid, false)
778 }
779
780 fn delete_vertex_impl(&mut self, vid: Vid, skip_wal: bool) -> Result<()> {
783 self.current_version += 1;
784
785 if !skip_wal && let Some(wal) = &mut self.wal {
786 let labels = self.vertex_labels.get(&vid).cloned().unwrap_or_default();
787 wal.append(Mutation::DeleteVertex { vid, labels })?;
788 }
789
790 self.apply_vertex_deletion(vid);
791 Ok(())
792 }
793
794 fn apply_vertex_deletion(&mut self, vid: Vid) {
798 let version = self.current_version;
799
800 let mut edges_to_remove = HashSet::new();
802
803 for entry in self.graph.neighbors(vid, Direction::Outgoing) {
805 edges_to_remove.insert(entry.eid);
806 }
807
808 for entry in self.graph.neighbors(vid, Direction::Incoming) {
810 edges_to_remove.insert(entry.eid); }
812
813 let cascaded_edges_count = edges_to_remove.len();
814
815 for eid in edges_to_remove {
817 if let Some((src, dst, etype)) = self.edge_endpoints.get(&eid) {
819 self.tombstones.insert(
820 eid,
821 TombstoneEntry {
822 eid,
823 src_vid: *src,
824 dst_vid: *dst,
825 edge_type: *etype,
826 },
827 );
828 self.edge_versions.insert(eid, version);
829 self.edge_endpoints.remove(&eid);
830 self.edge_properties.remove(&eid);
831 self.graph.remove_edge(eid);
832 self.mutation_count += 1;
833 self.mutation_stats.relationships_deleted += 1;
834 }
835 }
836
837 self.remove_vid_from_label_index(vid);
838 self.vertex_tombstones.insert(vid);
839 if let Some(props) = self.vertex_properties.get(&vid)
842 && let Some(ext) = Self::extid_of(props)
843 && self.extid_index.get(&ext) == Some(&vid)
844 {
845 self.extid_index.remove(&ext);
846 }
847 self.vertex_properties.remove(&vid);
848 self.vertex_partial_keys.remove(&vid);
850 self.vertex_versions.insert(vid, version);
851 self.graph.remove_vertex(vid);
852 self.mutation_count += 1;
853 self.mutation_stats.nodes_deleted += 1;
854
855 self.constraint_index.retain(|_, v| *v != vid);
857 self.merge_guard_index.retain(|_, v| *v != vid);
860
861 self.estimated_size += cascaded_edges_count * 72 + 8;
863 }
864
865 pub fn insert_edge(
866 &mut self,
867 src_vid: Vid,
868 dst_vid: Vid,
869 edge_type: u32,
870 eid: Eid,
871 properties: Properties,
872 edge_type_name: Option<String>,
873 ) -> Result<()> {
874 self.insert_edge_impl(
875 src_vid,
876 dst_vid,
877 edge_type,
878 eid,
879 properties,
880 edge_type_name,
881 false,
882 )
883 }
884
885 #[allow(clippy::too_many_arguments)]
888 fn insert_edge_impl(
889 &mut self,
890 src_vid: Vid,
891 dst_vid: Vid,
892 edge_type: u32,
893 eid: Eid,
894 properties: Properties,
895 edge_type_name: Option<String>,
896 skip_wal: bool,
897 ) -> Result<()> {
898 self.current_version += 1;
899 let now = now_nanos();
900
901 if !skip_wal && let Some(wal) = &mut self.wal {
902 wal.append(Mutation::InsertEdge {
903 src_vid,
904 dst_vid,
905 edge_type,
906 eid,
907 version: self.current_version,
908 properties: properties.clone(),
909 edge_type_name: edge_type_name.clone(),
910 })?;
911 }
912
913 self.apply_edge_insertion(src_vid, dst_vid, edge_type, eid, properties)?;
914
915 let type_name_size = if let Some(ref name) = edge_type_name {
917 let size = name.len() + 24;
918 self.edge_types.insert(eid, name.clone());
919 size
920 } else {
921 0
922 };
923
924 self.edge_created_at.entry(eid).or_insert(now);
926 self.edge_updated_at.insert(eid, now);
927
928 self.edge_partial_keys.remove(&eid);
931
932 self.estimated_size += type_name_size;
933
934 Ok(())
935 }
936
937 #[allow(clippy::too_many_arguments)]
942 pub fn insert_edge_partial_full(
943 &mut self,
944 src_vid: Vid,
945 dst_vid: Vid,
946 edge_type: u32,
947 eid: Eid,
948 properties: Properties,
949 edge_type_name: Option<String>,
950 touched_keys: HashSet<String>,
951 ) -> Result<()> {
952 self.current_version += 1;
953 let now = now_nanos();
954
955 if let Some(wal) = &mut self.wal {
956 wal.append(Mutation::InsertEdge {
957 src_vid,
958 dst_vid,
959 edge_type,
960 eid,
961 version: self.current_version,
962 properties: properties.clone(),
963 edge_type_name: edge_type_name.clone(),
964 })?;
965 }
966
967 self.apply_edge_insertion(src_vid, dst_vid, edge_type, eid, properties)?;
968
969 self.edge_partial_keys
973 .entry(eid)
974 .or_default()
975 .extend(touched_keys);
976
977 let type_name_size = if let Some(ref name) = edge_type_name {
978 let size = name.len() + 24;
979 self.edge_types.insert(eid, name.clone());
980 size
981 } else {
982 0
983 };
984
985 self.edge_created_at.entry(eid).or_insert(now);
986 self.edge_updated_at.insert(eid, now);
987
988 self.estimated_size += type_name_size;
989
990 Ok(())
991 }
992
993 fn apply_edge_insertion(
1002 &mut self,
1003 src_vid: Vid,
1004 dst_vid: Vid,
1005 edge_type: u32,
1006 eid: Eid,
1007 properties: Properties,
1008 ) -> Result<()> {
1009 let version = self.current_version;
1010
1011 if self.vertex_tombstones.contains(&src_vid) {
1014 anyhow::bail!(
1015 "Cannot insert edge: source vertex {} has been deleted (issue #77)",
1016 src_vid
1017 );
1018 }
1019 if self.vertex_tombstones.contains(&dst_vid) {
1020 anyhow::bail!(
1021 "Cannot insert edge: destination vertex {} has been deleted (issue #77)",
1022 dst_vid
1023 );
1024 }
1025
1026 if !self.graph.contains_vertex(src_vid) {
1031 self.graph.add_vertex(src_vid);
1032 }
1033 if !self.graph.contains_vertex(dst_vid) {
1034 self.graph.add_vertex(dst_vid);
1035 }
1036
1037 self.graph.add_edge(src_vid, dst_vid, eid, edge_type);
1038
1039 let props_size = Self::estimate_properties_size(&properties);
1041 let props_count = properties.len();
1042 if !properties.is_empty() {
1043 let entry = self.edge_properties.entry(eid).or_default();
1044 Self::merge_crdt_properties(entry, properties);
1045 }
1046
1047 self.edge_versions.insert(eid, version);
1048 self.edge_endpoints
1049 .insert(eid, (src_vid, dst_vid, edge_type));
1050 self.tombstones.remove(&eid);
1051 self.mutation_count += 1;
1052 self.mutation_stats.relationships_created += 1;
1053 self.mutation_stats.properties_set += props_count;
1054
1055 self.estimated_size += 24 + props_size + 16 + 28 + 32;
1057
1058 Ok(())
1059 }
1060
1061 pub fn delete_edge(
1062 &mut self,
1063 eid: Eid,
1064 src_vid: Vid,
1065 dst_vid: Vid,
1066 edge_type: u32,
1067 ) -> Result<()> {
1068 self.delete_edge_impl(eid, src_vid, dst_vid, edge_type, false)
1069 }
1070
1071 fn delete_edge_impl(
1074 &mut self,
1075 eid: Eid,
1076 src_vid: Vid,
1077 dst_vid: Vid,
1078 edge_type: u32,
1079 skip_wal: bool,
1080 ) -> Result<()> {
1081 self.current_version += 1;
1082 let now = now_nanos();
1083
1084 if !skip_wal && let Some(wal) = &mut self.wal {
1085 wal.append(Mutation::DeleteEdge {
1086 eid,
1087 src_vid,
1088 dst_vid,
1089 edge_type,
1090 version: self.current_version,
1091 })?;
1092 }
1093
1094 self.apply_edge_deletion(eid, src_vid, dst_vid, edge_type);
1095
1096 self.edge_updated_at.insert(eid, now);
1098
1099 Ok(())
1100 }
1101
1102 fn apply_edge_deletion(&mut self, eid: Eid, src_vid: Vid, dst_vid: Vid, edge_type: u32) {
1106 let version = self.current_version;
1107
1108 self.tombstones.insert(
1109 eid,
1110 TombstoneEntry {
1111 eid,
1112 src_vid,
1113 dst_vid,
1114 edge_type,
1115 },
1116 );
1117 self.edge_versions.insert(eid, version);
1118 self.edge_partial_keys.remove(&eid);
1121 self.graph.remove_edge(eid);
1122 self.mutation_count += 1;
1123 self.mutation_stats.relationships_deleted += 1;
1124
1125 self.estimated_size += 80;
1127 }
1128
1129 pub fn get_neighbors(
1132 &self,
1133 vid: Vid,
1134 edge_type: u32,
1135 direction: Direction,
1136 ) -> Vec<(Vid, Eid, u64)> {
1137 let edges = self.graph.neighbors(vid, direction);
1138
1139 edges
1140 .iter()
1141 .filter(|e| e.edge_type == edge_type && !self.is_tombstoned(e.eid))
1142 .map(|e| {
1143 let neighbor = match direction {
1144 Direction::Outgoing => e.dst_vid,
1145 Direction::Incoming => e.src_vid,
1146 };
1147 let version = self.edge_versions.get(&e.eid).copied().unwrap_or(0);
1148 (neighbor, e.eid, version)
1149 })
1150 .collect()
1151 }
1152
1153 pub fn is_tombstoned(&self, eid: Eid) -> bool {
1154 self.tombstones.contains_key(&eid)
1155 }
1156
1157 pub fn vids_for_label(&self, label_name: &str) -> Vec<Vid> {
1160 self.label_to_vids
1161 .get(label_name)
1162 .map(|set| set.iter().copied().collect())
1163 .unwrap_or_default()
1164 }
1165
1166 pub fn all_vertex_vids(&self) -> Vec<Vid> {
1170 self.vertex_properties.keys().copied().collect()
1171 }
1172
1173 pub fn vids_for_labels(&self, label_names: &[&str]) -> Vec<Vid> {
1176 let mut result = HashSet::new();
1177 for label_name in label_names {
1178 if let Some(set) = self.label_to_vids.get(*label_name) {
1179 result.extend(set.iter().copied());
1180 }
1181 }
1182 result.into_iter().collect()
1183 }
1184
1185 pub fn vids_with_all_labels(&self, label_names: &[&str]) -> Vec<Vid> {
1188 if label_names.is_empty() {
1189 return Vec::new();
1190 }
1191 let sets: Vec<&HashSet<Vid>> = match label_names
1194 .iter()
1195 .map(|ln| self.label_to_vids.get(*ln))
1196 .collect::<Option<Vec<_>>>()
1197 {
1198 Some(s) => s,
1199 None => return Vec::new(),
1200 };
1201 let smallest = sets.iter().min_by_key(|s| s.len()).unwrap();
1203 smallest
1204 .iter()
1205 .copied()
1206 .filter(|vid| sets.iter().all(|s| s.contains(vid)))
1207 .collect()
1208 }
1209
1210 pub fn get_vertex_labels(&self, vid: Vid) -> Option<&[String]> {
1212 self.vertex_labels.get(&vid).map(|v| v.as_slice())
1213 }
1214
1215 pub fn get_edge_type(&self, eid: Eid) -> Option<&str> {
1217 self.edge_types.get(&eid).map(|s| s.as_str())
1218 }
1219
1220 pub fn eids_for_type(&self, type_name: &str) -> Vec<Eid> {
1223 self.edge_types
1224 .iter()
1225 .filter(|(eid, etype)| *etype == type_name && !self.tombstones.contains_key(eid))
1226 .map(|(eid, _)| *eid)
1227 .collect()
1228 }
1229
1230 pub fn all_edge_eids(&self) -> Vec<Eid> {
1234 self.edge_endpoints
1235 .keys()
1236 .filter(|eid| !self.tombstones.contains_key(eid))
1237 .copied()
1238 .collect()
1239 }
1240
1241 pub fn get_edge_endpoints(&self, eid: Eid) -> Option<(Vid, Vid)> {
1243 self.edge_endpoints
1244 .get(&eid)
1245 .map(|(src, dst, _)| (*src, *dst))
1246 }
1247
1248 pub fn get_edge_endpoint_full(&self, eid: Eid) -> Option<(Vid, Vid, u32)> {
1250 self.edge_endpoints.get(&eid).copied()
1251 }
1252
1253 pub fn insert_constraint_key(&mut self, key: Vec<u8>, vid: Vid) {
1255 self.constraint_index.insert(key, vid);
1256 }
1257
1258 pub fn has_constraint_key(&self, key: &[u8], exclude_vid: Vid) -> bool {
1261 self.constraint_index
1262 .get(key)
1263 .is_some_and(|&v| v != exclude_vid)
1264 }
1265
1266 pub fn insert_merge_guard_key(&mut self, key: Vec<u8>, vid: Vid) {
1268 self.merge_guard_index.insert(key, vid);
1269 }
1270
1271 pub fn has_merge_guard_key(&self, key: &[u8], exclude_vid: Vid) -> bool {
1274 self.merge_guard_index
1275 .get(key)
1276 .is_some_and(|&v| v != exclude_vid)
1277 }
1278
1279 #[instrument(skip(self, other), level = "trace")]
1280 pub fn validate_merge_edge_endpoints(&self, other: &L0Buffer) -> Result<()> {
1299 let is_deleted = |vid: &Vid| {
1303 (self.vertex_tombstones.contains(vid) || other.vertex_tombstones.contains(vid))
1304 && !other.vertex_properties.contains_key(vid)
1305 };
1306 for (eid, (src_vid, dst_vid, _etype)) in &other.edge_endpoints {
1307 if other.tombstones.contains_key(eid) {
1308 continue; }
1310 if is_deleted(src_vid) {
1311 anyhow::bail!(
1312 "Cannot insert edge {}: source vertex {} has been deleted (issue #77)",
1313 eid,
1314 src_vid
1315 );
1316 }
1317 if is_deleted(dst_vid) {
1318 anyhow::bail!(
1319 "Cannot insert edge {}: destination vertex {} has been deleted (issue #77)",
1320 eid,
1321 dst_vid
1322 );
1323 }
1324 }
1325 Ok(())
1326 }
1327
1328 pub fn merge(&mut self, other: &L0Buffer) -> Result<()> {
1329 self.validate_merge_edge_endpoints(other)?;
1333 self.merge_validated(
1334 other,
1335 other.vertex_properties.clone(),
1336 other.edge_properties.clone(),
1337 )
1338 }
1339
1340 pub fn merge_take(&mut self, other: &mut L0Buffer) -> Result<()> {
1349 self.validate_merge_edge_endpoints(other)?;
1352 let vertex_props = std::mem::take(&mut other.vertex_properties);
1353 let edge_props = std::mem::take(&mut other.edge_properties);
1354 self.merge_validated(other, vertex_props, edge_props)
1355 }
1356
1357 fn merge_validated(
1361 &mut self,
1362 other: &L0Buffer,
1363 vertex_props: HashMap<Vid, Properties>,
1364 mut edge_props: HashMap<Eid, Properties>,
1365 ) -> Result<()> {
1366 trace!(
1367 other_mutation_count = other.mutation_count,
1368 "Merging L0 buffer"
1369 );
1370 for &vid in &other.vertex_tombstones {
1375 self.delete_vertex_impl(vid, true)?;
1376 }
1377
1378 for (vid, props) in vertex_props {
1379 let labels = other.vertex_labels.get(&vid).cloned().unwrap_or_default();
1380 self.insert_vertex_with_labels_impl(vid, props, &labels, true);
1381 }
1382
1383 for (vid, labels) in &other.vertex_labels {
1385 if !self.vertex_labels.contains_key(vid) {
1386 self.vertex_labels.insert(*vid, labels.clone());
1387 for label in labels {
1388 self.label_to_vids
1389 .entry(label.clone())
1390 .or_default()
1391 .insert(*vid);
1392 }
1393 }
1394 }
1395
1396 for vid in &other.vertex_label_overwrites {
1403 if other.vertex_tombstones.contains(vid) {
1404 continue;
1405 }
1406 let labels = other.vertex_labels.get(vid).cloned().unwrap_or_default();
1407 self.remove_vid_from_label_index(*vid);
1408 self.vertex_labels.insert(*vid, labels.clone());
1409 self.index_labels_for_vid(*vid, &labels);
1410 self.vertex_label_overwrites.insert(*vid);
1415 }
1416
1417 for (eid, (src, dst, etype)) in &other.edge_endpoints {
1419 if other.tombstones.contains_key(eid) {
1420 self.delete_edge_impl(*eid, *src, *dst, *etype, true)?;
1421 } else {
1422 let props = edge_props.remove(eid).unwrap_or_default();
1423 let etype_name = other.edge_types.get(eid).cloned();
1424 self.insert_edge_impl(*src, *dst, *etype, *eid, props, etype_name, true)?;
1425 }
1426 }
1427
1428 for (eid, tombstone) in &other.tombstones {
1432 if !other.edge_endpoints.contains_key(eid) {
1433 self.delete_edge_impl(
1434 *eid,
1435 tombstone.src_vid,
1436 tombstone.dst_vid,
1437 tombstone.edge_type,
1438 true,
1439 )?;
1440 }
1441 }
1442
1443 for (vid, ts) in &other.vertex_created_at {
1448 self.vertex_created_at.entry(*vid).or_insert(*ts); }
1450 for (vid, ts) in &other.vertex_updated_at {
1451 self.vertex_updated_at.insert(*vid, *ts); }
1453
1454 for (eid, ts) in &other.edge_created_at {
1455 self.edge_created_at.entry(*eid).or_insert(*ts); }
1457 for (eid, ts) in &other.edge_updated_at {
1458 self.edge_updated_at.insert(*eid, *ts); }
1460
1461 self.estimated_size += other.estimated_size;
1464
1465 for (key, vid) in &other.constraint_index {
1467 self.constraint_index.insert(key.clone(), *vid);
1468 }
1469
1470 for (key, vid) in &other.merge_guard_index {
1473 self.merge_guard_index.insert(key.clone(), *vid);
1474 }
1475
1476 for (vid, label) in &other.pending_embeddings {
1482 self.pending_embeddings.insert(*vid, label.clone());
1483 }
1484
1485 Ok(())
1486 }
1487
1488 #[instrument(skip(self, mutations), level = "debug")]
1492 pub fn replay_mutations(&mut self, mutations: Vec<Mutation>) -> Result<()> {
1493 trace!(count = mutations.len(), "Replaying mutations");
1494 for mutation in mutations {
1495 match mutation {
1496 Mutation::InsertVertex {
1497 vid,
1498 properties,
1499 labels,
1500 } => {
1501 self.current_version += 1;
1503 let version = self.current_version;
1504
1505 self.vertex_tombstones.remove(&vid);
1506 let tracks_extid = properties.contains_key("ext_id");
1507 let entry = self.vertex_properties.entry(vid).or_default();
1508 let old_extid = if tracks_extid {
1509 Self::extid_of(entry)
1510 } else {
1511 None
1512 };
1513 Self::merge_crdt_properties(entry, properties);
1514 if tracks_extid {
1515 let new_extid = Self::extid_of(
1516 self.vertex_properties.get(&vid).expect("just inserted"),
1517 );
1518 self.sync_extid_index(vid, old_extid, new_extid);
1519 }
1520 self.vertex_versions.insert(vid, version);
1521 self.graph.add_vertex(vid);
1522 self.mutation_count += 1;
1523
1524 let existing = self.vertex_labels.entry(vid).or_default();
1526 Self::append_unique_labels(existing, &labels);
1527 for label in &labels {
1528 self.label_to_vids
1529 .entry(label.clone())
1530 .or_default()
1531 .insert(vid);
1532 }
1533 }
1534 Mutation::DeleteVertex { vid, labels } => {
1535 self.current_version += 1;
1536 if !labels.is_empty() {
1538 let existing = self.vertex_labels.entry(vid).or_default();
1539 Self::append_unique_labels(existing, &labels);
1540 for label in &labels {
1541 self.label_to_vids
1542 .entry(label.clone())
1543 .or_default()
1544 .insert(vid);
1545 }
1546 }
1547 self.apply_vertex_deletion(vid);
1548 }
1549 Mutation::SetVertexLabels { vid, labels } => {
1550 self.current_version += 1;
1555 self.remove_vid_from_label_index(vid);
1556 self.vertex_labels.insert(vid, labels.clone());
1557 self.index_labels_for_vid(vid, &labels);
1558 self.mutation_count += 1;
1559 }
1560 Mutation::InsertEdge {
1561 src_vid,
1562 dst_vid,
1563 edge_type,
1564 eid,
1565 version: _,
1566 properties,
1567 edge_type_name,
1568 } => {
1569 self.current_version += 1;
1570 match self.apply_edge_insertion(src_vid, dst_vid, edge_type, eid, properties) {
1575 Ok(()) => {
1576 if let Some(name) = edge_type_name {
1578 self.edge_types.insert(eid, name);
1579 }
1580 }
1581 Err(e) => {
1582 tracing::warn!(
1583 ?eid,
1584 ?src_vid,
1585 ?dst_vid,
1586 error = %e,
1587 "WAL replay: skipping edge insertion to a deleted endpoint (issue #77)"
1588 );
1589 }
1590 }
1591 }
1592 Mutation::DeleteEdge {
1593 eid,
1594 src_vid,
1595 dst_vid,
1596 edge_type,
1597 version: _,
1598 } => {
1599 self.current_version += 1;
1600 self.apply_edge_deletion(eid, src_vid, dst_vid, edge_type);
1601 }
1602 }
1603 }
1604 Ok(())
1605 }
1606}
1607
1608#[cfg(test)]
1609mod tests {
1610 use super::*;
1611
1612 #[test]
1613 fn test_l0_buffer_ops() -> Result<()> {
1614 let mut l0 = L0Buffer::new(0, None);
1615 let vid_a = Vid::new(1);
1616 let vid_b = Vid::new(2);
1617 let eid_ab = Eid::new(101);
1618
1619 l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
1620
1621 let neighbors = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
1622 assert_eq!(neighbors.len(), 1);
1623 assert_eq!(neighbors[0].0, vid_b);
1624 assert_eq!(neighbors[0].1, eid_ab);
1625
1626 l0.delete_edge(eid_ab, vid_a, vid_b, 1)?;
1627 assert!(l0.is_tombstoned(eid_ab));
1628
1629 let neighbors_after = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
1631 assert_eq!(neighbors_after.len(), 0);
1632
1633 Ok(())
1634 }
1635
1636 #[test]
1642 fn validate_merge_rejects_edge_to_tombstoned_endpoint() {
1643 let mut main = L0Buffer::new(0, None);
1644 let vid_a = Vid::new(1);
1645 let vid_b = Vid::new(2);
1646 main.insert_vertex(vid_a, HashMap::new());
1647 main.insert_vertex(vid_b, HashMap::new());
1648 main.delete_vertex(vid_b).unwrap(); let mut tx = L0Buffer::new(0, None);
1652 let eid = Eid::new(101);
1653 tx.insert_edge(vid_a, vid_b, 1, eid, HashMap::new(), None)
1654 .unwrap();
1655
1656 assert!(
1657 main.validate_merge_edge_endpoints(&tx).is_err(),
1658 "edge to a tombstoned endpoint must be rejected before merge"
1659 );
1660 assert!(
1662 main.merge(&tx).is_err(),
1663 "merge must reject, not bail mid-apply"
1664 );
1665 assert!(
1666 !main.edge_endpoints.contains_key(&eid),
1667 "a rejected merge must not have partially applied the edge"
1668 );
1669 }
1670
1671 #[test]
1674 fn validate_merge_allows_edge_when_endpoint_reinserted() {
1675 let mut main = L0Buffer::new(0, None);
1676 let vid_a = Vid::new(1);
1677 let vid_b = Vid::new(2);
1678 main.insert_vertex(vid_a, HashMap::new());
1679 main.insert_vertex(vid_b, HashMap::new());
1680 main.delete_vertex(vid_b).unwrap();
1681
1682 let mut tx = L0Buffer::new(0, None);
1683 tx.insert_vertex(vid_b, HashMap::new()); let eid = Eid::new(101);
1685 tx.insert_edge(vid_a, vid_b, 1, eid, HashMap::new(), None)
1686 .unwrap();
1687
1688 assert!(main.validate_merge_edge_endpoints(&tx).is_ok());
1689 assert!(main.merge(&tx).is_ok());
1690 assert!(main.edge_endpoints.contains_key(&eid));
1691 }
1692
1693 #[test]
1695 fn validate_merge_allows_edge_to_live_endpoints() {
1696 let mut main = L0Buffer::new(0, None);
1697 let vid_a = Vid::new(1);
1698 let vid_b = Vid::new(2);
1699 main.insert_vertex(vid_a, HashMap::new());
1700 main.insert_vertex(vid_b, HashMap::new());
1701
1702 let mut tx = L0Buffer::new(0, None);
1703 let eid = Eid::new(101);
1704 tx.insert_edge(vid_a, vid_b, 1, eid, HashMap::new(), None)
1705 .unwrap();
1706
1707 assert!(main.validate_merge_edge_endpoints(&tx).is_ok());
1708 assert!(main.merge(&tx).is_ok());
1709 assert!(main.edge_endpoints.contains_key(&eid));
1710 }
1711
1712 #[test]
1713 fn test_l0_buffer_multiple_edges() -> Result<()> {
1714 let mut l0 = L0Buffer::new(0, None);
1715 let vid_a = Vid::new(1);
1716 let vid_b = Vid::new(2);
1717 let vid_c = Vid::new(3);
1718 let eid_ab = Eid::new(101);
1719 let eid_ac = Eid::new(102);
1720
1721 l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
1722 l0.insert_edge(vid_a, vid_c, 1, eid_ac, HashMap::new(), None)?;
1723
1724 let neighbors = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
1725 assert_eq!(neighbors.len(), 2);
1726
1727 l0.delete_edge(eid_ab, vid_a, vid_b, 1)?;
1729
1730 let neighbors_after = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
1732 assert_eq!(neighbors_after.len(), 1);
1733 assert_eq!(neighbors_after[0].0, vid_c);
1734
1735 Ok(())
1736 }
1737
1738 #[test]
1739 fn test_l0_buffer_edge_type_filter() -> Result<()> {
1740 let mut l0 = L0Buffer::new(0, None);
1741 let vid_a = Vid::new(1);
1742 let vid_b = Vid::new(2);
1743 let vid_c = Vid::new(3);
1744 let eid_ab = Eid::new(101);
1745 let eid_ac = Eid::new(201); l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
1748 l0.insert_edge(vid_a, vid_c, 2, eid_ac, HashMap::new(), None)?;
1749
1750 let type1_neighbors = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
1752 assert_eq!(type1_neighbors.len(), 1);
1753 assert_eq!(type1_neighbors[0].0, vid_b);
1754
1755 let type2_neighbors = l0.get_neighbors(vid_a, 2, Direction::Outgoing);
1757 assert_eq!(type2_neighbors.len(), 1);
1758 assert_eq!(type2_neighbors[0].0, vid_c);
1759
1760 Ok(())
1761 }
1762
1763 #[test]
1764 fn test_l0_buffer_incoming_edges() -> Result<()> {
1765 let mut l0 = L0Buffer::new(0, None);
1766 let vid_a = Vid::new(1);
1767 let vid_b = Vid::new(2);
1768 let vid_c = Vid::new(3);
1769 let eid_ab = Eid::new(101);
1770 let eid_cb = Eid::new(102);
1771
1772 l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
1774 l0.insert_edge(vid_c, vid_b, 1, eid_cb, HashMap::new(), None)?;
1775
1776 let incoming = l0.get_neighbors(vid_b, 1, Direction::Incoming);
1778 assert_eq!(incoming.len(), 2);
1779
1780 Ok(())
1781 }
1782
1783 #[test]
1785 fn test_merge_empty_props_edge() -> Result<()> {
1786 let mut main_l0 = L0Buffer::new(0, None);
1787 let mut tx_l0 = L0Buffer::new(0, None);
1788
1789 let vid_a = Vid::new(1);
1790 let vid_b = Vid::new(2);
1791 let eid_ab = Eid::new(101);
1792
1793 tx_l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
1795
1796 assert!(tx_l0.edge_endpoints.contains_key(&eid_ab));
1798 assert!(!tx_l0.edge_properties.contains_key(&eid_ab)); main_l0.merge(&tx_l0)?;
1802
1803 assert!(main_l0.edge_endpoints.contains_key(&eid_ab));
1805 let neighbors = main_l0.get_neighbors(vid_a, 1, Direction::Outgoing);
1806 assert_eq!(neighbors.len(), 1);
1807 assert_eq!(neighbors[0].0, vid_b);
1808
1809 Ok(())
1810 }
1811
1812 #[test]
1814 fn test_replay_crdt_merge() -> Result<()> {
1815 use crate::runtime::wal::Mutation;
1816 use serde_json::json;
1817 use uni_common::Value;
1818
1819 let mut l0 = L0Buffer::new(0, None);
1820 let vid = Vid::new(1);
1821
1822 let counter1: Value = json!({
1825 "t": "gc",
1826 "d": {"counts": {"node1": 5}}
1827 })
1828 .into();
1829 let counter2: Value = json!({
1830 "t": "gc",
1831 "d": {"counts": {"node2": 3}}
1832 })
1833 .into();
1834
1835 let mut props1 = HashMap::new();
1837 props1.insert("counter".to_string(), counter1.clone());
1838 l0.replay_mutations(vec![Mutation::InsertVertex {
1839 vid,
1840 properties: props1,
1841 labels: vec![],
1842 }])?;
1843
1844 let mut props2 = HashMap::new();
1846 props2.insert("counter".to_string(), counter2.clone());
1847 l0.replay_mutations(vec![Mutation::InsertVertex {
1848 vid,
1849 properties: props2,
1850 labels: vec![],
1851 }])?;
1852
1853 let stored_props = l0.vertex_properties.get(&vid).unwrap();
1855 let stored_counter = stored_props.get("counter").unwrap();
1856
1857 let stored_json: serde_json::Value = stored_counter.clone().into();
1859 let data = stored_json.get("d").unwrap();
1861 let counts = data.get("counts").unwrap();
1862 assert_eq!(counts.get("node1"), Some(&json!(5)));
1863 assert_eq!(counts.get("node2"), Some(&json!(3)));
1864
1865 Ok(())
1866 }
1867
1868 #[test]
1869 fn test_merge_preserves_vertex_timestamps() -> Result<()> {
1870 let mut l0_main = L0Buffer::new(0, None);
1871 let mut l0_tx = L0Buffer::new(0, None);
1872 let vid = Vid::new(1);
1873
1874 let ts_main_created = 1000;
1876 let ts_main_updated = 1100;
1877 l0_main.insert_vertex(vid, HashMap::new());
1878 l0_main.vertex_created_at.insert(vid, ts_main_created);
1879 l0_main.vertex_updated_at.insert(vid, ts_main_updated);
1880
1881 let ts_tx_created = 2000; let ts_tx_updated = 2100; l0_tx.insert_vertex(vid, HashMap::new());
1885 l0_tx.vertex_created_at.insert(vid, ts_tx_created);
1886 l0_tx.vertex_updated_at.insert(vid, ts_tx_updated);
1887
1888 l0_main.merge(&l0_tx)?;
1890
1891 assert_eq!(
1893 *l0_main.vertex_created_at.get(&vid).unwrap(),
1894 ts_main_created,
1895 "created_at should preserve oldest timestamp"
1896 );
1897
1898 assert_eq!(
1900 *l0_main.vertex_updated_at.get(&vid).unwrap(),
1901 ts_tx_updated,
1902 "updated_at should use latest timestamp"
1903 );
1904
1905 Ok(())
1906 }
1907
1908 #[test]
1909 fn test_merge_preserves_edge_timestamps() -> Result<()> {
1910 let mut l0_main = L0Buffer::new(0, None);
1911 let mut l0_tx = L0Buffer::new(0, None);
1912 let vid_a = Vid::new(1);
1913 let vid_b = Vid::new(2);
1914 let eid = Eid::new(100);
1915
1916 let ts_main_created = 1000;
1918 let ts_main_updated = 1100;
1919 l0_main.insert_edge(vid_a, vid_b, 1, eid, HashMap::new(), None)?;
1920 l0_main.edge_created_at.insert(eid, ts_main_created);
1921 l0_main.edge_updated_at.insert(eid, ts_main_updated);
1922
1923 let ts_tx_created = 2000; let ts_tx_updated = 2100; l0_tx.insert_edge(vid_a, vid_b, 1, eid, HashMap::new(), None)?;
1927 l0_tx.edge_created_at.insert(eid, ts_tx_created);
1928 l0_tx.edge_updated_at.insert(eid, ts_tx_updated);
1929
1930 l0_main.merge(&l0_tx)?;
1932
1933 assert_eq!(
1935 *l0_main.edge_created_at.get(&eid).unwrap(),
1936 ts_main_created,
1937 "edge created_at should preserve oldest timestamp"
1938 );
1939
1940 assert_eq!(
1942 *l0_main.edge_updated_at.get(&eid).unwrap(),
1943 ts_tx_updated,
1944 "edge updated_at should use latest timestamp"
1945 );
1946
1947 Ok(())
1948 }
1949
1950 #[test]
1951 fn test_merge_created_at_not_overwritten_for_existing_vertex() -> Result<()> {
1952 use uni_common::Value;
1953
1954 let mut l0_main = L0Buffer::new(0, None);
1955 let mut l0_tx = L0Buffer::new(0, None);
1956 let vid = Vid::new(1);
1957
1958 let ts_original = 1000;
1960 l0_main.insert_vertex(vid, HashMap::new());
1961 l0_main.vertex_created_at.insert(vid, ts_original);
1962 l0_main.vertex_updated_at.insert(vid, ts_original);
1963
1964 let ts_tx = 2000;
1966 let mut props = HashMap::new();
1967 props.insert("updated".to_string(), Value::String("yes".to_string()));
1968 l0_tx.insert_vertex(vid, props);
1969 l0_tx.vertex_created_at.insert(vid, ts_tx);
1970 l0_tx.vertex_updated_at.insert(vid, ts_tx);
1971
1972 l0_main.merge(&l0_tx)?;
1974
1975 assert_eq!(
1977 *l0_main.vertex_created_at.get(&vid).unwrap(),
1978 ts_original,
1979 "created_at must not be overwritten for existing vertex"
1980 );
1981
1982 assert_eq!(
1984 *l0_main.vertex_updated_at.get(&vid).unwrap(),
1985 ts_tx,
1986 "updated_at should reflect transaction timestamp"
1987 );
1988
1989 assert!(
1991 l0_main
1992 .vertex_properties
1993 .get(&vid)
1994 .unwrap()
1995 .contains_key("updated")
1996 );
1997
1998 Ok(())
1999 }
2000
2001 #[test]
2003 fn test_replay_mutations_preserves_vertex_labels() -> Result<()> {
2004 use crate::runtime::wal::Mutation;
2005
2006 let mut l0 = L0Buffer::new(0, None);
2007 let vid = Vid::new(42);
2008
2009 let mutations = vec![Mutation::InsertVertex {
2011 vid,
2012 properties: {
2013 let mut props = HashMap::new();
2014 props.insert(
2015 "name".to_string(),
2016 uni_common::Value::String("Alice".to_string()),
2017 );
2018 props
2019 },
2020 labels: vec!["Person".to_string(), "User".to_string()],
2021 }];
2022
2023 l0.replay_mutations(mutations)?;
2025
2026 assert!(l0.vertex_properties.contains_key(&vid));
2028
2029 let labels = l0.get_vertex_labels(vid).expect("Labels should exist");
2031 assert_eq!(labels.len(), 2);
2032 assert!(labels.contains(&"Person".to_string()));
2033 assert!(labels.contains(&"User".to_string()));
2034
2035 let person_vids = l0.vids_for_label("Person");
2037 assert_eq!(person_vids.len(), 1);
2038 assert_eq!(person_vids[0], vid);
2039
2040 let user_vids = l0.vids_for_label("User");
2041 assert_eq!(user_vids.len(), 1);
2042 assert_eq!(user_vids[0], vid);
2043
2044 Ok(())
2045 }
2046
2047 #[test]
2049 fn test_replay_mutations_preserves_delete_vertex_labels() -> Result<()> {
2050 use crate::runtime::wal::Mutation;
2051
2052 let mut l0 = L0Buffer::new(0, None);
2053 let vid = Vid::new(99);
2054
2055 l0.insert_vertex_with_labels(
2057 vid,
2058 HashMap::new(),
2059 &["Person".to_string(), "Admin".to_string()],
2060 );
2061
2062 assert!(l0.vertex_properties.contains_key(&vid));
2064 let labels = l0.get_vertex_labels(vid).expect("Labels should exist");
2065 assert_eq!(labels.len(), 2);
2066
2067 let mutations = vec![Mutation::DeleteVertex {
2069 vid,
2070 labels: vec!["Person".to_string(), "Admin".to_string()],
2071 }];
2072
2073 l0.replay_mutations(mutations)?;
2075
2076 assert!(l0.vertex_tombstones.contains(&vid));
2078
2079 let labels = l0.get_vertex_labels(vid);
2082 assert!(
2083 labels.is_some(),
2084 "Labels should be preserved even after deletion for tombstone flushing"
2085 );
2086
2087 Ok(())
2088 }
2089
2090 #[test]
2092 fn test_replay_mutations_preserves_edge_type_name() -> Result<()> {
2093 use crate::runtime::wal::Mutation;
2094
2095 let mut l0 = L0Buffer::new(0, None);
2096 let src = Vid::new(1);
2097 let dst = Vid::new(2);
2098 let eid = Eid::new(500);
2099 let edge_type = 100;
2100
2101 let mutations = vec![Mutation::InsertEdge {
2103 src_vid: src,
2104 dst_vid: dst,
2105 edge_type,
2106 eid,
2107 version: 1,
2108 properties: {
2109 let mut props = HashMap::new();
2110 props.insert("since".to_string(), uni_common::Value::Int(2020));
2111 props
2112 },
2113 edge_type_name: Some("KNOWS".to_string()),
2114 }];
2115
2116 l0.replay_mutations(mutations)?;
2118
2119 assert!(l0.edge_endpoints.contains_key(&eid));
2121
2122 let type_name = l0.get_edge_type(eid).expect("Edge type name should exist");
2124 assert_eq!(type_name, "KNOWS");
2125
2126 let knows_eids = l0.eids_for_type("KNOWS");
2128 assert_eq!(knows_eids.len(), 1);
2129 assert_eq!(knows_eids[0], eid);
2130
2131 Ok(())
2132 }
2133
2134 #[test]
2136 fn test_edge_type_mapping_survives_multiple_replays() -> Result<()> {
2137 use crate::runtime::wal::Mutation;
2138
2139 let mut l0 = L0Buffer::new(0, None);
2140
2141 let mutations = vec![
2143 Mutation::InsertEdge {
2144 src_vid: Vid::new(1),
2145 dst_vid: Vid::new(2),
2146 edge_type: 100,
2147 eid: Eid::new(1000),
2148 version: 1,
2149 properties: HashMap::new(),
2150 edge_type_name: Some("KNOWS".to_string()),
2151 },
2152 Mutation::InsertEdge {
2153 src_vid: Vid::new(2),
2154 dst_vid: Vid::new(3),
2155 edge_type: 101,
2156 eid: Eid::new(1001),
2157 version: 2,
2158 properties: HashMap::new(),
2159 edge_type_name: Some("LIKES".to_string()),
2160 },
2161 Mutation::InsertEdge {
2162 src_vid: Vid::new(3),
2163 dst_vid: Vid::new(1),
2164 edge_type: 100,
2165 eid: Eid::new(1002),
2166 version: 3,
2167 properties: HashMap::new(),
2168 edge_type_name: Some("KNOWS".to_string()),
2169 },
2170 ];
2171
2172 l0.replay_mutations(mutations)?;
2173
2174 assert_eq!(l0.get_edge_type(Eid::new(1000)), Some("KNOWS"));
2176 assert_eq!(l0.get_edge_type(Eid::new(1001)), Some("LIKES"));
2177 assert_eq!(l0.get_edge_type(Eid::new(1002)), Some("KNOWS"));
2178
2179 let knows_edges = l0.eids_for_type("KNOWS");
2181 assert_eq!(knows_edges.len(), 2);
2182 assert!(knows_edges.contains(&Eid::new(1000)));
2183 assert!(knows_edges.contains(&Eid::new(1002)));
2184
2185 let likes_edges = l0.eids_for_type("LIKES");
2186 assert_eq!(likes_edges.len(), 1);
2187 assert_eq!(likes_edges[0], Eid::new(1001));
2188
2189 Ok(())
2190 }
2191
2192 #[test]
2194 fn test_replay_mutations_combined_labels_and_edge_types() -> Result<()> {
2195 use crate::runtime::wal::Mutation;
2196
2197 let mut l0 = L0Buffer::new(0, None);
2198 let alice = Vid::new(1);
2199 let bob = Vid::new(2);
2200 let eid = Eid::new(100);
2201
2202 let mutations = vec![
2204 Mutation::InsertVertex {
2206 vid: alice,
2207 properties: {
2208 let mut props = HashMap::new();
2209 props.insert(
2210 "name".to_string(),
2211 uni_common::Value::String("Alice".to_string()),
2212 );
2213 props
2214 },
2215 labels: vec!["Person".to_string()],
2216 },
2217 Mutation::InsertVertex {
2219 vid: bob,
2220 properties: {
2221 let mut props = HashMap::new();
2222 props.insert(
2223 "name".to_string(),
2224 uni_common::Value::String("Bob".to_string()),
2225 );
2226 props
2227 },
2228 labels: vec!["Person".to_string()],
2229 },
2230 Mutation::InsertEdge {
2232 src_vid: alice,
2233 dst_vid: bob,
2234 edge_type: 1,
2235 eid,
2236 version: 3,
2237 properties: HashMap::new(),
2238 edge_type_name: Some("KNOWS".to_string()),
2239 },
2240 ];
2241
2242 l0.replay_mutations(mutations)?;
2244
2245 assert_eq!(l0.get_vertex_labels(alice).unwrap().len(), 1);
2247 assert_eq!(l0.get_vertex_labels(bob).unwrap().len(), 1);
2248 assert_eq!(l0.vids_for_label("Person").len(), 2);
2249
2250 assert_eq!(l0.get_edge_type(eid).unwrap(), "KNOWS");
2252 assert_eq!(l0.eids_for_type("KNOWS").len(), 1);
2253
2254 let alice_neighbors = l0.get_neighbors(alice, 1, Direction::Outgoing);
2256 assert_eq!(alice_neighbors.len(), 1);
2257 assert_eq!(alice_neighbors[0].0, bob);
2258
2259 Ok(())
2260 }
2261
2262 #[test]
2264 fn test_replay_mutations_backward_compat_empty_labels() -> Result<()> {
2265 use crate::runtime::wal::Mutation;
2266
2267 let mut l0 = L0Buffer::new(0, None);
2268 let vid = Vid::new(1);
2269
2270 let mutations = vec![Mutation::InsertVertex {
2273 vid,
2274 properties: HashMap::new(),
2275 labels: vec![], }];
2277
2278 l0.replay_mutations(mutations)?;
2279
2280 assert!(l0.vertex_properties.contains_key(&vid));
2282
2283 let labels = l0.get_vertex_labels(vid);
2285 assert!(labels.is_some(), "Labels entry should exist even if empty");
2286 assert_eq!(labels.unwrap().len(), 0);
2287
2288 Ok(())
2289 }
2290
2291 #[test]
2292 fn test_now_nanos_returns_nanosecond_range() {
2293 let now = now_nanos();
2297
2298 assert!(
2300 now > 1_700_000_000_000_000_000,
2301 "now_nanos() returned {}, expected > 1.7e18 for nanoseconds",
2302 now
2303 );
2304
2305 assert!(
2307 now < 4_100_000_000_000_000_000,
2308 "now_nanos() returned {}, expected < 4.1e18",
2309 now
2310 );
2311 }
2312}