1use crate::runtime::wal::{Mutation, WriteAheadLog};
5use anyhow::Result;
6use std::collections::{HashMap, HashSet};
7use std::sync::Arc;
8use std::time::{SystemTime, UNIX_EPOCH};
9use tracing::{instrument, trace};
10use uni_common::core::id::{Eid, Vid};
11use uni_common::graph::simple_graph::{Direction, SimpleGraph};
12use uni_common::{Properties, Value};
13use uni_crdt::Crdt;
14
15#[derive(Debug, Default)]
23pub struct OccReadSet {
24 pub vertices: HashSet<Vid>,
26 pub edges: HashSet<Eid>,
28}
29
30impl OccReadSet {
31 pub fn is_empty(&self) -> bool {
34 self.vertices.is_empty() && self.edges.is_empty()
35 }
36}
37
38fn now_nanos() -> i64 {
40 SystemTime::now()
41 .duration_since(UNIX_EPOCH)
42 .map(|d| d.as_nanos() as i64)
43 .unwrap_or(0)
44}
45
46pub(crate) fn try_as_crdt(v: &Value) -> Option<Crdt> {
58 if !matches!(v, Value::Map(_)) {
59 return None;
60 }
61 serde_json::from_value::<Crdt>(v.clone().into()).ok()
62}
63
64pub fn serialize_constraint_key(label: &str, key_values: &[(String, Value)]) -> Vec<u8> {
67 let mut buf = label.as_bytes().to_vec();
68 buf.push(0); let mut sorted = key_values.to_vec();
70 sorted.sort_by(|a, b| a.0.cmp(&b.0));
71 for (k, v) in &sorted {
72 buf.extend(k.as_bytes());
73 buf.push(0);
74 buf.extend(serde_json::to_vec(v).unwrap_or_default());
76 buf.push(0);
77 }
78 buf
79}
80
81#[derive(Debug, Clone, Default)]
87pub struct MutationStats {
88 pub nodes_created: usize,
89 pub nodes_deleted: usize,
90 pub relationships_created: usize,
91 pub relationships_deleted: usize,
92 pub properties_set: usize,
93 pub properties_removed: usize,
94 pub labels_added: usize,
95 pub labels_removed: usize,
96}
97
98impl MutationStats {
99 pub fn diff(&self, before: &Self) -> Self {
101 Self {
102 nodes_created: self.nodes_created.saturating_sub(before.nodes_created),
103 nodes_deleted: self.nodes_deleted.saturating_sub(before.nodes_deleted),
104 relationships_created: self
105 .relationships_created
106 .saturating_sub(before.relationships_created),
107 relationships_deleted: self
108 .relationships_deleted
109 .saturating_sub(before.relationships_deleted),
110 properties_set: self.properties_set.saturating_sub(before.properties_set),
111 properties_removed: self
112 .properties_removed
113 .saturating_sub(before.properties_removed),
114 labels_added: self.labels_added.saturating_sub(before.labels_added),
115 labels_removed: self.labels_removed.saturating_sub(before.labels_removed),
116 }
117 }
118}
119
120#[derive(Clone, Debug)]
121pub struct TombstoneEntry {
122 pub eid: Eid,
123 pub src_vid: Vid,
124 pub dst_vid: Vid,
125 pub edge_type: u32,
126}
127
128pub struct L0Buffer {
129 pub graph: SimpleGraph,
131 pub tombstones: HashMap<Eid, TombstoneEntry>,
133 pub vertex_tombstones: HashSet<Vid>,
135 pub edge_versions: HashMap<Eid, u64>,
137 pub vertex_versions: HashMap<Vid, u64>,
139 pub edge_properties: HashMap<Eid, Properties>,
141 pub vertex_properties: HashMap<Vid, Properties>,
143 pub edge_endpoints: HashMap<Eid, (Vid, Vid, u32)>,
145 pub vertex_labels: HashMap<Vid, Vec<String>>,
148 pub label_to_vids: HashMap<String, HashSet<Vid>>,
151 pub vertex_label_overwrites: HashSet<Vid>,
159 pub edge_types: HashMap<Eid, String>,
161 pub current_version: u64,
163 pub mutation_count: usize,
165 pub mutation_stats: MutationStats,
167 pub wal: Option<Arc<WriteAheadLog>>,
169 pub wal_lsn_at_flush: u64,
172 pub vertex_created_at: HashMap<Vid, i64>,
174 pub vertex_updated_at: HashMap<Vid, i64>,
176 pub edge_created_at: HashMap<Eid, i64>,
178 pub edge_updated_at: HashMap<Eid, i64>,
180 pub estimated_size: usize,
183 pub constraint_index: HashMap<Vec<u8>, Vid>,
187 pub vertex_partial_keys: HashMap<Vid, HashSet<String>>,
193 pub edge_partial_keys: HashMap<Eid, HashSet<String>>,
200 pub pending_embeddings: HashMap<Vid, String>,
207 pub occ_read_seq: u64,
212 pub occ_read_set: Option<Arc<parking_lot::Mutex<OccReadSet>>>,
217}
218
219impl std::fmt::Debug for L0Buffer {
220 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
221 f.debug_struct("L0Buffer")
222 .field("vertex_count", &self.graph.vertex_count())
223 .field("edge_count", &self.graph.edge_count())
224 .field("tombstones", &self.tombstones.len())
225 .field("vertex_tombstones", &self.vertex_tombstones.len())
226 .field("current_version", &self.current_version)
227 .field("mutation_count", &self.mutation_count)
228 .finish()
229 }
230}
231
232impl Clone for L0Buffer {
233 fn clone(&self) -> Self {
238 Self {
239 graph: self.graph.clone(),
240 tombstones: self.tombstones.clone(),
241 vertex_tombstones: self.vertex_tombstones.clone(),
242 edge_versions: self.edge_versions.clone(),
243 vertex_versions: self.vertex_versions.clone(),
244 edge_properties: self.edge_properties.clone(),
245 vertex_properties: self.vertex_properties.clone(),
246 edge_endpoints: self.edge_endpoints.clone(),
247 vertex_labels: self.vertex_labels.clone(),
248 label_to_vids: self.label_to_vids.clone(),
249 vertex_label_overwrites: self.vertex_label_overwrites.clone(),
250 edge_types: self.edge_types.clone(),
251 current_version: self.current_version,
252 mutation_count: self.mutation_count,
253 mutation_stats: self.mutation_stats.clone(),
254 wal: None, wal_lsn_at_flush: self.wal_lsn_at_flush,
256 vertex_created_at: self.vertex_created_at.clone(),
257 vertex_updated_at: self.vertex_updated_at.clone(),
258 edge_created_at: self.edge_created_at.clone(),
259 edge_updated_at: self.edge_updated_at.clone(),
260 estimated_size: self.estimated_size,
261 constraint_index: self.constraint_index.clone(),
262 vertex_partial_keys: self.vertex_partial_keys.clone(),
263 edge_partial_keys: self.edge_partial_keys.clone(),
264 pending_embeddings: self.pending_embeddings.clone(),
265 occ_read_seq: self.occ_read_seq,
266 occ_read_set: None,
268 }
269 }
270}
271
272impl L0Buffer {
273 fn append_unique_labels(existing: &mut Vec<String>, labels: &[String]) {
275 for label in labels {
276 if !existing.contains(label) {
277 existing.push(label.clone());
278 }
279 }
280 }
281
282 fn index_labels_for_vid(&mut self, vid: Vid, labels: &[String]) {
284 for label in labels {
285 self.label_to_vids
286 .entry(label.clone())
287 .or_default()
288 .insert(vid);
289 }
290 }
291
292 fn remove_vid_from_label_index(&mut self, vid: Vid) {
294 if let Some(labels) = self.vertex_labels.get(&vid) {
295 for label in labels {
296 if let Some(set) = self.label_to_vids.get_mut(label) {
297 set.remove(&vid);
298 }
299 }
300 }
301 }
302
303 pub fn set_vertex_labels(&mut self, vid: Vid, labels: &[String]) {
314 self.remove_vid_from_label_index(vid);
315 self.vertex_labels.insert(vid, labels.to_vec());
316 self.index_labels_for_vid(vid, labels);
317 self.vertex_label_overwrites.insert(vid);
318 self.current_version += 1;
319 self.mutation_count += 1;
320 }
321
322 fn merge_crdt_properties(entry: &mut Properties, properties: Properties) {
331 if entry.is_empty() {
333 *entry = properties;
334 return;
335 }
336
337 for (k, v) in properties {
338 if let Some(mut new_crdt) = try_as_crdt(&v)
342 && let Some(existing_v) = entry.get(&k)
343 && let Ok(existing_crdt) = serde_json::from_value::<Crdt>(existing_v.clone().into())
344 {
345 if new_crdt.try_merge(&existing_crdt).is_ok()
347 && let Ok(merged_json) = serde_json::to_value(new_crdt)
348 {
349 entry.insert(k, uni_common::Value::from(merged_json));
350 continue;
351 }
352 tracing::warn!(
360 property = %k,
361 existing_variant = existing_crdt.type_name(),
362 "overwriting CRDT property with a different CRDT variant \
363 (last-writer-wins); merged CRDT state is discarded"
364 );
365 } else if try_as_crdt(&v).is_none()
366 && entry.get(&k).is_some_and(|e| try_as_crdt(e).is_some())
367 {
368 tracing::warn!(
375 property = %k,
376 "overwriting CRDT property with non-CRDT value (last-writer-wins); \
377 merged CRDT state is discarded"
378 );
379 }
380 entry.insert(k, v);
382 }
383 }
384
385 fn estimate_properties_size(props: &Properties) -> usize {
387 props.keys().map(|k| k.len() + 32).sum()
388 }
389
390 pub fn size_bytes(&self) -> usize {
393 let mut total = 0;
394
395 total += self.graph.vertex_count() * 8;
397 total += self.graph.edge_count() * 24;
398
399 for props in self.vertex_properties.values() {
401 total += Self::estimate_properties_size(props);
402 }
403 for props in self.edge_properties.values() {
404 total += Self::estimate_properties_size(props);
405 }
406
407 total += self.tombstones.len() * 64;
409 total += self.vertex_tombstones.len() * 8;
410 total += self.edge_versions.len() * 16;
411 total += self.vertex_versions.len() * 16;
412 total += self.edge_endpoints.len() * 28; for labels in self.vertex_labels.values() {
416 total += labels.iter().map(|l| l.len() + 24).sum::<usize>();
417 }
418
419 for (label, vids) in &self.label_to_vids {
421 total += label.len() + 24 + vids.len() * 8 + 48; }
423
424 for type_name in self.edge_types.values() {
426 total += type_name.len() + 24;
427 }
428
429 total += self.vertex_created_at.len() * 16;
431 total += self.vertex_updated_at.len() * 16;
432 total += self.edge_created_at.len() * 16;
433 total += self.edge_updated_at.len() * 16;
434
435 total
436 }
437
438 pub fn new(start_version: u64, wal: Option<Arc<WriteAheadLog>>) -> Self {
439 Self {
440 graph: SimpleGraph::new(),
441 tombstones: HashMap::new(),
442 vertex_tombstones: HashSet::new(),
443 edge_versions: HashMap::new(),
444 vertex_versions: HashMap::new(),
445 edge_properties: HashMap::new(),
446 vertex_properties: HashMap::new(),
447 edge_endpoints: HashMap::new(),
448 vertex_labels: HashMap::new(),
449 label_to_vids: HashMap::new(),
450 vertex_label_overwrites: HashSet::new(),
451 edge_types: HashMap::new(),
452 current_version: start_version,
453 mutation_count: 0,
454 mutation_stats: MutationStats::default(),
455 wal,
456 wal_lsn_at_flush: 0,
457 vertex_created_at: HashMap::new(),
458 vertex_updated_at: HashMap::new(),
459 edge_created_at: HashMap::new(),
460 edge_updated_at: HashMap::new(),
461 estimated_size: 0,
462 constraint_index: HashMap::new(),
463 vertex_partial_keys: HashMap::new(),
464 edge_partial_keys: HashMap::new(),
465 pending_embeddings: HashMap::new(),
466 occ_read_seq: 0,
467 occ_read_set: None,
468 }
469 }
470
471 pub fn insert_vertex(&mut self, vid: Vid, properties: Properties) {
472 self.insert_vertex_with_labels(vid, properties, &[]);
473 }
474
475 pub fn insert_vertex_with_labels(
477 &mut self,
478 vid: Vid,
479 properties: Properties,
480 labels: &[String],
481 ) {
482 self.insert_vertex_with_labels_impl(vid, properties, labels, false);
483 }
484
485 fn insert_vertex_with_labels_impl(
488 &mut self,
489 vid: Vid,
490 properties: Properties,
491 labels: &[String],
492 skip_wal: bool,
493 ) {
494 self.current_version += 1;
495 let version = self.current_version;
496 let now = now_nanos();
497
498 if !skip_wal && let Some(wal) = &self.wal {
499 let _ = wal.append(&Mutation::InsertVertex {
500 vid,
501 properties: properties.clone(),
502 labels: labels.to_vec(),
503 });
504 }
505
506 self.vertex_tombstones.remove(&vid);
507
508 self.vertex_partial_keys.remove(&vid);
511
512 let entry = self.vertex_properties.entry(vid).or_default();
513 Self::merge_crdt_properties(entry, properties.clone());
514 self.vertex_versions.insert(vid, version);
515
516 self.vertex_created_at.entry(vid).or_insert(now);
518 self.vertex_updated_at.insert(vid, now);
519
520 let labels_size: usize = labels.iter().map(|l| l.len() + 24).sum();
523 let existing = self.vertex_labels.entry(vid).or_default();
524 Self::append_unique_labels(existing, labels);
525 self.index_labels_for_vid(vid, labels);
526
527 self.graph.add_vertex(vid);
528 self.mutation_count += 1;
529 self.mutation_stats.nodes_created += 1;
530 self.mutation_stats.properties_set += properties.len();
531 self.mutation_stats.labels_added += labels.len();
532
533 let props_size = Self::estimate_properties_size(&properties);
534 self.estimated_size += 8 + props_size + 16 + labels_size + 32;
535 }
536
537 pub fn insert_vertex_partial_full(
558 &mut self,
559 vid: Vid,
560 props: Properties,
561 touched_keys: HashSet<String>,
562 labels: &[String],
563 ) {
564 self.insert_vertex_with_labels_partial_impl(vid, props, labels, false);
568 self.vertex_partial_keys
569 .entry(vid)
570 .or_default()
571 .extend(touched_keys);
572 }
573
574 pub fn insert_vertex_partial(&mut self, vid: Vid, touched: Properties, labels: &[String]) {
578 let touched_keys: Vec<String> = touched.keys().cloned().collect();
582
583 let already_full = self.vertex_properties.contains_key(&vid)
594 && !self.vertex_partial_keys.contains_key(&vid);
595
596 self.insert_vertex_with_labels_partial_impl(vid, touched, labels, false);
601
602 if !already_full {
603 self.vertex_partial_keys
604 .entry(vid)
605 .or_default()
606 .extend(touched_keys);
607 }
608 }
609
610 fn insert_vertex_with_labels_partial_impl(
614 &mut self,
615 vid: Vid,
616 properties: Properties,
617 labels: &[String],
618 skip_wal: bool,
619 ) {
620 self.current_version += 1;
621 let version = self.current_version;
622 let now = now_nanos();
623
624 if !skip_wal && let Some(wal) = &self.wal {
625 let _ = wal.append(&Mutation::InsertVertex {
631 vid,
632 properties: properties.clone(),
633 labels: labels.to_vec(),
634 });
635 }
636
637 self.vertex_tombstones.remove(&vid);
638 let entry = self.vertex_properties.entry(vid).or_default();
642 Self::merge_crdt_properties(entry, properties.clone());
643 self.vertex_versions.insert(vid, version);
644
645 self.vertex_created_at.entry(vid).or_insert(now);
646 self.vertex_updated_at.insert(vid, now);
647
648 let labels_size: usize = labels.iter().map(|l| l.len() + 24).sum();
649 let existing = self.vertex_labels.entry(vid).or_default();
650 Self::append_unique_labels(existing, labels);
651 self.index_labels_for_vid(vid, labels);
652
653 self.graph.add_vertex(vid);
654 self.mutation_count += 1;
655 self.mutation_stats.properties_set += properties.len();
658 self.mutation_stats.labels_added += labels.len();
659
660 let props_size = Self::estimate_properties_size(&properties);
661 self.estimated_size += 8 + props_size + 16 + labels_size + 32;
662 }
663
664 pub fn add_vertex_labels(&mut self, vid: Vid, labels: &[String]) {
666 let existing = self.vertex_labels.entry(vid).or_default();
667 Self::append_unique_labels(existing, labels);
668 self.index_labels_for_vid(vid, labels);
669 }
670
671 pub fn remove_vertex_label(&mut self, vid: Vid, label: &str) -> bool {
674 if let Some(labels) = self.vertex_labels.get_mut(&vid)
675 && let Some(pos) = labels.iter().position(|l| l == label)
676 {
677 labels.remove(pos);
678 if let Some(set) = self.label_to_vids.get_mut(label) {
679 set.remove(&vid);
680 }
681 self.current_version += 1;
682 self.mutation_count += 1;
683 self.mutation_stats.labels_removed += 1;
684 return true;
687 }
688 false
689 }
690
691 pub fn set_edge_type(&mut self, eid: Eid, edge_type: String) {
693 self.edge_types.insert(eid, edge_type);
694 }
695
696 pub fn delete_vertex(&mut self, vid: Vid) -> Result<()> {
697 self.current_version += 1;
698
699 if let Some(wal) = &mut self.wal {
700 let labels = self.vertex_labels.get(&vid).cloned().unwrap_or_default();
701 wal.append(&Mutation::DeleteVertex { vid, labels })?;
702 }
703
704 self.apply_vertex_deletion(vid);
705 Ok(())
706 }
707
708 fn apply_vertex_deletion(&mut self, vid: Vid) {
712 let version = self.current_version;
713
714 let mut edges_to_remove = HashSet::new();
716
717 for entry in self.graph.neighbors(vid, Direction::Outgoing) {
719 edges_to_remove.insert(entry.eid);
720 }
721
722 for entry in self.graph.neighbors(vid, Direction::Incoming) {
724 edges_to_remove.insert(entry.eid); }
726
727 let cascaded_edges_count = edges_to_remove.len();
728
729 for eid in edges_to_remove {
731 if let Some((src, dst, etype)) = self.edge_endpoints.get(&eid) {
733 self.tombstones.insert(
734 eid,
735 TombstoneEntry {
736 eid,
737 src_vid: *src,
738 dst_vid: *dst,
739 edge_type: *etype,
740 },
741 );
742 self.edge_versions.insert(eid, version);
743 self.edge_endpoints.remove(&eid);
744 self.edge_properties.remove(&eid);
745 self.graph.remove_edge(eid);
746 self.mutation_count += 1;
747 self.mutation_stats.relationships_deleted += 1;
748 }
749 }
750
751 self.remove_vid_from_label_index(vid);
752 self.vertex_tombstones.insert(vid);
753 self.vertex_properties.remove(&vid);
754 self.vertex_partial_keys.remove(&vid);
756 self.vertex_versions.insert(vid, version);
757 self.graph.remove_vertex(vid);
758 self.mutation_count += 1;
759 self.mutation_stats.nodes_deleted += 1;
760
761 self.constraint_index.retain(|_, v| *v != vid);
763
764 self.estimated_size += cascaded_edges_count * 72 + 8;
766 }
767
768 pub fn insert_edge(
769 &mut self,
770 src_vid: Vid,
771 dst_vid: Vid,
772 edge_type: u32,
773 eid: Eid,
774 properties: Properties,
775 edge_type_name: Option<String>,
776 ) -> Result<()> {
777 self.current_version += 1;
778 let now = now_nanos();
779
780 if let Some(wal) = &mut self.wal {
781 wal.append(&Mutation::InsertEdge {
782 src_vid,
783 dst_vid,
784 edge_type,
785 eid,
786 version: self.current_version,
787 properties: properties.clone(),
788 edge_type_name: edge_type_name.clone(),
789 })?;
790 }
791
792 self.apply_edge_insertion(src_vid, dst_vid, edge_type, eid, properties)?;
793
794 let type_name_size = if let Some(ref name) = edge_type_name {
796 let size = name.len() + 24;
797 self.edge_types.insert(eid, name.clone());
798 size
799 } else {
800 0
801 };
802
803 self.edge_created_at.entry(eid).or_insert(now);
805 self.edge_updated_at.insert(eid, now);
806
807 self.edge_partial_keys.remove(&eid);
810
811 self.estimated_size += type_name_size;
812
813 Ok(())
814 }
815
816 #[allow(clippy::too_many_arguments)]
821 pub fn insert_edge_partial_full(
822 &mut self,
823 src_vid: Vid,
824 dst_vid: Vid,
825 edge_type: u32,
826 eid: Eid,
827 properties: Properties,
828 edge_type_name: Option<String>,
829 touched_keys: HashSet<String>,
830 ) -> Result<()> {
831 self.current_version += 1;
832 let now = now_nanos();
833
834 if let Some(wal) = &mut self.wal {
835 wal.append(&Mutation::InsertEdge {
836 src_vid,
837 dst_vid,
838 edge_type,
839 eid,
840 version: self.current_version,
841 properties: properties.clone(),
842 edge_type_name: edge_type_name.clone(),
843 })?;
844 }
845
846 self.apply_edge_insertion(src_vid, dst_vid, edge_type, eid, properties)?;
847
848 self.edge_partial_keys
852 .entry(eid)
853 .or_default()
854 .extend(touched_keys);
855
856 let type_name_size = if let Some(ref name) = edge_type_name {
857 let size = name.len() + 24;
858 self.edge_types.insert(eid, name.clone());
859 size
860 } else {
861 0
862 };
863
864 self.edge_created_at.entry(eid).or_insert(now);
865 self.edge_updated_at.insert(eid, now);
866
867 self.estimated_size += type_name_size;
868
869 Ok(())
870 }
871
872 fn apply_edge_insertion(
881 &mut self,
882 src_vid: Vid,
883 dst_vid: Vid,
884 edge_type: u32,
885 eid: Eid,
886 properties: Properties,
887 ) -> Result<()> {
888 let version = self.current_version;
889
890 if self.vertex_tombstones.contains(&src_vid) {
893 anyhow::bail!(
894 "Cannot insert edge: source vertex {} has been deleted (issue #77)",
895 src_vid
896 );
897 }
898 if self.vertex_tombstones.contains(&dst_vid) {
899 anyhow::bail!(
900 "Cannot insert edge: destination vertex {} has been deleted (issue #77)",
901 dst_vid
902 );
903 }
904
905 if !self.graph.contains_vertex(src_vid) {
910 self.graph.add_vertex(src_vid);
911 }
912 if !self.graph.contains_vertex(dst_vid) {
913 self.graph.add_vertex(dst_vid);
914 }
915
916 self.graph.add_edge(src_vid, dst_vid, eid, edge_type);
917
918 let props_size = Self::estimate_properties_size(&properties);
920 let props_count = properties.len();
921 if !properties.is_empty() {
922 let entry = self.edge_properties.entry(eid).or_default();
923 Self::merge_crdt_properties(entry, properties);
924 }
925
926 self.edge_versions.insert(eid, version);
927 self.edge_endpoints
928 .insert(eid, (src_vid, dst_vid, edge_type));
929 self.tombstones.remove(&eid);
930 self.mutation_count += 1;
931 self.mutation_stats.relationships_created += 1;
932 self.mutation_stats.properties_set += props_count;
933
934 self.estimated_size += 24 + props_size + 16 + 28 + 32;
936
937 Ok(())
938 }
939
940 pub fn delete_edge(
941 &mut self,
942 eid: Eid,
943 src_vid: Vid,
944 dst_vid: Vid,
945 edge_type: u32,
946 ) -> Result<()> {
947 self.current_version += 1;
948 let now = now_nanos();
949
950 if let Some(wal) = &mut self.wal {
951 wal.append(&Mutation::DeleteEdge {
952 eid,
953 src_vid,
954 dst_vid,
955 edge_type,
956 version: self.current_version,
957 })?;
958 }
959
960 self.apply_edge_deletion(eid, src_vid, dst_vid, edge_type);
961
962 self.edge_updated_at.insert(eid, now);
964
965 Ok(())
966 }
967
968 fn apply_edge_deletion(&mut self, eid: Eid, src_vid: Vid, dst_vid: Vid, edge_type: u32) {
972 let version = self.current_version;
973
974 self.tombstones.insert(
975 eid,
976 TombstoneEntry {
977 eid,
978 src_vid,
979 dst_vid,
980 edge_type,
981 },
982 );
983 self.edge_versions.insert(eid, version);
984 self.edge_partial_keys.remove(&eid);
987 self.graph.remove_edge(eid);
988 self.mutation_count += 1;
989 self.mutation_stats.relationships_deleted += 1;
990
991 self.estimated_size += 80;
993 }
994
995 pub fn get_neighbors(
998 &self,
999 vid: Vid,
1000 edge_type: u32,
1001 direction: Direction,
1002 ) -> Vec<(Vid, Eid, u64)> {
1003 let edges = self.graph.neighbors(vid, direction);
1004
1005 edges
1006 .iter()
1007 .filter(|e| e.edge_type == edge_type && !self.is_tombstoned(e.eid))
1008 .map(|e| {
1009 let neighbor = match direction {
1010 Direction::Outgoing => e.dst_vid,
1011 Direction::Incoming => e.src_vid,
1012 };
1013 let version = self.edge_versions.get(&e.eid).copied().unwrap_or(0);
1014 (neighbor, e.eid, version)
1015 })
1016 .collect()
1017 }
1018
1019 pub fn is_tombstoned(&self, eid: Eid) -> bool {
1020 self.tombstones.contains_key(&eid)
1021 }
1022
1023 pub fn vids_for_label(&self, label_name: &str) -> Vec<Vid> {
1026 self.label_to_vids
1027 .get(label_name)
1028 .map(|set| set.iter().copied().collect())
1029 .unwrap_or_default()
1030 }
1031
1032 pub fn all_vertex_vids(&self) -> Vec<Vid> {
1036 self.vertex_properties.keys().copied().collect()
1037 }
1038
1039 pub fn vids_for_labels(&self, label_names: &[&str]) -> Vec<Vid> {
1042 let mut result = HashSet::new();
1043 for label_name in label_names {
1044 if let Some(set) = self.label_to_vids.get(*label_name) {
1045 result.extend(set.iter().copied());
1046 }
1047 }
1048 result.into_iter().collect()
1049 }
1050
1051 pub fn vids_with_all_labels(&self, label_names: &[&str]) -> Vec<Vid> {
1054 if label_names.is_empty() {
1055 return Vec::new();
1056 }
1057 let sets: Vec<&HashSet<Vid>> = match label_names
1060 .iter()
1061 .map(|ln| self.label_to_vids.get(*ln))
1062 .collect::<Option<Vec<_>>>()
1063 {
1064 Some(s) => s,
1065 None => return Vec::new(),
1066 };
1067 let smallest = sets.iter().min_by_key(|s| s.len()).unwrap();
1069 smallest
1070 .iter()
1071 .copied()
1072 .filter(|vid| sets.iter().all(|s| s.contains(vid)))
1073 .collect()
1074 }
1075
1076 pub fn get_vertex_labels(&self, vid: Vid) -> Option<&[String]> {
1078 self.vertex_labels.get(&vid).map(|v| v.as_slice())
1079 }
1080
1081 pub fn get_edge_type(&self, eid: Eid) -> Option<&str> {
1083 self.edge_types.get(&eid).map(|s| s.as_str())
1084 }
1085
1086 pub fn eids_for_type(&self, type_name: &str) -> Vec<Eid> {
1089 self.edge_types
1090 .iter()
1091 .filter(|(eid, etype)| *etype == type_name && !self.tombstones.contains_key(eid))
1092 .map(|(eid, _)| *eid)
1093 .collect()
1094 }
1095
1096 pub fn all_edge_eids(&self) -> Vec<Eid> {
1100 self.edge_endpoints
1101 .keys()
1102 .filter(|eid| !self.tombstones.contains_key(eid))
1103 .copied()
1104 .collect()
1105 }
1106
1107 pub fn get_edge_endpoints(&self, eid: Eid) -> Option<(Vid, Vid)> {
1109 self.edge_endpoints
1110 .get(&eid)
1111 .map(|(src, dst, _)| (*src, *dst))
1112 }
1113
1114 pub fn get_edge_endpoint_full(&self, eid: Eid) -> Option<(Vid, Vid, u32)> {
1116 self.edge_endpoints.get(&eid).copied()
1117 }
1118
1119 pub fn insert_constraint_key(&mut self, key: Vec<u8>, vid: Vid) {
1121 self.constraint_index.insert(key, vid);
1122 }
1123
1124 pub fn has_constraint_key(&self, key: &[u8], exclude_vid: Vid) -> bool {
1127 self.constraint_index
1128 .get(key)
1129 .is_some_and(|&v| v != exclude_vid)
1130 }
1131
1132 #[instrument(skip(self, other), level = "trace")]
1133 pub fn merge(&mut self, other: &L0Buffer) -> Result<()> {
1134 trace!(
1135 other_mutation_count = other.mutation_count,
1136 "Merging L0 buffer"
1137 );
1138 for &vid in &other.vertex_tombstones {
1140 self.delete_vertex(vid)?;
1141 }
1142
1143 for (vid, props) in &other.vertex_properties {
1144 let labels = other.vertex_labels.get(vid).cloned().unwrap_or_default();
1145 self.insert_vertex_with_labels_impl(*vid, props.clone(), &labels, true);
1148 }
1149
1150 for (vid, labels) in &other.vertex_labels {
1152 if !self.vertex_labels.contains_key(vid) {
1153 self.vertex_labels.insert(*vid, labels.clone());
1154 for label in labels {
1155 self.label_to_vids
1156 .entry(label.clone())
1157 .or_default()
1158 .insert(*vid);
1159 }
1160 }
1161 }
1162
1163 for vid in &other.vertex_label_overwrites {
1170 if other.vertex_tombstones.contains(vid) {
1171 continue;
1172 }
1173 let labels = other.vertex_labels.get(vid).cloned().unwrap_or_default();
1174 self.remove_vid_from_label_index(*vid);
1175 self.vertex_labels.insert(*vid, labels.clone());
1176 self.index_labels_for_vid(*vid, &labels);
1177 }
1178
1179 for (eid, (src, dst, etype)) in &other.edge_endpoints {
1181 if other.tombstones.contains_key(eid) {
1182 self.delete_edge(*eid, *src, *dst, *etype)?;
1183 } else {
1184 let props = other.edge_properties.get(eid).cloned().unwrap_or_default();
1185 let etype_name = other.edge_types.get(eid).cloned();
1186 self.insert_edge(*src, *dst, *etype, *eid, props, etype_name)?;
1187 }
1188 }
1189
1190 for (eid, tombstone) in &other.tombstones {
1194 if !other.edge_endpoints.contains_key(eid) {
1195 self.delete_edge(
1196 *eid,
1197 tombstone.src_vid,
1198 tombstone.dst_vid,
1199 tombstone.edge_type,
1200 )?;
1201 }
1202 }
1203
1204 for (vid, ts) in &other.vertex_created_at {
1209 self.vertex_created_at.entry(*vid).or_insert(*ts); }
1211 for (vid, ts) in &other.vertex_updated_at {
1212 self.vertex_updated_at.insert(*vid, *ts); }
1214
1215 for (eid, ts) in &other.edge_created_at {
1216 self.edge_created_at.entry(*eid).or_insert(*ts); }
1218 for (eid, ts) in &other.edge_updated_at {
1219 self.edge_updated_at.insert(*eid, *ts); }
1221
1222 self.estimated_size += other.estimated_size;
1225
1226 for (key, vid) in &other.constraint_index {
1228 self.constraint_index.insert(key.clone(), *vid);
1229 }
1230
1231 Ok(())
1232 }
1233
1234 #[instrument(skip(self, mutations), level = "debug")]
1238 pub fn replay_mutations(&mut self, mutations: Vec<Mutation>) -> Result<()> {
1239 trace!(count = mutations.len(), "Replaying mutations");
1240 for mutation in mutations {
1241 match mutation {
1242 Mutation::InsertVertex {
1243 vid,
1244 properties,
1245 labels,
1246 } => {
1247 self.current_version += 1;
1249 let version = self.current_version;
1250
1251 self.vertex_tombstones.remove(&vid);
1252 let entry = self.vertex_properties.entry(vid).or_default();
1253 Self::merge_crdt_properties(entry, properties);
1254 self.vertex_versions.insert(vid, version);
1255 self.graph.add_vertex(vid);
1256 self.mutation_count += 1;
1257
1258 let existing = self.vertex_labels.entry(vid).or_default();
1260 Self::append_unique_labels(existing, &labels);
1261 for label in &labels {
1262 self.label_to_vids
1263 .entry(label.clone())
1264 .or_default()
1265 .insert(vid);
1266 }
1267 }
1268 Mutation::DeleteVertex { vid, labels } => {
1269 self.current_version += 1;
1270 if !labels.is_empty() {
1272 let existing = self.vertex_labels.entry(vid).or_default();
1273 Self::append_unique_labels(existing, &labels);
1274 for label in &labels {
1275 self.label_to_vids
1276 .entry(label.clone())
1277 .or_default()
1278 .insert(vid);
1279 }
1280 }
1281 self.apply_vertex_deletion(vid);
1282 }
1283 Mutation::SetVertexLabels { vid, labels } => {
1284 self.current_version += 1;
1289 self.remove_vid_from_label_index(vid);
1290 self.vertex_labels.insert(vid, labels.clone());
1291 self.index_labels_for_vid(vid, &labels);
1292 self.mutation_count += 1;
1293 }
1294 Mutation::InsertEdge {
1295 src_vid,
1296 dst_vid,
1297 edge_type,
1298 eid,
1299 version: _,
1300 properties,
1301 edge_type_name,
1302 } => {
1303 self.current_version += 1;
1304 self.apply_edge_insertion(src_vid, dst_vid, edge_type, eid, properties)?;
1305 if let Some(name) = edge_type_name {
1307 self.edge_types.insert(eid, name);
1308 }
1309 }
1310 Mutation::DeleteEdge {
1311 eid,
1312 src_vid,
1313 dst_vid,
1314 edge_type,
1315 version: _,
1316 } => {
1317 self.current_version += 1;
1318 self.apply_edge_deletion(eid, src_vid, dst_vid, edge_type);
1319 }
1320 }
1321 }
1322 Ok(())
1323 }
1324}
1325
1326#[cfg(test)]
1327mod tests {
1328 use super::*;
1329
1330 #[test]
1331 fn test_l0_buffer_ops() -> Result<()> {
1332 let mut l0 = L0Buffer::new(0, None);
1333 let vid_a = Vid::new(1);
1334 let vid_b = Vid::new(2);
1335 let eid_ab = Eid::new(101);
1336
1337 l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
1338
1339 let neighbors = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
1340 assert_eq!(neighbors.len(), 1);
1341 assert_eq!(neighbors[0].0, vid_b);
1342 assert_eq!(neighbors[0].1, eid_ab);
1343
1344 l0.delete_edge(eid_ab, vid_a, vid_b, 1)?;
1345 assert!(l0.is_tombstoned(eid_ab));
1346
1347 let neighbors_after = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
1349 assert_eq!(neighbors_after.len(), 0);
1350
1351 Ok(())
1352 }
1353
1354 #[test]
1355 fn test_l0_buffer_multiple_edges() -> Result<()> {
1356 let mut l0 = L0Buffer::new(0, None);
1357 let vid_a = Vid::new(1);
1358 let vid_b = Vid::new(2);
1359 let vid_c = Vid::new(3);
1360 let eid_ab = Eid::new(101);
1361 let eid_ac = Eid::new(102);
1362
1363 l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
1364 l0.insert_edge(vid_a, vid_c, 1, eid_ac, HashMap::new(), None)?;
1365
1366 let neighbors = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
1367 assert_eq!(neighbors.len(), 2);
1368
1369 l0.delete_edge(eid_ab, vid_a, vid_b, 1)?;
1371
1372 let neighbors_after = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
1374 assert_eq!(neighbors_after.len(), 1);
1375 assert_eq!(neighbors_after[0].0, vid_c);
1376
1377 Ok(())
1378 }
1379
1380 #[test]
1381 fn test_l0_buffer_edge_type_filter() -> Result<()> {
1382 let mut l0 = L0Buffer::new(0, None);
1383 let vid_a = Vid::new(1);
1384 let vid_b = Vid::new(2);
1385 let vid_c = Vid::new(3);
1386 let eid_ab = Eid::new(101);
1387 let eid_ac = Eid::new(201); l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
1390 l0.insert_edge(vid_a, vid_c, 2, eid_ac, HashMap::new(), None)?;
1391
1392 let type1_neighbors = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
1394 assert_eq!(type1_neighbors.len(), 1);
1395 assert_eq!(type1_neighbors[0].0, vid_b);
1396
1397 let type2_neighbors = l0.get_neighbors(vid_a, 2, Direction::Outgoing);
1399 assert_eq!(type2_neighbors.len(), 1);
1400 assert_eq!(type2_neighbors[0].0, vid_c);
1401
1402 Ok(())
1403 }
1404
1405 #[test]
1406 fn test_l0_buffer_incoming_edges() -> Result<()> {
1407 let mut l0 = L0Buffer::new(0, None);
1408 let vid_a = Vid::new(1);
1409 let vid_b = Vid::new(2);
1410 let vid_c = Vid::new(3);
1411 let eid_ab = Eid::new(101);
1412 let eid_cb = Eid::new(102);
1413
1414 l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
1416 l0.insert_edge(vid_c, vid_b, 1, eid_cb, HashMap::new(), None)?;
1417
1418 let incoming = l0.get_neighbors(vid_b, 1, Direction::Incoming);
1420 assert_eq!(incoming.len(), 2);
1421
1422 Ok(())
1423 }
1424
1425 #[test]
1427 fn test_merge_empty_props_edge() -> Result<()> {
1428 let mut main_l0 = L0Buffer::new(0, None);
1429 let mut tx_l0 = L0Buffer::new(0, None);
1430
1431 let vid_a = Vid::new(1);
1432 let vid_b = Vid::new(2);
1433 let eid_ab = Eid::new(101);
1434
1435 tx_l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
1437
1438 assert!(tx_l0.edge_endpoints.contains_key(&eid_ab));
1440 assert!(!tx_l0.edge_properties.contains_key(&eid_ab)); main_l0.merge(&tx_l0)?;
1444
1445 assert!(main_l0.edge_endpoints.contains_key(&eid_ab));
1447 let neighbors = main_l0.get_neighbors(vid_a, 1, Direction::Outgoing);
1448 assert_eq!(neighbors.len(), 1);
1449 assert_eq!(neighbors[0].0, vid_b);
1450
1451 Ok(())
1452 }
1453
1454 #[test]
1456 fn test_replay_crdt_merge() -> Result<()> {
1457 use crate::runtime::wal::Mutation;
1458 use serde_json::json;
1459 use uni_common::Value;
1460
1461 let mut l0 = L0Buffer::new(0, None);
1462 let vid = Vid::new(1);
1463
1464 let counter1: Value = json!({
1467 "t": "gc",
1468 "d": {"counts": {"node1": 5}}
1469 })
1470 .into();
1471 let counter2: Value = json!({
1472 "t": "gc",
1473 "d": {"counts": {"node2": 3}}
1474 })
1475 .into();
1476
1477 let mut props1 = HashMap::new();
1479 props1.insert("counter".to_string(), counter1.clone());
1480 l0.replay_mutations(vec![Mutation::InsertVertex {
1481 vid,
1482 properties: props1,
1483 labels: vec![],
1484 }])?;
1485
1486 let mut props2 = HashMap::new();
1488 props2.insert("counter".to_string(), counter2.clone());
1489 l0.replay_mutations(vec![Mutation::InsertVertex {
1490 vid,
1491 properties: props2,
1492 labels: vec![],
1493 }])?;
1494
1495 let stored_props = l0.vertex_properties.get(&vid).unwrap();
1497 let stored_counter = stored_props.get("counter").unwrap();
1498
1499 let stored_json: serde_json::Value = stored_counter.clone().into();
1501 let data = stored_json.get("d").unwrap();
1503 let counts = data.get("counts").unwrap();
1504 assert_eq!(counts.get("node1"), Some(&json!(5)));
1505 assert_eq!(counts.get("node2"), Some(&json!(3)));
1506
1507 Ok(())
1508 }
1509
1510 #[test]
1511 fn test_merge_preserves_vertex_timestamps() -> Result<()> {
1512 let mut l0_main = L0Buffer::new(0, None);
1513 let mut l0_tx = L0Buffer::new(0, None);
1514 let vid = Vid::new(1);
1515
1516 let ts_main_created = 1000;
1518 let ts_main_updated = 1100;
1519 l0_main.insert_vertex(vid, HashMap::new());
1520 l0_main.vertex_created_at.insert(vid, ts_main_created);
1521 l0_main.vertex_updated_at.insert(vid, ts_main_updated);
1522
1523 let ts_tx_created = 2000; let ts_tx_updated = 2100; l0_tx.insert_vertex(vid, HashMap::new());
1527 l0_tx.vertex_created_at.insert(vid, ts_tx_created);
1528 l0_tx.vertex_updated_at.insert(vid, ts_tx_updated);
1529
1530 l0_main.merge(&l0_tx)?;
1532
1533 assert_eq!(
1535 *l0_main.vertex_created_at.get(&vid).unwrap(),
1536 ts_main_created,
1537 "created_at should preserve oldest timestamp"
1538 );
1539
1540 assert_eq!(
1542 *l0_main.vertex_updated_at.get(&vid).unwrap(),
1543 ts_tx_updated,
1544 "updated_at should use latest timestamp"
1545 );
1546
1547 Ok(())
1548 }
1549
1550 #[test]
1551 fn test_merge_preserves_edge_timestamps() -> Result<()> {
1552 let mut l0_main = L0Buffer::new(0, None);
1553 let mut l0_tx = L0Buffer::new(0, None);
1554 let vid_a = Vid::new(1);
1555 let vid_b = Vid::new(2);
1556 let eid = Eid::new(100);
1557
1558 let ts_main_created = 1000;
1560 let ts_main_updated = 1100;
1561 l0_main.insert_edge(vid_a, vid_b, 1, eid, HashMap::new(), None)?;
1562 l0_main.edge_created_at.insert(eid, ts_main_created);
1563 l0_main.edge_updated_at.insert(eid, ts_main_updated);
1564
1565 let ts_tx_created = 2000; let ts_tx_updated = 2100; l0_tx.insert_edge(vid_a, vid_b, 1, eid, HashMap::new(), None)?;
1569 l0_tx.edge_created_at.insert(eid, ts_tx_created);
1570 l0_tx.edge_updated_at.insert(eid, ts_tx_updated);
1571
1572 l0_main.merge(&l0_tx)?;
1574
1575 assert_eq!(
1577 *l0_main.edge_created_at.get(&eid).unwrap(),
1578 ts_main_created,
1579 "edge created_at should preserve oldest timestamp"
1580 );
1581
1582 assert_eq!(
1584 *l0_main.edge_updated_at.get(&eid).unwrap(),
1585 ts_tx_updated,
1586 "edge updated_at should use latest timestamp"
1587 );
1588
1589 Ok(())
1590 }
1591
1592 #[test]
1593 fn test_merge_created_at_not_overwritten_for_existing_vertex() -> Result<()> {
1594 use uni_common::Value;
1595
1596 let mut l0_main = L0Buffer::new(0, None);
1597 let mut l0_tx = L0Buffer::new(0, None);
1598 let vid = Vid::new(1);
1599
1600 let ts_original = 1000;
1602 l0_main.insert_vertex(vid, HashMap::new());
1603 l0_main.vertex_created_at.insert(vid, ts_original);
1604 l0_main.vertex_updated_at.insert(vid, ts_original);
1605
1606 let ts_tx = 2000;
1608 let mut props = HashMap::new();
1609 props.insert("updated".to_string(), Value::String("yes".to_string()));
1610 l0_tx.insert_vertex(vid, props);
1611 l0_tx.vertex_created_at.insert(vid, ts_tx);
1612 l0_tx.vertex_updated_at.insert(vid, ts_tx);
1613
1614 l0_main.merge(&l0_tx)?;
1616
1617 assert_eq!(
1619 *l0_main.vertex_created_at.get(&vid).unwrap(),
1620 ts_original,
1621 "created_at must not be overwritten for existing vertex"
1622 );
1623
1624 assert_eq!(
1626 *l0_main.vertex_updated_at.get(&vid).unwrap(),
1627 ts_tx,
1628 "updated_at should reflect transaction timestamp"
1629 );
1630
1631 assert!(
1633 l0_main
1634 .vertex_properties
1635 .get(&vid)
1636 .unwrap()
1637 .contains_key("updated")
1638 );
1639
1640 Ok(())
1641 }
1642
1643 #[test]
1645 fn test_replay_mutations_preserves_vertex_labels() -> Result<()> {
1646 use crate::runtime::wal::Mutation;
1647
1648 let mut l0 = L0Buffer::new(0, None);
1649 let vid = Vid::new(42);
1650
1651 let mutations = vec![Mutation::InsertVertex {
1653 vid,
1654 properties: {
1655 let mut props = HashMap::new();
1656 props.insert(
1657 "name".to_string(),
1658 uni_common::Value::String("Alice".to_string()),
1659 );
1660 props
1661 },
1662 labels: vec!["Person".to_string(), "User".to_string()],
1663 }];
1664
1665 l0.replay_mutations(mutations)?;
1667
1668 assert!(l0.vertex_properties.contains_key(&vid));
1670
1671 let labels = l0.get_vertex_labels(vid).expect("Labels should exist");
1673 assert_eq!(labels.len(), 2);
1674 assert!(labels.contains(&"Person".to_string()));
1675 assert!(labels.contains(&"User".to_string()));
1676
1677 let person_vids = l0.vids_for_label("Person");
1679 assert_eq!(person_vids.len(), 1);
1680 assert_eq!(person_vids[0], vid);
1681
1682 let user_vids = l0.vids_for_label("User");
1683 assert_eq!(user_vids.len(), 1);
1684 assert_eq!(user_vids[0], vid);
1685
1686 Ok(())
1687 }
1688
1689 #[test]
1691 fn test_replay_mutations_preserves_delete_vertex_labels() -> Result<()> {
1692 use crate::runtime::wal::Mutation;
1693
1694 let mut l0 = L0Buffer::new(0, None);
1695 let vid = Vid::new(99);
1696
1697 l0.insert_vertex_with_labels(
1699 vid,
1700 HashMap::new(),
1701 &["Person".to_string(), "Admin".to_string()],
1702 );
1703
1704 assert!(l0.vertex_properties.contains_key(&vid));
1706 let labels = l0.get_vertex_labels(vid).expect("Labels should exist");
1707 assert_eq!(labels.len(), 2);
1708
1709 let mutations = vec![Mutation::DeleteVertex {
1711 vid,
1712 labels: vec!["Person".to_string(), "Admin".to_string()],
1713 }];
1714
1715 l0.replay_mutations(mutations)?;
1717
1718 assert!(l0.vertex_tombstones.contains(&vid));
1720
1721 let labels = l0.get_vertex_labels(vid);
1724 assert!(
1725 labels.is_some(),
1726 "Labels should be preserved even after deletion for tombstone flushing"
1727 );
1728
1729 Ok(())
1730 }
1731
1732 #[test]
1734 fn test_replay_mutations_preserves_edge_type_name() -> Result<()> {
1735 use crate::runtime::wal::Mutation;
1736
1737 let mut l0 = L0Buffer::new(0, None);
1738 let src = Vid::new(1);
1739 let dst = Vid::new(2);
1740 let eid = Eid::new(500);
1741 let edge_type = 100;
1742
1743 let mutations = vec![Mutation::InsertEdge {
1745 src_vid: src,
1746 dst_vid: dst,
1747 edge_type,
1748 eid,
1749 version: 1,
1750 properties: {
1751 let mut props = HashMap::new();
1752 props.insert("since".to_string(), uni_common::Value::Int(2020));
1753 props
1754 },
1755 edge_type_name: Some("KNOWS".to_string()),
1756 }];
1757
1758 l0.replay_mutations(mutations)?;
1760
1761 assert!(l0.edge_endpoints.contains_key(&eid));
1763
1764 let type_name = l0.get_edge_type(eid).expect("Edge type name should exist");
1766 assert_eq!(type_name, "KNOWS");
1767
1768 let knows_eids = l0.eids_for_type("KNOWS");
1770 assert_eq!(knows_eids.len(), 1);
1771 assert_eq!(knows_eids[0], eid);
1772
1773 Ok(())
1774 }
1775
1776 #[test]
1778 fn test_edge_type_mapping_survives_multiple_replays() -> Result<()> {
1779 use crate::runtime::wal::Mutation;
1780
1781 let mut l0 = L0Buffer::new(0, None);
1782
1783 let mutations = vec![
1785 Mutation::InsertEdge {
1786 src_vid: Vid::new(1),
1787 dst_vid: Vid::new(2),
1788 edge_type: 100,
1789 eid: Eid::new(1000),
1790 version: 1,
1791 properties: HashMap::new(),
1792 edge_type_name: Some("KNOWS".to_string()),
1793 },
1794 Mutation::InsertEdge {
1795 src_vid: Vid::new(2),
1796 dst_vid: Vid::new(3),
1797 edge_type: 101,
1798 eid: Eid::new(1001),
1799 version: 2,
1800 properties: HashMap::new(),
1801 edge_type_name: Some("LIKES".to_string()),
1802 },
1803 Mutation::InsertEdge {
1804 src_vid: Vid::new(3),
1805 dst_vid: Vid::new(1),
1806 edge_type: 100,
1807 eid: Eid::new(1002),
1808 version: 3,
1809 properties: HashMap::new(),
1810 edge_type_name: Some("KNOWS".to_string()),
1811 },
1812 ];
1813
1814 l0.replay_mutations(mutations)?;
1815
1816 assert_eq!(l0.get_edge_type(Eid::new(1000)), Some("KNOWS"));
1818 assert_eq!(l0.get_edge_type(Eid::new(1001)), Some("LIKES"));
1819 assert_eq!(l0.get_edge_type(Eid::new(1002)), Some("KNOWS"));
1820
1821 let knows_edges = l0.eids_for_type("KNOWS");
1823 assert_eq!(knows_edges.len(), 2);
1824 assert!(knows_edges.contains(&Eid::new(1000)));
1825 assert!(knows_edges.contains(&Eid::new(1002)));
1826
1827 let likes_edges = l0.eids_for_type("LIKES");
1828 assert_eq!(likes_edges.len(), 1);
1829 assert_eq!(likes_edges[0], Eid::new(1001));
1830
1831 Ok(())
1832 }
1833
1834 #[test]
1836 fn test_replay_mutations_combined_labels_and_edge_types() -> Result<()> {
1837 use crate::runtime::wal::Mutation;
1838
1839 let mut l0 = L0Buffer::new(0, None);
1840 let alice = Vid::new(1);
1841 let bob = Vid::new(2);
1842 let eid = Eid::new(100);
1843
1844 let mutations = vec![
1846 Mutation::InsertVertex {
1848 vid: alice,
1849 properties: {
1850 let mut props = HashMap::new();
1851 props.insert(
1852 "name".to_string(),
1853 uni_common::Value::String("Alice".to_string()),
1854 );
1855 props
1856 },
1857 labels: vec!["Person".to_string()],
1858 },
1859 Mutation::InsertVertex {
1861 vid: bob,
1862 properties: {
1863 let mut props = HashMap::new();
1864 props.insert(
1865 "name".to_string(),
1866 uni_common::Value::String("Bob".to_string()),
1867 );
1868 props
1869 },
1870 labels: vec!["Person".to_string()],
1871 },
1872 Mutation::InsertEdge {
1874 src_vid: alice,
1875 dst_vid: bob,
1876 edge_type: 1,
1877 eid,
1878 version: 3,
1879 properties: HashMap::new(),
1880 edge_type_name: Some("KNOWS".to_string()),
1881 },
1882 ];
1883
1884 l0.replay_mutations(mutations)?;
1886
1887 assert_eq!(l0.get_vertex_labels(alice).unwrap().len(), 1);
1889 assert_eq!(l0.get_vertex_labels(bob).unwrap().len(), 1);
1890 assert_eq!(l0.vids_for_label("Person").len(), 2);
1891
1892 assert_eq!(l0.get_edge_type(eid).unwrap(), "KNOWS");
1894 assert_eq!(l0.eids_for_type("KNOWS").len(), 1);
1895
1896 let alice_neighbors = l0.get_neighbors(alice, 1, Direction::Outgoing);
1898 assert_eq!(alice_neighbors.len(), 1);
1899 assert_eq!(alice_neighbors[0].0, bob);
1900
1901 Ok(())
1902 }
1903
1904 #[test]
1906 fn test_replay_mutations_backward_compat_empty_labels() -> Result<()> {
1907 use crate::runtime::wal::Mutation;
1908
1909 let mut l0 = L0Buffer::new(0, None);
1910 let vid = Vid::new(1);
1911
1912 let mutations = vec![Mutation::InsertVertex {
1915 vid,
1916 properties: HashMap::new(),
1917 labels: vec![], }];
1919
1920 l0.replay_mutations(mutations)?;
1921
1922 assert!(l0.vertex_properties.contains_key(&vid));
1924
1925 let labels = l0.get_vertex_labels(vid);
1927 assert!(labels.is_some(), "Labels entry should exist even if empty");
1928 assert_eq!(labels.unwrap().len(), 0);
1929
1930 Ok(())
1931 }
1932
1933 #[test]
1934 fn test_now_nanos_returns_nanosecond_range() {
1935 let now = now_nanos();
1939
1940 assert!(
1942 now > 1_700_000_000_000_000_000,
1943 "now_nanos() returned {}, expected > 1.7e18 for nanoseconds",
1944 now
1945 );
1946
1947 assert!(
1949 now < 4_100_000_000_000_000_000,
1950 "now_nanos() returned {}, expected < 4.1e18",
1951 now
1952 );
1953 }
1954}