1use crate::runtime::wal::{Mutation, WriteAheadLog};
5use anyhow::Result;
6use std::collections::{HashMap, HashSet};
7use std::sync::Arc;
8use std::time::{SystemTime, UNIX_EPOCH};
9use tracing::{instrument, trace};
10use uni_common::core::id::{Eid, Vid};
11use uni_common::graph::simple_graph::{Direction, SimpleGraph};
12use uni_common::{Properties, Value};
13use uni_crdt::Crdt;
14
15fn now_nanos() -> i64 {
17 SystemTime::now()
18 .duration_since(UNIX_EPOCH)
19 .map(|d| d.as_nanos() as i64)
20 .unwrap_or(0)
21}
22
23pub fn serialize_constraint_key(label: &str, key_values: &[(String, Value)]) -> Vec<u8> {
26 let mut buf = label.as_bytes().to_vec();
27 buf.push(0); let mut sorted = key_values.to_vec();
29 sorted.sort_by(|a, b| a.0.cmp(&b.0));
30 for (k, v) in &sorted {
31 buf.extend(k.as_bytes());
32 buf.push(0);
33 buf.extend(serde_json::to_vec(v).unwrap_or_default());
35 buf.push(0);
36 }
37 buf
38}
39
40#[derive(Clone, Debug)]
41pub struct TombstoneEntry {
42 pub eid: Eid,
43 pub src_vid: Vid,
44 pub dst_vid: Vid,
45 pub edge_type: u32,
46}
47
48pub struct L0Buffer {
49 pub graph: SimpleGraph,
51 pub tombstones: HashMap<Eid, TombstoneEntry>,
53 pub vertex_tombstones: HashSet<Vid>,
55 pub edge_versions: HashMap<Eid, u64>,
57 pub vertex_versions: HashMap<Vid, u64>,
59 pub edge_properties: HashMap<Eid, Properties>,
61 pub vertex_properties: HashMap<Vid, Properties>,
63 pub edge_endpoints: HashMap<Eid, (Vid, Vid, u32)>,
65 pub vertex_labels: HashMap<Vid, Vec<String>>,
68 pub edge_types: HashMap<Eid, String>,
70 pub current_version: u64,
72 pub mutation_count: usize,
74 pub wal: Option<Arc<WriteAheadLog>>,
76 pub wal_lsn_at_flush: u64,
79 pub vertex_created_at: HashMap<Vid, i64>,
81 pub vertex_updated_at: HashMap<Vid, i64>,
83 pub edge_created_at: HashMap<Eid, i64>,
85 pub edge_updated_at: HashMap<Eid, i64>,
87 pub estimated_size: usize,
90 pub constraint_index: HashMap<Vec<u8>, Vid>,
94}
95
96impl std::fmt::Debug for L0Buffer {
97 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
98 f.debug_struct("L0Buffer")
99 .field("vertex_count", &self.graph.vertex_count())
100 .field("edge_count", &self.graph.edge_count())
101 .field("tombstones", &self.tombstones.len())
102 .field("vertex_tombstones", &self.vertex_tombstones.len())
103 .field("current_version", &self.current_version)
104 .field("mutation_count", &self.mutation_count)
105 .finish()
106 }
107}
108
109impl L0Buffer {
110 fn append_unique_labels(existing: &mut Vec<String>, labels: &[String]) {
112 for label in labels {
113 if !existing.contains(label) {
114 existing.push(label.clone());
115 }
116 }
117 }
118
119 fn merge_crdt_properties(entry: &mut Properties, properties: Properties) {
122 for (k, v) in properties {
123 let json_v: serde_json::Value = v.clone().into();
125 if let Ok(mut new_crdt) = serde_json::from_value::<Crdt>(json_v)
126 && let Some(existing_v) = entry.get(&k)
127 && let Ok(existing_crdt) = serde_json::from_value::<Crdt>(existing_v.clone().into())
128 {
129 if new_crdt.try_merge(&existing_crdt).is_ok()
131 && let Ok(merged_json) = serde_json::to_value(new_crdt)
132 {
133 entry.insert(k, uni_common::Value::from(merged_json));
134 continue;
135 }
136 }
138 entry.insert(k, v);
140 }
141 }
142
143 fn estimate_properties_size(props: &Properties) -> usize {
145 props.keys().map(|k| k.len() + 32).sum()
146 }
147
148 pub fn size_bytes(&self) -> usize {
151 let mut total = 0;
152
153 total += self.graph.vertex_count() * 8;
155 total += self.graph.edge_count() * 24;
156
157 for props in self.vertex_properties.values() {
159 total += Self::estimate_properties_size(props);
160 }
161 for props in self.edge_properties.values() {
162 total += Self::estimate_properties_size(props);
163 }
164
165 total += self.tombstones.len() * 64;
167 total += self.vertex_tombstones.len() * 8;
168 total += self.edge_versions.len() * 16;
169 total += self.vertex_versions.len() * 16;
170 total += self.edge_endpoints.len() * 28; for labels in self.vertex_labels.values() {
174 total += labels.iter().map(|l| l.len() + 24).sum::<usize>();
175 }
176
177 for type_name in self.edge_types.values() {
179 total += type_name.len() + 24;
180 }
181
182 total += self.vertex_created_at.len() * 16;
184 total += self.vertex_updated_at.len() * 16;
185 total += self.edge_created_at.len() * 16;
186 total += self.edge_updated_at.len() * 16;
187
188 total
189 }
190
191 pub fn new(start_version: u64, wal: Option<Arc<WriteAheadLog>>) -> Self {
192 Self {
193 graph: SimpleGraph::new(),
194 tombstones: HashMap::new(),
195 vertex_tombstones: HashSet::new(),
196 edge_versions: HashMap::new(),
197 vertex_versions: HashMap::new(),
198 edge_properties: HashMap::new(),
199 vertex_properties: HashMap::new(),
200 edge_endpoints: HashMap::new(),
201 vertex_labels: HashMap::new(),
202 edge_types: HashMap::new(),
203 current_version: start_version,
204 mutation_count: 0,
205 wal,
206 wal_lsn_at_flush: 0,
207 vertex_created_at: HashMap::new(),
208 vertex_updated_at: HashMap::new(),
209 edge_created_at: HashMap::new(),
210 edge_updated_at: HashMap::new(),
211 estimated_size: 0,
212 constraint_index: HashMap::new(),
213 }
214 }
215
216 pub fn insert_vertex(&mut self, vid: Vid, properties: Properties) {
217 self.insert_vertex_with_labels(vid, properties, &[]);
218 }
219
220 pub fn insert_vertex_with_labels(
222 &mut self,
223 vid: Vid,
224 properties: Properties,
225 labels: &[String],
226 ) {
227 self.current_version += 1;
228 let version = self.current_version;
229 let now = now_nanos();
230
231 if let Some(wal) = &self.wal {
232 let _ = wal.append(&Mutation::InsertVertex {
233 vid,
234 properties: properties.clone(),
235 labels: labels.to_vec(),
236 });
237 }
238
239 self.vertex_tombstones.remove(&vid);
240
241 let entry = self.vertex_properties.entry(vid).or_default();
242 Self::merge_crdt_properties(entry, properties.clone());
243 self.vertex_versions.insert(vid, version);
244
245 self.vertex_created_at.entry(vid).or_insert(now);
247 self.vertex_updated_at.insert(vid, now);
248
249 let labels_size: usize = labels.iter().map(|l| l.len() + 24).sum();
252 let existing = self.vertex_labels.entry(vid).or_default();
253 Self::append_unique_labels(existing, labels);
254
255 self.graph.add_vertex(vid);
256 self.mutation_count += 1;
257
258 let props_size = Self::estimate_properties_size(&properties);
259 self.estimated_size += 8 + props_size + 16 + labels_size + 32;
260 }
261
262 pub fn add_vertex_labels(&mut self, vid: Vid, labels: &[String]) {
264 let existing = self.vertex_labels.entry(vid).or_default();
265 Self::append_unique_labels(existing, labels);
266 }
267
268 pub fn remove_vertex_label(&mut self, vid: Vid, label: &str) -> bool {
271 if let Some(labels) = self.vertex_labels.get_mut(&vid)
272 && let Some(pos) = labels.iter().position(|l| l == label)
273 {
274 labels.remove(pos);
275 self.current_version += 1;
276 self.mutation_count += 1;
277 return true;
280 }
281 false
282 }
283
284 pub fn set_edge_type(&mut self, eid: Eid, edge_type: String) {
286 self.edge_types.insert(eid, edge_type);
287 }
288
289 pub fn delete_vertex(&mut self, vid: Vid) -> Result<()> {
290 self.current_version += 1;
291
292 if let Some(wal) = &mut self.wal {
293 let labels = self.vertex_labels.get(&vid).cloned().unwrap_or_default();
294 wal.append(&Mutation::DeleteVertex { vid, labels })?;
295 }
296
297 self.apply_vertex_deletion(vid);
298 Ok(())
299 }
300
301 fn apply_vertex_deletion(&mut self, vid: Vid) {
305 let version = self.current_version;
306
307 let mut edges_to_remove = HashSet::new();
309
310 for entry in self.graph.neighbors(vid, Direction::Outgoing) {
312 edges_to_remove.insert(entry.eid);
313 }
314
315 for entry in self.graph.neighbors(vid, Direction::Incoming) {
317 edges_to_remove.insert(entry.eid); }
319
320 let cascaded_edges_count = edges_to_remove.len();
321
322 for eid in edges_to_remove {
324 if let Some((src, dst, etype)) = self.edge_endpoints.get(&eid) {
326 self.tombstones.insert(
327 eid,
328 TombstoneEntry {
329 eid,
330 src_vid: *src,
331 dst_vid: *dst,
332 edge_type: *etype,
333 },
334 );
335 self.edge_versions.insert(eid, version);
336 self.edge_endpoints.remove(&eid);
337 self.edge_properties.remove(&eid);
338 self.graph.remove_edge(eid);
339 self.mutation_count += 1;
340 }
341 }
342
343 self.vertex_tombstones.insert(vid);
344 self.vertex_properties.remove(&vid);
345 self.vertex_versions.insert(vid, version);
346 self.graph.remove_vertex(vid);
347 self.mutation_count += 1;
348
349 self.constraint_index.retain(|_, v| *v != vid);
351
352 self.estimated_size += cascaded_edges_count * 72 + 8;
354 }
355
356 pub fn insert_edge(
357 &mut self,
358 src_vid: Vid,
359 dst_vid: Vid,
360 edge_type: u32,
361 eid: Eid,
362 properties: Properties,
363 edge_type_name: Option<String>,
364 ) -> Result<()> {
365 self.current_version += 1;
366 let now = now_nanos();
367
368 if let Some(wal) = &mut self.wal {
369 wal.append(&Mutation::InsertEdge {
370 src_vid,
371 dst_vid,
372 edge_type,
373 eid,
374 version: self.current_version,
375 properties: properties.clone(),
376 edge_type_name: edge_type_name.clone(),
377 })?;
378 }
379
380 self.apply_edge_insertion(src_vid, dst_vid, edge_type, eid, properties)?;
381
382 let type_name_size = if let Some(ref name) = edge_type_name {
384 let size = name.len() + 24;
385 self.edge_types.insert(eid, name.clone());
386 size
387 } else {
388 0
389 };
390
391 self.edge_created_at.entry(eid).or_insert(now);
393 self.edge_updated_at.insert(eid, now);
394
395 self.estimated_size += type_name_size;
396
397 Ok(())
398 }
399
400 fn apply_edge_insertion(
409 &mut self,
410 src_vid: Vid,
411 dst_vid: Vid,
412 edge_type: u32,
413 eid: Eid,
414 properties: Properties,
415 ) -> Result<()> {
416 let version = self.current_version;
417
418 if self.vertex_tombstones.contains(&src_vid) {
421 anyhow::bail!(
422 "Cannot insert edge: source vertex {} has been deleted (issue #77)",
423 src_vid
424 );
425 }
426 if self.vertex_tombstones.contains(&dst_vid) {
427 anyhow::bail!(
428 "Cannot insert edge: destination vertex {} has been deleted (issue #77)",
429 dst_vid
430 );
431 }
432
433 if !self.graph.contains_vertex(src_vid) {
438 self.graph.add_vertex(src_vid);
439 }
440 if !self.graph.contains_vertex(dst_vid) {
441 self.graph.add_vertex(dst_vid);
442 }
443
444 self.graph.add_edge(src_vid, dst_vid, eid, edge_type);
445
446 let props_size = Self::estimate_properties_size(&properties);
448 if !properties.is_empty() {
449 let entry = self.edge_properties.entry(eid).or_default();
450 Self::merge_crdt_properties(entry, properties);
451 }
452
453 self.edge_versions.insert(eid, version);
454 self.edge_endpoints
455 .insert(eid, (src_vid, dst_vid, edge_type));
456 self.tombstones.remove(&eid);
457 self.mutation_count += 1;
458
459 self.estimated_size += 24 + props_size + 16 + 28 + 32;
461
462 Ok(())
463 }
464
465 pub fn delete_edge(
466 &mut self,
467 eid: Eid,
468 src_vid: Vid,
469 dst_vid: Vid,
470 edge_type: u32,
471 ) -> Result<()> {
472 self.current_version += 1;
473 let now = now_nanos();
474
475 if let Some(wal) = &mut self.wal {
476 wal.append(&Mutation::DeleteEdge {
477 eid,
478 src_vid,
479 dst_vid,
480 edge_type,
481 version: self.current_version,
482 })?;
483 }
484
485 self.apply_edge_deletion(eid, src_vid, dst_vid, edge_type);
486
487 self.edge_updated_at.insert(eid, now);
489
490 Ok(())
491 }
492
493 fn apply_edge_deletion(&mut self, eid: Eid, src_vid: Vid, dst_vid: Vid, edge_type: u32) {
497 let version = self.current_version;
498
499 self.tombstones.insert(
500 eid,
501 TombstoneEntry {
502 eid,
503 src_vid,
504 dst_vid,
505 edge_type,
506 },
507 );
508 self.edge_versions.insert(eid, version);
509 self.graph.remove_edge(eid);
510 self.mutation_count += 1;
511
512 self.estimated_size += 80;
514 }
515
516 pub fn get_neighbors(
519 &self,
520 vid: Vid,
521 edge_type: u32,
522 direction: Direction,
523 ) -> Vec<(Vid, Eid, u64)> {
524 let edges = self.graph.neighbors(vid, direction);
525
526 edges
527 .iter()
528 .filter(|e| e.edge_type == edge_type && !self.is_tombstoned(e.eid))
529 .map(|e| {
530 let neighbor = match direction {
531 Direction::Outgoing => e.dst_vid,
532 Direction::Incoming => e.src_vid,
533 };
534 let version = self.edge_versions.get(&e.eid).copied().unwrap_or(0);
535 (neighbor, e.eid, version)
536 })
537 .collect()
538 }
539
540 pub fn is_tombstoned(&self, eid: Eid) -> bool {
541 self.tombstones.contains_key(&eid)
542 }
543
544 pub fn vids_for_label(&self, label_name: &str) -> Vec<Vid> {
547 self.vertex_labels
548 .iter()
549 .filter(|(_, labels)| labels.iter().any(|l| l == label_name))
550 .map(|(vid, _)| *vid)
551 .collect()
552 }
553
554 pub fn all_vertex_vids(&self) -> Vec<Vid> {
558 self.vertex_properties.keys().copied().collect()
559 }
560
561 pub fn vids_for_labels(&self, label_names: &[&str]) -> Vec<Vid> {
564 self.vertex_labels
565 .iter()
566 .filter(|(_, labels)| label_names.iter().any(|ln| labels.iter().any(|l| l == *ln)))
567 .map(|(vid, _)| *vid)
568 .collect()
569 }
570
571 pub fn vids_with_all_labels(&self, label_names: &[&str]) -> Vec<Vid> {
573 self.vertex_labels
574 .iter()
575 .filter(|(_, labels)| label_names.iter().all(|ln| labels.iter().any(|l| l == *ln)))
576 .map(|(vid, _)| *vid)
577 .collect()
578 }
579
580 pub fn get_vertex_labels(&self, vid: Vid) -> Option<&[String]> {
582 self.vertex_labels.get(&vid).map(|v| v.as_slice())
583 }
584
585 pub fn get_edge_type(&self, eid: Eid) -> Option<&str> {
587 self.edge_types.get(&eid).map(|s| s.as_str())
588 }
589
590 pub fn eids_for_type(&self, type_name: &str) -> Vec<Eid> {
593 self.edge_types
594 .iter()
595 .filter(|(eid, etype)| *etype == type_name && !self.tombstones.contains_key(eid))
596 .map(|(eid, _)| *eid)
597 .collect()
598 }
599
600 pub fn all_edge_eids(&self) -> Vec<Eid> {
604 self.edge_endpoints
605 .keys()
606 .filter(|eid| !self.tombstones.contains_key(eid))
607 .copied()
608 .collect()
609 }
610
611 pub fn get_edge_endpoints(&self, eid: Eid) -> Option<(Vid, Vid)> {
613 self.edge_endpoints
614 .get(&eid)
615 .map(|(src, dst, _)| (*src, *dst))
616 }
617
618 pub fn get_edge_endpoint_full(&self, eid: Eid) -> Option<(Vid, Vid, u32)> {
620 self.edge_endpoints.get(&eid).copied()
621 }
622
623 pub fn insert_constraint_key(&mut self, key: Vec<u8>, vid: Vid) {
625 self.constraint_index.insert(key, vid);
626 }
627
628 pub fn has_constraint_key(&self, key: &[u8], exclude_vid: Vid) -> bool {
631 self.constraint_index
632 .get(key)
633 .is_some_and(|&v| v != exclude_vid)
634 }
635
636 #[instrument(skip(self, other), level = "trace")]
637 pub fn merge(&mut self, other: &L0Buffer) -> Result<()> {
638 trace!(
639 other_mutation_count = other.mutation_count,
640 "Merging L0 buffer"
641 );
642 for &vid in &other.vertex_tombstones {
644 self.delete_vertex(vid)?;
645 }
646
647 for (vid, props) in &other.vertex_properties {
648 let labels = other.vertex_labels.get(vid).cloned().unwrap_or_default();
649 self.insert_vertex_with_labels(*vid, props.clone(), &labels);
650 }
651
652 for (vid, labels) in &other.vertex_labels {
654 if !self.vertex_labels.contains_key(vid) {
655 self.vertex_labels.insert(*vid, labels.clone());
656 }
657 }
658
659 for (eid, (src, dst, etype)) in &other.edge_endpoints {
661 if other.tombstones.contains_key(eid) {
662 self.delete_edge(*eid, *src, *dst, *etype)?;
663 } else {
664 let props = other.edge_properties.get(eid).cloned().unwrap_or_default();
665 let etype_name = other.edge_types.get(eid).cloned();
666 self.insert_edge(*src, *dst, *etype, *eid, props, etype_name)?;
667 }
668 }
669
670 for (vid, ts) in &other.vertex_created_at {
675 self.vertex_created_at.entry(*vid).or_insert(*ts); }
677 for (vid, ts) in &other.vertex_updated_at {
678 self.vertex_updated_at.insert(*vid, *ts); }
680
681 for (eid, ts) in &other.edge_created_at {
682 self.edge_created_at.entry(*eid).or_insert(*ts); }
684 for (eid, ts) in &other.edge_updated_at {
685 self.edge_updated_at.insert(*eid, *ts); }
687
688 self.estimated_size += other.estimated_size;
691
692 for (key, vid) in &other.constraint_index {
694 self.constraint_index.insert(key.clone(), *vid);
695 }
696
697 Ok(())
698 }
699
700 #[instrument(skip(self, mutations), level = "debug")]
704 pub fn replay_mutations(&mut self, mutations: Vec<Mutation>) -> Result<()> {
705 trace!(count = mutations.len(), "Replaying mutations");
706 for mutation in mutations {
707 match mutation {
708 Mutation::InsertVertex {
709 vid,
710 properties,
711 labels,
712 } => {
713 self.current_version += 1;
715 let version = self.current_version;
716
717 self.vertex_tombstones.remove(&vid);
718 let entry = self.vertex_properties.entry(vid).or_default();
719 Self::merge_crdt_properties(entry, properties);
720 self.vertex_versions.insert(vid, version);
721 self.graph.add_vertex(vid);
722 self.mutation_count += 1;
723
724 let existing = self.vertex_labels.entry(vid).or_default();
726 Self::append_unique_labels(existing, &labels);
727 }
728 Mutation::DeleteVertex { vid, labels } => {
729 self.current_version += 1;
730 if !labels.is_empty() {
732 let existing = self.vertex_labels.entry(vid).or_default();
733 Self::append_unique_labels(existing, &labels);
734 }
735 self.apply_vertex_deletion(vid);
736 }
737 Mutation::InsertEdge {
738 src_vid,
739 dst_vid,
740 edge_type,
741 eid,
742 version: _,
743 properties,
744 edge_type_name,
745 } => {
746 self.current_version += 1;
747 self.apply_edge_insertion(src_vid, dst_vid, edge_type, eid, properties)?;
748 if let Some(name) = edge_type_name {
750 self.edge_types.insert(eid, name);
751 }
752 }
753 Mutation::DeleteEdge {
754 eid,
755 src_vid,
756 dst_vid,
757 edge_type,
758 version: _,
759 } => {
760 self.current_version += 1;
761 self.apply_edge_deletion(eid, src_vid, dst_vid, edge_type);
762 }
763 }
764 }
765 Ok(())
766 }
767}
768
769#[cfg(test)]
770mod tests {
771 use super::*;
772
773 #[test]
774 fn test_l0_buffer_ops() -> Result<()> {
775 let mut l0 = L0Buffer::new(0, None);
776 let vid_a = Vid::new(1);
777 let vid_b = Vid::new(2);
778 let eid_ab = Eid::new(101);
779
780 l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
781
782 let neighbors = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
783 assert_eq!(neighbors.len(), 1);
784 assert_eq!(neighbors[0].0, vid_b);
785 assert_eq!(neighbors[0].1, eid_ab);
786
787 l0.delete_edge(eid_ab, vid_a, vid_b, 1)?;
788 assert!(l0.is_tombstoned(eid_ab));
789
790 let neighbors_after = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
792 assert_eq!(neighbors_after.len(), 0);
793
794 Ok(())
795 }
796
797 #[test]
798 fn test_l0_buffer_multiple_edges() -> Result<()> {
799 let mut l0 = L0Buffer::new(0, None);
800 let vid_a = Vid::new(1);
801 let vid_b = Vid::new(2);
802 let vid_c = Vid::new(3);
803 let eid_ab = Eid::new(101);
804 let eid_ac = Eid::new(102);
805
806 l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
807 l0.insert_edge(vid_a, vid_c, 1, eid_ac, HashMap::new(), None)?;
808
809 let neighbors = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
810 assert_eq!(neighbors.len(), 2);
811
812 l0.delete_edge(eid_ab, vid_a, vid_b, 1)?;
814
815 let neighbors_after = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
817 assert_eq!(neighbors_after.len(), 1);
818 assert_eq!(neighbors_after[0].0, vid_c);
819
820 Ok(())
821 }
822
823 #[test]
824 fn test_l0_buffer_edge_type_filter() -> Result<()> {
825 let mut l0 = L0Buffer::new(0, None);
826 let vid_a = Vid::new(1);
827 let vid_b = Vid::new(2);
828 let vid_c = Vid::new(3);
829 let eid_ab = Eid::new(101);
830 let eid_ac = Eid::new(201); l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
833 l0.insert_edge(vid_a, vid_c, 2, eid_ac, HashMap::new(), None)?;
834
835 let type1_neighbors = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
837 assert_eq!(type1_neighbors.len(), 1);
838 assert_eq!(type1_neighbors[0].0, vid_b);
839
840 let type2_neighbors = l0.get_neighbors(vid_a, 2, Direction::Outgoing);
842 assert_eq!(type2_neighbors.len(), 1);
843 assert_eq!(type2_neighbors[0].0, vid_c);
844
845 Ok(())
846 }
847
848 #[test]
849 fn test_l0_buffer_incoming_edges() -> Result<()> {
850 let mut l0 = L0Buffer::new(0, None);
851 let vid_a = Vid::new(1);
852 let vid_b = Vid::new(2);
853 let vid_c = Vid::new(3);
854 let eid_ab = Eid::new(101);
855 let eid_cb = Eid::new(102);
856
857 l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
859 l0.insert_edge(vid_c, vid_b, 1, eid_cb, HashMap::new(), None)?;
860
861 let incoming = l0.get_neighbors(vid_b, 1, Direction::Incoming);
863 assert_eq!(incoming.len(), 2);
864
865 Ok(())
866 }
867
868 #[test]
870 fn test_merge_empty_props_edge() -> Result<()> {
871 let mut main_l0 = L0Buffer::new(0, None);
872 let mut tx_l0 = L0Buffer::new(0, None);
873
874 let vid_a = Vid::new(1);
875 let vid_b = Vid::new(2);
876 let eid_ab = Eid::new(101);
877
878 tx_l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
880
881 assert!(tx_l0.edge_endpoints.contains_key(&eid_ab));
883 assert!(!tx_l0.edge_properties.contains_key(&eid_ab)); main_l0.merge(&tx_l0)?;
887
888 assert!(main_l0.edge_endpoints.contains_key(&eid_ab));
890 let neighbors = main_l0.get_neighbors(vid_a, 1, Direction::Outgoing);
891 assert_eq!(neighbors.len(), 1);
892 assert_eq!(neighbors[0].0, vid_b);
893
894 Ok(())
895 }
896
897 #[test]
899 fn test_replay_crdt_merge() -> Result<()> {
900 use crate::runtime::wal::Mutation;
901 use serde_json::json;
902 use uni_common::Value;
903
904 let mut l0 = L0Buffer::new(0, None);
905 let vid = Vid::new(1);
906
907 let counter1: Value = json!({
910 "t": "gc",
911 "d": {"counts": {"node1": 5}}
912 })
913 .into();
914 let counter2: Value = json!({
915 "t": "gc",
916 "d": {"counts": {"node2": 3}}
917 })
918 .into();
919
920 let mut props1 = HashMap::new();
922 props1.insert("counter".to_string(), counter1.clone());
923 l0.replay_mutations(vec![Mutation::InsertVertex {
924 vid,
925 properties: props1,
926 labels: vec![],
927 }])?;
928
929 let mut props2 = HashMap::new();
931 props2.insert("counter".to_string(), counter2.clone());
932 l0.replay_mutations(vec![Mutation::InsertVertex {
933 vid,
934 properties: props2,
935 labels: vec![],
936 }])?;
937
938 let stored_props = l0.vertex_properties.get(&vid).unwrap();
940 let stored_counter = stored_props.get("counter").unwrap();
941
942 let stored_json: serde_json::Value = stored_counter.clone().into();
944 let data = stored_json.get("d").unwrap();
946 let counts = data.get("counts").unwrap();
947 assert_eq!(counts.get("node1"), Some(&json!(5)));
948 assert_eq!(counts.get("node2"), Some(&json!(3)));
949
950 Ok(())
951 }
952
953 #[test]
954 fn test_merge_preserves_vertex_timestamps() -> Result<()> {
955 let mut l0_main = L0Buffer::new(0, None);
956 let mut l0_tx = L0Buffer::new(0, None);
957 let vid = Vid::new(1);
958
959 let ts_main_created = 1000;
961 let ts_main_updated = 1100;
962 l0_main.insert_vertex(vid, HashMap::new());
963 l0_main.vertex_created_at.insert(vid, ts_main_created);
964 l0_main.vertex_updated_at.insert(vid, ts_main_updated);
965
966 let ts_tx_created = 2000; let ts_tx_updated = 2100; l0_tx.insert_vertex(vid, HashMap::new());
970 l0_tx.vertex_created_at.insert(vid, ts_tx_created);
971 l0_tx.vertex_updated_at.insert(vid, ts_tx_updated);
972
973 l0_main.merge(&l0_tx)?;
975
976 assert_eq!(
978 *l0_main.vertex_created_at.get(&vid).unwrap(),
979 ts_main_created,
980 "created_at should preserve oldest timestamp"
981 );
982
983 assert_eq!(
985 *l0_main.vertex_updated_at.get(&vid).unwrap(),
986 ts_tx_updated,
987 "updated_at should use latest timestamp"
988 );
989
990 Ok(())
991 }
992
993 #[test]
994 fn test_merge_preserves_edge_timestamps() -> Result<()> {
995 let mut l0_main = L0Buffer::new(0, None);
996 let mut l0_tx = L0Buffer::new(0, None);
997 let vid_a = Vid::new(1);
998 let vid_b = Vid::new(2);
999 let eid = Eid::new(100);
1000
1001 let ts_main_created = 1000;
1003 let ts_main_updated = 1100;
1004 l0_main.insert_edge(vid_a, vid_b, 1, eid, HashMap::new(), None)?;
1005 l0_main.edge_created_at.insert(eid, ts_main_created);
1006 l0_main.edge_updated_at.insert(eid, ts_main_updated);
1007
1008 let ts_tx_created = 2000; let ts_tx_updated = 2100; l0_tx.insert_edge(vid_a, vid_b, 1, eid, HashMap::new(), None)?;
1012 l0_tx.edge_created_at.insert(eid, ts_tx_created);
1013 l0_tx.edge_updated_at.insert(eid, ts_tx_updated);
1014
1015 l0_main.merge(&l0_tx)?;
1017
1018 assert_eq!(
1020 *l0_main.edge_created_at.get(&eid).unwrap(),
1021 ts_main_created,
1022 "edge created_at should preserve oldest timestamp"
1023 );
1024
1025 assert_eq!(
1027 *l0_main.edge_updated_at.get(&eid).unwrap(),
1028 ts_tx_updated,
1029 "edge updated_at should use latest timestamp"
1030 );
1031
1032 Ok(())
1033 }
1034
1035 #[test]
1036 fn test_merge_created_at_not_overwritten_for_existing_vertex() -> Result<()> {
1037 use uni_common::Value;
1038
1039 let mut l0_main = L0Buffer::new(0, None);
1040 let mut l0_tx = L0Buffer::new(0, None);
1041 let vid = Vid::new(1);
1042
1043 let ts_original = 1000;
1045 l0_main.insert_vertex(vid, HashMap::new());
1046 l0_main.vertex_created_at.insert(vid, ts_original);
1047 l0_main.vertex_updated_at.insert(vid, ts_original);
1048
1049 let ts_tx = 2000;
1051 let mut props = HashMap::new();
1052 props.insert("updated".to_string(), Value::String("yes".to_string()));
1053 l0_tx.insert_vertex(vid, props);
1054 l0_tx.vertex_created_at.insert(vid, ts_tx);
1055 l0_tx.vertex_updated_at.insert(vid, ts_tx);
1056
1057 l0_main.merge(&l0_tx)?;
1059
1060 assert_eq!(
1062 *l0_main.vertex_created_at.get(&vid).unwrap(),
1063 ts_original,
1064 "created_at must not be overwritten for existing vertex"
1065 );
1066
1067 assert_eq!(
1069 *l0_main.vertex_updated_at.get(&vid).unwrap(),
1070 ts_tx,
1071 "updated_at should reflect transaction timestamp"
1072 );
1073
1074 assert!(
1076 l0_main
1077 .vertex_properties
1078 .get(&vid)
1079 .unwrap()
1080 .contains_key("updated")
1081 );
1082
1083 Ok(())
1084 }
1085
1086 #[test]
1088 fn test_replay_mutations_preserves_vertex_labels() -> Result<()> {
1089 use crate::runtime::wal::Mutation;
1090
1091 let mut l0 = L0Buffer::new(0, None);
1092 let vid = Vid::new(42);
1093
1094 let mutations = vec![Mutation::InsertVertex {
1096 vid,
1097 properties: {
1098 let mut props = HashMap::new();
1099 props.insert(
1100 "name".to_string(),
1101 uni_common::Value::String("Alice".to_string()),
1102 );
1103 props
1104 },
1105 labels: vec!["Person".to_string(), "User".to_string()],
1106 }];
1107
1108 l0.replay_mutations(mutations)?;
1110
1111 assert!(l0.vertex_properties.contains_key(&vid));
1113
1114 let labels = l0.get_vertex_labels(vid).expect("Labels should exist");
1116 assert_eq!(labels.len(), 2);
1117 assert!(labels.contains(&"Person".to_string()));
1118 assert!(labels.contains(&"User".to_string()));
1119
1120 let person_vids = l0.vids_for_label("Person");
1122 assert_eq!(person_vids.len(), 1);
1123 assert_eq!(person_vids[0], vid);
1124
1125 let user_vids = l0.vids_for_label("User");
1126 assert_eq!(user_vids.len(), 1);
1127 assert_eq!(user_vids[0], vid);
1128
1129 Ok(())
1130 }
1131
1132 #[test]
1134 fn test_replay_mutations_preserves_delete_vertex_labels() -> Result<()> {
1135 use crate::runtime::wal::Mutation;
1136
1137 let mut l0 = L0Buffer::new(0, None);
1138 let vid = Vid::new(99);
1139
1140 l0.insert_vertex_with_labels(
1142 vid,
1143 HashMap::new(),
1144 &["Person".to_string(), "Admin".to_string()],
1145 );
1146
1147 assert!(l0.vertex_properties.contains_key(&vid));
1149 let labels = l0.get_vertex_labels(vid).expect("Labels should exist");
1150 assert_eq!(labels.len(), 2);
1151
1152 let mutations = vec![Mutation::DeleteVertex {
1154 vid,
1155 labels: vec!["Person".to_string(), "Admin".to_string()],
1156 }];
1157
1158 l0.replay_mutations(mutations)?;
1160
1161 assert!(l0.vertex_tombstones.contains(&vid));
1163
1164 let labels = l0.get_vertex_labels(vid);
1167 assert!(
1168 labels.is_some(),
1169 "Labels should be preserved even after deletion for tombstone flushing"
1170 );
1171
1172 Ok(())
1173 }
1174
1175 #[test]
1177 fn test_replay_mutations_preserves_edge_type_name() -> Result<()> {
1178 use crate::runtime::wal::Mutation;
1179
1180 let mut l0 = L0Buffer::new(0, None);
1181 let src = Vid::new(1);
1182 let dst = Vid::new(2);
1183 let eid = Eid::new(500);
1184 let edge_type = 100;
1185
1186 let mutations = vec![Mutation::InsertEdge {
1188 src_vid: src,
1189 dst_vid: dst,
1190 edge_type,
1191 eid,
1192 version: 1,
1193 properties: {
1194 let mut props = HashMap::new();
1195 props.insert("since".to_string(), uni_common::Value::Int(2020));
1196 props
1197 },
1198 edge_type_name: Some("KNOWS".to_string()),
1199 }];
1200
1201 l0.replay_mutations(mutations)?;
1203
1204 assert!(l0.edge_endpoints.contains_key(&eid));
1206
1207 let type_name = l0.get_edge_type(eid).expect("Edge type name should exist");
1209 assert_eq!(type_name, "KNOWS");
1210
1211 let knows_eids = l0.eids_for_type("KNOWS");
1213 assert_eq!(knows_eids.len(), 1);
1214 assert_eq!(knows_eids[0], eid);
1215
1216 Ok(())
1217 }
1218
1219 #[test]
1221 fn test_edge_type_mapping_survives_multiple_replays() -> Result<()> {
1222 use crate::runtime::wal::Mutation;
1223
1224 let mut l0 = L0Buffer::new(0, None);
1225
1226 let mutations = vec![
1228 Mutation::InsertEdge {
1229 src_vid: Vid::new(1),
1230 dst_vid: Vid::new(2),
1231 edge_type: 100,
1232 eid: Eid::new(1000),
1233 version: 1,
1234 properties: HashMap::new(),
1235 edge_type_name: Some("KNOWS".to_string()),
1236 },
1237 Mutation::InsertEdge {
1238 src_vid: Vid::new(2),
1239 dst_vid: Vid::new(3),
1240 edge_type: 101,
1241 eid: Eid::new(1001),
1242 version: 2,
1243 properties: HashMap::new(),
1244 edge_type_name: Some("LIKES".to_string()),
1245 },
1246 Mutation::InsertEdge {
1247 src_vid: Vid::new(3),
1248 dst_vid: Vid::new(1),
1249 edge_type: 100,
1250 eid: Eid::new(1002),
1251 version: 3,
1252 properties: HashMap::new(),
1253 edge_type_name: Some("KNOWS".to_string()),
1254 },
1255 ];
1256
1257 l0.replay_mutations(mutations)?;
1258
1259 assert_eq!(l0.get_edge_type(Eid::new(1000)), Some("KNOWS"));
1261 assert_eq!(l0.get_edge_type(Eid::new(1001)), Some("LIKES"));
1262 assert_eq!(l0.get_edge_type(Eid::new(1002)), Some("KNOWS"));
1263
1264 let knows_edges = l0.eids_for_type("KNOWS");
1266 assert_eq!(knows_edges.len(), 2);
1267 assert!(knows_edges.contains(&Eid::new(1000)));
1268 assert!(knows_edges.contains(&Eid::new(1002)));
1269
1270 let likes_edges = l0.eids_for_type("LIKES");
1271 assert_eq!(likes_edges.len(), 1);
1272 assert_eq!(likes_edges[0], Eid::new(1001));
1273
1274 Ok(())
1275 }
1276
1277 #[test]
1279 fn test_replay_mutations_combined_labels_and_edge_types() -> Result<()> {
1280 use crate::runtime::wal::Mutation;
1281
1282 let mut l0 = L0Buffer::new(0, None);
1283 let alice = Vid::new(1);
1284 let bob = Vid::new(2);
1285 let eid = Eid::new(100);
1286
1287 let mutations = vec![
1289 Mutation::InsertVertex {
1291 vid: alice,
1292 properties: {
1293 let mut props = HashMap::new();
1294 props.insert(
1295 "name".to_string(),
1296 uni_common::Value::String("Alice".to_string()),
1297 );
1298 props
1299 },
1300 labels: vec!["Person".to_string()],
1301 },
1302 Mutation::InsertVertex {
1304 vid: bob,
1305 properties: {
1306 let mut props = HashMap::new();
1307 props.insert(
1308 "name".to_string(),
1309 uni_common::Value::String("Bob".to_string()),
1310 );
1311 props
1312 },
1313 labels: vec!["Person".to_string()],
1314 },
1315 Mutation::InsertEdge {
1317 src_vid: alice,
1318 dst_vid: bob,
1319 edge_type: 1,
1320 eid,
1321 version: 3,
1322 properties: HashMap::new(),
1323 edge_type_name: Some("KNOWS".to_string()),
1324 },
1325 ];
1326
1327 l0.replay_mutations(mutations)?;
1329
1330 assert_eq!(l0.get_vertex_labels(alice).unwrap().len(), 1);
1332 assert_eq!(l0.get_vertex_labels(bob).unwrap().len(), 1);
1333 assert_eq!(l0.vids_for_label("Person").len(), 2);
1334
1335 assert_eq!(l0.get_edge_type(eid).unwrap(), "KNOWS");
1337 assert_eq!(l0.eids_for_type("KNOWS").len(), 1);
1338
1339 let alice_neighbors = l0.get_neighbors(alice, 1, Direction::Outgoing);
1341 assert_eq!(alice_neighbors.len(), 1);
1342 assert_eq!(alice_neighbors[0].0, bob);
1343
1344 Ok(())
1345 }
1346
1347 #[test]
1349 fn test_replay_mutations_backward_compat_empty_labels() -> Result<()> {
1350 use crate::runtime::wal::Mutation;
1351
1352 let mut l0 = L0Buffer::new(0, None);
1353 let vid = Vid::new(1);
1354
1355 let mutations = vec![Mutation::InsertVertex {
1358 vid,
1359 properties: HashMap::new(),
1360 labels: vec![], }];
1362
1363 l0.replay_mutations(mutations)?;
1364
1365 assert!(l0.vertex_properties.contains_key(&vid));
1367
1368 let labels = l0.get_vertex_labels(vid);
1370 assert!(labels.is_some(), "Labels entry should exist even if empty");
1371 assert_eq!(labels.unwrap().len(), 0);
1372
1373 Ok(())
1374 }
1375
1376 #[test]
1377 fn test_now_nanos_returns_nanosecond_range() {
1378 let now = now_nanos();
1382
1383 assert!(
1385 now > 1_700_000_000_000_000_000,
1386 "now_nanos() returned {}, expected > 1.7e18 for nanoseconds",
1387 now
1388 );
1389
1390 assert!(
1392 now < 4_100_000_000_000_000_000,
1393 "now_nanos() returned {}, expected < 4.1e18",
1394 now
1395 );
1396 }
1397}