1use crate::runtime::wal::{Mutation, WriteAheadLog};
5use anyhow::Result;
6use std::collections::{HashMap, HashSet};
7use std::sync::Arc;
8use std::time::{SystemTime, UNIX_EPOCH};
9use tracing::{instrument, trace};
10use uni_common::core::id::{Eid, Vid};
11use uni_common::graph::simple_graph::{Direction, SimpleGraph};
12use uni_common::{Properties, Value};
13use uni_crdt::Crdt;
14
15fn now_nanos() -> i64 {
17 SystemTime::now()
18 .duration_since(UNIX_EPOCH)
19 .map(|d| d.as_nanos() as i64)
20 .unwrap_or(0)
21}
22
23pub fn serialize_constraint_key(label: &str, key_values: &[(String, Value)]) -> Vec<u8> {
26 let mut buf = label.as_bytes().to_vec();
27 buf.push(0); let mut sorted = key_values.to_vec();
29 sorted.sort_by(|a, b| a.0.cmp(&b.0));
30 for (k, v) in &sorted {
31 buf.extend(k.as_bytes());
32 buf.push(0);
33 buf.extend(serde_json::to_vec(v).unwrap_or_default());
35 buf.push(0);
36 }
37 buf
38}
39
40#[derive(Debug, Clone, Default)]
46pub struct MutationStats {
47 pub nodes_created: usize,
48 pub nodes_deleted: usize,
49 pub relationships_created: usize,
50 pub relationships_deleted: usize,
51 pub properties_set: usize,
52 pub properties_removed: usize,
53 pub labels_added: usize,
54 pub labels_removed: usize,
55}
56
57impl MutationStats {
58 pub fn diff(&self, before: &Self) -> Self {
60 Self {
61 nodes_created: self.nodes_created.saturating_sub(before.nodes_created),
62 nodes_deleted: self.nodes_deleted.saturating_sub(before.nodes_deleted),
63 relationships_created: self
64 .relationships_created
65 .saturating_sub(before.relationships_created),
66 relationships_deleted: self
67 .relationships_deleted
68 .saturating_sub(before.relationships_deleted),
69 properties_set: self.properties_set.saturating_sub(before.properties_set),
70 properties_removed: self
71 .properties_removed
72 .saturating_sub(before.properties_removed),
73 labels_added: self.labels_added.saturating_sub(before.labels_added),
74 labels_removed: self.labels_removed.saturating_sub(before.labels_removed),
75 }
76 }
77}
78
79#[derive(Clone, Debug)]
80pub struct TombstoneEntry {
81 pub eid: Eid,
82 pub src_vid: Vid,
83 pub dst_vid: Vid,
84 pub edge_type: u32,
85}
86
87pub struct L0Buffer {
88 pub graph: SimpleGraph,
90 pub tombstones: HashMap<Eid, TombstoneEntry>,
92 pub vertex_tombstones: HashSet<Vid>,
94 pub edge_versions: HashMap<Eid, u64>,
96 pub vertex_versions: HashMap<Vid, u64>,
98 pub edge_properties: HashMap<Eid, Properties>,
100 pub vertex_properties: HashMap<Vid, Properties>,
102 pub edge_endpoints: HashMap<Eid, (Vid, Vid, u32)>,
104 pub vertex_labels: HashMap<Vid, Vec<String>>,
107 pub edge_types: HashMap<Eid, String>,
109 pub current_version: u64,
111 pub mutation_count: usize,
113 pub mutation_stats: MutationStats,
115 pub wal: Option<Arc<WriteAheadLog>>,
117 pub wal_lsn_at_flush: u64,
120 pub vertex_created_at: HashMap<Vid, i64>,
122 pub vertex_updated_at: HashMap<Vid, i64>,
124 pub edge_created_at: HashMap<Eid, i64>,
126 pub edge_updated_at: HashMap<Eid, i64>,
128 pub estimated_size: usize,
131 pub constraint_index: HashMap<Vec<u8>, Vid>,
135}
136
137impl std::fmt::Debug for L0Buffer {
138 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
139 f.debug_struct("L0Buffer")
140 .field("vertex_count", &self.graph.vertex_count())
141 .field("edge_count", &self.graph.edge_count())
142 .field("tombstones", &self.tombstones.len())
143 .field("vertex_tombstones", &self.vertex_tombstones.len())
144 .field("current_version", &self.current_version)
145 .field("mutation_count", &self.mutation_count)
146 .finish()
147 }
148}
149
150impl Clone for L0Buffer {
151 fn clone(&self) -> Self {
156 Self {
157 graph: self.graph.clone(),
158 tombstones: self.tombstones.clone(),
159 vertex_tombstones: self.vertex_tombstones.clone(),
160 edge_versions: self.edge_versions.clone(),
161 vertex_versions: self.vertex_versions.clone(),
162 edge_properties: self.edge_properties.clone(),
163 vertex_properties: self.vertex_properties.clone(),
164 edge_endpoints: self.edge_endpoints.clone(),
165 vertex_labels: self.vertex_labels.clone(),
166 edge_types: self.edge_types.clone(),
167 current_version: self.current_version,
168 mutation_count: self.mutation_count,
169 mutation_stats: self.mutation_stats.clone(),
170 wal: None, wal_lsn_at_flush: self.wal_lsn_at_flush,
172 vertex_created_at: self.vertex_created_at.clone(),
173 vertex_updated_at: self.vertex_updated_at.clone(),
174 edge_created_at: self.edge_created_at.clone(),
175 edge_updated_at: self.edge_updated_at.clone(),
176 estimated_size: self.estimated_size,
177 constraint_index: self.constraint_index.clone(),
178 }
179 }
180}
181
182impl L0Buffer {
183 fn append_unique_labels(existing: &mut Vec<String>, labels: &[String]) {
185 for label in labels {
186 if !existing.contains(label) {
187 existing.push(label.clone());
188 }
189 }
190 }
191
192 fn merge_crdt_properties(entry: &mut Properties, properties: Properties) {
195 for (k, v) in properties {
196 let json_v: serde_json::Value = v.clone().into();
198 if let Ok(mut new_crdt) = serde_json::from_value::<Crdt>(json_v)
199 && let Some(existing_v) = entry.get(&k)
200 && let Ok(existing_crdt) = serde_json::from_value::<Crdt>(existing_v.clone().into())
201 {
202 if new_crdt.try_merge(&existing_crdt).is_ok()
204 && let Ok(merged_json) = serde_json::to_value(new_crdt)
205 {
206 entry.insert(k, uni_common::Value::from(merged_json));
207 continue;
208 }
209 }
211 entry.insert(k, v);
213 }
214 }
215
216 fn estimate_properties_size(props: &Properties) -> usize {
218 props.keys().map(|k| k.len() + 32).sum()
219 }
220
221 pub fn size_bytes(&self) -> usize {
224 let mut total = 0;
225
226 total += self.graph.vertex_count() * 8;
228 total += self.graph.edge_count() * 24;
229
230 for props in self.vertex_properties.values() {
232 total += Self::estimate_properties_size(props);
233 }
234 for props in self.edge_properties.values() {
235 total += Self::estimate_properties_size(props);
236 }
237
238 total += self.tombstones.len() * 64;
240 total += self.vertex_tombstones.len() * 8;
241 total += self.edge_versions.len() * 16;
242 total += self.vertex_versions.len() * 16;
243 total += self.edge_endpoints.len() * 28; for labels in self.vertex_labels.values() {
247 total += labels.iter().map(|l| l.len() + 24).sum::<usize>();
248 }
249
250 for type_name in self.edge_types.values() {
252 total += type_name.len() + 24;
253 }
254
255 total += self.vertex_created_at.len() * 16;
257 total += self.vertex_updated_at.len() * 16;
258 total += self.edge_created_at.len() * 16;
259 total += self.edge_updated_at.len() * 16;
260
261 total
262 }
263
264 pub fn new(start_version: u64, wal: Option<Arc<WriteAheadLog>>) -> Self {
265 Self {
266 graph: SimpleGraph::new(),
267 tombstones: HashMap::new(),
268 vertex_tombstones: HashSet::new(),
269 edge_versions: HashMap::new(),
270 vertex_versions: HashMap::new(),
271 edge_properties: HashMap::new(),
272 vertex_properties: HashMap::new(),
273 edge_endpoints: HashMap::new(),
274 vertex_labels: HashMap::new(),
275 edge_types: HashMap::new(),
276 current_version: start_version,
277 mutation_count: 0,
278 mutation_stats: MutationStats::default(),
279 wal,
280 wal_lsn_at_flush: 0,
281 vertex_created_at: HashMap::new(),
282 vertex_updated_at: HashMap::new(),
283 edge_created_at: HashMap::new(),
284 edge_updated_at: HashMap::new(),
285 estimated_size: 0,
286 constraint_index: HashMap::new(),
287 }
288 }
289
290 pub fn insert_vertex(&mut self, vid: Vid, properties: Properties) {
291 self.insert_vertex_with_labels(vid, properties, &[]);
292 }
293
294 pub fn insert_vertex_with_labels(
296 &mut self,
297 vid: Vid,
298 properties: Properties,
299 labels: &[String],
300 ) {
301 self.current_version += 1;
302 let version = self.current_version;
303 let now = now_nanos();
304
305 if let Some(wal) = &self.wal {
306 let _ = wal.append(&Mutation::InsertVertex {
307 vid,
308 properties: properties.clone(),
309 labels: labels.to_vec(),
310 });
311 }
312
313 self.vertex_tombstones.remove(&vid);
314
315 let entry = self.vertex_properties.entry(vid).or_default();
316 Self::merge_crdt_properties(entry, properties.clone());
317 self.vertex_versions.insert(vid, version);
318
319 self.vertex_created_at.entry(vid).or_insert(now);
321 self.vertex_updated_at.insert(vid, now);
322
323 let labels_size: usize = labels.iter().map(|l| l.len() + 24).sum();
326 let existing = self.vertex_labels.entry(vid).or_default();
327 Self::append_unique_labels(existing, labels);
328
329 self.graph.add_vertex(vid);
330 self.mutation_count += 1;
331 self.mutation_stats.nodes_created += 1;
332 self.mutation_stats.properties_set += properties.len();
333 self.mutation_stats.labels_added += labels.len();
334
335 let props_size = Self::estimate_properties_size(&properties);
336 self.estimated_size += 8 + props_size + 16 + labels_size + 32;
337 }
338
339 pub fn add_vertex_labels(&mut self, vid: Vid, labels: &[String]) {
341 let existing = self.vertex_labels.entry(vid).or_default();
342 Self::append_unique_labels(existing, labels);
343 }
344
345 pub fn remove_vertex_label(&mut self, vid: Vid, label: &str) -> bool {
348 if let Some(labels) = self.vertex_labels.get_mut(&vid)
349 && let Some(pos) = labels.iter().position(|l| l == label)
350 {
351 labels.remove(pos);
352 self.current_version += 1;
353 self.mutation_count += 1;
354 self.mutation_stats.labels_removed += 1;
355 return true;
358 }
359 false
360 }
361
362 pub fn set_edge_type(&mut self, eid: Eid, edge_type: String) {
364 self.edge_types.insert(eid, edge_type);
365 }
366
367 pub fn delete_vertex(&mut self, vid: Vid) -> Result<()> {
368 self.current_version += 1;
369
370 if let Some(wal) = &mut self.wal {
371 let labels = self.vertex_labels.get(&vid).cloned().unwrap_or_default();
372 wal.append(&Mutation::DeleteVertex { vid, labels })?;
373 }
374
375 self.apply_vertex_deletion(vid);
376 Ok(())
377 }
378
379 fn apply_vertex_deletion(&mut self, vid: Vid) {
383 let version = self.current_version;
384
385 let mut edges_to_remove = HashSet::new();
387
388 for entry in self.graph.neighbors(vid, Direction::Outgoing) {
390 edges_to_remove.insert(entry.eid);
391 }
392
393 for entry in self.graph.neighbors(vid, Direction::Incoming) {
395 edges_to_remove.insert(entry.eid); }
397
398 let cascaded_edges_count = edges_to_remove.len();
399
400 for eid in edges_to_remove {
402 if let Some((src, dst, etype)) = self.edge_endpoints.get(&eid) {
404 self.tombstones.insert(
405 eid,
406 TombstoneEntry {
407 eid,
408 src_vid: *src,
409 dst_vid: *dst,
410 edge_type: *etype,
411 },
412 );
413 self.edge_versions.insert(eid, version);
414 self.edge_endpoints.remove(&eid);
415 self.edge_properties.remove(&eid);
416 self.graph.remove_edge(eid);
417 self.mutation_count += 1;
418 self.mutation_stats.relationships_deleted += 1;
419 }
420 }
421
422 self.vertex_tombstones.insert(vid);
423 self.vertex_properties.remove(&vid);
424 self.vertex_versions.insert(vid, version);
425 self.graph.remove_vertex(vid);
426 self.mutation_count += 1;
427 self.mutation_stats.nodes_deleted += 1;
428
429 self.constraint_index.retain(|_, v| *v != vid);
431
432 self.estimated_size += cascaded_edges_count * 72 + 8;
434 }
435
436 pub fn insert_edge(
437 &mut self,
438 src_vid: Vid,
439 dst_vid: Vid,
440 edge_type: u32,
441 eid: Eid,
442 properties: Properties,
443 edge_type_name: Option<String>,
444 ) -> Result<()> {
445 self.current_version += 1;
446 let now = now_nanos();
447
448 if let Some(wal) = &mut self.wal {
449 wal.append(&Mutation::InsertEdge {
450 src_vid,
451 dst_vid,
452 edge_type,
453 eid,
454 version: self.current_version,
455 properties: properties.clone(),
456 edge_type_name: edge_type_name.clone(),
457 })?;
458 }
459
460 self.apply_edge_insertion(src_vid, dst_vid, edge_type, eid, properties)?;
461
462 let type_name_size = if let Some(ref name) = edge_type_name {
464 let size = name.len() + 24;
465 self.edge_types.insert(eid, name.clone());
466 size
467 } else {
468 0
469 };
470
471 self.edge_created_at.entry(eid).or_insert(now);
473 self.edge_updated_at.insert(eid, now);
474
475 self.estimated_size += type_name_size;
476
477 Ok(())
478 }
479
480 fn apply_edge_insertion(
489 &mut self,
490 src_vid: Vid,
491 dst_vid: Vid,
492 edge_type: u32,
493 eid: Eid,
494 properties: Properties,
495 ) -> Result<()> {
496 let version = self.current_version;
497
498 if self.vertex_tombstones.contains(&src_vid) {
501 anyhow::bail!(
502 "Cannot insert edge: source vertex {} has been deleted (issue #77)",
503 src_vid
504 );
505 }
506 if self.vertex_tombstones.contains(&dst_vid) {
507 anyhow::bail!(
508 "Cannot insert edge: destination vertex {} has been deleted (issue #77)",
509 dst_vid
510 );
511 }
512
513 if !self.graph.contains_vertex(src_vid) {
518 self.graph.add_vertex(src_vid);
519 }
520 if !self.graph.contains_vertex(dst_vid) {
521 self.graph.add_vertex(dst_vid);
522 }
523
524 self.graph.add_edge(src_vid, dst_vid, eid, edge_type);
525
526 let props_size = Self::estimate_properties_size(&properties);
528 let props_count = properties.len();
529 if !properties.is_empty() {
530 let entry = self.edge_properties.entry(eid).or_default();
531 Self::merge_crdt_properties(entry, properties);
532 }
533
534 self.edge_versions.insert(eid, version);
535 self.edge_endpoints
536 .insert(eid, (src_vid, dst_vid, edge_type));
537 self.tombstones.remove(&eid);
538 self.mutation_count += 1;
539 self.mutation_stats.relationships_created += 1;
540 self.mutation_stats.properties_set += props_count;
541
542 self.estimated_size += 24 + props_size + 16 + 28 + 32;
544
545 Ok(())
546 }
547
548 pub fn delete_edge(
549 &mut self,
550 eid: Eid,
551 src_vid: Vid,
552 dst_vid: Vid,
553 edge_type: u32,
554 ) -> Result<()> {
555 self.current_version += 1;
556 let now = now_nanos();
557
558 if let Some(wal) = &mut self.wal {
559 wal.append(&Mutation::DeleteEdge {
560 eid,
561 src_vid,
562 dst_vid,
563 edge_type,
564 version: self.current_version,
565 })?;
566 }
567
568 self.apply_edge_deletion(eid, src_vid, dst_vid, edge_type);
569
570 self.edge_updated_at.insert(eid, now);
572
573 Ok(())
574 }
575
576 fn apply_edge_deletion(&mut self, eid: Eid, src_vid: Vid, dst_vid: Vid, edge_type: u32) {
580 let version = self.current_version;
581
582 self.tombstones.insert(
583 eid,
584 TombstoneEntry {
585 eid,
586 src_vid,
587 dst_vid,
588 edge_type,
589 },
590 );
591 self.edge_versions.insert(eid, version);
592 self.graph.remove_edge(eid);
593 self.mutation_count += 1;
594 self.mutation_stats.relationships_deleted += 1;
595
596 self.estimated_size += 80;
598 }
599
600 pub fn get_neighbors(
603 &self,
604 vid: Vid,
605 edge_type: u32,
606 direction: Direction,
607 ) -> Vec<(Vid, Eid, u64)> {
608 let edges = self.graph.neighbors(vid, direction);
609
610 edges
611 .iter()
612 .filter(|e| e.edge_type == edge_type && !self.is_tombstoned(e.eid))
613 .map(|e| {
614 let neighbor = match direction {
615 Direction::Outgoing => e.dst_vid,
616 Direction::Incoming => e.src_vid,
617 };
618 let version = self.edge_versions.get(&e.eid).copied().unwrap_or(0);
619 (neighbor, e.eid, version)
620 })
621 .collect()
622 }
623
624 pub fn is_tombstoned(&self, eid: Eid) -> bool {
625 self.tombstones.contains_key(&eid)
626 }
627
628 pub fn vids_for_label(&self, label_name: &str) -> Vec<Vid> {
631 self.vertex_labels
632 .iter()
633 .filter(|(_, labels)| labels.iter().any(|l| l == label_name))
634 .map(|(vid, _)| *vid)
635 .collect()
636 }
637
638 pub fn all_vertex_vids(&self) -> Vec<Vid> {
642 self.vertex_properties.keys().copied().collect()
643 }
644
645 pub fn vids_for_labels(&self, label_names: &[&str]) -> Vec<Vid> {
648 self.vertex_labels
649 .iter()
650 .filter(|(_, labels)| label_names.iter().any(|ln| labels.iter().any(|l| l == *ln)))
651 .map(|(vid, _)| *vid)
652 .collect()
653 }
654
655 pub fn vids_with_all_labels(&self, label_names: &[&str]) -> Vec<Vid> {
657 self.vertex_labels
658 .iter()
659 .filter(|(_, labels)| label_names.iter().all(|ln| labels.iter().any(|l| l == *ln)))
660 .map(|(vid, _)| *vid)
661 .collect()
662 }
663
664 pub fn get_vertex_labels(&self, vid: Vid) -> Option<&[String]> {
666 self.vertex_labels.get(&vid).map(|v| v.as_slice())
667 }
668
669 pub fn get_edge_type(&self, eid: Eid) -> Option<&str> {
671 self.edge_types.get(&eid).map(|s| s.as_str())
672 }
673
674 pub fn eids_for_type(&self, type_name: &str) -> Vec<Eid> {
677 self.edge_types
678 .iter()
679 .filter(|(eid, etype)| *etype == type_name && !self.tombstones.contains_key(eid))
680 .map(|(eid, _)| *eid)
681 .collect()
682 }
683
684 pub fn all_edge_eids(&self) -> Vec<Eid> {
688 self.edge_endpoints
689 .keys()
690 .filter(|eid| !self.tombstones.contains_key(eid))
691 .copied()
692 .collect()
693 }
694
695 pub fn get_edge_endpoints(&self, eid: Eid) -> Option<(Vid, Vid)> {
697 self.edge_endpoints
698 .get(&eid)
699 .map(|(src, dst, _)| (*src, *dst))
700 }
701
702 pub fn get_edge_endpoint_full(&self, eid: Eid) -> Option<(Vid, Vid, u32)> {
704 self.edge_endpoints.get(&eid).copied()
705 }
706
707 pub fn insert_constraint_key(&mut self, key: Vec<u8>, vid: Vid) {
709 self.constraint_index.insert(key, vid);
710 }
711
712 pub fn has_constraint_key(&self, key: &[u8], exclude_vid: Vid) -> bool {
715 self.constraint_index
716 .get(key)
717 .is_some_and(|&v| v != exclude_vid)
718 }
719
720 #[instrument(skip(self, other), level = "trace")]
721 pub fn merge(&mut self, other: &L0Buffer) -> Result<()> {
722 trace!(
723 other_mutation_count = other.mutation_count,
724 "Merging L0 buffer"
725 );
726 for &vid in &other.vertex_tombstones {
728 self.delete_vertex(vid)?;
729 }
730
731 for (vid, props) in &other.vertex_properties {
732 let labels = other.vertex_labels.get(vid).cloned().unwrap_or_default();
733 self.insert_vertex_with_labels(*vid, props.clone(), &labels);
734 }
735
736 for (vid, labels) in &other.vertex_labels {
738 if !self.vertex_labels.contains_key(vid) {
739 self.vertex_labels.insert(*vid, labels.clone());
740 }
741 }
742
743 for (eid, (src, dst, etype)) in &other.edge_endpoints {
745 if other.tombstones.contains_key(eid) {
746 self.delete_edge(*eid, *src, *dst, *etype)?;
747 } else {
748 let props = other.edge_properties.get(eid).cloned().unwrap_or_default();
749 let etype_name = other.edge_types.get(eid).cloned();
750 self.insert_edge(*src, *dst, *etype, *eid, props, etype_name)?;
751 }
752 }
753
754 for (eid, tombstone) in &other.tombstones {
758 if !other.edge_endpoints.contains_key(eid) {
759 self.delete_edge(
760 *eid,
761 tombstone.src_vid,
762 tombstone.dst_vid,
763 tombstone.edge_type,
764 )?;
765 }
766 }
767
768 for (vid, ts) in &other.vertex_created_at {
773 self.vertex_created_at.entry(*vid).or_insert(*ts); }
775 for (vid, ts) in &other.vertex_updated_at {
776 self.vertex_updated_at.insert(*vid, *ts); }
778
779 for (eid, ts) in &other.edge_created_at {
780 self.edge_created_at.entry(*eid).or_insert(*ts); }
782 for (eid, ts) in &other.edge_updated_at {
783 self.edge_updated_at.insert(*eid, *ts); }
785
786 self.estimated_size += other.estimated_size;
789
790 for (key, vid) in &other.constraint_index {
792 self.constraint_index.insert(key.clone(), *vid);
793 }
794
795 Ok(())
796 }
797
798 #[instrument(skip(self, mutations), level = "debug")]
802 pub fn replay_mutations(&mut self, mutations: Vec<Mutation>) -> Result<()> {
803 trace!(count = mutations.len(), "Replaying mutations");
804 for mutation in mutations {
805 match mutation {
806 Mutation::InsertVertex {
807 vid,
808 properties,
809 labels,
810 } => {
811 self.current_version += 1;
813 let version = self.current_version;
814
815 self.vertex_tombstones.remove(&vid);
816 let entry = self.vertex_properties.entry(vid).or_default();
817 Self::merge_crdt_properties(entry, properties);
818 self.vertex_versions.insert(vid, version);
819 self.graph.add_vertex(vid);
820 self.mutation_count += 1;
821
822 let existing = self.vertex_labels.entry(vid).or_default();
824 Self::append_unique_labels(existing, &labels);
825 }
826 Mutation::DeleteVertex { vid, labels } => {
827 self.current_version += 1;
828 if !labels.is_empty() {
830 let existing = self.vertex_labels.entry(vid).or_default();
831 Self::append_unique_labels(existing, &labels);
832 }
833 self.apply_vertex_deletion(vid);
834 }
835 Mutation::InsertEdge {
836 src_vid,
837 dst_vid,
838 edge_type,
839 eid,
840 version: _,
841 properties,
842 edge_type_name,
843 } => {
844 self.current_version += 1;
845 self.apply_edge_insertion(src_vid, dst_vid, edge_type, eid, properties)?;
846 if let Some(name) = edge_type_name {
848 self.edge_types.insert(eid, name);
849 }
850 }
851 Mutation::DeleteEdge {
852 eid,
853 src_vid,
854 dst_vid,
855 edge_type,
856 version: _,
857 } => {
858 self.current_version += 1;
859 self.apply_edge_deletion(eid, src_vid, dst_vid, edge_type);
860 }
861 }
862 }
863 Ok(())
864 }
865}
866
867#[cfg(test)]
868mod tests {
869 use super::*;
870
871 #[test]
872 fn test_l0_buffer_ops() -> Result<()> {
873 let mut l0 = L0Buffer::new(0, None);
874 let vid_a = Vid::new(1);
875 let vid_b = Vid::new(2);
876 let eid_ab = Eid::new(101);
877
878 l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
879
880 let neighbors = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
881 assert_eq!(neighbors.len(), 1);
882 assert_eq!(neighbors[0].0, vid_b);
883 assert_eq!(neighbors[0].1, eid_ab);
884
885 l0.delete_edge(eid_ab, vid_a, vid_b, 1)?;
886 assert!(l0.is_tombstoned(eid_ab));
887
888 let neighbors_after = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
890 assert_eq!(neighbors_after.len(), 0);
891
892 Ok(())
893 }
894
895 #[test]
896 fn test_l0_buffer_multiple_edges() -> Result<()> {
897 let mut l0 = L0Buffer::new(0, None);
898 let vid_a = Vid::new(1);
899 let vid_b = Vid::new(2);
900 let vid_c = Vid::new(3);
901 let eid_ab = Eid::new(101);
902 let eid_ac = Eid::new(102);
903
904 l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
905 l0.insert_edge(vid_a, vid_c, 1, eid_ac, HashMap::new(), None)?;
906
907 let neighbors = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
908 assert_eq!(neighbors.len(), 2);
909
910 l0.delete_edge(eid_ab, vid_a, vid_b, 1)?;
912
913 let neighbors_after = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
915 assert_eq!(neighbors_after.len(), 1);
916 assert_eq!(neighbors_after[0].0, vid_c);
917
918 Ok(())
919 }
920
921 #[test]
922 fn test_l0_buffer_edge_type_filter() -> Result<()> {
923 let mut l0 = L0Buffer::new(0, None);
924 let vid_a = Vid::new(1);
925 let vid_b = Vid::new(2);
926 let vid_c = Vid::new(3);
927 let eid_ab = Eid::new(101);
928 let eid_ac = Eid::new(201); l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
931 l0.insert_edge(vid_a, vid_c, 2, eid_ac, HashMap::new(), None)?;
932
933 let type1_neighbors = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
935 assert_eq!(type1_neighbors.len(), 1);
936 assert_eq!(type1_neighbors[0].0, vid_b);
937
938 let type2_neighbors = l0.get_neighbors(vid_a, 2, Direction::Outgoing);
940 assert_eq!(type2_neighbors.len(), 1);
941 assert_eq!(type2_neighbors[0].0, vid_c);
942
943 Ok(())
944 }
945
946 #[test]
947 fn test_l0_buffer_incoming_edges() -> Result<()> {
948 let mut l0 = L0Buffer::new(0, None);
949 let vid_a = Vid::new(1);
950 let vid_b = Vid::new(2);
951 let vid_c = Vid::new(3);
952 let eid_ab = Eid::new(101);
953 let eid_cb = Eid::new(102);
954
955 l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
957 l0.insert_edge(vid_c, vid_b, 1, eid_cb, HashMap::new(), None)?;
958
959 let incoming = l0.get_neighbors(vid_b, 1, Direction::Incoming);
961 assert_eq!(incoming.len(), 2);
962
963 Ok(())
964 }
965
966 #[test]
968 fn test_merge_empty_props_edge() -> Result<()> {
969 let mut main_l0 = L0Buffer::new(0, None);
970 let mut tx_l0 = L0Buffer::new(0, None);
971
972 let vid_a = Vid::new(1);
973 let vid_b = Vid::new(2);
974 let eid_ab = Eid::new(101);
975
976 tx_l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
978
979 assert!(tx_l0.edge_endpoints.contains_key(&eid_ab));
981 assert!(!tx_l0.edge_properties.contains_key(&eid_ab)); main_l0.merge(&tx_l0)?;
985
986 assert!(main_l0.edge_endpoints.contains_key(&eid_ab));
988 let neighbors = main_l0.get_neighbors(vid_a, 1, Direction::Outgoing);
989 assert_eq!(neighbors.len(), 1);
990 assert_eq!(neighbors[0].0, vid_b);
991
992 Ok(())
993 }
994
995 #[test]
997 fn test_replay_crdt_merge() -> Result<()> {
998 use crate::runtime::wal::Mutation;
999 use serde_json::json;
1000 use uni_common::Value;
1001
1002 let mut l0 = L0Buffer::new(0, None);
1003 let vid = Vid::new(1);
1004
1005 let counter1: Value = json!({
1008 "t": "gc",
1009 "d": {"counts": {"node1": 5}}
1010 })
1011 .into();
1012 let counter2: Value = json!({
1013 "t": "gc",
1014 "d": {"counts": {"node2": 3}}
1015 })
1016 .into();
1017
1018 let mut props1 = HashMap::new();
1020 props1.insert("counter".to_string(), counter1.clone());
1021 l0.replay_mutations(vec![Mutation::InsertVertex {
1022 vid,
1023 properties: props1,
1024 labels: vec![],
1025 }])?;
1026
1027 let mut props2 = HashMap::new();
1029 props2.insert("counter".to_string(), counter2.clone());
1030 l0.replay_mutations(vec![Mutation::InsertVertex {
1031 vid,
1032 properties: props2,
1033 labels: vec![],
1034 }])?;
1035
1036 let stored_props = l0.vertex_properties.get(&vid).unwrap();
1038 let stored_counter = stored_props.get("counter").unwrap();
1039
1040 let stored_json: serde_json::Value = stored_counter.clone().into();
1042 let data = stored_json.get("d").unwrap();
1044 let counts = data.get("counts").unwrap();
1045 assert_eq!(counts.get("node1"), Some(&json!(5)));
1046 assert_eq!(counts.get("node2"), Some(&json!(3)));
1047
1048 Ok(())
1049 }
1050
1051 #[test]
1052 fn test_merge_preserves_vertex_timestamps() -> Result<()> {
1053 let mut l0_main = L0Buffer::new(0, None);
1054 let mut l0_tx = L0Buffer::new(0, None);
1055 let vid = Vid::new(1);
1056
1057 let ts_main_created = 1000;
1059 let ts_main_updated = 1100;
1060 l0_main.insert_vertex(vid, HashMap::new());
1061 l0_main.vertex_created_at.insert(vid, ts_main_created);
1062 l0_main.vertex_updated_at.insert(vid, ts_main_updated);
1063
1064 let ts_tx_created = 2000; let ts_tx_updated = 2100; l0_tx.insert_vertex(vid, HashMap::new());
1068 l0_tx.vertex_created_at.insert(vid, ts_tx_created);
1069 l0_tx.vertex_updated_at.insert(vid, ts_tx_updated);
1070
1071 l0_main.merge(&l0_tx)?;
1073
1074 assert_eq!(
1076 *l0_main.vertex_created_at.get(&vid).unwrap(),
1077 ts_main_created,
1078 "created_at should preserve oldest timestamp"
1079 );
1080
1081 assert_eq!(
1083 *l0_main.vertex_updated_at.get(&vid).unwrap(),
1084 ts_tx_updated,
1085 "updated_at should use latest timestamp"
1086 );
1087
1088 Ok(())
1089 }
1090
1091 #[test]
1092 fn test_merge_preserves_edge_timestamps() -> Result<()> {
1093 let mut l0_main = L0Buffer::new(0, None);
1094 let mut l0_tx = L0Buffer::new(0, None);
1095 let vid_a = Vid::new(1);
1096 let vid_b = Vid::new(2);
1097 let eid = Eid::new(100);
1098
1099 let ts_main_created = 1000;
1101 let ts_main_updated = 1100;
1102 l0_main.insert_edge(vid_a, vid_b, 1, eid, HashMap::new(), None)?;
1103 l0_main.edge_created_at.insert(eid, ts_main_created);
1104 l0_main.edge_updated_at.insert(eid, ts_main_updated);
1105
1106 let ts_tx_created = 2000; let ts_tx_updated = 2100; l0_tx.insert_edge(vid_a, vid_b, 1, eid, HashMap::new(), None)?;
1110 l0_tx.edge_created_at.insert(eid, ts_tx_created);
1111 l0_tx.edge_updated_at.insert(eid, ts_tx_updated);
1112
1113 l0_main.merge(&l0_tx)?;
1115
1116 assert_eq!(
1118 *l0_main.edge_created_at.get(&eid).unwrap(),
1119 ts_main_created,
1120 "edge created_at should preserve oldest timestamp"
1121 );
1122
1123 assert_eq!(
1125 *l0_main.edge_updated_at.get(&eid).unwrap(),
1126 ts_tx_updated,
1127 "edge updated_at should use latest timestamp"
1128 );
1129
1130 Ok(())
1131 }
1132
1133 #[test]
1134 fn test_merge_created_at_not_overwritten_for_existing_vertex() -> Result<()> {
1135 use uni_common::Value;
1136
1137 let mut l0_main = L0Buffer::new(0, None);
1138 let mut l0_tx = L0Buffer::new(0, None);
1139 let vid = Vid::new(1);
1140
1141 let ts_original = 1000;
1143 l0_main.insert_vertex(vid, HashMap::new());
1144 l0_main.vertex_created_at.insert(vid, ts_original);
1145 l0_main.vertex_updated_at.insert(vid, ts_original);
1146
1147 let ts_tx = 2000;
1149 let mut props = HashMap::new();
1150 props.insert("updated".to_string(), Value::String("yes".to_string()));
1151 l0_tx.insert_vertex(vid, props);
1152 l0_tx.vertex_created_at.insert(vid, ts_tx);
1153 l0_tx.vertex_updated_at.insert(vid, ts_tx);
1154
1155 l0_main.merge(&l0_tx)?;
1157
1158 assert_eq!(
1160 *l0_main.vertex_created_at.get(&vid).unwrap(),
1161 ts_original,
1162 "created_at must not be overwritten for existing vertex"
1163 );
1164
1165 assert_eq!(
1167 *l0_main.vertex_updated_at.get(&vid).unwrap(),
1168 ts_tx,
1169 "updated_at should reflect transaction timestamp"
1170 );
1171
1172 assert!(
1174 l0_main
1175 .vertex_properties
1176 .get(&vid)
1177 .unwrap()
1178 .contains_key("updated")
1179 );
1180
1181 Ok(())
1182 }
1183
1184 #[test]
1186 fn test_replay_mutations_preserves_vertex_labels() -> Result<()> {
1187 use crate::runtime::wal::Mutation;
1188
1189 let mut l0 = L0Buffer::new(0, None);
1190 let vid = Vid::new(42);
1191
1192 let mutations = vec![Mutation::InsertVertex {
1194 vid,
1195 properties: {
1196 let mut props = HashMap::new();
1197 props.insert(
1198 "name".to_string(),
1199 uni_common::Value::String("Alice".to_string()),
1200 );
1201 props
1202 },
1203 labels: vec!["Person".to_string(), "User".to_string()],
1204 }];
1205
1206 l0.replay_mutations(mutations)?;
1208
1209 assert!(l0.vertex_properties.contains_key(&vid));
1211
1212 let labels = l0.get_vertex_labels(vid).expect("Labels should exist");
1214 assert_eq!(labels.len(), 2);
1215 assert!(labels.contains(&"Person".to_string()));
1216 assert!(labels.contains(&"User".to_string()));
1217
1218 let person_vids = l0.vids_for_label("Person");
1220 assert_eq!(person_vids.len(), 1);
1221 assert_eq!(person_vids[0], vid);
1222
1223 let user_vids = l0.vids_for_label("User");
1224 assert_eq!(user_vids.len(), 1);
1225 assert_eq!(user_vids[0], vid);
1226
1227 Ok(())
1228 }
1229
1230 #[test]
1232 fn test_replay_mutations_preserves_delete_vertex_labels() -> Result<()> {
1233 use crate::runtime::wal::Mutation;
1234
1235 let mut l0 = L0Buffer::new(0, None);
1236 let vid = Vid::new(99);
1237
1238 l0.insert_vertex_with_labels(
1240 vid,
1241 HashMap::new(),
1242 &["Person".to_string(), "Admin".to_string()],
1243 );
1244
1245 assert!(l0.vertex_properties.contains_key(&vid));
1247 let labels = l0.get_vertex_labels(vid).expect("Labels should exist");
1248 assert_eq!(labels.len(), 2);
1249
1250 let mutations = vec![Mutation::DeleteVertex {
1252 vid,
1253 labels: vec!["Person".to_string(), "Admin".to_string()],
1254 }];
1255
1256 l0.replay_mutations(mutations)?;
1258
1259 assert!(l0.vertex_tombstones.contains(&vid));
1261
1262 let labels = l0.get_vertex_labels(vid);
1265 assert!(
1266 labels.is_some(),
1267 "Labels should be preserved even after deletion for tombstone flushing"
1268 );
1269
1270 Ok(())
1271 }
1272
1273 #[test]
1275 fn test_replay_mutations_preserves_edge_type_name() -> Result<()> {
1276 use crate::runtime::wal::Mutation;
1277
1278 let mut l0 = L0Buffer::new(0, None);
1279 let src = Vid::new(1);
1280 let dst = Vid::new(2);
1281 let eid = Eid::new(500);
1282 let edge_type = 100;
1283
1284 let mutations = vec![Mutation::InsertEdge {
1286 src_vid: src,
1287 dst_vid: dst,
1288 edge_type,
1289 eid,
1290 version: 1,
1291 properties: {
1292 let mut props = HashMap::new();
1293 props.insert("since".to_string(), uni_common::Value::Int(2020));
1294 props
1295 },
1296 edge_type_name: Some("KNOWS".to_string()),
1297 }];
1298
1299 l0.replay_mutations(mutations)?;
1301
1302 assert!(l0.edge_endpoints.contains_key(&eid));
1304
1305 let type_name = l0.get_edge_type(eid).expect("Edge type name should exist");
1307 assert_eq!(type_name, "KNOWS");
1308
1309 let knows_eids = l0.eids_for_type("KNOWS");
1311 assert_eq!(knows_eids.len(), 1);
1312 assert_eq!(knows_eids[0], eid);
1313
1314 Ok(())
1315 }
1316
1317 #[test]
1319 fn test_edge_type_mapping_survives_multiple_replays() -> Result<()> {
1320 use crate::runtime::wal::Mutation;
1321
1322 let mut l0 = L0Buffer::new(0, None);
1323
1324 let mutations = vec![
1326 Mutation::InsertEdge {
1327 src_vid: Vid::new(1),
1328 dst_vid: Vid::new(2),
1329 edge_type: 100,
1330 eid: Eid::new(1000),
1331 version: 1,
1332 properties: HashMap::new(),
1333 edge_type_name: Some("KNOWS".to_string()),
1334 },
1335 Mutation::InsertEdge {
1336 src_vid: Vid::new(2),
1337 dst_vid: Vid::new(3),
1338 edge_type: 101,
1339 eid: Eid::new(1001),
1340 version: 2,
1341 properties: HashMap::new(),
1342 edge_type_name: Some("LIKES".to_string()),
1343 },
1344 Mutation::InsertEdge {
1345 src_vid: Vid::new(3),
1346 dst_vid: Vid::new(1),
1347 edge_type: 100,
1348 eid: Eid::new(1002),
1349 version: 3,
1350 properties: HashMap::new(),
1351 edge_type_name: Some("KNOWS".to_string()),
1352 },
1353 ];
1354
1355 l0.replay_mutations(mutations)?;
1356
1357 assert_eq!(l0.get_edge_type(Eid::new(1000)), Some("KNOWS"));
1359 assert_eq!(l0.get_edge_type(Eid::new(1001)), Some("LIKES"));
1360 assert_eq!(l0.get_edge_type(Eid::new(1002)), Some("KNOWS"));
1361
1362 let knows_edges = l0.eids_for_type("KNOWS");
1364 assert_eq!(knows_edges.len(), 2);
1365 assert!(knows_edges.contains(&Eid::new(1000)));
1366 assert!(knows_edges.contains(&Eid::new(1002)));
1367
1368 let likes_edges = l0.eids_for_type("LIKES");
1369 assert_eq!(likes_edges.len(), 1);
1370 assert_eq!(likes_edges[0], Eid::new(1001));
1371
1372 Ok(())
1373 }
1374
1375 #[test]
1377 fn test_replay_mutations_combined_labels_and_edge_types() -> Result<()> {
1378 use crate::runtime::wal::Mutation;
1379
1380 let mut l0 = L0Buffer::new(0, None);
1381 let alice = Vid::new(1);
1382 let bob = Vid::new(2);
1383 let eid = Eid::new(100);
1384
1385 let mutations = vec![
1387 Mutation::InsertVertex {
1389 vid: alice,
1390 properties: {
1391 let mut props = HashMap::new();
1392 props.insert(
1393 "name".to_string(),
1394 uni_common::Value::String("Alice".to_string()),
1395 );
1396 props
1397 },
1398 labels: vec!["Person".to_string()],
1399 },
1400 Mutation::InsertVertex {
1402 vid: bob,
1403 properties: {
1404 let mut props = HashMap::new();
1405 props.insert(
1406 "name".to_string(),
1407 uni_common::Value::String("Bob".to_string()),
1408 );
1409 props
1410 },
1411 labels: vec!["Person".to_string()],
1412 },
1413 Mutation::InsertEdge {
1415 src_vid: alice,
1416 dst_vid: bob,
1417 edge_type: 1,
1418 eid,
1419 version: 3,
1420 properties: HashMap::new(),
1421 edge_type_name: Some("KNOWS".to_string()),
1422 },
1423 ];
1424
1425 l0.replay_mutations(mutations)?;
1427
1428 assert_eq!(l0.get_vertex_labels(alice).unwrap().len(), 1);
1430 assert_eq!(l0.get_vertex_labels(bob).unwrap().len(), 1);
1431 assert_eq!(l0.vids_for_label("Person").len(), 2);
1432
1433 assert_eq!(l0.get_edge_type(eid).unwrap(), "KNOWS");
1435 assert_eq!(l0.eids_for_type("KNOWS").len(), 1);
1436
1437 let alice_neighbors = l0.get_neighbors(alice, 1, Direction::Outgoing);
1439 assert_eq!(alice_neighbors.len(), 1);
1440 assert_eq!(alice_neighbors[0].0, bob);
1441
1442 Ok(())
1443 }
1444
1445 #[test]
1447 fn test_replay_mutations_backward_compat_empty_labels() -> Result<()> {
1448 use crate::runtime::wal::Mutation;
1449
1450 let mut l0 = L0Buffer::new(0, None);
1451 let vid = Vid::new(1);
1452
1453 let mutations = vec![Mutation::InsertVertex {
1456 vid,
1457 properties: HashMap::new(),
1458 labels: vec![], }];
1460
1461 l0.replay_mutations(mutations)?;
1462
1463 assert!(l0.vertex_properties.contains_key(&vid));
1465
1466 let labels = l0.get_vertex_labels(vid);
1468 assert!(labels.is_some(), "Labels entry should exist even if empty");
1469 assert_eq!(labels.unwrap().len(), 0);
1470
1471 Ok(())
1472 }
1473
1474 #[test]
1475 fn test_now_nanos_returns_nanosecond_range() {
1476 let now = now_nanos();
1480
1481 assert!(
1483 now > 1_700_000_000_000_000_000,
1484 "now_nanos() returned {}, expected > 1.7e18 for nanoseconds",
1485 now
1486 );
1487
1488 assert!(
1490 now < 4_100_000_000_000_000_000,
1491 "now_nanos() returned {}, expected < 4.1e18",
1492 now
1493 );
1494 }
1495}