use crate::runtime::wal::{Mutation, WriteAheadLog};
use anyhow::Result;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use tracing::{instrument, trace};
use uni_common::core::id::{Eid, Vid};
use uni_common::graph::simple_graph::{Direction, SimpleGraph};
use uni_common::{Properties, Value};
use uni_crdt::Crdt;
fn now_nanos() -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_nanos() as i64)
.unwrap_or(0)
}
pub fn serialize_constraint_key(label: &str, key_values: &[(String, Value)]) -> Vec<u8> {
let mut buf = label.as_bytes().to_vec();
buf.push(0); let mut sorted = key_values.to_vec();
sorted.sort_by(|a, b| a.0.cmp(&b.0));
for (k, v) in &sorted {
buf.extend(k.as_bytes());
buf.push(0);
buf.extend(serde_json::to_vec(v).unwrap_or_default());
buf.push(0);
}
buf
}
#[derive(Debug, Clone, Default)]
pub struct MutationStats {
pub nodes_created: usize,
pub nodes_deleted: usize,
pub relationships_created: usize,
pub relationships_deleted: usize,
pub properties_set: usize,
pub properties_removed: usize,
pub labels_added: usize,
pub labels_removed: usize,
}
impl MutationStats {
pub fn diff(&self, before: &Self) -> Self {
Self {
nodes_created: self.nodes_created.saturating_sub(before.nodes_created),
nodes_deleted: self.nodes_deleted.saturating_sub(before.nodes_deleted),
relationships_created: self
.relationships_created
.saturating_sub(before.relationships_created),
relationships_deleted: self
.relationships_deleted
.saturating_sub(before.relationships_deleted),
properties_set: self.properties_set.saturating_sub(before.properties_set),
properties_removed: self
.properties_removed
.saturating_sub(before.properties_removed),
labels_added: self.labels_added.saturating_sub(before.labels_added),
labels_removed: self.labels_removed.saturating_sub(before.labels_removed),
}
}
}
#[derive(Clone, Debug)]
pub struct TombstoneEntry {
pub eid: Eid,
pub src_vid: Vid,
pub dst_vid: Vid,
pub edge_type: u32,
}
pub struct L0Buffer {
pub graph: SimpleGraph,
pub tombstones: HashMap<Eid, TombstoneEntry>,
pub vertex_tombstones: HashSet<Vid>,
pub edge_versions: HashMap<Eid, u64>,
pub vertex_versions: HashMap<Vid, u64>,
pub edge_properties: HashMap<Eid, Properties>,
pub vertex_properties: HashMap<Vid, Properties>,
pub edge_endpoints: HashMap<Eid, (Vid, Vid, u32)>,
pub vertex_labels: HashMap<Vid, Vec<String>>,
pub edge_types: HashMap<Eid, String>,
pub current_version: u64,
pub mutation_count: usize,
pub mutation_stats: MutationStats,
pub wal: Option<Arc<WriteAheadLog>>,
pub wal_lsn_at_flush: u64,
pub vertex_created_at: HashMap<Vid, i64>,
pub vertex_updated_at: HashMap<Vid, i64>,
pub edge_created_at: HashMap<Eid, i64>,
pub edge_updated_at: HashMap<Eid, i64>,
pub estimated_size: usize,
pub constraint_index: HashMap<Vec<u8>, Vid>,
}
impl std::fmt::Debug for L0Buffer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("L0Buffer")
.field("vertex_count", &self.graph.vertex_count())
.field("edge_count", &self.graph.edge_count())
.field("tombstones", &self.tombstones.len())
.field("vertex_tombstones", &self.vertex_tombstones.len())
.field("current_version", &self.current_version)
.field("mutation_count", &self.mutation_count)
.finish()
}
}
impl Clone for L0Buffer {
fn clone(&self) -> Self {
Self {
graph: self.graph.clone(),
tombstones: self.tombstones.clone(),
vertex_tombstones: self.vertex_tombstones.clone(),
edge_versions: self.edge_versions.clone(),
vertex_versions: self.vertex_versions.clone(),
edge_properties: self.edge_properties.clone(),
vertex_properties: self.vertex_properties.clone(),
edge_endpoints: self.edge_endpoints.clone(),
vertex_labels: self.vertex_labels.clone(),
edge_types: self.edge_types.clone(),
current_version: self.current_version,
mutation_count: self.mutation_count,
mutation_stats: self.mutation_stats.clone(),
wal: None, wal_lsn_at_flush: self.wal_lsn_at_flush,
vertex_created_at: self.vertex_created_at.clone(),
vertex_updated_at: self.vertex_updated_at.clone(),
edge_created_at: self.edge_created_at.clone(),
edge_updated_at: self.edge_updated_at.clone(),
estimated_size: self.estimated_size,
constraint_index: self.constraint_index.clone(),
}
}
}
impl L0Buffer {
fn append_unique_labels(existing: &mut Vec<String>, labels: &[String]) {
for label in labels {
if !existing.contains(label) {
existing.push(label.clone());
}
}
}
fn merge_crdt_properties(entry: &mut Properties, properties: Properties) {
for (k, v) in properties {
let json_v: serde_json::Value = v.clone().into();
if let Ok(mut new_crdt) = serde_json::from_value::<Crdt>(json_v)
&& let Some(existing_v) = entry.get(&k)
&& let Ok(existing_crdt) = serde_json::from_value::<Crdt>(existing_v.clone().into())
{
if new_crdt.try_merge(&existing_crdt).is_ok()
&& let Ok(merged_json) = serde_json::to_value(new_crdt)
{
entry.insert(k, uni_common::Value::from(merged_json));
continue;
}
}
entry.insert(k, v);
}
}
fn estimate_properties_size(props: &Properties) -> usize {
props.keys().map(|k| k.len() + 32).sum()
}
pub fn size_bytes(&self) -> usize {
let mut total = 0;
total += self.graph.vertex_count() * 8;
total += self.graph.edge_count() * 24;
for props in self.vertex_properties.values() {
total += Self::estimate_properties_size(props);
}
for props in self.edge_properties.values() {
total += Self::estimate_properties_size(props);
}
total += self.tombstones.len() * 64;
total += self.vertex_tombstones.len() * 8;
total += self.edge_versions.len() * 16;
total += self.vertex_versions.len() * 16;
total += self.edge_endpoints.len() * 28;
for labels in self.vertex_labels.values() {
total += labels.iter().map(|l| l.len() + 24).sum::<usize>();
}
for type_name in self.edge_types.values() {
total += type_name.len() + 24;
}
total += self.vertex_created_at.len() * 16;
total += self.vertex_updated_at.len() * 16;
total += self.edge_created_at.len() * 16;
total += self.edge_updated_at.len() * 16;
total
}
pub fn new(start_version: u64, wal: Option<Arc<WriteAheadLog>>) -> Self {
Self {
graph: SimpleGraph::new(),
tombstones: HashMap::new(),
vertex_tombstones: HashSet::new(),
edge_versions: HashMap::new(),
vertex_versions: HashMap::new(),
edge_properties: HashMap::new(),
vertex_properties: HashMap::new(),
edge_endpoints: HashMap::new(),
vertex_labels: HashMap::new(),
edge_types: HashMap::new(),
current_version: start_version,
mutation_count: 0,
mutation_stats: MutationStats::default(),
wal,
wal_lsn_at_flush: 0,
vertex_created_at: HashMap::new(),
vertex_updated_at: HashMap::new(),
edge_created_at: HashMap::new(),
edge_updated_at: HashMap::new(),
estimated_size: 0,
constraint_index: HashMap::new(),
}
}
pub fn insert_vertex(&mut self, vid: Vid, properties: Properties) {
self.insert_vertex_with_labels(vid, properties, &[]);
}
pub fn insert_vertex_with_labels(
&mut self,
vid: Vid,
properties: Properties,
labels: &[String],
) {
self.current_version += 1;
let version = self.current_version;
let now = now_nanos();
if let Some(wal) = &self.wal {
let _ = wal.append(&Mutation::InsertVertex {
vid,
properties: properties.clone(),
labels: labels.to_vec(),
});
}
self.vertex_tombstones.remove(&vid);
let entry = self.vertex_properties.entry(vid).or_default();
Self::merge_crdt_properties(entry, properties.clone());
self.vertex_versions.insert(vid, version);
self.vertex_created_at.entry(vid).or_insert(now);
self.vertex_updated_at.insert(vid, now);
let labels_size: usize = labels.iter().map(|l| l.len() + 24).sum();
let existing = self.vertex_labels.entry(vid).or_default();
Self::append_unique_labels(existing, labels);
self.graph.add_vertex(vid);
self.mutation_count += 1;
self.mutation_stats.nodes_created += 1;
self.mutation_stats.properties_set += properties.len();
self.mutation_stats.labels_added += labels.len();
let props_size = Self::estimate_properties_size(&properties);
self.estimated_size += 8 + props_size + 16 + labels_size + 32;
}
pub fn add_vertex_labels(&mut self, vid: Vid, labels: &[String]) {
let existing = self.vertex_labels.entry(vid).or_default();
Self::append_unique_labels(existing, labels);
}
pub fn remove_vertex_label(&mut self, vid: Vid, label: &str) -> bool {
if let Some(labels) = self.vertex_labels.get_mut(&vid)
&& let Some(pos) = labels.iter().position(|l| l == label)
{
labels.remove(pos);
self.current_version += 1;
self.mutation_count += 1;
self.mutation_stats.labels_removed += 1;
return true;
}
false
}
pub fn set_edge_type(&mut self, eid: Eid, edge_type: String) {
self.edge_types.insert(eid, edge_type);
}
pub fn delete_vertex(&mut self, vid: Vid) -> Result<()> {
self.current_version += 1;
if let Some(wal) = &mut self.wal {
let labels = self.vertex_labels.get(&vid).cloned().unwrap_or_default();
wal.append(&Mutation::DeleteVertex { vid, labels })?;
}
self.apply_vertex_deletion(vid);
Ok(())
}
fn apply_vertex_deletion(&mut self, vid: Vid) {
let version = self.current_version;
let mut edges_to_remove = HashSet::new();
for entry in self.graph.neighbors(vid, Direction::Outgoing) {
edges_to_remove.insert(entry.eid);
}
for entry in self.graph.neighbors(vid, Direction::Incoming) {
edges_to_remove.insert(entry.eid); }
let cascaded_edges_count = edges_to_remove.len();
for eid in edges_to_remove {
if let Some((src, dst, etype)) = self.edge_endpoints.get(&eid) {
self.tombstones.insert(
eid,
TombstoneEntry {
eid,
src_vid: *src,
dst_vid: *dst,
edge_type: *etype,
},
);
self.edge_versions.insert(eid, version);
self.edge_endpoints.remove(&eid);
self.edge_properties.remove(&eid);
self.graph.remove_edge(eid);
self.mutation_count += 1;
self.mutation_stats.relationships_deleted += 1;
}
}
self.vertex_tombstones.insert(vid);
self.vertex_properties.remove(&vid);
self.vertex_versions.insert(vid, version);
self.graph.remove_vertex(vid);
self.mutation_count += 1;
self.mutation_stats.nodes_deleted += 1;
self.constraint_index.retain(|_, v| *v != vid);
self.estimated_size += cascaded_edges_count * 72 + 8;
}
pub fn insert_edge(
&mut self,
src_vid: Vid,
dst_vid: Vid,
edge_type: u32,
eid: Eid,
properties: Properties,
edge_type_name: Option<String>,
) -> Result<()> {
self.current_version += 1;
let now = now_nanos();
if let Some(wal) = &mut self.wal {
wal.append(&Mutation::InsertEdge {
src_vid,
dst_vid,
edge_type,
eid,
version: self.current_version,
properties: properties.clone(),
edge_type_name: edge_type_name.clone(),
})?;
}
self.apply_edge_insertion(src_vid, dst_vid, edge_type, eid, properties)?;
let type_name_size = if let Some(ref name) = edge_type_name {
let size = name.len() + 24;
self.edge_types.insert(eid, name.clone());
size
} else {
0
};
self.edge_created_at.entry(eid).or_insert(now);
self.edge_updated_at.insert(eid, now);
self.estimated_size += type_name_size;
Ok(())
}
fn apply_edge_insertion(
&mut self,
src_vid: Vid,
dst_vid: Vid,
edge_type: u32,
eid: Eid,
properties: Properties,
) -> Result<()> {
let version = self.current_version;
if self.vertex_tombstones.contains(&src_vid) {
anyhow::bail!(
"Cannot insert edge: source vertex {} has been deleted (issue #77)",
src_vid
);
}
if self.vertex_tombstones.contains(&dst_vid) {
anyhow::bail!(
"Cannot insert edge: destination vertex {} has been deleted (issue #77)",
dst_vid
);
}
if !self.graph.contains_vertex(src_vid) {
self.graph.add_vertex(src_vid);
}
if !self.graph.contains_vertex(dst_vid) {
self.graph.add_vertex(dst_vid);
}
self.graph.add_edge(src_vid, dst_vid, eid, edge_type);
let props_size = Self::estimate_properties_size(&properties);
let props_count = properties.len();
if !properties.is_empty() {
let entry = self.edge_properties.entry(eid).or_default();
Self::merge_crdt_properties(entry, properties);
}
self.edge_versions.insert(eid, version);
self.edge_endpoints
.insert(eid, (src_vid, dst_vid, edge_type));
self.tombstones.remove(&eid);
self.mutation_count += 1;
self.mutation_stats.relationships_created += 1;
self.mutation_stats.properties_set += props_count;
self.estimated_size += 24 + props_size + 16 + 28 + 32;
Ok(())
}
pub fn delete_edge(
&mut self,
eid: Eid,
src_vid: Vid,
dst_vid: Vid,
edge_type: u32,
) -> Result<()> {
self.current_version += 1;
let now = now_nanos();
if let Some(wal) = &mut self.wal {
wal.append(&Mutation::DeleteEdge {
eid,
src_vid,
dst_vid,
edge_type,
version: self.current_version,
})?;
}
self.apply_edge_deletion(eid, src_vid, dst_vid, edge_type);
self.edge_updated_at.insert(eid, now);
Ok(())
}
fn apply_edge_deletion(&mut self, eid: Eid, src_vid: Vid, dst_vid: Vid, edge_type: u32) {
let version = self.current_version;
self.tombstones.insert(
eid,
TombstoneEntry {
eid,
src_vid,
dst_vid,
edge_type,
},
);
self.edge_versions.insert(eid, version);
self.graph.remove_edge(eid);
self.mutation_count += 1;
self.mutation_stats.relationships_deleted += 1;
self.estimated_size += 80;
}
pub fn get_neighbors(
&self,
vid: Vid,
edge_type: u32,
direction: Direction,
) -> Vec<(Vid, Eid, u64)> {
let edges = self.graph.neighbors(vid, direction);
edges
.iter()
.filter(|e| e.edge_type == edge_type && !self.is_tombstoned(e.eid))
.map(|e| {
let neighbor = match direction {
Direction::Outgoing => e.dst_vid,
Direction::Incoming => e.src_vid,
};
let version = self.edge_versions.get(&e.eid).copied().unwrap_or(0);
(neighbor, e.eid, version)
})
.collect()
}
pub fn is_tombstoned(&self, eid: Eid) -> bool {
self.tombstones.contains_key(&eid)
}
pub fn vids_for_label(&self, label_name: &str) -> Vec<Vid> {
self.vertex_labels
.iter()
.filter(|(_, labels)| labels.iter().any(|l| l == label_name))
.map(|(vid, _)| *vid)
.collect()
}
pub fn all_vertex_vids(&self) -> Vec<Vid> {
self.vertex_properties.keys().copied().collect()
}
pub fn vids_for_labels(&self, label_names: &[&str]) -> Vec<Vid> {
self.vertex_labels
.iter()
.filter(|(_, labels)| label_names.iter().any(|ln| labels.iter().any(|l| l == *ln)))
.map(|(vid, _)| *vid)
.collect()
}
pub fn vids_with_all_labels(&self, label_names: &[&str]) -> Vec<Vid> {
self.vertex_labels
.iter()
.filter(|(_, labels)| label_names.iter().all(|ln| labels.iter().any(|l| l == *ln)))
.map(|(vid, _)| *vid)
.collect()
}
pub fn get_vertex_labels(&self, vid: Vid) -> Option<&[String]> {
self.vertex_labels.get(&vid).map(|v| v.as_slice())
}
pub fn get_edge_type(&self, eid: Eid) -> Option<&str> {
self.edge_types.get(&eid).map(|s| s.as_str())
}
pub fn eids_for_type(&self, type_name: &str) -> Vec<Eid> {
self.edge_types
.iter()
.filter(|(eid, etype)| *etype == type_name && !self.tombstones.contains_key(eid))
.map(|(eid, _)| *eid)
.collect()
}
pub fn all_edge_eids(&self) -> Vec<Eid> {
self.edge_endpoints
.keys()
.filter(|eid| !self.tombstones.contains_key(eid))
.copied()
.collect()
}
pub fn get_edge_endpoints(&self, eid: Eid) -> Option<(Vid, Vid)> {
self.edge_endpoints
.get(&eid)
.map(|(src, dst, _)| (*src, *dst))
}
pub fn get_edge_endpoint_full(&self, eid: Eid) -> Option<(Vid, Vid, u32)> {
self.edge_endpoints.get(&eid).copied()
}
pub fn insert_constraint_key(&mut self, key: Vec<u8>, vid: Vid) {
self.constraint_index.insert(key, vid);
}
pub fn has_constraint_key(&self, key: &[u8], exclude_vid: Vid) -> bool {
self.constraint_index
.get(key)
.is_some_and(|&v| v != exclude_vid)
}
#[instrument(skip(self, other), level = "trace")]
pub fn merge(&mut self, other: &L0Buffer) -> Result<()> {
trace!(
other_mutation_count = other.mutation_count,
"Merging L0 buffer"
);
for &vid in &other.vertex_tombstones {
self.delete_vertex(vid)?;
}
for (vid, props) in &other.vertex_properties {
let labels = other.vertex_labels.get(vid).cloned().unwrap_or_default();
self.insert_vertex_with_labels(*vid, props.clone(), &labels);
}
for (vid, labels) in &other.vertex_labels {
if !self.vertex_labels.contains_key(vid) {
self.vertex_labels.insert(*vid, labels.clone());
}
}
for (eid, (src, dst, etype)) in &other.edge_endpoints {
if other.tombstones.contains_key(eid) {
self.delete_edge(*eid, *src, *dst, *etype)?;
} else {
let props = other.edge_properties.get(eid).cloned().unwrap_or_default();
let etype_name = other.edge_types.get(eid).cloned();
self.insert_edge(*src, *dst, *etype, *eid, props, etype_name)?;
}
}
for (eid, tombstone) in &other.tombstones {
if !other.edge_endpoints.contains_key(eid) {
self.delete_edge(
*eid,
tombstone.src_vid,
tombstone.dst_vid,
tombstone.edge_type,
)?;
}
}
for (vid, ts) in &other.vertex_created_at {
self.vertex_created_at.entry(*vid).or_insert(*ts); }
for (vid, ts) in &other.vertex_updated_at {
self.vertex_updated_at.insert(*vid, *ts); }
for (eid, ts) in &other.edge_created_at {
self.edge_created_at.entry(*eid).or_insert(*ts); }
for (eid, ts) in &other.edge_updated_at {
self.edge_updated_at.insert(*eid, *ts); }
self.estimated_size += other.estimated_size;
for (key, vid) in &other.constraint_index {
self.constraint_index.insert(key.clone(), *vid);
}
Ok(())
}
#[instrument(skip(self, mutations), level = "debug")]
pub fn replay_mutations(&mut self, mutations: Vec<Mutation>) -> Result<()> {
trace!(count = mutations.len(), "Replaying mutations");
for mutation in mutations {
match mutation {
Mutation::InsertVertex {
vid,
properties,
labels,
} => {
self.current_version += 1;
let version = self.current_version;
self.vertex_tombstones.remove(&vid);
let entry = self.vertex_properties.entry(vid).or_default();
Self::merge_crdt_properties(entry, properties);
self.vertex_versions.insert(vid, version);
self.graph.add_vertex(vid);
self.mutation_count += 1;
let existing = self.vertex_labels.entry(vid).or_default();
Self::append_unique_labels(existing, &labels);
}
Mutation::DeleteVertex { vid, labels } => {
self.current_version += 1;
if !labels.is_empty() {
let existing = self.vertex_labels.entry(vid).or_default();
Self::append_unique_labels(existing, &labels);
}
self.apply_vertex_deletion(vid);
}
Mutation::InsertEdge {
src_vid,
dst_vid,
edge_type,
eid,
version: _,
properties,
edge_type_name,
} => {
self.current_version += 1;
self.apply_edge_insertion(src_vid, dst_vid, edge_type, eid, properties)?;
if let Some(name) = edge_type_name {
self.edge_types.insert(eid, name);
}
}
Mutation::DeleteEdge {
eid,
src_vid,
dst_vid,
edge_type,
version: _,
} => {
self.current_version += 1;
self.apply_edge_deletion(eid, src_vid, dst_vid, edge_type);
}
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_l0_buffer_ops() -> Result<()> {
let mut l0 = L0Buffer::new(0, None);
let vid_a = Vid::new(1);
let vid_b = Vid::new(2);
let eid_ab = Eid::new(101);
l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
let neighbors = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
assert_eq!(neighbors.len(), 1);
assert_eq!(neighbors[0].0, vid_b);
assert_eq!(neighbors[0].1, eid_ab);
l0.delete_edge(eid_ab, vid_a, vid_b, 1)?;
assert!(l0.is_tombstoned(eid_ab));
let neighbors_after = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
assert_eq!(neighbors_after.len(), 0);
Ok(())
}
#[test]
fn test_l0_buffer_multiple_edges() -> Result<()> {
let mut l0 = L0Buffer::new(0, None);
let vid_a = Vid::new(1);
let vid_b = Vid::new(2);
let vid_c = Vid::new(3);
let eid_ab = Eid::new(101);
let eid_ac = Eid::new(102);
l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
l0.insert_edge(vid_a, vid_c, 1, eid_ac, HashMap::new(), None)?;
let neighbors = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
assert_eq!(neighbors.len(), 2);
l0.delete_edge(eid_ab, vid_a, vid_b, 1)?;
let neighbors_after = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
assert_eq!(neighbors_after.len(), 1);
assert_eq!(neighbors_after[0].0, vid_c);
Ok(())
}
#[test]
fn test_l0_buffer_edge_type_filter() -> Result<()> {
let mut l0 = L0Buffer::new(0, None);
let vid_a = Vid::new(1);
let vid_b = Vid::new(2);
let vid_c = Vid::new(3);
let eid_ab = Eid::new(101);
let eid_ac = Eid::new(201);
l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
l0.insert_edge(vid_a, vid_c, 2, eid_ac, HashMap::new(), None)?;
let type1_neighbors = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
assert_eq!(type1_neighbors.len(), 1);
assert_eq!(type1_neighbors[0].0, vid_b);
let type2_neighbors = l0.get_neighbors(vid_a, 2, Direction::Outgoing);
assert_eq!(type2_neighbors.len(), 1);
assert_eq!(type2_neighbors[0].0, vid_c);
Ok(())
}
#[test]
fn test_l0_buffer_incoming_edges() -> Result<()> {
let mut l0 = L0Buffer::new(0, None);
let vid_a = Vid::new(1);
let vid_b = Vid::new(2);
let vid_c = Vid::new(3);
let eid_ab = Eid::new(101);
let eid_cb = Eid::new(102);
l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
l0.insert_edge(vid_c, vid_b, 1, eid_cb, HashMap::new(), None)?;
let incoming = l0.get_neighbors(vid_b, 1, Direction::Incoming);
assert_eq!(incoming.len(), 2);
Ok(())
}
#[test]
fn test_merge_empty_props_edge() -> Result<()> {
let mut main_l0 = L0Buffer::new(0, None);
let mut tx_l0 = L0Buffer::new(0, None);
let vid_a = Vid::new(1);
let vid_b = Vid::new(2);
let eid_ab = Eid::new(101);
tx_l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
assert!(tx_l0.edge_endpoints.contains_key(&eid_ab));
assert!(!tx_l0.edge_properties.contains_key(&eid_ab));
main_l0.merge(&tx_l0)?;
assert!(main_l0.edge_endpoints.contains_key(&eid_ab));
let neighbors = main_l0.get_neighbors(vid_a, 1, Direction::Outgoing);
assert_eq!(neighbors.len(), 1);
assert_eq!(neighbors[0].0, vid_b);
Ok(())
}
#[test]
fn test_replay_crdt_merge() -> Result<()> {
use crate::runtime::wal::Mutation;
use serde_json::json;
use uni_common::Value;
let mut l0 = L0Buffer::new(0, None);
let vid = Vid::new(1);
let counter1: Value = json!({
"t": "gc",
"d": {"counts": {"node1": 5}}
})
.into();
let counter2: Value = json!({
"t": "gc",
"d": {"counts": {"node2": 3}}
})
.into();
let mut props1 = HashMap::new();
props1.insert("counter".to_string(), counter1.clone());
l0.replay_mutations(vec![Mutation::InsertVertex {
vid,
properties: props1,
labels: vec![],
}])?;
let mut props2 = HashMap::new();
props2.insert("counter".to_string(), counter2.clone());
l0.replay_mutations(vec![Mutation::InsertVertex {
vid,
properties: props2,
labels: vec![],
}])?;
let stored_props = l0.vertex_properties.get(&vid).unwrap();
let stored_counter = stored_props.get("counter").unwrap();
let stored_json: serde_json::Value = stored_counter.clone().into();
let data = stored_json.get("d").unwrap();
let counts = data.get("counts").unwrap();
assert_eq!(counts.get("node1"), Some(&json!(5)));
assert_eq!(counts.get("node2"), Some(&json!(3)));
Ok(())
}
#[test]
fn test_merge_preserves_vertex_timestamps() -> Result<()> {
let mut l0_main = L0Buffer::new(0, None);
let mut l0_tx = L0Buffer::new(0, None);
let vid = Vid::new(1);
let ts_main_created = 1000;
let ts_main_updated = 1100;
l0_main.insert_vertex(vid, HashMap::new());
l0_main.vertex_created_at.insert(vid, ts_main_created);
l0_main.vertex_updated_at.insert(vid, ts_main_updated);
let ts_tx_created = 2000; let ts_tx_updated = 2100; l0_tx.insert_vertex(vid, HashMap::new());
l0_tx.vertex_created_at.insert(vid, ts_tx_created);
l0_tx.vertex_updated_at.insert(vid, ts_tx_updated);
l0_main.merge(&l0_tx)?;
assert_eq!(
*l0_main.vertex_created_at.get(&vid).unwrap(),
ts_main_created,
"created_at should preserve oldest timestamp"
);
assert_eq!(
*l0_main.vertex_updated_at.get(&vid).unwrap(),
ts_tx_updated,
"updated_at should use latest timestamp"
);
Ok(())
}
#[test]
fn test_merge_preserves_edge_timestamps() -> Result<()> {
let mut l0_main = L0Buffer::new(0, None);
let mut l0_tx = L0Buffer::new(0, None);
let vid_a = Vid::new(1);
let vid_b = Vid::new(2);
let eid = Eid::new(100);
let ts_main_created = 1000;
let ts_main_updated = 1100;
l0_main.insert_edge(vid_a, vid_b, 1, eid, HashMap::new(), None)?;
l0_main.edge_created_at.insert(eid, ts_main_created);
l0_main.edge_updated_at.insert(eid, ts_main_updated);
let ts_tx_created = 2000; let ts_tx_updated = 2100; l0_tx.insert_edge(vid_a, vid_b, 1, eid, HashMap::new(), None)?;
l0_tx.edge_created_at.insert(eid, ts_tx_created);
l0_tx.edge_updated_at.insert(eid, ts_tx_updated);
l0_main.merge(&l0_tx)?;
assert_eq!(
*l0_main.edge_created_at.get(&eid).unwrap(),
ts_main_created,
"edge created_at should preserve oldest timestamp"
);
assert_eq!(
*l0_main.edge_updated_at.get(&eid).unwrap(),
ts_tx_updated,
"edge updated_at should use latest timestamp"
);
Ok(())
}
#[test]
fn test_merge_created_at_not_overwritten_for_existing_vertex() -> Result<()> {
use uni_common::Value;
let mut l0_main = L0Buffer::new(0, None);
let mut l0_tx = L0Buffer::new(0, None);
let vid = Vid::new(1);
let ts_original = 1000;
l0_main.insert_vertex(vid, HashMap::new());
l0_main.vertex_created_at.insert(vid, ts_original);
l0_main.vertex_updated_at.insert(vid, ts_original);
let ts_tx = 2000;
let mut props = HashMap::new();
props.insert("updated".to_string(), Value::String("yes".to_string()));
l0_tx.insert_vertex(vid, props);
l0_tx.vertex_created_at.insert(vid, ts_tx);
l0_tx.vertex_updated_at.insert(vid, ts_tx);
l0_main.merge(&l0_tx)?;
assert_eq!(
*l0_main.vertex_created_at.get(&vid).unwrap(),
ts_original,
"created_at must not be overwritten for existing vertex"
);
assert_eq!(
*l0_main.vertex_updated_at.get(&vid).unwrap(),
ts_tx,
"updated_at should reflect transaction timestamp"
);
assert!(
l0_main
.vertex_properties
.get(&vid)
.unwrap()
.contains_key("updated")
);
Ok(())
}
#[test]
fn test_replay_mutations_preserves_vertex_labels() -> Result<()> {
use crate::runtime::wal::Mutation;
let mut l0 = L0Buffer::new(0, None);
let vid = Vid::new(42);
let mutations = vec![Mutation::InsertVertex {
vid,
properties: {
let mut props = HashMap::new();
props.insert(
"name".to_string(),
uni_common::Value::String("Alice".to_string()),
);
props
},
labels: vec!["Person".to_string(), "User".to_string()],
}];
l0.replay_mutations(mutations)?;
assert!(l0.vertex_properties.contains_key(&vid));
let labels = l0.get_vertex_labels(vid).expect("Labels should exist");
assert_eq!(labels.len(), 2);
assert!(labels.contains(&"Person".to_string()));
assert!(labels.contains(&"User".to_string()));
let person_vids = l0.vids_for_label("Person");
assert_eq!(person_vids.len(), 1);
assert_eq!(person_vids[0], vid);
let user_vids = l0.vids_for_label("User");
assert_eq!(user_vids.len(), 1);
assert_eq!(user_vids[0], vid);
Ok(())
}
#[test]
fn test_replay_mutations_preserves_delete_vertex_labels() -> Result<()> {
use crate::runtime::wal::Mutation;
let mut l0 = L0Buffer::new(0, None);
let vid = Vid::new(99);
l0.insert_vertex_with_labels(
vid,
HashMap::new(),
&["Person".to_string(), "Admin".to_string()],
);
assert!(l0.vertex_properties.contains_key(&vid));
let labels = l0.get_vertex_labels(vid).expect("Labels should exist");
assert_eq!(labels.len(), 2);
let mutations = vec![Mutation::DeleteVertex {
vid,
labels: vec!["Person".to_string(), "Admin".to_string()],
}];
l0.replay_mutations(mutations)?;
assert!(l0.vertex_tombstones.contains(&vid));
let labels = l0.get_vertex_labels(vid);
assert!(
labels.is_some(),
"Labels should be preserved even after deletion for tombstone flushing"
);
Ok(())
}
#[test]
fn test_replay_mutations_preserves_edge_type_name() -> Result<()> {
use crate::runtime::wal::Mutation;
let mut l0 = L0Buffer::new(0, None);
let src = Vid::new(1);
let dst = Vid::new(2);
let eid = Eid::new(500);
let edge_type = 100;
let mutations = vec![Mutation::InsertEdge {
src_vid: src,
dst_vid: dst,
edge_type,
eid,
version: 1,
properties: {
let mut props = HashMap::new();
props.insert("since".to_string(), uni_common::Value::Int(2020));
props
},
edge_type_name: Some("KNOWS".to_string()),
}];
l0.replay_mutations(mutations)?;
assert!(l0.edge_endpoints.contains_key(&eid));
let type_name = l0.get_edge_type(eid).expect("Edge type name should exist");
assert_eq!(type_name, "KNOWS");
let knows_eids = l0.eids_for_type("KNOWS");
assert_eq!(knows_eids.len(), 1);
assert_eq!(knows_eids[0], eid);
Ok(())
}
#[test]
fn test_edge_type_mapping_survives_multiple_replays() -> Result<()> {
use crate::runtime::wal::Mutation;
let mut l0 = L0Buffer::new(0, None);
let mutations = vec![
Mutation::InsertEdge {
src_vid: Vid::new(1),
dst_vid: Vid::new(2),
edge_type: 100,
eid: Eid::new(1000),
version: 1,
properties: HashMap::new(),
edge_type_name: Some("KNOWS".to_string()),
},
Mutation::InsertEdge {
src_vid: Vid::new(2),
dst_vid: Vid::new(3),
edge_type: 101,
eid: Eid::new(1001),
version: 2,
properties: HashMap::new(),
edge_type_name: Some("LIKES".to_string()),
},
Mutation::InsertEdge {
src_vid: Vid::new(3),
dst_vid: Vid::new(1),
edge_type: 100,
eid: Eid::new(1002),
version: 3,
properties: HashMap::new(),
edge_type_name: Some("KNOWS".to_string()),
},
];
l0.replay_mutations(mutations)?;
assert_eq!(l0.get_edge_type(Eid::new(1000)), Some("KNOWS"));
assert_eq!(l0.get_edge_type(Eid::new(1001)), Some("LIKES"));
assert_eq!(l0.get_edge_type(Eid::new(1002)), Some("KNOWS"));
let knows_edges = l0.eids_for_type("KNOWS");
assert_eq!(knows_edges.len(), 2);
assert!(knows_edges.contains(&Eid::new(1000)));
assert!(knows_edges.contains(&Eid::new(1002)));
let likes_edges = l0.eids_for_type("LIKES");
assert_eq!(likes_edges.len(), 1);
assert_eq!(likes_edges[0], Eid::new(1001));
Ok(())
}
#[test]
fn test_replay_mutations_combined_labels_and_edge_types() -> Result<()> {
use crate::runtime::wal::Mutation;
let mut l0 = L0Buffer::new(0, None);
let alice = Vid::new(1);
let bob = Vid::new(2);
let eid = Eid::new(100);
let mutations = vec![
Mutation::InsertVertex {
vid: alice,
properties: {
let mut props = HashMap::new();
props.insert(
"name".to_string(),
uni_common::Value::String("Alice".to_string()),
);
props
},
labels: vec!["Person".to_string()],
},
Mutation::InsertVertex {
vid: bob,
properties: {
let mut props = HashMap::new();
props.insert(
"name".to_string(),
uni_common::Value::String("Bob".to_string()),
);
props
},
labels: vec!["Person".to_string()],
},
Mutation::InsertEdge {
src_vid: alice,
dst_vid: bob,
edge_type: 1,
eid,
version: 3,
properties: HashMap::new(),
edge_type_name: Some("KNOWS".to_string()),
},
];
l0.replay_mutations(mutations)?;
assert_eq!(l0.get_vertex_labels(alice).unwrap().len(), 1);
assert_eq!(l0.get_vertex_labels(bob).unwrap().len(), 1);
assert_eq!(l0.vids_for_label("Person").len(), 2);
assert_eq!(l0.get_edge_type(eid).unwrap(), "KNOWS");
assert_eq!(l0.eids_for_type("KNOWS").len(), 1);
let alice_neighbors = l0.get_neighbors(alice, 1, Direction::Outgoing);
assert_eq!(alice_neighbors.len(), 1);
assert_eq!(alice_neighbors[0].0, bob);
Ok(())
}
#[test]
fn test_replay_mutations_backward_compat_empty_labels() -> Result<()> {
use crate::runtime::wal::Mutation;
let mut l0 = L0Buffer::new(0, None);
let vid = Vid::new(1);
let mutations = vec![Mutation::InsertVertex {
vid,
properties: HashMap::new(),
labels: vec![], }];
l0.replay_mutations(mutations)?;
assert!(l0.vertex_properties.contains_key(&vid));
let labels = l0.get_vertex_labels(vid);
assert!(labels.is_some(), "Labels entry should exist even if empty");
assert_eq!(labels.unwrap().len(), 0);
Ok(())
}
#[test]
fn test_now_nanos_returns_nanosecond_range() {
let now = now_nanos();
assert!(
now > 1_700_000_000_000_000_000,
"now_nanos() returned {}, expected > 1.7e18 for nanoseconds",
now
);
assert!(
now < 4_100_000_000_000_000_000,
"now_nanos() returned {}, expected < 4.1e18",
now
);
}
}