1use crate::runtime::wal::{Mutation, WriteAheadLog};
5use anyhow::Result;
6use std::collections::{HashMap, HashSet};
7use std::sync::Arc;
8use std::time::{SystemTime, UNIX_EPOCH};
9use tracing::{instrument, trace};
10use uni_common::core::id::{Eid, Vid};
11use uni_common::graph::simple_graph::{Direction, SimpleGraph};
12use uni_common::{Properties, Value};
13use uni_crdt::Crdt;
14
15#[derive(Debug, Default)]
23pub struct OccReadSet {
24 pub vertices: HashSet<Vid>,
26 pub edges: HashSet<Eid>,
28}
29
30impl OccReadSet {
31 pub fn is_empty(&self) -> bool {
34 self.vertices.is_empty() && self.edges.is_empty()
35 }
36}
37
38fn now_nanos() -> i64 {
40 SystemTime::now()
41 .duration_since(UNIX_EPOCH)
42 .map(|d| d.as_nanos() as i64)
43 .unwrap_or(0)
44}
45
46pub(crate) fn try_as_crdt(v: &Value) -> Option<Crdt> {
58 if !matches!(v, Value::Map(_)) {
59 return None;
60 }
61 serde_json::from_value::<Crdt>(v.clone().into()).ok()
62}
63
64pub fn serialize_constraint_key(label: &str, key_values: &[(String, Value)]) -> Vec<u8> {
67 let mut buf = label.as_bytes().to_vec();
68 buf.push(0); let mut sorted = key_values.to_vec();
70 sorted.sort_by(|a, b| a.0.cmp(&b.0));
71 for (k, v) in &sorted {
72 buf.extend(k.as_bytes());
73 buf.push(0);
74 buf.extend(serde_json::to_vec(v).unwrap_or_default());
76 buf.push(0);
77 }
78 buf
79}
80
81#[derive(Debug, Clone, Default)]
87pub struct MutationStats {
88 pub nodes_created: usize,
89 pub nodes_deleted: usize,
90 pub relationships_created: usize,
91 pub relationships_deleted: usize,
92 pub properties_set: usize,
93 pub properties_removed: usize,
94 pub labels_added: usize,
95 pub labels_removed: usize,
96}
97
98impl MutationStats {
99 pub fn diff(&self, before: &Self) -> Self {
101 Self {
102 nodes_created: self.nodes_created.saturating_sub(before.nodes_created),
103 nodes_deleted: self.nodes_deleted.saturating_sub(before.nodes_deleted),
104 relationships_created: self
105 .relationships_created
106 .saturating_sub(before.relationships_created),
107 relationships_deleted: self
108 .relationships_deleted
109 .saturating_sub(before.relationships_deleted),
110 properties_set: self.properties_set.saturating_sub(before.properties_set),
111 properties_removed: self
112 .properties_removed
113 .saturating_sub(before.properties_removed),
114 labels_added: self.labels_added.saturating_sub(before.labels_added),
115 labels_removed: self.labels_removed.saturating_sub(before.labels_removed),
116 }
117 }
118}
119
120#[derive(Clone, Debug)]
121pub struct TombstoneEntry {
122 pub eid: Eid,
123 pub src_vid: Vid,
124 pub dst_vid: Vid,
125 pub edge_type: u32,
126}
127
128pub struct L0Buffer {
129 pub graph: SimpleGraph,
131 pub tombstones: HashMap<Eid, TombstoneEntry>,
133 pub vertex_tombstones: HashSet<Vid>,
135 pub edge_versions: HashMap<Eid, u64>,
137 pub vertex_versions: HashMap<Vid, u64>,
139 pub edge_properties: HashMap<Eid, Properties>,
141 pub vertex_properties: HashMap<Vid, Properties>,
143 pub edge_endpoints: HashMap<Eid, (Vid, Vid, u32)>,
145 pub vertex_labels: HashMap<Vid, Vec<String>>,
148 pub label_to_vids: HashMap<String, HashSet<Vid>>,
151 pub vertex_label_overwrites: HashSet<Vid>,
159 pub edge_types: HashMap<Eid, String>,
161 pub current_version: u64,
163 pub mutation_count: usize,
165 pub mutation_stats: MutationStats,
167 pub wal: Option<Arc<WriteAheadLog>>,
169 pub wal_lsn_at_flush: u64,
172 pub wal_lsn_at_start: u64,
182 pub vertex_created_at: HashMap<Vid, i64>,
184 pub vertex_updated_at: HashMap<Vid, i64>,
186 pub edge_created_at: HashMap<Eid, i64>,
188 pub edge_updated_at: HashMap<Eid, i64>,
190 pub estimated_size: usize,
193 pub constraint_index: HashMap<Vec<u8>, Vid>,
197 pub merge_guard_index: HashMap<Vec<u8>, Vid>,
207 pub extid_index: HashMap<String, Vid>,
214 pub vertex_partial_keys: HashMap<Vid, HashSet<String>>,
220 pub edge_partial_keys: HashMap<Eid, HashSet<String>>,
227 pub pending_embeddings: HashMap<Vid, String>,
234 pub occ_read_seq: u64,
239 pub occ_read_set: Option<Arc<parking_lot::Mutex<OccReadSet>>>,
244}
245
246impl std::fmt::Debug for L0Buffer {
247 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
248 f.debug_struct("L0Buffer")
249 .field("vertex_count", &self.graph.vertex_count())
250 .field("edge_count", &self.graph.edge_count())
251 .field("tombstones", &self.tombstones.len())
252 .field("vertex_tombstones", &self.vertex_tombstones.len())
253 .field("current_version", &self.current_version)
254 .field("mutation_count", &self.mutation_count)
255 .finish()
256 }
257}
258
259impl Clone for L0Buffer {
260 fn clone(&self) -> Self {
265 Self {
266 graph: self.graph.clone(),
267 tombstones: self.tombstones.clone(),
268 vertex_tombstones: self.vertex_tombstones.clone(),
269 edge_versions: self.edge_versions.clone(),
270 vertex_versions: self.vertex_versions.clone(),
271 edge_properties: self.edge_properties.clone(),
272 vertex_properties: self.vertex_properties.clone(),
273 edge_endpoints: self.edge_endpoints.clone(),
274 vertex_labels: self.vertex_labels.clone(),
275 label_to_vids: self.label_to_vids.clone(),
276 vertex_label_overwrites: self.vertex_label_overwrites.clone(),
277 edge_types: self.edge_types.clone(),
278 current_version: self.current_version,
279 mutation_count: self.mutation_count,
280 mutation_stats: self.mutation_stats.clone(),
281 wal: None, wal_lsn_at_flush: self.wal_lsn_at_flush,
283 wal_lsn_at_start: self.wal_lsn_at_start,
284 vertex_created_at: self.vertex_created_at.clone(),
285 vertex_updated_at: self.vertex_updated_at.clone(),
286 edge_created_at: self.edge_created_at.clone(),
287 edge_updated_at: self.edge_updated_at.clone(),
288 estimated_size: self.estimated_size,
289 constraint_index: self.constraint_index.clone(),
290 merge_guard_index: self.merge_guard_index.clone(),
291 extid_index: self.extid_index.clone(),
292 vertex_partial_keys: self.vertex_partial_keys.clone(),
293 edge_partial_keys: self.edge_partial_keys.clone(),
294 pending_embeddings: self.pending_embeddings.clone(),
295 occ_read_seq: self.occ_read_seq,
296 occ_read_set: None,
298 }
299 }
300}
301
302impl L0Buffer {
303 fn append_unique_labels(existing: &mut Vec<String>, labels: &[String]) {
305 for label in labels {
306 if !existing.contains(label) {
307 existing.push(label.clone());
308 }
309 }
310 }
311
312 fn index_labels_for_vid(&mut self, vid: Vid, labels: &[String]) {
314 for label in labels {
315 self.label_to_vids
316 .entry(label.clone())
317 .or_default()
318 .insert(vid);
319 }
320 }
321
322 fn extid_of(props: &Properties) -> Option<String> {
324 props
325 .get("ext_id")
326 .and_then(|v| v.as_str())
327 .map(str::to_owned)
328 }
329
330 fn sync_extid_index(&mut self, vid: Vid, old: Option<String>, new: Option<String>) {
338 if old == new {
339 return;
340 }
341 if let Some(old) = old
342 && self.extid_index.get(&old) == Some(&vid)
343 {
344 self.extid_index.remove(&old);
345 }
346 if let Some(new) = new {
347 self.extid_index.insert(new, vid);
348 }
349 }
350
351 fn remove_vid_from_label_index(&mut self, vid: Vid) {
353 if let Some(labels) = self.vertex_labels.get(&vid) {
354 for label in labels {
355 if let Some(set) = self.label_to_vids.get_mut(label) {
356 set.remove(&vid);
357 }
358 }
359 }
360 }
361
362 pub fn set_vertex_labels(&mut self, vid: Vid, labels: &[String]) {
373 self.remove_vid_from_label_index(vid);
374 self.vertex_labels.insert(vid, labels.to_vec());
375 self.index_labels_for_vid(vid, labels);
376 self.vertex_label_overwrites.insert(vid);
377 self.current_version += 1;
378 self.mutation_count += 1;
379 }
380
381 fn merge_crdt_properties(entry: &mut Properties, properties: Properties) {
390 if entry.is_empty() {
392 *entry = properties;
393 return;
394 }
395
396 for (k, v) in properties {
397 if let Some(mut new_crdt) = try_as_crdt(&v)
401 && let Some(existing_v) = entry.get(&k)
402 && let Ok(existing_crdt) = serde_json::from_value::<Crdt>(existing_v.clone().into())
403 {
404 if new_crdt.try_merge(&existing_crdt).is_ok()
406 && let Ok(merged_json) = serde_json::to_value(new_crdt)
407 {
408 entry.insert(k, uni_common::Value::from(merged_json));
409 continue;
410 }
411 tracing::warn!(
419 property = %k,
420 existing_variant = existing_crdt.type_name(),
421 "overwriting CRDT property with a different CRDT variant \
422 (last-writer-wins); merged CRDT state is discarded"
423 );
424 } else if try_as_crdt(&v).is_none()
425 && entry.get(&k).is_some_and(|e| try_as_crdt(e).is_some())
426 {
427 tracing::warn!(
434 property = %k,
435 "overwriting CRDT property with non-CRDT value (last-writer-wins); \
436 merged CRDT state is discarded"
437 );
438 }
439 entry.insert(k, v);
441 }
442 }
443
444 fn estimate_properties_size(props: &Properties) -> usize {
446 props.keys().map(|k| k.len() + 32).sum()
447 }
448
449 pub fn size_bytes(&self) -> usize {
452 let mut total = 0;
453
454 total += self.graph.vertex_count() * 8;
456 total += self.graph.edge_count() * 24;
457
458 for props in self.vertex_properties.values() {
460 total += Self::estimate_properties_size(props);
461 }
462 for props in self.edge_properties.values() {
463 total += Self::estimate_properties_size(props);
464 }
465
466 total += self.tombstones.len() * 64;
468 total += self.vertex_tombstones.len() * 8;
469 total += self.edge_versions.len() * 16;
470 total += self.vertex_versions.len() * 16;
471 total += self.edge_endpoints.len() * 28; for labels in self.vertex_labels.values() {
475 total += labels.iter().map(|l| l.len() + 24).sum::<usize>();
476 }
477
478 for (label, vids) in &self.label_to_vids {
480 total += label.len() + 24 + vids.len() * 8 + 48; }
482
483 for type_name in self.edge_types.values() {
485 total += type_name.len() + 24;
486 }
487
488 total += self.vertex_created_at.len() * 16;
490 total += self.vertex_updated_at.len() * 16;
491 total += self.edge_created_at.len() * 16;
492 total += self.edge_updated_at.len() * 16;
493
494 total
495 }
496
497 pub fn new(start_version: u64, wal: Option<Arc<WriteAheadLog>>) -> Self {
498 Self {
499 graph: SimpleGraph::new(),
500 tombstones: HashMap::new(),
501 vertex_tombstones: HashSet::new(),
502 edge_versions: HashMap::new(),
503 vertex_versions: HashMap::new(),
504 edge_properties: HashMap::new(),
505 vertex_properties: HashMap::new(),
506 edge_endpoints: HashMap::new(),
507 vertex_labels: HashMap::new(),
508 label_to_vids: HashMap::new(),
509 vertex_label_overwrites: HashSet::new(),
510 edge_types: HashMap::new(),
511 current_version: start_version,
512 mutation_count: 0,
513 mutation_stats: MutationStats::default(),
514 wal,
515 wal_lsn_at_flush: 0,
516 wal_lsn_at_start: 0,
517 vertex_created_at: HashMap::new(),
518 vertex_updated_at: HashMap::new(),
519 edge_created_at: HashMap::new(),
520 edge_updated_at: HashMap::new(),
521 estimated_size: 0,
522 constraint_index: HashMap::new(),
523 merge_guard_index: HashMap::new(),
524 extid_index: HashMap::new(),
525 vertex_partial_keys: HashMap::new(),
526 edge_partial_keys: HashMap::new(),
527 pending_embeddings: HashMap::new(),
528 occ_read_seq: 0,
529 occ_read_set: None,
530 }
531 }
532
533 pub fn insert_vertex(&mut self, vid: Vid, properties: Properties) {
534 self.insert_vertex_with_labels(vid, properties, &[]);
535 }
536
537 pub fn insert_vertex_with_labels(
539 &mut self,
540 vid: Vid,
541 properties: Properties,
542 labels: &[String],
543 ) {
544 self.insert_vertex_with_labels_impl(vid, properties, labels, false);
545 }
546
547 fn insert_vertex_with_labels_impl(
550 &mut self,
551 vid: Vid,
552 properties: Properties,
553 labels: &[String],
554 skip_wal: bool,
555 ) {
556 self.current_version += 1;
557 let version = self.current_version;
558 let now = now_nanos();
559
560 if !skip_wal && let Some(wal) = &self.wal {
561 let _ = wal.append(Mutation::InsertVertex {
562 vid,
563 properties: properties.clone(),
564 labels: labels.to_vec(),
565 });
566 }
567
568 self.vertex_tombstones.remove(&vid);
569
570 self.vertex_partial_keys.remove(&vid);
573
574 let props_size = Self::estimate_properties_size(&properties);
577 let props_count = properties.len();
578 let tracks_extid = properties.contains_key("ext_id");
579
580 let entry = self.vertex_properties.entry(vid).or_default();
581 let old_extid = if tracks_extid {
582 Self::extid_of(entry)
583 } else {
584 None
585 };
586 Self::merge_crdt_properties(entry, properties);
587 if tracks_extid {
588 let new_extid =
589 Self::extid_of(self.vertex_properties.get(&vid).expect("just inserted"));
590 self.sync_extid_index(vid, old_extid, new_extid);
591 }
592 self.vertex_versions.insert(vid, version);
593
594 self.vertex_created_at.entry(vid).or_insert(now);
596 self.vertex_updated_at.insert(vid, now);
597
598 let labels_size: usize = labels.iter().map(|l| l.len() + 24).sum();
601 let existing = self.vertex_labels.entry(vid).or_default();
602 Self::append_unique_labels(existing, labels);
603 self.index_labels_for_vid(vid, labels);
604
605 self.graph.add_vertex(vid);
606 self.mutation_count += 1;
607 self.mutation_stats.nodes_created += 1;
608 self.mutation_stats.properties_set += props_count;
609 self.mutation_stats.labels_added += labels.len();
610
611 self.estimated_size += 8 + props_size + 16 + labels_size + 32;
612 }
613
614 pub fn insert_vertex_partial_full(
635 &mut self,
636 vid: Vid,
637 props: Properties,
638 touched_keys: HashSet<String>,
639 labels: &[String],
640 ) {
641 self.insert_vertex_with_labels_partial_impl(vid, props, labels, false);
645 self.vertex_partial_keys
646 .entry(vid)
647 .or_default()
648 .extend(touched_keys);
649 }
650
651 pub fn insert_vertex_partial(&mut self, vid: Vid, touched: Properties, labels: &[String]) {
655 let touched_keys: Vec<String> = touched.keys().cloned().collect();
659
660 let already_full = self.vertex_properties.contains_key(&vid)
671 && !self.vertex_partial_keys.contains_key(&vid);
672
673 self.insert_vertex_with_labels_partial_impl(vid, touched, labels, false);
678
679 if !already_full {
680 self.vertex_partial_keys
681 .entry(vid)
682 .or_default()
683 .extend(touched_keys);
684 }
685 }
686
687 fn insert_vertex_with_labels_partial_impl(
691 &mut self,
692 vid: Vid,
693 properties: Properties,
694 labels: &[String],
695 skip_wal: bool,
696 ) {
697 self.current_version += 1;
698 let version = self.current_version;
699 let now = now_nanos();
700
701 if !skip_wal && let Some(wal) = &self.wal {
702 let _ = wal.append(Mutation::InsertVertex {
708 vid,
709 properties: properties.clone(),
710 labels: labels.to_vec(),
711 });
712 }
713
714 self.vertex_tombstones.remove(&vid);
715 let props_size = Self::estimate_properties_size(&properties);
721 let props_count = properties.len();
722 let tracks_extid = properties.contains_key("ext_id");
723
724 let entry = self.vertex_properties.entry(vid).or_default();
725 let old_extid = if tracks_extid {
726 Self::extid_of(entry)
727 } else {
728 None
729 };
730 Self::merge_crdt_properties(entry, properties);
731 if tracks_extid {
732 let new_extid =
733 Self::extid_of(self.vertex_properties.get(&vid).expect("just inserted"));
734 self.sync_extid_index(vid, old_extid, new_extid);
735 }
736 self.vertex_versions.insert(vid, version);
737
738 self.vertex_created_at.entry(vid).or_insert(now);
739 self.vertex_updated_at.insert(vid, now);
740
741 let labels_size: usize = labels.iter().map(|l| l.len() + 24).sum();
742 let existing = self.vertex_labels.entry(vid).or_default();
743 Self::append_unique_labels(existing, labels);
744 self.index_labels_for_vid(vid, labels);
745
746 self.graph.add_vertex(vid);
747 self.mutation_count += 1;
748 self.mutation_stats.properties_set += props_count;
751 self.mutation_stats.labels_added += labels.len();
752
753 self.estimated_size += 8 + props_size + 16 + labels_size + 32;
754 }
755
756 pub fn add_vertex_labels(&mut self, vid: Vid, labels: &[String]) {
758 let existing = self.vertex_labels.entry(vid).or_default();
759 Self::append_unique_labels(existing, labels);
760 self.index_labels_for_vid(vid, labels);
761 }
762
763 pub fn remove_vertex_label(&mut self, vid: Vid, label: &str) -> bool {
766 if let Some(labels) = self.vertex_labels.get_mut(&vid)
767 && let Some(pos) = labels.iter().position(|l| l == label)
768 {
769 labels.remove(pos);
770 if let Some(set) = self.label_to_vids.get_mut(label) {
771 set.remove(&vid);
772 }
773 self.current_version += 1;
774 self.mutation_count += 1;
775 self.mutation_stats.labels_removed += 1;
776 return true;
779 }
780 false
781 }
782
783 pub fn set_edge_type(&mut self, eid: Eid, edge_type: String) {
785 self.edge_types.insert(eid, edge_type);
786 }
787
788 pub fn delete_vertex(&mut self, vid: Vid) -> Result<()> {
789 self.delete_vertex_impl(vid, false)
790 }
791
792 fn delete_vertex_impl(&mut self, vid: Vid, skip_wal: bool) -> Result<()> {
795 self.current_version += 1;
796
797 if !skip_wal && let Some(wal) = &mut self.wal {
798 let labels = self.vertex_labels.get(&vid).cloned().unwrap_or_default();
799 wal.append(Mutation::DeleteVertex { vid, labels })?;
800 }
801
802 self.apply_vertex_deletion(vid);
803 Ok(())
804 }
805
806 fn apply_vertex_deletion(&mut self, vid: Vid) {
810 let version = self.current_version;
811
812 let mut edges_to_remove = HashSet::new();
814
815 for entry in self.graph.neighbors(vid, Direction::Outgoing) {
817 edges_to_remove.insert(entry.eid);
818 }
819
820 for entry in self.graph.neighbors(vid, Direction::Incoming) {
822 edges_to_remove.insert(entry.eid); }
824
825 let cascaded_edges_count = edges_to_remove.len();
826
827 for eid in edges_to_remove {
829 if let Some((src, dst, etype)) = self.edge_endpoints.get(&eid) {
831 self.tombstones.insert(
832 eid,
833 TombstoneEntry {
834 eid,
835 src_vid: *src,
836 dst_vid: *dst,
837 edge_type: *etype,
838 },
839 );
840 self.edge_versions.insert(eid, version);
841 self.edge_endpoints.remove(&eid);
842 self.edge_properties.remove(&eid);
843 self.graph.remove_edge(eid);
844 self.mutation_count += 1;
845 self.mutation_stats.relationships_deleted += 1;
846 }
847 }
848
849 self.remove_vid_from_label_index(vid);
850 self.vertex_tombstones.insert(vid);
851 if let Some(props) = self.vertex_properties.get(&vid)
854 && let Some(ext) = Self::extid_of(props)
855 && self.extid_index.get(&ext) == Some(&vid)
856 {
857 self.extid_index.remove(&ext);
858 }
859 self.vertex_properties.remove(&vid);
860 self.vertex_partial_keys.remove(&vid);
862 self.vertex_versions.insert(vid, version);
863 self.graph.remove_vertex(vid);
864 self.mutation_count += 1;
865 self.mutation_stats.nodes_deleted += 1;
866
867 self.constraint_index.retain(|_, v| *v != vid);
869 self.merge_guard_index.retain(|_, v| *v != vid);
872
873 self.estimated_size += cascaded_edges_count * 72 + 8;
875 }
876
877 pub fn insert_edge(
878 &mut self,
879 src_vid: Vid,
880 dst_vid: Vid,
881 edge_type: u32,
882 eid: Eid,
883 properties: Properties,
884 edge_type_name: Option<String>,
885 ) -> Result<()> {
886 self.insert_edge_impl(
887 src_vid,
888 dst_vid,
889 edge_type,
890 eid,
891 properties,
892 edge_type_name,
893 false,
894 )
895 }
896
897 #[allow(clippy::too_many_arguments)]
900 fn insert_edge_impl(
901 &mut self,
902 src_vid: Vid,
903 dst_vid: Vid,
904 edge_type: u32,
905 eid: Eid,
906 properties: Properties,
907 edge_type_name: Option<String>,
908 skip_wal: bool,
909 ) -> Result<()> {
910 self.current_version += 1;
911 let now = now_nanos();
912
913 if !skip_wal && let Some(wal) = &mut self.wal {
914 wal.append(Mutation::InsertEdge {
915 src_vid,
916 dst_vid,
917 edge_type,
918 eid,
919 version: self.current_version,
920 properties: properties.clone(),
921 edge_type_name: edge_type_name.clone(),
922 })?;
923 }
924
925 self.apply_edge_insertion(src_vid, dst_vid, edge_type, eid, properties)?;
926
927 let type_name_size = if let Some(ref name) = edge_type_name {
929 let size = name.len() + 24;
930 self.edge_types.insert(eid, name.clone());
931 size
932 } else {
933 0
934 };
935
936 self.edge_created_at.entry(eid).or_insert(now);
938 self.edge_updated_at.insert(eid, now);
939
940 self.edge_partial_keys.remove(&eid);
943
944 self.estimated_size += type_name_size;
945
946 Ok(())
947 }
948
949 #[allow(clippy::too_many_arguments)]
954 pub fn insert_edge_partial_full(
955 &mut self,
956 src_vid: Vid,
957 dst_vid: Vid,
958 edge_type: u32,
959 eid: Eid,
960 properties: Properties,
961 edge_type_name: Option<String>,
962 touched_keys: HashSet<String>,
963 ) -> Result<()> {
964 self.current_version += 1;
965 let now = now_nanos();
966
967 if let Some(wal) = &mut self.wal {
968 wal.append(Mutation::InsertEdge {
969 src_vid,
970 dst_vid,
971 edge_type,
972 eid,
973 version: self.current_version,
974 properties: properties.clone(),
975 edge_type_name: edge_type_name.clone(),
976 })?;
977 }
978
979 self.apply_edge_insertion(src_vid, dst_vid, edge_type, eid, properties)?;
980
981 self.edge_partial_keys
985 .entry(eid)
986 .or_default()
987 .extend(touched_keys);
988
989 let type_name_size = if let Some(ref name) = edge_type_name {
990 let size = name.len() + 24;
991 self.edge_types.insert(eid, name.clone());
992 size
993 } else {
994 0
995 };
996
997 self.edge_created_at.entry(eid).or_insert(now);
998 self.edge_updated_at.insert(eid, now);
999
1000 self.estimated_size += type_name_size;
1001
1002 Ok(())
1003 }
1004
1005 fn apply_edge_insertion(
1014 &mut self,
1015 src_vid: Vid,
1016 dst_vid: Vid,
1017 edge_type: u32,
1018 eid: Eid,
1019 properties: Properties,
1020 ) -> Result<()> {
1021 let version = self.current_version;
1022
1023 if self.vertex_tombstones.contains(&src_vid) {
1026 anyhow::bail!(
1027 "Cannot insert edge: source vertex {} has been deleted (issue #77)",
1028 src_vid
1029 );
1030 }
1031 if self.vertex_tombstones.contains(&dst_vid) {
1032 anyhow::bail!(
1033 "Cannot insert edge: destination vertex {} has been deleted (issue #77)",
1034 dst_vid
1035 );
1036 }
1037
1038 if !self.graph.contains_vertex(src_vid) {
1043 self.graph.add_vertex(src_vid);
1044 }
1045 if !self.graph.contains_vertex(dst_vid) {
1046 self.graph.add_vertex(dst_vid);
1047 }
1048
1049 self.graph.add_edge(src_vid, dst_vid, eid, edge_type);
1050
1051 let props_size = Self::estimate_properties_size(&properties);
1053 let props_count = properties.len();
1054 if !properties.is_empty() {
1055 let entry = self.edge_properties.entry(eid).or_default();
1056 Self::merge_crdt_properties(entry, properties);
1057 }
1058
1059 self.edge_versions.insert(eid, version);
1060 self.edge_endpoints
1061 .insert(eid, (src_vid, dst_vid, edge_type));
1062 self.tombstones.remove(&eid);
1063 self.mutation_count += 1;
1064 self.mutation_stats.relationships_created += 1;
1065 self.mutation_stats.properties_set += props_count;
1066
1067 self.estimated_size += 24 + props_size + 16 + 28 + 32;
1069
1070 Ok(())
1071 }
1072
1073 pub fn delete_edge(
1074 &mut self,
1075 eid: Eid,
1076 src_vid: Vid,
1077 dst_vid: Vid,
1078 edge_type: u32,
1079 ) -> Result<()> {
1080 self.delete_edge_impl(eid, src_vid, dst_vid, edge_type, false)
1081 }
1082
1083 fn delete_edge_impl(
1086 &mut self,
1087 eid: Eid,
1088 src_vid: Vid,
1089 dst_vid: Vid,
1090 edge_type: u32,
1091 skip_wal: bool,
1092 ) -> Result<()> {
1093 self.current_version += 1;
1094 let now = now_nanos();
1095
1096 if !skip_wal && let Some(wal) = &mut self.wal {
1097 wal.append(Mutation::DeleteEdge {
1098 eid,
1099 src_vid,
1100 dst_vid,
1101 edge_type,
1102 version: self.current_version,
1103 })?;
1104 }
1105
1106 self.apply_edge_deletion(eid, src_vid, dst_vid, edge_type);
1107
1108 self.edge_updated_at.insert(eid, now);
1110
1111 Ok(())
1112 }
1113
1114 fn apply_edge_deletion(&mut self, eid: Eid, src_vid: Vid, dst_vid: Vid, edge_type: u32) {
1118 let version = self.current_version;
1119
1120 self.tombstones.insert(
1121 eid,
1122 TombstoneEntry {
1123 eid,
1124 src_vid,
1125 dst_vid,
1126 edge_type,
1127 },
1128 );
1129 self.edge_versions.insert(eid, version);
1130 self.edge_partial_keys.remove(&eid);
1133 self.graph.remove_edge(eid);
1134 self.mutation_count += 1;
1135 self.mutation_stats.relationships_deleted += 1;
1136
1137 self.estimated_size += 80;
1139 }
1140
1141 pub fn get_neighbors(
1144 &self,
1145 vid: Vid,
1146 edge_type: u32,
1147 direction: Direction,
1148 ) -> Vec<(Vid, Eid, u64)> {
1149 let edges = self.graph.neighbors(vid, direction);
1150
1151 edges
1152 .iter()
1153 .filter(|e| e.edge_type == edge_type && !self.is_tombstoned(e.eid))
1154 .map(|e| {
1155 let neighbor = match direction {
1156 Direction::Outgoing => e.dst_vid,
1157 Direction::Incoming => e.src_vid,
1158 };
1159 let version = self.edge_versions.get(&e.eid).copied().unwrap_or(0);
1160 (neighbor, e.eid, version)
1161 })
1162 .collect()
1163 }
1164
1165 pub fn is_tombstoned(&self, eid: Eid) -> bool {
1166 self.tombstones.contains_key(&eid)
1167 }
1168
1169 pub fn vids_for_label(&self, label_name: &str) -> Vec<Vid> {
1172 self.label_to_vids
1173 .get(label_name)
1174 .map(|set| set.iter().copied().collect())
1175 .unwrap_or_default()
1176 }
1177
1178 pub fn all_vertex_vids(&self) -> Vec<Vid> {
1182 self.vertex_properties.keys().copied().collect()
1183 }
1184
1185 pub fn vids_for_labels(&self, label_names: &[&str]) -> Vec<Vid> {
1188 let mut result = HashSet::new();
1189 for label_name in label_names {
1190 if let Some(set) = self.label_to_vids.get(*label_name) {
1191 result.extend(set.iter().copied());
1192 }
1193 }
1194 result.into_iter().collect()
1195 }
1196
1197 pub fn vids_with_all_labels(&self, label_names: &[&str]) -> Vec<Vid> {
1200 if label_names.is_empty() {
1201 return Vec::new();
1202 }
1203 let sets: Vec<&HashSet<Vid>> = match label_names
1206 .iter()
1207 .map(|ln| self.label_to_vids.get(*ln))
1208 .collect::<Option<Vec<_>>>()
1209 {
1210 Some(s) => s,
1211 None => return Vec::new(),
1212 };
1213 let smallest = sets.iter().min_by_key(|s| s.len()).unwrap();
1215 smallest
1216 .iter()
1217 .copied()
1218 .filter(|vid| sets.iter().all(|s| s.contains(vid)))
1219 .collect()
1220 }
1221
1222 pub fn get_vertex_labels(&self, vid: Vid) -> Option<&[String]> {
1224 self.vertex_labels.get(&vid).map(|v| v.as_slice())
1225 }
1226
1227 pub fn get_edge_type(&self, eid: Eid) -> Option<&str> {
1229 self.edge_types.get(&eid).map(|s| s.as_str())
1230 }
1231
1232 pub fn eids_for_type(&self, type_name: &str) -> Vec<Eid> {
1235 self.edge_types
1236 .iter()
1237 .filter(|(eid, etype)| *etype == type_name && !self.tombstones.contains_key(eid))
1238 .map(|(eid, _)| *eid)
1239 .collect()
1240 }
1241
1242 pub fn all_edge_eids(&self) -> Vec<Eid> {
1246 self.edge_endpoints
1247 .keys()
1248 .filter(|eid| !self.tombstones.contains_key(eid))
1249 .copied()
1250 .collect()
1251 }
1252
1253 pub fn get_edge_endpoints(&self, eid: Eid) -> Option<(Vid, Vid)> {
1255 self.edge_endpoints
1256 .get(&eid)
1257 .map(|(src, dst, _)| (*src, *dst))
1258 }
1259
1260 pub fn get_edge_endpoint_full(&self, eid: Eid) -> Option<(Vid, Vid, u32)> {
1262 self.edge_endpoints.get(&eid).copied()
1263 }
1264
1265 pub fn insert_constraint_key(&mut self, key: Vec<u8>, vid: Vid) {
1267 self.constraint_index.insert(key, vid);
1268 }
1269
1270 pub fn has_constraint_key(&self, key: &[u8], exclude_vid: Vid) -> bool {
1273 self.constraint_index
1274 .get(key)
1275 .is_some_and(|&v| v != exclude_vid)
1276 }
1277
1278 pub fn insert_merge_guard_key(&mut self, key: Vec<u8>, vid: Vid) {
1280 self.merge_guard_index.insert(key, vid);
1281 }
1282
1283 pub fn has_merge_guard_key(&self, key: &[u8], exclude_vid: Vid) -> bool {
1286 self.merge_guard_index
1287 .get(key)
1288 .is_some_and(|&v| v != exclude_vid)
1289 }
1290
1291 #[instrument(skip(self, other), level = "trace")]
1292 pub fn validate_merge_edge_endpoints(&self, other: &L0Buffer) -> Result<()> {
1311 let is_deleted = |vid: &Vid| {
1315 (self.vertex_tombstones.contains(vid) || other.vertex_tombstones.contains(vid))
1316 && !other.vertex_properties.contains_key(vid)
1317 };
1318 for (eid, (src_vid, dst_vid, _etype)) in &other.edge_endpoints {
1319 if other.tombstones.contains_key(eid) {
1320 continue; }
1322 if is_deleted(src_vid) {
1323 anyhow::bail!(
1324 "Cannot insert edge {}: source vertex {} has been deleted (issue #77)",
1325 eid,
1326 src_vid
1327 );
1328 }
1329 if is_deleted(dst_vid) {
1330 anyhow::bail!(
1331 "Cannot insert edge {}: destination vertex {} has been deleted (issue #77)",
1332 eid,
1333 dst_vid
1334 );
1335 }
1336 }
1337 Ok(())
1338 }
1339
1340 pub fn merge(&mut self, other: &L0Buffer) -> Result<()> {
1341 self.validate_merge_edge_endpoints(other)?;
1345 self.merge_validated(
1346 other,
1347 other.vertex_properties.clone(),
1348 other.edge_properties.clone(),
1349 )
1350 }
1351
1352 pub fn merge_take(&mut self, other: &mut L0Buffer) -> Result<()> {
1361 self.validate_merge_edge_endpoints(other)?;
1364 let vertex_props = std::mem::take(&mut other.vertex_properties);
1365 let edge_props = std::mem::take(&mut other.edge_properties);
1366 self.merge_validated(other, vertex_props, edge_props)
1367 }
1368
1369 fn merge_validated(
1373 &mut self,
1374 other: &L0Buffer,
1375 vertex_props: HashMap<Vid, Properties>,
1376 mut edge_props: HashMap<Eid, Properties>,
1377 ) -> Result<()> {
1378 trace!(
1379 other_mutation_count = other.mutation_count,
1380 "Merging L0 buffer"
1381 );
1382 for &vid in &other.vertex_tombstones {
1387 self.delete_vertex_impl(vid, true)?;
1388 }
1389
1390 for (vid, props) in vertex_props {
1391 let labels = other.vertex_labels.get(&vid).cloned().unwrap_or_default();
1392 self.insert_vertex_with_labels_impl(vid, props, &labels, true);
1393 }
1394
1395 for (vid, labels) in &other.vertex_labels {
1397 if !self.vertex_labels.contains_key(vid) {
1398 self.vertex_labels.insert(*vid, labels.clone());
1399 for label in labels {
1400 self.label_to_vids
1401 .entry(label.clone())
1402 .or_default()
1403 .insert(*vid);
1404 }
1405 }
1406 }
1407
1408 for vid in &other.vertex_label_overwrites {
1415 if other.vertex_tombstones.contains(vid) {
1416 continue;
1417 }
1418 let labels = other.vertex_labels.get(vid).cloned().unwrap_or_default();
1419 self.remove_vid_from_label_index(*vid);
1420 self.vertex_labels.insert(*vid, labels.clone());
1421 self.index_labels_for_vid(*vid, &labels);
1422 self.vertex_label_overwrites.insert(*vid);
1427 }
1428
1429 for (eid, (src, dst, etype)) in &other.edge_endpoints {
1431 if other.tombstones.contains_key(eid) {
1432 self.delete_edge_impl(*eid, *src, *dst, *etype, true)?;
1433 } else {
1434 let props = edge_props.remove(eid).unwrap_or_default();
1435 let etype_name = other.edge_types.get(eid).cloned();
1436 self.insert_edge_impl(*src, *dst, *etype, *eid, props, etype_name, true)?;
1437 }
1438 }
1439
1440 for (eid, tombstone) in &other.tombstones {
1444 if !other.edge_endpoints.contains_key(eid) {
1445 self.delete_edge_impl(
1446 *eid,
1447 tombstone.src_vid,
1448 tombstone.dst_vid,
1449 tombstone.edge_type,
1450 true,
1451 )?;
1452 }
1453 }
1454
1455 for (vid, ts) in &other.vertex_created_at {
1460 self.vertex_created_at.entry(*vid).or_insert(*ts); }
1462 for (vid, ts) in &other.vertex_updated_at {
1463 self.vertex_updated_at.insert(*vid, *ts); }
1465
1466 for (eid, ts) in &other.edge_created_at {
1467 self.edge_created_at.entry(*eid).or_insert(*ts); }
1469 for (eid, ts) in &other.edge_updated_at {
1470 self.edge_updated_at.insert(*eid, *ts); }
1472
1473 self.estimated_size += other.estimated_size;
1476
1477 for (key, vid) in &other.constraint_index {
1479 self.constraint_index.insert(key.clone(), *vid);
1480 }
1481
1482 for (key, vid) in &other.merge_guard_index {
1485 self.merge_guard_index.insert(key.clone(), *vid);
1486 }
1487
1488 for (vid, label) in &other.pending_embeddings {
1494 self.pending_embeddings.insert(*vid, label.clone());
1495 }
1496
1497 Ok(())
1498 }
1499
1500 #[instrument(skip(self, mutations), level = "debug")]
1504 pub fn replay_mutations(&mut self, mutations: Vec<Mutation>) -> Result<()> {
1505 trace!(count = mutations.len(), "Replaying mutations");
1506 for mutation in mutations {
1507 match mutation {
1508 Mutation::InsertVertex {
1509 vid,
1510 properties,
1511 labels,
1512 } => {
1513 self.current_version += 1;
1515 let version = self.current_version;
1516
1517 self.vertex_tombstones.remove(&vid);
1518 let tracks_extid = properties.contains_key("ext_id");
1519 let entry = self.vertex_properties.entry(vid).or_default();
1520 let old_extid = if tracks_extid {
1521 Self::extid_of(entry)
1522 } else {
1523 None
1524 };
1525 Self::merge_crdt_properties(entry, properties);
1526 if tracks_extid {
1527 let new_extid = Self::extid_of(
1528 self.vertex_properties.get(&vid).expect("just inserted"),
1529 );
1530 self.sync_extid_index(vid, old_extid, new_extid);
1531 }
1532 self.vertex_versions.insert(vid, version);
1533 self.graph.add_vertex(vid);
1534 self.mutation_count += 1;
1535
1536 let existing = self.vertex_labels.entry(vid).or_default();
1538 Self::append_unique_labels(existing, &labels);
1539 for label in &labels {
1540 self.label_to_vids
1541 .entry(label.clone())
1542 .or_default()
1543 .insert(vid);
1544 }
1545 }
1546 Mutation::DeleteVertex { vid, labels } => {
1547 self.current_version += 1;
1548 if !labels.is_empty() {
1550 let existing = self.vertex_labels.entry(vid).or_default();
1551 Self::append_unique_labels(existing, &labels);
1552 for label in &labels {
1553 self.label_to_vids
1554 .entry(label.clone())
1555 .or_default()
1556 .insert(vid);
1557 }
1558 }
1559 self.apply_vertex_deletion(vid);
1560 }
1561 Mutation::SetVertexLabels { vid, labels } => {
1562 self.current_version += 1;
1567 self.remove_vid_from_label_index(vid);
1568 self.vertex_labels.insert(vid, labels.clone());
1569 self.index_labels_for_vid(vid, &labels);
1570 self.mutation_count += 1;
1571 }
1572 Mutation::InsertEdge {
1573 src_vid,
1574 dst_vid,
1575 edge_type,
1576 eid,
1577 version: _,
1578 properties,
1579 edge_type_name,
1580 } => {
1581 self.current_version += 1;
1582 match self.apply_edge_insertion(src_vid, dst_vid, edge_type, eid, properties) {
1587 Ok(()) => {
1588 if let Some(name) = edge_type_name {
1590 self.edge_types.insert(eid, name);
1591 }
1592 }
1593 Err(e) => {
1594 tracing::warn!(
1595 ?eid,
1596 ?src_vid,
1597 ?dst_vid,
1598 error = %e,
1599 "WAL replay: skipping edge insertion to a deleted endpoint (issue #77)"
1600 );
1601 }
1602 }
1603 }
1604 Mutation::DeleteEdge {
1605 eid,
1606 src_vid,
1607 dst_vid,
1608 edge_type,
1609 version: _,
1610 } => {
1611 self.current_version += 1;
1612 self.apply_edge_deletion(eid, src_vid, dst_vid, edge_type);
1613 }
1614 }
1615 }
1616 Ok(())
1617 }
1618}
1619
1620#[cfg(test)]
1621mod tests {
1622 use super::*;
1623
1624 #[test]
1625 fn test_l0_buffer_ops() -> Result<()> {
1626 let mut l0 = L0Buffer::new(0, None);
1627 let vid_a = Vid::new(1);
1628 let vid_b = Vid::new(2);
1629 let eid_ab = Eid::new(101);
1630
1631 l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
1632
1633 let neighbors = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
1634 assert_eq!(neighbors.len(), 1);
1635 assert_eq!(neighbors[0].0, vid_b);
1636 assert_eq!(neighbors[0].1, eid_ab);
1637
1638 l0.delete_edge(eid_ab, vid_a, vid_b, 1)?;
1639 assert!(l0.is_tombstoned(eid_ab));
1640
1641 let neighbors_after = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
1643 assert_eq!(neighbors_after.len(), 0);
1644
1645 Ok(())
1646 }
1647
1648 #[test]
1654 fn validate_merge_rejects_edge_to_tombstoned_endpoint() {
1655 let mut main = L0Buffer::new(0, None);
1656 let vid_a = Vid::new(1);
1657 let vid_b = Vid::new(2);
1658 main.insert_vertex(vid_a, HashMap::new());
1659 main.insert_vertex(vid_b, HashMap::new());
1660 main.delete_vertex(vid_b).unwrap(); let mut tx = L0Buffer::new(0, None);
1664 let eid = Eid::new(101);
1665 tx.insert_edge(vid_a, vid_b, 1, eid, HashMap::new(), None)
1666 .unwrap();
1667
1668 assert!(
1669 main.validate_merge_edge_endpoints(&tx).is_err(),
1670 "edge to a tombstoned endpoint must be rejected before merge"
1671 );
1672 assert!(
1674 main.merge(&tx).is_err(),
1675 "merge must reject, not bail mid-apply"
1676 );
1677 assert!(
1678 !main.edge_endpoints.contains_key(&eid),
1679 "a rejected merge must not have partially applied the edge"
1680 );
1681 }
1682
1683 #[test]
1686 fn validate_merge_allows_edge_when_endpoint_reinserted() {
1687 let mut main = L0Buffer::new(0, None);
1688 let vid_a = Vid::new(1);
1689 let vid_b = Vid::new(2);
1690 main.insert_vertex(vid_a, HashMap::new());
1691 main.insert_vertex(vid_b, HashMap::new());
1692 main.delete_vertex(vid_b).unwrap();
1693
1694 let mut tx = L0Buffer::new(0, None);
1695 tx.insert_vertex(vid_b, HashMap::new()); let eid = Eid::new(101);
1697 tx.insert_edge(vid_a, vid_b, 1, eid, HashMap::new(), None)
1698 .unwrap();
1699
1700 assert!(main.validate_merge_edge_endpoints(&tx).is_ok());
1701 assert!(main.merge(&tx).is_ok());
1702 assert!(main.edge_endpoints.contains_key(&eid));
1703 }
1704
1705 #[test]
1707 fn validate_merge_allows_edge_to_live_endpoints() {
1708 let mut main = L0Buffer::new(0, None);
1709 let vid_a = Vid::new(1);
1710 let vid_b = Vid::new(2);
1711 main.insert_vertex(vid_a, HashMap::new());
1712 main.insert_vertex(vid_b, HashMap::new());
1713
1714 let mut tx = L0Buffer::new(0, None);
1715 let eid = Eid::new(101);
1716 tx.insert_edge(vid_a, vid_b, 1, eid, HashMap::new(), None)
1717 .unwrap();
1718
1719 assert!(main.validate_merge_edge_endpoints(&tx).is_ok());
1720 assert!(main.merge(&tx).is_ok());
1721 assert!(main.edge_endpoints.contains_key(&eid));
1722 }
1723
1724 #[test]
1725 fn test_l0_buffer_multiple_edges() -> Result<()> {
1726 let mut l0 = L0Buffer::new(0, None);
1727 let vid_a = Vid::new(1);
1728 let vid_b = Vid::new(2);
1729 let vid_c = Vid::new(3);
1730 let eid_ab = Eid::new(101);
1731 let eid_ac = Eid::new(102);
1732
1733 l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
1734 l0.insert_edge(vid_a, vid_c, 1, eid_ac, HashMap::new(), None)?;
1735
1736 let neighbors = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
1737 assert_eq!(neighbors.len(), 2);
1738
1739 l0.delete_edge(eid_ab, vid_a, vid_b, 1)?;
1741
1742 let neighbors_after = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
1744 assert_eq!(neighbors_after.len(), 1);
1745 assert_eq!(neighbors_after[0].0, vid_c);
1746
1747 Ok(())
1748 }
1749
1750 #[test]
1751 fn test_l0_buffer_edge_type_filter() -> Result<()> {
1752 let mut l0 = L0Buffer::new(0, None);
1753 let vid_a = Vid::new(1);
1754 let vid_b = Vid::new(2);
1755 let vid_c = Vid::new(3);
1756 let eid_ab = Eid::new(101);
1757 let eid_ac = Eid::new(201); l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
1760 l0.insert_edge(vid_a, vid_c, 2, eid_ac, HashMap::new(), None)?;
1761
1762 let type1_neighbors = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
1764 assert_eq!(type1_neighbors.len(), 1);
1765 assert_eq!(type1_neighbors[0].0, vid_b);
1766
1767 let type2_neighbors = l0.get_neighbors(vid_a, 2, Direction::Outgoing);
1769 assert_eq!(type2_neighbors.len(), 1);
1770 assert_eq!(type2_neighbors[0].0, vid_c);
1771
1772 Ok(())
1773 }
1774
1775 #[test]
1776 fn test_l0_buffer_incoming_edges() -> Result<()> {
1777 let mut l0 = L0Buffer::new(0, None);
1778 let vid_a = Vid::new(1);
1779 let vid_b = Vid::new(2);
1780 let vid_c = Vid::new(3);
1781 let eid_ab = Eid::new(101);
1782 let eid_cb = Eid::new(102);
1783
1784 l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
1786 l0.insert_edge(vid_c, vid_b, 1, eid_cb, HashMap::new(), None)?;
1787
1788 let incoming = l0.get_neighbors(vid_b, 1, Direction::Incoming);
1790 assert_eq!(incoming.len(), 2);
1791
1792 Ok(())
1793 }
1794
1795 #[test]
1797 fn test_merge_empty_props_edge() -> Result<()> {
1798 let mut main_l0 = L0Buffer::new(0, None);
1799 let mut tx_l0 = L0Buffer::new(0, None);
1800
1801 let vid_a = Vid::new(1);
1802 let vid_b = Vid::new(2);
1803 let eid_ab = Eid::new(101);
1804
1805 tx_l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
1807
1808 assert!(tx_l0.edge_endpoints.contains_key(&eid_ab));
1810 assert!(!tx_l0.edge_properties.contains_key(&eid_ab)); main_l0.merge(&tx_l0)?;
1814
1815 assert!(main_l0.edge_endpoints.contains_key(&eid_ab));
1817 let neighbors = main_l0.get_neighbors(vid_a, 1, Direction::Outgoing);
1818 assert_eq!(neighbors.len(), 1);
1819 assert_eq!(neighbors[0].0, vid_b);
1820
1821 Ok(())
1822 }
1823
1824 #[test]
1826 fn test_replay_crdt_merge() -> Result<()> {
1827 use crate::runtime::wal::Mutation;
1828 use serde_json::json;
1829 use uni_common::Value;
1830
1831 let mut l0 = L0Buffer::new(0, None);
1832 let vid = Vid::new(1);
1833
1834 let counter1: Value = json!({
1837 "t": "gc",
1838 "d": {"counts": {"node1": 5}}
1839 })
1840 .into();
1841 let counter2: Value = json!({
1842 "t": "gc",
1843 "d": {"counts": {"node2": 3}}
1844 })
1845 .into();
1846
1847 let mut props1 = HashMap::new();
1849 props1.insert("counter".to_string(), counter1.clone());
1850 l0.replay_mutations(vec![Mutation::InsertVertex {
1851 vid,
1852 properties: props1,
1853 labels: vec![],
1854 }])?;
1855
1856 let mut props2 = HashMap::new();
1858 props2.insert("counter".to_string(), counter2.clone());
1859 l0.replay_mutations(vec![Mutation::InsertVertex {
1860 vid,
1861 properties: props2,
1862 labels: vec![],
1863 }])?;
1864
1865 let stored_props = l0.vertex_properties.get(&vid).unwrap();
1867 let stored_counter = stored_props.get("counter").unwrap();
1868
1869 let stored_json: serde_json::Value = stored_counter.clone().into();
1871 let data = stored_json.get("d").unwrap();
1873 let counts = data.get("counts").unwrap();
1874 assert_eq!(counts.get("node1"), Some(&json!(5)));
1875 assert_eq!(counts.get("node2"), Some(&json!(3)));
1876
1877 Ok(())
1878 }
1879
1880 #[test]
1881 fn test_merge_preserves_vertex_timestamps() -> Result<()> {
1882 let mut l0_main = L0Buffer::new(0, None);
1883 let mut l0_tx = L0Buffer::new(0, None);
1884 let vid = Vid::new(1);
1885
1886 let ts_main_created = 1000;
1888 let ts_main_updated = 1100;
1889 l0_main.insert_vertex(vid, HashMap::new());
1890 l0_main.vertex_created_at.insert(vid, ts_main_created);
1891 l0_main.vertex_updated_at.insert(vid, ts_main_updated);
1892
1893 let ts_tx_created = 2000; let ts_tx_updated = 2100; l0_tx.insert_vertex(vid, HashMap::new());
1897 l0_tx.vertex_created_at.insert(vid, ts_tx_created);
1898 l0_tx.vertex_updated_at.insert(vid, ts_tx_updated);
1899
1900 l0_main.merge(&l0_tx)?;
1902
1903 assert_eq!(
1905 *l0_main.vertex_created_at.get(&vid).unwrap(),
1906 ts_main_created,
1907 "created_at should preserve oldest timestamp"
1908 );
1909
1910 assert_eq!(
1912 *l0_main.vertex_updated_at.get(&vid).unwrap(),
1913 ts_tx_updated,
1914 "updated_at should use latest timestamp"
1915 );
1916
1917 Ok(())
1918 }
1919
1920 #[test]
1921 fn test_merge_preserves_edge_timestamps() -> Result<()> {
1922 let mut l0_main = L0Buffer::new(0, None);
1923 let mut l0_tx = L0Buffer::new(0, None);
1924 let vid_a = Vid::new(1);
1925 let vid_b = Vid::new(2);
1926 let eid = Eid::new(100);
1927
1928 let ts_main_created = 1000;
1930 let ts_main_updated = 1100;
1931 l0_main.insert_edge(vid_a, vid_b, 1, eid, HashMap::new(), None)?;
1932 l0_main.edge_created_at.insert(eid, ts_main_created);
1933 l0_main.edge_updated_at.insert(eid, ts_main_updated);
1934
1935 let ts_tx_created = 2000; let ts_tx_updated = 2100; l0_tx.insert_edge(vid_a, vid_b, 1, eid, HashMap::new(), None)?;
1939 l0_tx.edge_created_at.insert(eid, ts_tx_created);
1940 l0_tx.edge_updated_at.insert(eid, ts_tx_updated);
1941
1942 l0_main.merge(&l0_tx)?;
1944
1945 assert_eq!(
1947 *l0_main.edge_created_at.get(&eid).unwrap(),
1948 ts_main_created,
1949 "edge created_at should preserve oldest timestamp"
1950 );
1951
1952 assert_eq!(
1954 *l0_main.edge_updated_at.get(&eid).unwrap(),
1955 ts_tx_updated,
1956 "edge updated_at should use latest timestamp"
1957 );
1958
1959 Ok(())
1960 }
1961
1962 #[test]
1963 fn test_merge_created_at_not_overwritten_for_existing_vertex() -> Result<()> {
1964 use uni_common::Value;
1965
1966 let mut l0_main = L0Buffer::new(0, None);
1967 let mut l0_tx = L0Buffer::new(0, None);
1968 let vid = Vid::new(1);
1969
1970 let ts_original = 1000;
1972 l0_main.insert_vertex(vid, HashMap::new());
1973 l0_main.vertex_created_at.insert(vid, ts_original);
1974 l0_main.vertex_updated_at.insert(vid, ts_original);
1975
1976 let ts_tx = 2000;
1978 let mut props = HashMap::new();
1979 props.insert("updated".to_string(), Value::String("yes".to_string()));
1980 l0_tx.insert_vertex(vid, props);
1981 l0_tx.vertex_created_at.insert(vid, ts_tx);
1982 l0_tx.vertex_updated_at.insert(vid, ts_tx);
1983
1984 l0_main.merge(&l0_tx)?;
1986
1987 assert_eq!(
1989 *l0_main.vertex_created_at.get(&vid).unwrap(),
1990 ts_original,
1991 "created_at must not be overwritten for existing vertex"
1992 );
1993
1994 assert_eq!(
1996 *l0_main.vertex_updated_at.get(&vid).unwrap(),
1997 ts_tx,
1998 "updated_at should reflect transaction timestamp"
1999 );
2000
2001 assert!(
2003 l0_main
2004 .vertex_properties
2005 .get(&vid)
2006 .unwrap()
2007 .contains_key("updated")
2008 );
2009
2010 Ok(())
2011 }
2012
2013 #[test]
2015 fn test_replay_mutations_preserves_vertex_labels() -> Result<()> {
2016 use crate::runtime::wal::Mutation;
2017
2018 let mut l0 = L0Buffer::new(0, None);
2019 let vid = Vid::new(42);
2020
2021 let mutations = vec![Mutation::InsertVertex {
2023 vid,
2024 properties: {
2025 let mut props = HashMap::new();
2026 props.insert(
2027 "name".to_string(),
2028 uni_common::Value::String("Alice".to_string()),
2029 );
2030 props
2031 },
2032 labels: vec!["Person".to_string(), "User".to_string()],
2033 }];
2034
2035 l0.replay_mutations(mutations)?;
2037
2038 assert!(l0.vertex_properties.contains_key(&vid));
2040
2041 let labels = l0.get_vertex_labels(vid).expect("Labels should exist");
2043 assert_eq!(labels.len(), 2);
2044 assert!(labels.contains(&"Person".to_string()));
2045 assert!(labels.contains(&"User".to_string()));
2046
2047 let person_vids = l0.vids_for_label("Person");
2049 assert_eq!(person_vids.len(), 1);
2050 assert_eq!(person_vids[0], vid);
2051
2052 let user_vids = l0.vids_for_label("User");
2053 assert_eq!(user_vids.len(), 1);
2054 assert_eq!(user_vids[0], vid);
2055
2056 Ok(())
2057 }
2058
2059 #[test]
2061 fn test_replay_mutations_preserves_delete_vertex_labels() -> Result<()> {
2062 use crate::runtime::wal::Mutation;
2063
2064 let mut l0 = L0Buffer::new(0, None);
2065 let vid = Vid::new(99);
2066
2067 l0.insert_vertex_with_labels(
2069 vid,
2070 HashMap::new(),
2071 &["Person".to_string(), "Admin".to_string()],
2072 );
2073
2074 assert!(l0.vertex_properties.contains_key(&vid));
2076 let labels = l0.get_vertex_labels(vid).expect("Labels should exist");
2077 assert_eq!(labels.len(), 2);
2078
2079 let mutations = vec![Mutation::DeleteVertex {
2081 vid,
2082 labels: vec!["Person".to_string(), "Admin".to_string()],
2083 }];
2084
2085 l0.replay_mutations(mutations)?;
2087
2088 assert!(l0.vertex_tombstones.contains(&vid));
2090
2091 let labels = l0.get_vertex_labels(vid);
2094 assert!(
2095 labels.is_some(),
2096 "Labels should be preserved even after deletion for tombstone flushing"
2097 );
2098
2099 Ok(())
2100 }
2101
2102 #[test]
2104 fn test_replay_mutations_preserves_edge_type_name() -> Result<()> {
2105 use crate::runtime::wal::Mutation;
2106
2107 let mut l0 = L0Buffer::new(0, None);
2108 let src = Vid::new(1);
2109 let dst = Vid::new(2);
2110 let eid = Eid::new(500);
2111 let edge_type = 100;
2112
2113 let mutations = vec![Mutation::InsertEdge {
2115 src_vid: src,
2116 dst_vid: dst,
2117 edge_type,
2118 eid,
2119 version: 1,
2120 properties: {
2121 let mut props = HashMap::new();
2122 props.insert("since".to_string(), uni_common::Value::Int(2020));
2123 props
2124 },
2125 edge_type_name: Some("KNOWS".to_string()),
2126 }];
2127
2128 l0.replay_mutations(mutations)?;
2130
2131 assert!(l0.edge_endpoints.contains_key(&eid));
2133
2134 let type_name = l0.get_edge_type(eid).expect("Edge type name should exist");
2136 assert_eq!(type_name, "KNOWS");
2137
2138 let knows_eids = l0.eids_for_type("KNOWS");
2140 assert_eq!(knows_eids.len(), 1);
2141 assert_eq!(knows_eids[0], eid);
2142
2143 Ok(())
2144 }
2145
2146 #[test]
2148 fn test_edge_type_mapping_survives_multiple_replays() -> Result<()> {
2149 use crate::runtime::wal::Mutation;
2150
2151 let mut l0 = L0Buffer::new(0, None);
2152
2153 let mutations = vec![
2155 Mutation::InsertEdge {
2156 src_vid: Vid::new(1),
2157 dst_vid: Vid::new(2),
2158 edge_type: 100,
2159 eid: Eid::new(1000),
2160 version: 1,
2161 properties: HashMap::new(),
2162 edge_type_name: Some("KNOWS".to_string()),
2163 },
2164 Mutation::InsertEdge {
2165 src_vid: Vid::new(2),
2166 dst_vid: Vid::new(3),
2167 edge_type: 101,
2168 eid: Eid::new(1001),
2169 version: 2,
2170 properties: HashMap::new(),
2171 edge_type_name: Some("LIKES".to_string()),
2172 },
2173 Mutation::InsertEdge {
2174 src_vid: Vid::new(3),
2175 dst_vid: Vid::new(1),
2176 edge_type: 100,
2177 eid: Eid::new(1002),
2178 version: 3,
2179 properties: HashMap::new(),
2180 edge_type_name: Some("KNOWS".to_string()),
2181 },
2182 ];
2183
2184 l0.replay_mutations(mutations)?;
2185
2186 assert_eq!(l0.get_edge_type(Eid::new(1000)), Some("KNOWS"));
2188 assert_eq!(l0.get_edge_type(Eid::new(1001)), Some("LIKES"));
2189 assert_eq!(l0.get_edge_type(Eid::new(1002)), Some("KNOWS"));
2190
2191 let knows_edges = l0.eids_for_type("KNOWS");
2193 assert_eq!(knows_edges.len(), 2);
2194 assert!(knows_edges.contains(&Eid::new(1000)));
2195 assert!(knows_edges.contains(&Eid::new(1002)));
2196
2197 let likes_edges = l0.eids_for_type("LIKES");
2198 assert_eq!(likes_edges.len(), 1);
2199 assert_eq!(likes_edges[0], Eid::new(1001));
2200
2201 Ok(())
2202 }
2203
2204 #[test]
2206 fn test_replay_mutations_combined_labels_and_edge_types() -> Result<()> {
2207 use crate::runtime::wal::Mutation;
2208
2209 let mut l0 = L0Buffer::new(0, None);
2210 let alice = Vid::new(1);
2211 let bob = Vid::new(2);
2212 let eid = Eid::new(100);
2213
2214 let mutations = vec![
2216 Mutation::InsertVertex {
2218 vid: alice,
2219 properties: {
2220 let mut props = HashMap::new();
2221 props.insert(
2222 "name".to_string(),
2223 uni_common::Value::String("Alice".to_string()),
2224 );
2225 props
2226 },
2227 labels: vec!["Person".to_string()],
2228 },
2229 Mutation::InsertVertex {
2231 vid: bob,
2232 properties: {
2233 let mut props = HashMap::new();
2234 props.insert(
2235 "name".to_string(),
2236 uni_common::Value::String("Bob".to_string()),
2237 );
2238 props
2239 },
2240 labels: vec!["Person".to_string()],
2241 },
2242 Mutation::InsertEdge {
2244 src_vid: alice,
2245 dst_vid: bob,
2246 edge_type: 1,
2247 eid,
2248 version: 3,
2249 properties: HashMap::new(),
2250 edge_type_name: Some("KNOWS".to_string()),
2251 },
2252 ];
2253
2254 l0.replay_mutations(mutations)?;
2256
2257 assert_eq!(l0.get_vertex_labels(alice).unwrap().len(), 1);
2259 assert_eq!(l0.get_vertex_labels(bob).unwrap().len(), 1);
2260 assert_eq!(l0.vids_for_label("Person").len(), 2);
2261
2262 assert_eq!(l0.get_edge_type(eid).unwrap(), "KNOWS");
2264 assert_eq!(l0.eids_for_type("KNOWS").len(), 1);
2265
2266 let alice_neighbors = l0.get_neighbors(alice, 1, Direction::Outgoing);
2268 assert_eq!(alice_neighbors.len(), 1);
2269 assert_eq!(alice_neighbors[0].0, bob);
2270
2271 Ok(())
2272 }
2273
2274 #[test]
2276 fn test_replay_mutations_backward_compat_empty_labels() -> Result<()> {
2277 use crate::runtime::wal::Mutation;
2278
2279 let mut l0 = L0Buffer::new(0, None);
2280 let vid = Vid::new(1);
2281
2282 let mutations = vec![Mutation::InsertVertex {
2285 vid,
2286 properties: HashMap::new(),
2287 labels: vec![], }];
2289
2290 l0.replay_mutations(mutations)?;
2291
2292 assert!(l0.vertex_properties.contains_key(&vid));
2294
2295 let labels = l0.get_vertex_labels(vid);
2297 assert!(labels.is_some(), "Labels entry should exist even if empty");
2298 assert_eq!(labels.unwrap().len(), 0);
2299
2300 Ok(())
2301 }
2302
2303 #[test]
2304 fn test_now_nanos_returns_nanosecond_range() {
2305 let now = now_nanos();
2309
2310 assert!(
2312 now > 1_700_000_000_000_000_000,
2313 "now_nanos() returned {}, expected > 1.7e18 for nanoseconds",
2314 now
2315 );
2316
2317 assert!(
2319 now < 4_100_000_000_000_000_000,
2320 "now_nanos() returned {}, expected < 4.1e18",
2321 now
2322 );
2323 }
2324}