use std::collections::{HashSet, VecDeque};
use crate::runtime::l0::{L0Buffer, OccReadSet, try_as_crdt};
use uni_common::core::id::{Eid, Vid};
#[derive(Debug, Default, Clone)]
pub struct WriteSet {
pub vertices: HashSet<Vid>,
pub edges: HashSet<Eid>,
}
impl WriteSet {
pub fn from_l0(l0: &L0Buffer) -> Self {
let mut vertices: HashSet<Vid> = HashSet::new();
for (vid, props) in &l0.vertex_properties {
if !is_crdt_carveout(l0, vid, props) {
vertices.insert(*vid);
}
}
vertices.extend(l0.vertex_tombstones.iter().copied());
vertices.extend(l0.vertex_label_overwrites.iter().copied());
let mut edges: HashSet<Eid> = l0.edge_properties.keys().copied().collect();
edges.extend(l0.edge_endpoints.keys().copied());
edges.extend(l0.tombstones.keys().copied());
Self { vertices, edges }
}
pub fn is_empty(&self) -> bool {
self.vertices.is_empty() && self.edges.is_empty()
}
pub fn intersects(&self, other: &WriteSet) -> bool {
let (small, large) = if self.vertices.len() <= other.vertices.len() {
(&self.vertices, &other.vertices)
} else {
(&other.vertices, &self.vertices)
};
if small.iter().any(|v| large.contains(v)) {
return true;
}
let (small, large) = if self.edges.len() <= other.edges.len() {
(&self.edges, &other.edges)
} else {
(&other.edges, &self.edges)
};
small.iter().any(|e| large.contains(e))
}
}
fn is_crdt_carveout(l0: &L0Buffer, vid: &Vid, props: &uni_common::Properties) -> bool {
let label_changed = l0
.vertex_labels
.get(vid)
.is_some_and(|labels| !labels.is_empty());
let all_crdt = !props.is_empty() && props.values().all(|v| try_as_crdt(v).is_some());
all_crdt && !label_changed
}
#[derive(Debug)]
pub struct CrdtVariantConflict {
pub vid: Vid,
pub property: String,
}
impl std::fmt::Display for CrdtVariantConflict {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"carved-out CRDT write to property {:?} would overwrite a different \
committed CRDT variant (a lost update); aborting",
self.property
)
}
}
pub fn crdt_carveout_overwrite(tx_l0: &L0Buffer, main: &L0Buffer) -> Option<CrdtVariantConflict> {
for (vid, props) in &tx_l0.vertex_properties {
if tx_l0.vertex_tombstones.contains(vid) || !is_crdt_carveout(tx_l0, vid, props) {
continue;
}
let Some(existing_props) = main.vertex_properties.get(vid) else {
continue;
};
for (key, value) in props {
let (Some(new_crdt), Some(existing_crdt)) = (
try_as_crdt(value),
existing_props.get(key).and_then(try_as_crdt),
) else {
continue;
};
if new_crdt.type_name() != existing_crdt.type_name() {
return Some(CrdtVariantConflict {
vid: *vid,
property: key.clone(),
});
}
}
}
None
}
fn read_set_intersects(read_set: &OccReadSet, w: &WriteSet) -> bool {
read_set.vertices.iter().any(|v| w.vertices.contains(v))
|| read_set.edges.iter().any(|e| w.edges.contains(e))
}
#[derive(Debug)]
pub enum Conflict {
WriteWrite { seq: u64 },
ReadWrite { seq: u64 },
HistoryTruncated { read_seq: u64, oldest: u64 },
}
impl std::fmt::Display for Conflict {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Conflict::WriteWrite { seq } => {
write!(f, "write-write conflict with commit sequence {seq}")
}
Conflict::ReadWrite { seq } => {
write!(f, "read-write antidependency with commit sequence {seq}")
}
Conflict::HistoryTruncated { read_seq, oldest } => write!(
f,
"commit history truncated below read sequence {read_seq} \
(oldest retained {oldest}); aborting conservatively"
),
}
}
}
#[derive(Debug)]
pub struct CommitRegistry {
entries: VecDeque<(u64, WriteSet)>,
capacity: usize,
}
impl CommitRegistry {
pub fn new(capacity: usize) -> Self {
assert!(capacity > 0, "CommitRegistry capacity must be non-zero");
Self {
entries: VecDeque::new(),
capacity,
}
}
pub fn record(&mut self, seq: u64, write_set: WriteSet) {
self.entries.push_back((seq, write_set));
while self.entries.len() > self.capacity {
self.entries.pop_front();
}
}
pub fn check(
&self,
read_seq: u64,
write_set: &WriteSet,
read_set: Option<&OccReadSet>,
) -> Option<Conflict> {
if let Some(&(oldest, _)) = self.entries.front()
&& oldest > read_seq.saturating_add(1)
{
return Some(Conflict::HistoryTruncated { read_seq, oldest });
}
for (seq, committed) in &self.entries {
if *seq <= read_seq {
continue;
}
if write_set.intersects(committed) {
return Some(Conflict::WriteWrite { seq: *seq });
}
if let Some(rs) = read_set
&& read_set_intersects(rs, committed)
{
return Some(Conflict::ReadWrite { seq: *seq });
}
}
None
}
}
#[cfg(test)]
mod tests {
use super::*;
fn ws(vids: &[u64]) -> WriteSet {
WriteSet {
vertices: vids.iter().map(|&v| Vid::from(v)).collect(),
edges: HashSet::new(),
}
}
#[test]
fn disjoint_writes_do_not_conflict() {
let mut reg = CommitRegistry::new(16);
reg.record(1, ws(&[1, 2]));
assert!(reg.check(0, &ws(&[3, 4]), None).is_none());
}
#[test]
fn overlapping_write_after_read_seq_conflicts() {
let mut reg = CommitRegistry::new(16);
reg.record(1, ws(&[1, 2]));
assert!(matches!(
reg.check(0, &ws(&[2]), None),
Some(Conflict::WriteWrite { seq: 1 })
));
}
#[test]
fn commit_at_or_before_read_seq_is_ignored() {
let mut reg = CommitRegistry::new(16);
reg.record(1, ws(&[1]));
assert!(reg.check(1, &ws(&[1]), None).is_none());
}
#[test]
fn read_write_antidependency_detected() {
let mut reg = CommitRegistry::new(16);
reg.record(1, ws(&[5]));
let mut rs = OccReadSet::default();
rs.vertices.insert(Vid::from(5));
assert!(matches!(
reg.check(0, &ws(&[99]), Some(&rs)),
Some(Conflict::ReadWrite { seq: 1 })
));
}
#[test]
fn truncated_history_aborts_conservatively() {
let mut reg = CommitRegistry::new(2);
reg.record(1, ws(&[1]));
reg.record(2, ws(&[2]));
reg.record(3, ws(&[3])); assert!(matches!(
reg.check(0, &ws(&[42]), None),
Some(Conflict::HistoryTruncated {
read_seq: 0,
oldest: 2
})
));
}
fn vid(n: u64) -> Vid {
Vid::from(n)
}
fn crdt_props(actor: &str, n: u64) -> uni_common::Properties {
let mut gc = uni_crdt::GCounter::new();
gc.increment(actor, n);
let v: uni_common::Value = serde_json::to_value(uni_crdt::Crdt::GCounter(gc))
.unwrap()
.into();
uni_common::Properties::from([("counter".to_string(), v)])
}
fn int_props(n: i64) -> uni_common::Properties {
uni_common::Properties::from([("n".to_string(), uni_common::Value::Int(n))])
}
fn gset_props(item: &str) -> uni_common::Properties {
let mut gs = uni_crdt::GSet::new();
gs.add(item.to_string());
let v: uni_common::Value = serde_json::to_value(uni_crdt::Crdt::GSet(gs))
.unwrap()
.into();
uni_common::Properties::from([("counter".to_string(), v)])
}
#[test]
fn crdt_only_write_without_labels_is_carved_out() {
let mut buf = L0Buffer::new(0, None);
buf.insert_vertex_with_labels(vid(1), crdt_props("a", 5), &[]);
assert!(!WriteSet::from_l0(&buf).vertices.contains(&vid(1)));
}
#[test]
fn non_crdt_write_without_labels_is_conflictable() {
let mut buf = L0Buffer::new(0, None);
buf.insert_vertex_with_labels(vid(1), int_props(1), &[]);
assert!(WriteSet::from_l0(&buf).vertices.contains(&vid(1)));
}
#[test]
fn crdt_write_with_labels_stays_conflictable() {
let mut buf = L0Buffer::new(0, None);
buf.insert_vertex_with_labels(vid(1), crdt_props("a", 5), &["Counter".to_string()]);
assert!(WriteSet::from_l0(&buf).vertices.contains(&vid(1)));
}
#[test]
fn mixed_crdt_and_lww_write_is_conflictable() {
let mut buf = L0Buffer::new(0, None);
let mut props = crdt_props("a", 5);
props.insert("n".to_string(), uni_common::Value::Int(1));
buf.insert_vertex_with_labels(vid(1), props, &[]);
assert!(WriteSet::from_l0(&buf).vertices.contains(&vid(1)));
}
#[test]
fn plain_map_value_is_not_mistaken_for_crdt() {
let mut buf = L0Buffer::new(0, None);
let map = uni_common::Value::Map(std::collections::HashMap::from([(
"x".to_string(),
uni_common::Value::Int(1),
)]));
buf.insert_vertex_with_labels(
vid(1),
uni_common::Properties::from([("data".to_string(), map)]),
&[],
);
assert!(WriteSet::from_l0(&buf).vertices.contains(&vid(1)));
}
#[test]
fn tombstoned_vertex_is_conflictable() {
let mut buf = L0Buffer::new(0, None);
buf.insert_vertex_with_labels(vid(1), crdt_props("a", 5), &[]);
buf.delete_vertex(vid(1)).unwrap();
assert!(WriteSet::from_l0(&buf).vertices.contains(&vid(1)));
}
#[test]
fn crdt_carveout_overwrite_detects_variant_mismatch() {
let mut main = L0Buffer::new(0, None);
main.insert_vertex_with_labels(vid(1), crdt_props("a", 5), &[]);
let mut tx = L0Buffer::new(0, None);
tx.insert_vertex_with_labels(vid(1), gset_props("x"), &[]);
let conflict = crdt_carveout_overwrite(&tx, &main).expect("variant mismatch");
assert_eq!(conflict.vid, vid(1));
assert_eq!(conflict.property, "counter");
}
#[test]
fn crdt_carveout_overwrite_allows_same_variant() {
let mut main = L0Buffer::new(0, None);
main.insert_vertex_with_labels(vid(1), crdt_props("a", 5), &[]);
let mut tx = L0Buffer::new(0, None);
tx.insert_vertex_with_labels(vid(1), crdt_props("b", 7), &[]);
assert!(crdt_carveout_overwrite(&tx, &main).is_none());
}
#[test]
fn crdt_carveout_overwrite_allows_new_vertex() {
let main = L0Buffer::new(0, None);
let mut tx = L0Buffer::new(0, None);
tx.insert_vertex_with_labels(vid(1), gset_props("x"), &[]);
assert!(crdt_carveout_overwrite(&tx, &main).is_none());
}
#[test]
fn crdt_carveout_overwrite_ignores_conflictable_writes() {
let mut main = L0Buffer::new(0, None);
main.insert_vertex_with_labels(vid(1), crdt_props("a", 5), &[]);
let mut tx = L0Buffer::new(0, None);
tx.insert_vertex_with_labels(vid(1), gset_props("x"), &["Counter".to_string()]);
assert!(crdt_carveout_overwrite(&tx, &main).is_none());
}
#[test]
fn long_lived_reader_within_retained_history_does_not_abort() {
let mut reg = CommitRegistry::new(16);
for seq in 1..=5 {
reg.record(seq, ws(&[seq + 100])); }
assert!(reg.check(0, &ws(&[1]), None).is_none());
}
#[test]
fn truncated_history_aborts_read_set_txn_conservatively() {
let mut reg = CommitRegistry::new(2);
reg.record(1, ws(&[1]));
reg.record(2, ws(&[2]));
reg.record(3, ws(&[3])); let mut rs = OccReadSet::default();
rs.vertices.insert(Vid::from(7));
assert!(matches!(
reg.check(0, &ws(&[42]), Some(&rs)),
Some(Conflict::HistoryTruncated { .. })
));
}
}