use std::collections::{HashMap, HashSet};
use std::hash::Hash;
use super::merge::Merge;
use super::{OrSet, Timestamp};
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum OrSetOp<T> {
Add(T, Timestamp),
Remove(T, Vec<Timestamp>),
}
impl<T: Hash + Eq + Clone> OrSet<T> {
#[must_use]
pub fn new() -> Self {
Self {
elements: HashSet::new(),
tombstone: HashSet::new(),
}
}
pub fn add(&mut self, value: T, tag: Timestamp) {
self.elements.insert((value, tag));
}
pub fn remove(&mut self, value: &T) -> Vec<Timestamp>
where
T: Eq + Hash,
{
let active_tags: Vec<Timestamp> = self
.elements
.iter()
.filter(|(v, _)| v == value)
.map(|(_, ts)| ts.clone())
.collect();
for tag in &active_tags {
self.tombstone.insert((value.clone(), tag.clone()));
}
active_tags
}
pub fn remove_specific(&mut self, value: &T, tags: &[Timestamp])
where
T: Eq + Hash,
{
for tag in tags {
self.tombstone.insert((value.clone(), tag.clone()));
}
}
pub fn contains(&self, value: &T) -> bool {
self.elements
.iter()
.any(|(v, ts)| v == value && !self.tombstone.contains(&(value.clone(), ts.clone())))
}
#[must_use]
pub fn values(&self) -> HashSet<&T> {
let mut result = HashSet::new();
for (value, tag) in &self.elements {
if !self.tombstone.contains(&(value.clone(), tag.clone())) {
result.insert(value);
}
}
result
}
#[must_use]
pub fn len(&self) -> usize {
self.values().len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn active_tags(&self, value: &T) -> Vec<&Timestamp> {
self.elements
.iter()
.filter(|(v, ts)| v == value && !self.tombstone.contains(&(value.clone(), ts.clone())))
.map(|(_, ts)| ts)
.collect()
}
pub fn apply(&mut self, op: OrSetOp<T>) {
match op {
OrSetOp::Add(value, tag) => {
self.add(value, tag);
}
OrSetOp::Remove(value, observed_tags) => {
for tag in observed_tags {
self.tombstone.insert((value.clone(), tag));
}
}
}
}
}
impl<T: Hash + Eq + Clone> Default for OrSet<T> {
fn default() -> Self {
Self::new()
}
}
impl<T: Eq + Hash + Clone> Merge for OrSet<T> {
fn merge(&mut self, other: Self) {
self.elements.extend(other.elements);
self.tombstone.extend(other.tombstone);
}
}
use crate::dag::graph::EventDag;
use crate::dag::replay::{DivergentReplay, ReplayError, replay_divergent};
use crate::event::Event;
use crate::event::data::{AssignAction, EventData};
use crate::event::types::EventType;
#[derive(Debug, Clone)]
pub enum OrSetField {
Labels,
Assignees,
BlockedBy,
RelatedTo,
}
pub fn ops_from_events(
events: &[Event],
field: &OrSetField,
base_state: Option<&OrSet<String>>,
dag: Option<&EventDag>,
) -> Vec<OrSetOp<String>> {
let mut current_set = base_state.map_or_else(OrSet::new, Clone::clone);
let mut tag_map: HashMap<Timestamp, String> = HashMap::new();
let mut ops = Vec::new();
for event in events {
let tag = event_to_timestamp(event);
if let Some(op) = dispatch_event(event, field, &tag, dag, &tag_map, &mut current_set) {
if let OrSetOp::Add(_, ref t) = op {
tag_map.insert(t.clone(), event.event_hash.clone());
}
ops.push(op);
}
}
ops
}
fn dispatch_event(
event: &Event,
field: &OrSetField,
tag: &Timestamp,
dag: Option<&EventDag>,
tag_map: &HashMap<Timestamp, String>,
current_set: &mut OrSet<String>,
) -> Option<OrSetOp<String>> {
match field {
OrSetField::Assignees => handle_assignee(event, tag, dag, tag_map, current_set),
OrSetField::Labels => handle_label(event, tag, dag, tag_map, current_set),
OrSetField::BlockedBy => handle_link(
event,
tag,
dag,
tag_map,
current_set,
&["blocks", "blocked_by"],
),
OrSetField::RelatedTo => handle_link(
event,
tag,
dag,
tag_map,
current_set,
&["related_to", "related"],
),
}
}
fn visible_tags(
candidates: Vec<&Timestamp>,
event: &Event,
dag: Option<&EventDag>,
tag_map: &HashMap<Timestamp, String>,
) -> Vec<Timestamp> {
candidates
.into_iter()
.filter(|t| {
dag.is_none_or(|dag_ref| {
tag_map
.get(t)
.is_none_or(|tag_hash| dag_ref.is_ancestor(tag_hash, &event.event_hash))
})
})
.cloned()
.collect()
}
fn record_add(value: String, tag: &Timestamp, current_set: &mut OrSet<String>) -> OrSetOp<String> {
let op = OrSetOp::Add(value.clone(), tag.clone());
current_set.add(value, tag.clone());
op
}
fn record_remove(
value: String,
event: &Event,
dag: Option<&EventDag>,
tag_map: &HashMap<Timestamp, String>,
current_set: &mut OrSet<String>,
) -> OrSetOp<String> {
let candidate_tags = current_set.active_tags(&value);
let observed_tags = visible_tags(candidate_tags, event, dag, tag_map);
current_set.remove_specific(&value, &observed_tags);
OrSetOp::Remove(value, observed_tags)
}
fn handle_assignee(
event: &Event,
tag: &Timestamp,
dag: Option<&EventDag>,
tag_map: &HashMap<Timestamp, String>,
current_set: &mut OrSet<String>,
) -> Option<OrSetOp<String>> {
if let EventData::Assign(data) = &event.data {
return match data.action {
AssignAction::Assign => Some(record_add(data.agent.clone(), tag, current_set)),
AssignAction::Unassign => Some(record_remove(
data.agent.clone(),
event,
dag,
tag_map,
current_set,
)),
};
}
None
}
fn handle_label(
event: &Event,
tag: &Timestamp,
dag: Option<&EventDag>,
tag_map: &HashMap<Timestamp, String>,
current_set: &mut OrSet<String>,
) -> Option<OrSetOp<String>> {
if event.event_type != EventType::Update {
return None;
}
let EventData::Update(data) = &event.data else {
return None;
};
if data.field != "labels" {
return None;
}
let obj = data.value.as_object()?;
let action_str = obj.get("action")?.as_str().unwrap_or("");
let label_str = obj.get("label")?.as_str().unwrap_or("").to_string();
match action_str {
"add" => Some(record_add(label_str, tag, current_set)),
"remove" => Some(record_remove(label_str, event, dag, tag_map, current_set)),
_ => None,
}
}
fn handle_link(
event: &Event,
tag: &Timestamp,
dag: Option<&EventDag>,
tag_map: &HashMap<Timestamp, String>,
current_set: &mut OrSet<String>,
link_types: &[&str],
) -> Option<OrSetOp<String>> {
match event.event_type {
EventType::Link => {
if let EventData::Link(data) = &event.data
&& link_types.contains(&data.link_type.as_str())
{
return Some(record_add(data.target.clone(), tag, current_set));
}
}
EventType::Unlink => {
if let EventData::Unlink(data) = &event.data {
let matches = data
.link_type
.as_ref()
.is_none_or(|lt| link_types.contains(<.as_str()));
if matches {
return Some(record_remove(
data.target.clone(),
event,
dag,
tag_map,
current_set,
));
}
}
}
_ => {}
}
None
}
#[must_use]
pub fn materialize_from_events(events: &[Event], field: &OrSetField) -> OrSet<String> {
let ops = ops_from_events(events, field, None, None);
let mut set = OrSet::new();
for op in ops {
set.apply(op);
}
set
}
pub fn materialize_from_replay(
dag: &EventDag,
tip_a: &str,
tip_b: &str,
base_state: &OrSet<String>,
field: &OrSetField,
) -> Result<OrSet<String>, ReplayError> {
let replay = replay_divergent(dag, tip_a, tip_b)?;
Ok(apply_replay(base_state, &replay, field, Some(dag)))
}
#[must_use]
pub fn apply_replay(
base_state: &OrSet<String>,
replay: &DivergentReplay,
field: &OrSetField,
dag: Option<&EventDag>,
) -> OrSet<String> {
let mut result = base_state.clone();
let ops = ops_from_events(&replay.merged, field, Some(base_state), dag);
for op in ops {
result.apply(op);
}
result
}
fn event_to_timestamp(event: &Event) -> Timestamp {
use chrono::TimeZone;
let epoch_secs = event.wall_ts_us / 1_000_000;
let subsec_nanos = u32::try_from((event.wall_ts_us % 1_000_000) * 1_000).unwrap_or(0);
let wall = chrono::Utc.timestamp_opt(epoch_secs, subsec_nanos).unwrap();
let actor = hash_str_to_u64(&event.agent);
let event_hash_u64 = hash_str_to_u64(&event.event_hash);
let itc = event.wall_ts_us.cast_unsigned();
Timestamp {
wall,
actor,
event_hash: event_hash_u64,
itc,
}
}
fn hash_str_to_u64(s: &str) -> u64 {
use std::hash::{Hash, Hasher};
let mut hasher = std::collections::hash_map::DefaultHasher::new();
s.hash(&mut hasher);
hasher.finish()
}
#[cfg(test)]
mod tests {
use super::*;
use crate::event::data::{AssignAction, AssignData, EventData};
use crate::event::{Event, EventType};
use chrono::{TimeZone, Utc};
use std::collections::BTreeMap;
fn make_tag(wall_secs: i64, actor: u64, event_hash: u64) -> Timestamp {
Timestamp {
wall: Utc.timestamp_opt(wall_secs, 0).unwrap(),
actor,
event_hash,
itc: wall_secs as u64,
}
}
#[test]
fn new_orset_is_empty() {
let set: OrSet<String> = OrSet::new();
assert!(set.is_empty());
assert_eq!(set.len(), 0);
assert!(set.values().is_empty());
}
#[test]
fn add_single_element() {
let mut set: OrSet<String> = OrSet::new();
let tag = make_tag(1, 1, 100);
set.add("alice".into(), tag);
assert!(set.contains(&"alice".into()));
assert!(!set.contains(&"bob".into()));
assert_eq!(set.len(), 1);
}
#[test]
fn add_multiple_elements() {
let mut set: OrSet<String> = OrSet::new();
set.add("alice".into(), make_tag(1, 1, 100));
set.add("bob".into(), make_tag(2, 1, 101));
set.add("charlie".into(), make_tag(3, 1, 102));
assert_eq!(set.len(), 3);
assert!(set.contains(&"alice".into()));
assert!(set.contains(&"bob".into()));
assert!(set.contains(&"charlie".into()));
}
#[test]
fn add_same_element_twice_with_different_tags() {
let mut set: OrSet<String> = OrSet::new();
set.add("alice".into(), make_tag(1, 1, 100));
set.add("alice".into(), make_tag(2, 2, 101));
assert_eq!(set.len(), 1);
assert!(set.contains(&"alice".into()));
assert_eq!(set.active_tags(&"alice".into()).len(), 2);
}
#[test]
fn remove_element() {
let mut set: OrSet<String> = OrSet::new();
set.add("alice".into(), make_tag(1, 1, 100));
assert!(set.contains(&"alice".into()));
let removed = set.remove(&"alice".into());
assert_eq!(removed.len(), 1);
assert!(!set.contains(&"alice".into()));
assert!(set.is_empty());
}
#[test]
fn remove_nonexistent_element() {
let mut set: OrSet<String> = OrSet::new();
let removed = set.remove(&"alice".into());
assert!(removed.is_empty());
assert!(set.is_empty());
}
#[test]
fn add_remove_add_cycle() {
let mut set: OrSet<String> = OrSet::new();
set.add("alice".into(), make_tag(1, 1, 100));
assert!(set.contains(&"alice".into()));
set.remove(&"alice".into());
assert!(!set.contains(&"alice".into()));
set.add("alice".into(), make_tag(3, 1, 102));
assert!(set.contains(&"alice".into()));
assert_eq!(set.active_tags(&"alice".into()).len(), 1);
}
#[test]
fn multiple_add_remove_cycles() {
let mut set: OrSet<String> = OrSet::new();
for i in 0..5 {
set.add("x".into(), make_tag(i * 2, 1, (i * 2) as u64));
assert!(set.contains(&"x".into()));
set.remove(&"x".into());
assert!(!set.contains(&"x".into()));
}
set.add("x".into(), make_tag(100, 1, 999));
assert!(set.contains(&"x".into()));
assert_eq!(set.len(), 1);
}
#[test]
fn concurrent_add_remove_add_wins() {
let tag1 = make_tag(1, 1, 100);
let mut base: OrSet<String> = OrSet::new();
base.add("x".into(), tag1.clone());
let mut agent_a = base.clone();
agent_a.remove(&"x".into());
assert!(!agent_a.contains(&"x".into()));
let mut agent_b = base.clone();
let tag2 = make_tag(2, 2, 200);
agent_b.add("x".into(), tag2.clone());
let mut merged_ab = agent_a.clone();
merged_ab.merge(agent_b.clone());
assert!(
merged_ab.contains(&"x".into()),
"add-wins: concurrent add should survive remove"
);
let mut merged_ba = agent_b.clone();
merged_ba.merge(agent_a.clone());
assert!(
merged_ba.contains(&"x".into()),
"add-wins: merge must be commutative"
);
}
#[test]
fn concurrent_adds_both_present() {
let mut agent_a: OrSet<String> = OrSet::new();
agent_a.add("x".into(), make_tag(1, 1, 100));
let mut agent_b: OrSet<String> = OrSet::new();
agent_b.add("x".into(), make_tag(1, 2, 200));
let mut merged = agent_a.clone();
merged.merge(agent_b);
assert!(merged.contains(&"x".into()));
assert_eq!(merged.active_tags(&"x".into()).len(), 2);
}
#[test]
fn causal_remove_after_add_element_absent() {
let mut set: OrSet<String> = OrSet::new();
set.add("x".into(), make_tag(1, 1, 100));
set.remove(&"x".into());
assert!(!set.contains(&"x".into()));
}
#[test]
fn merge_commutative() {
let mut a: OrSet<u32> = OrSet::new();
a.add(1, make_tag(1, 1, 100));
a.add(2, make_tag(2, 1, 101));
let mut b: OrSet<u32> = OrSet::new();
b.add(2, make_tag(3, 2, 200));
b.add(3, make_tag(4, 2, 201));
let mut ab = a.clone();
ab.merge(b.clone());
let mut ba = b.clone();
ba.merge(a.clone());
assert_eq!(ab, ba);
}
#[test]
fn merge_associative() {
let mut a: OrSet<u32> = OrSet::new();
a.add(1, make_tag(1, 1, 100));
let mut b: OrSet<u32> = OrSet::new();
b.add(2, make_tag(2, 2, 200));
let mut c: OrSet<u32> = OrSet::new();
c.add(3, make_tag(3, 3, 300));
let mut ab_c = a.clone();
ab_c.merge(b.clone());
ab_c.merge(c.clone());
let mut bc = b.clone();
bc.merge(c.clone());
let mut a_bc = a.clone();
a_bc.merge(bc);
assert_eq!(ab_c, a_bc);
}
#[test]
fn merge_idempotent() {
let mut a: OrSet<u32> = OrSet::new();
a.add(1, make_tag(1, 1, 100));
a.add(2, make_tag(2, 1, 101));
a.remove(&1);
let before = a.clone();
a.merge(before.clone());
assert_eq!(a, before);
}
#[test]
fn merge_empty_sets() {
let a: OrSet<String> = OrSet::new();
let b: OrSet<String> = OrSet::new();
let mut merged = a.clone();
merged.merge(b);
assert!(merged.is_empty());
}
#[test]
fn merge_with_empty_is_identity() {
let mut a: OrSet<u32> = OrSet::new();
a.add(1, make_tag(1, 1, 100));
a.add(2, make_tag(2, 1, 101));
let before = a.clone();
a.merge(OrSet::new());
assert_eq!(a, before);
}
#[test]
fn three_way_concurrent_add_remove() {
let tag1 = make_tag(1, 0, 1);
let mut base: OrSet<String> = OrSet::new();
base.add("x".into(), tag1.clone());
let mut a = base.clone();
a.remove(&"x".into());
let mut b = base.clone();
b.add("x".into(), make_tag(2, 2, 200));
let mut c = base.clone();
c.remove(&"x".into());
let mut result = a.clone();
result.merge(b.clone());
result.merge(c.clone());
assert!(
result.contains(&"x".into()),
"B's concurrent add should win over A and C's removes"
);
}
#[test]
fn remove_then_concurrent_re_adds() {
let tag1 = make_tag(1, 0, 1);
let mut base: OrSet<String> = OrSet::new();
base.add("x".into(), tag1.clone());
let mut a = base.clone();
a.remove(&"x".into());
a.add("x".into(), make_tag(3, 1, 300));
let mut b = base.clone();
b.remove(&"x".into());
b.add("x".into(), make_tag(4, 2, 400));
let mut merged = a.clone();
merged.merge(b.clone());
assert!(merged.contains(&"x".into()));
assert_eq!(merged.active_tags(&"x".into()).len(), 2);
}
#[test]
fn mixed_elements_concurrent_ops() {
let mut base: OrSet<String> = OrSet::new();
base.add("a".into(), make_tag(1, 0, 1));
base.add("b".into(), make_tag(2, 0, 2));
let mut s1 = base.clone();
s1.remove(&"a".into());
s1.add("c".into(), make_tag(3, 1, 100));
let mut s2 = base.clone();
s2.remove(&"b".into());
s2.add("d".into(), make_tag(4, 2, 200));
let mut merged = s1.clone();
merged.merge(s2);
assert!(!merged.contains(&"a".into()));
assert!(!merged.contains(&"b".into()));
assert!(merged.contains(&"c".into()));
assert!(merged.contains(&"d".into()));
}
#[test]
fn apply_add_op() {
let mut set: OrSet<String> = OrSet::new();
let tag = make_tag(1, 1, 100);
set.apply(OrSetOp::Add("alice".into(), tag));
assert!(set.contains(&"alice".into()));
}
#[test]
fn apply_remove_op_with_observed_tags() {
let mut set: OrSet<String> = OrSet::new();
let tag = make_tag(1, 1, 100);
set.add("alice".into(), tag.clone());
set.apply(OrSetOp::Remove("alice".into(), vec![tag]));
assert!(!set.contains(&"alice".into()));
}
#[test]
fn apply_remove_with_unobserved_tag_survives() {
let mut set: OrSet<String> = OrSet::new();
let tag1 = make_tag(1, 1, 100);
let tag2 = make_tag(2, 2, 200);
set.add("alice".into(), tag1.clone());
set.add("alice".into(), tag2.clone());
set.apply(OrSetOp::Remove("alice".into(), vec![tag1]));
assert!(set.contains(&"alice".into()));
assert_eq!(set.active_tags(&"alice".into()).len(), 1);
}
#[test]
fn values_returns_distinct_present_elements() {
let mut set: OrSet<String> = OrSet::new();
set.add("a".into(), make_tag(1, 1, 100));
set.add("b".into(), make_tag(2, 1, 101));
set.add("c".into(), make_tag(3, 1, 102));
set.remove(&"b".into());
let vals = set.values();
assert_eq!(vals.len(), 2);
assert!(vals.contains(&"a".to_string()));
assert!(vals.contains(&"c".to_string()));
assert!(!vals.contains(&"b".to_string()));
}
#[test]
fn remove_base_state_item_succeeds() {
let base_tag = make_tag(1, 1, 100);
let mut base_state: OrSet<String> = OrSet::new();
base_state.add("alice".into(), base_tag.clone());
let event = Event {
wall_ts_us: 2000,
agent: "agent".into(),
itc: "itc".into(),
parents: vec![],
event_type: EventType::Assign,
item_id: crate::model::item_id::ItemId::new_unchecked("bn-test"),
data: EventData::Assign(AssignData {
agent: "alice".into(),
action: AssignAction::Unassign,
extra: BTreeMap::new(),
}),
event_hash: "hash".into(),
};
let ops = ops_from_events(&[event], &OrSetField::Assignees, Some(&base_state), None);
let mut result = base_state.clone();
for op in ops {
result.apply(op);
}
assert!(
!result.contains(&"alice".into()),
"Alice should be removed when base_state is provided"
);
}
#[test]
fn concurrent_remove_respects_dag_visibility() {
let root_hash = "root";
let add_hash = "add_hash";
let remove_hash = "remove_hash";
let mut dag = EventDag::new();
let make_evt = |hash: &str, parents: Vec<&str>| Event {
wall_ts_us: 0,
agent: "a".into(),
itc: "i".into(),
parents: parents.iter().map(|s| s.to_string()).collect(),
event_type: EventType::Create, item_id: crate::model::item_id::ItemId::new_unchecked("bn"),
data: EventData::Create(crate::event::data::CreateData {
title: "".into(),
kind: crate::model::item::Kind::Task,
size: None,
urgency: crate::model::item::Urgency::Default,
labels: vec![],
parent: None,
causation: None,
description: None,
extra: BTreeMap::new(),
}),
event_hash: hash.into(),
};
dag.insert(make_evt(root_hash, vec![]));
dag.insert(make_evt(add_hash, vec![root_hash]));
dag.insert(make_evt(remove_hash, vec![root_hash]));
let add_event = Event {
wall_ts_us: 1000,
agent: "alice".into(),
itc: "1".into(),
parents: vec![root_hash.into()],
event_type: EventType::Assign,
item_id: crate::model::item_id::ItemId::new_unchecked("bn"),
data: EventData::Assign(AssignData {
agent: "alice".into(),
action: AssignAction::Assign,
extra: BTreeMap::new(),
}),
event_hash: add_hash.into(),
};
let remove_event = Event {
wall_ts_us: 2000,
agent: "bob".into(),
itc: "2".into(),
parents: vec![root_hash.into()],
event_type: EventType::Assign,
item_id: crate::model::item_id::ItemId::new_unchecked("bn"),
data: EventData::Assign(AssignData {
agent: "alice".into(),
action: AssignAction::Unassign,
extra: BTreeMap::new(),
}),
event_hash: remove_hash.into(),
};
let events = vec![add_event, remove_event];
let ops = ops_from_events(
&events,
&OrSetField::Assignees,
None, Some(&dag),
);
let mut set = OrSet::new();
for op in ops {
set.apply(op);
}
assert!(
set.contains(&"alice".into()),
"Concurrent Add should survive Remove"
);
}
}