Skip to main content

bones_core/crdt/
orset.rs

1//! OR-Set (Observed-Remove Set) with add-wins semantics.
2//!
3//! This module implements an OR-Set backed by unique tags (timestamps) for
4//! each add operation. The set tracks both additions and removals, and uses
5//! tag-based causality to resolve concurrent add/remove conflicts.
6//!
7//! # Add-Wins Semantics
8//!
9//! When an add and a remove for the same element occur concurrently (neither
10//! causally depends on the other), the add wins. This is achieved by:
11//!
12//! - Each add creates a new unique tag (timestamp).
13//! - A remove only tombstones the tags that were observed at the time of removal.
14//! - A concurrent add introduces a tag the remove never saw, so it survives.
15//!
16//! # DAG Integration
17//!
18//! In bones, OR-Set operations (add/remove) are stored as events in the
19//! Eg-Walker DAG. The OR-Set state can be materialized by replaying events
20//! from the LCA of divergent branches. See [`materialize_from_replay`] for
21//! the replay-based construction.
22//!
23//! # Semilattice Properties
24//!
25//! The merge operation satisfies the semilattice laws:
26//! - **Commutative**: merge(A, B) = merge(B, A)
27//! - **Associative**: merge(merge(A, B), C) = merge(A, merge(B, C))
28//! - **Idempotent**: merge(A, A) = A
29
30use std::collections::{HashMap, HashSet};
31use std::hash::Hash;
32
33use super::merge::Merge;
34use super::{OrSet, Timestamp};
35
36// ---------------------------------------------------------------------------
37// Operations
38// ---------------------------------------------------------------------------
39
40/// An operation on an OR-Set element.
41#[derive(Debug, Clone, PartialEq, Eq)]
42pub enum OrSetOp<T> {
43    /// Add an element with a unique tag (timestamp).
44    Add(T, Timestamp),
45    /// Remove an element, tombstoning all currently-observed tags for it.
46    Remove(T, Vec<Timestamp>),
47}
48
49impl<T: Hash + Eq + Clone> OrSet<T> {
50    /// Create a new empty OR-Set.
51    #[must_use]
52    pub fn new() -> Self {
53        Self {
54            elements: HashSet::new(),
55            tombstone: HashSet::new(),
56        }
57    }
58
59    /// Add an element to the set with a unique tag.
60    ///
61    /// The tag (timestamp) must be unique across all operations to ensure
62    /// correct causality tracking. In bones, this is guaranteed by the
63    /// event hash or ITC stamp.
64    pub fn add(&mut self, value: T, tag: Timestamp) {
65        self.elements.insert((value, tag));
66    }
67
68    /// Remove an element from the set.
69    ///
70    /// This tombstones all currently-observed tags for the given element.
71    /// Any tags added concurrently (not yet observed) will survive,
72    /// implementing add-wins semantics.
73    ///
74    /// Returns the tags that were tombstoned (empty if element was not present).
75    pub fn remove(&mut self, value: &T) -> Vec<Timestamp>
76    where
77        T: Eq + Hash,
78    {
79        // Find all active tags for this value.
80        let active_tags: Vec<Timestamp> = self
81            .elements
82            .iter()
83            .filter(|(v, _)| v == value)
84            .map(|(_, ts)| ts.clone())
85            .collect();
86
87        // Tombstone each active tag.
88        for tag in &active_tags {
89            self.tombstone.insert((value.clone(), tag.clone()));
90        }
91
92        active_tags
93    }
94
95    /// Remove specific tags for an element.
96    ///
97    /// Tombstones only the provided tags.
98    pub fn remove_specific(&mut self, value: &T, tags: &[Timestamp])
99    where
100        T: Eq + Hash,
101    {
102        for tag in tags {
103            self.tombstone.insert((value.clone(), tag.clone()));
104        }
105    }
106
107    /// Check if an element is present in the set.
108    ///
109    /// An element is present if it has at least one add-tag that is not
110    /// covered by a corresponding tombstone.
111    pub fn contains(&self, value: &T) -> bool {
112        self.elements
113            .iter()
114            .any(|(v, ts)| v == value && !self.tombstone.contains(&(value.clone(), ts.clone())))
115    }
116
117    /// Return all currently-present values in the set.
118    ///
119    /// An element is present if it has at least one un-tombstoned add-tag.
120    #[must_use]
121    pub fn values(&self) -> HashSet<&T> {
122        let mut result = HashSet::new();
123        for (value, tag) in &self.elements {
124            if !self.tombstone.contains(&(value.clone(), tag.clone())) {
125                result.insert(value);
126            }
127        }
128        result
129    }
130
131    /// Return the number of distinct present values.
132    #[must_use]
133    pub fn len(&self) -> usize {
134        self.values().len()
135    }
136
137    /// Return `true` if no values are present.
138    #[must_use]
139    pub fn is_empty(&self) -> bool {
140        self.len() == 0
141    }
142
143    /// Return all active (un-tombstoned) tags for a given value.
144    pub fn active_tags(&self, value: &T) -> Vec<&Timestamp> {
145        self.elements
146            .iter()
147            .filter(|(v, ts)| v == value && !self.tombstone.contains(&(value.clone(), ts.clone())))
148            .map(|(_, ts)| ts)
149            .collect()
150    }
151
152    /// Apply an operation to the OR-Set.
153    pub fn apply(&mut self, op: OrSetOp<T>) {
154        match op {
155            OrSetOp::Add(value, tag) => {
156                self.add(value, tag);
157            }
158            OrSetOp::Remove(value, observed_tags) => {
159                // Only tombstone the specific tags that were observed.
160                for tag in observed_tags {
161                    self.tombstone.insert((value.clone(), tag));
162                }
163            }
164        }
165    }
166}
167
168impl<T: Hash + Eq + Clone> Default for OrSet<T> {
169    fn default() -> Self {
170        Self::new()
171    }
172}
173
174// ---------------------------------------------------------------------------
175// Merge (semilattice join)
176// ---------------------------------------------------------------------------
177
178/// Merge implementation for OR-Set.
179///
180/// The merge is a union of both the element sets and the tombstone sets.
181/// This satisfies semilattice laws because set union is commutative,
182/// associative, and idempotent.
183///
184/// After merge, an element is present iff it has at least one add-tag
185/// not covered by a tombstone entry.
186impl<T: Eq + Hash + Clone> Merge for OrSet<T> {
187    fn merge(&mut self, other: Self) {
188        self.elements.extend(other.elements);
189        self.tombstone.extend(other.tombstone);
190    }
191}
192
193// ---------------------------------------------------------------------------
194// DAG Replay Materialization
195// ---------------------------------------------------------------------------
196
197use crate::dag::graph::EventDag;
198use crate::dag::replay::{DivergentReplay, ReplayError, replay_divergent};
199use crate::event::Event;
200use crate::event::data::{AssignAction, EventData};
201use crate::event::types::EventType;
202
203/// An OR-Set field descriptor for DAG-based materialization.
204///
205/// Identifies which event type and field name should be interpreted as
206/// add/remove operations for an OR-Set.
207#[derive(Debug, Clone)]
208pub enum OrSetField {
209    /// Labels: add/remove via item.update with field="labels" and
210    /// JSON value encoding the operation.
211    Labels,
212    /// Assignees: add/remove via item.assign events.
213    Assignees,
214    /// Blocked-by links: add/remove via item.link/item.unlink events.
215    BlockedBy,
216    /// Related-to links: add/remove via item.link/item.unlink events.
217    RelatedTo,
218}
219
220/// Extract OR-Set operations from a sequence of DAG events for a given field.
221///
222/// This is the bridge between the event DAG and the OR-Set CRDT. Each event
223/// that affects the target field is translated into an [`OrSetOp`].
224///
225/// # Timestamp Construction
226///
227/// Each event's unique identity (`event_hash` + `wall_ts` + agent) is mapped to
228/// a [`Timestamp`] for use as the OR-Set tag.
229///
230/// # DAG Visibility
231///
232/// If `dag` is provided, `Remove` operations will only target tags that are
233/// causally visible (ancestors) to the removing event. This is crucial for
234/// handling concurrent edits in `merged` replays.
235///
236/// If `base_state` is provided, the set is initialized with it. Tags present
237/// in `base_state` are assumed to be visible to all events (ancestors).
238pub fn ops_from_events(
239    events: &[Event],
240    field: &OrSetField,
241    base_state: Option<&OrSet<String>>,
242    dag: Option<&EventDag>,
243) -> Vec<OrSetOp<String>> {
244    let mut current_set = base_state.map_or_else(OrSet::new, Clone::clone);
245    let mut tag_map: HashMap<Timestamp, String> = HashMap::new();
246    let mut ops = Vec::new();
247
248    for event in events {
249        let tag = event_to_timestamp(event);
250        if let Some(op) = dispatch_event(event, field, &tag, dag, &tag_map, &mut current_set) {
251            if let OrSetOp::Add(_, ref t) = op {
252                tag_map.insert(t.clone(), event.event_hash.clone());
253            }
254            ops.push(op);
255        }
256    }
257
258    ops
259}
260
261/// Dispatch a single event to the appropriate field handler.
262fn dispatch_event(
263    event: &Event,
264    field: &OrSetField,
265    tag: &Timestamp,
266    dag: Option<&EventDag>,
267    tag_map: &HashMap<Timestamp, String>,
268    current_set: &mut OrSet<String>,
269) -> Option<OrSetOp<String>> {
270    match field {
271        OrSetField::Assignees => handle_assignee(event, tag, dag, tag_map, current_set),
272        OrSetField::Labels => handle_label(event, tag, dag, tag_map, current_set),
273        OrSetField::BlockedBy => handle_link(
274            event,
275            tag,
276            dag,
277            tag_map,
278            current_set,
279            &["blocks", "blocked_by"],
280        ),
281        OrSetField::RelatedTo => handle_link(
282            event,
283            tag,
284            dag,
285            tag_map,
286            current_set,
287            &["related_to", "related"],
288        ),
289    }
290}
291
292/// Filter candidate tags for DAG visibility relative to the given event.
293fn visible_tags(
294    candidates: Vec<&Timestamp>,
295    event: &Event,
296    dag: Option<&EventDag>,
297    tag_map: &HashMap<Timestamp, String>,
298) -> Vec<Timestamp> {
299    candidates
300        .into_iter()
301        .filter(|t| {
302            dag.is_none_or(|dag_ref| {
303                tag_map
304                    .get(t)
305                    .is_none_or(|tag_hash| dag_ref.is_ancestor(tag_hash, &event.event_hash))
306            })
307        })
308        .cloned()
309        .collect()
310}
311
312/// Record an add: inserts the tag into the set and returns the op.
313fn record_add(value: String, tag: &Timestamp, current_set: &mut OrSet<String>) -> OrSetOp<String> {
314    let op = OrSetOp::Add(value.clone(), tag.clone());
315    current_set.add(value, tag.clone());
316    op
317}
318
319/// Record a remove: tombstones visible tags and returns the op.
320fn record_remove(
321    value: String,
322    event: &Event,
323    dag: Option<&EventDag>,
324    tag_map: &HashMap<Timestamp, String>,
325    current_set: &mut OrSet<String>,
326) -> OrSetOp<String> {
327    let candidate_tags = current_set.active_tags(&value);
328    let observed_tags = visible_tags(candidate_tags, event, dag, tag_map);
329    current_set.remove_specific(&value, &observed_tags);
330    OrSetOp::Remove(value, observed_tags)
331}
332
333fn handle_assignee(
334    event: &Event,
335    tag: &Timestamp,
336    dag: Option<&EventDag>,
337    tag_map: &HashMap<Timestamp, String>,
338    current_set: &mut OrSet<String>,
339) -> Option<OrSetOp<String>> {
340    if let EventData::Assign(data) = &event.data {
341        return match data.action {
342            AssignAction::Assign => Some(record_add(data.agent.clone(), tag, current_set)),
343            AssignAction::Unassign => Some(record_remove(
344                data.agent.clone(),
345                event,
346                dag,
347                tag_map,
348                current_set,
349            )),
350        };
351    }
352    None
353}
354
355fn handle_label(
356    event: &Event,
357    tag: &Timestamp,
358    dag: Option<&EventDag>,
359    tag_map: &HashMap<Timestamp, String>,
360    current_set: &mut OrSet<String>,
361) -> Option<OrSetOp<String>> {
362    if event.event_type != EventType::Update {
363        return None;
364    }
365    let EventData::Update(data) = &event.data else {
366        return None;
367    };
368    if data.field != "labels" {
369        return None;
370    }
371    let obj = data.value.as_object()?;
372    let action_str = obj.get("action")?.as_str().unwrap_or("");
373    let label_str = obj.get("label")?.as_str().unwrap_or("").to_string();
374
375    match action_str {
376        "add" => Some(record_add(label_str, tag, current_set)),
377        "remove" => Some(record_remove(label_str, event, dag, tag_map, current_set)),
378        _ => None,
379    }
380}
381
382fn handle_link(
383    event: &Event,
384    tag: &Timestamp,
385    dag: Option<&EventDag>,
386    tag_map: &HashMap<Timestamp, String>,
387    current_set: &mut OrSet<String>,
388    link_types: &[&str],
389) -> Option<OrSetOp<String>> {
390    match event.event_type {
391        EventType::Link => {
392            if let EventData::Link(data) = &event.data
393                && link_types.contains(&data.link_type.as_str())
394            {
395                return Some(record_add(data.target.clone(), tag, current_set));
396            }
397        }
398        EventType::Unlink => {
399            if let EventData::Unlink(data) = &event.data {
400                let matches = data
401                    .link_type
402                    .as_ref()
403                    .is_none_or(|lt| link_types.contains(&lt.as_str()));
404                if matches {
405                    return Some(record_remove(
406                        data.target.clone(),
407                        event,
408                        dag,
409                        tag_map,
410                        current_set,
411                    ));
412                }
413            }
414        }
415        _ => {}
416    }
417    None
418}
419
420/// Materialize an OR-Set from a sequence of events.
421///
422/// Replays the given events in order, applying add/remove operations
423/// to build the final OR-Set state.
424#[must_use]
425pub fn materialize_from_events(events: &[Event], field: &OrSetField) -> OrSet<String> {
426    let ops = ops_from_events(events, field, None, None);
427    let mut set = OrSet::new();
428    for op in ops {
429        set.apply(op);
430    }
431    set
432}
433
434/// Materialize an OR-Set by replaying divergent branches from the DAG.
435///
436/// Given two branch tips, finds their LCA, collects divergent events,
437/// and applies them in deterministic order to produce the merged OR-Set.
438///
439/// The `base_state` is the OR-Set state at the LCA. The divergent events
440/// from both branches are replayed on top of it.
441///
442/// # Add-Wins Resolution
443///
444/// If branch A adds element X and branch B removes element X concurrently:
445/// - Branch B's remove tombstones only the tags B observed (from the LCA state).
446/// - Branch A's add creates a new tag that B never saw.
447/// - After merge, the new tag survives → element X is present (add wins).
448///
449/// # Errors
450///
451/// Returns [`ReplayError`] if the divergent replay fails (e.g., missing
452/// tips or malformed DAG).
453pub fn materialize_from_replay(
454    dag: &EventDag,
455    tip_a: &str,
456    tip_b: &str,
457    base_state: &OrSet<String>,
458    field: &OrSetField,
459) -> Result<OrSet<String>, ReplayError> {
460    let replay = replay_divergent(dag, tip_a, tip_b)?;
461    Ok(apply_replay(base_state, &replay, field, Some(dag)))
462}
463
464/// Apply a divergent replay to a base OR-Set state.
465///
466/// The merged events from the replay are applied in deterministic order
467/// on top of the base state.
468#[must_use]
469pub fn apply_replay(
470    base_state: &OrSet<String>,
471    replay: &DivergentReplay,
472    field: &OrSetField,
473    dag: Option<&EventDag>,
474) -> OrSet<String> {
475    // Start from the base state at the LCA.
476    let mut result = base_state.clone();
477
478    // Apply all divergent events in merged (deterministic) order.
479    // Pass base_state so ops_from_events knows about pre-existing tags.
480    // Pass dag so ops_from_events can resolve visibility.
481    let ops = ops_from_events(&replay.merged, field, Some(base_state), dag);
482    for op in ops {
483        result.apply(op);
484    }
485
486    result
487}
488
489/// Convert an event to a Timestamp for use as an OR-Set tag.
490///
491/// Uses the event's `wall_ts`, agent (hashed), and `event_hash` (hashed) to
492/// produce a unique, deterministic timestamp.
493fn event_to_timestamp(event: &Event) -> Timestamp {
494    use chrono::TimeZone;
495
496    let epoch_secs = event.wall_ts_us / 1_000_000;
497    let subsec_nanos = u32::try_from((event.wall_ts_us % 1_000_000) * 1_000).unwrap_or(0);
498    let wall = chrono::Utc.timestamp_opt(epoch_secs, subsec_nanos).unwrap();
499
500    // Hash the agent string to a u64 for the actor field.
501    let actor = hash_str_to_u64(&event.agent);
502
503    // Hash the event_hash to a u64 for the event_hash field.
504    let event_hash_u64 = hash_str_to_u64(&event.event_hash);
505
506    // Use wall_ts_us as a simple ITC substitute.
507    let itc = event.wall_ts_us.cast_unsigned();
508
509    Timestamp {
510        wall,
511        actor,
512        event_hash: event_hash_u64,
513        itc,
514    }
515}
516
517/// Simple string hash to u64 for deterministic tag generation.
518fn hash_str_to_u64(s: &str) -> u64 {
519    use std::hash::{Hash, Hasher};
520    let mut hasher = std::collections::hash_map::DefaultHasher::new();
521    s.hash(&mut hasher);
522    hasher.finish()
523}
524
525// ---------------------------------------------------------------------------
526// Tests
527// ---------------------------------------------------------------------------
528
529#[cfg(test)]
530mod tests {
531    use super::*;
532    use crate::event::data::{AssignAction, AssignData, EventData};
533    use crate::event::{Event, EventType};
534    use chrono::{TimeZone, Utc};
535    use std::collections::BTreeMap;
536
537    fn make_tag(wall_secs: i64, actor: u64, event_hash: u64) -> Timestamp {
538        Timestamp {
539            wall: Utc.timestamp_opt(wall_secs, 0).unwrap(),
540            actor,
541            event_hash,
542            itc: wall_secs as u64,
543        }
544    }
545
546    // ===================================================================
547    // Basic operations
548    // ===================================================================
549
550    #[test]
551    fn new_orset_is_empty() {
552        let set: OrSet<String> = OrSet::new();
553        assert!(set.is_empty());
554        assert_eq!(set.len(), 0);
555        assert!(set.values().is_empty());
556    }
557
558    #[test]
559    fn add_single_element() {
560        let mut set: OrSet<String> = OrSet::new();
561        let tag = make_tag(1, 1, 100);
562        set.add("alice".into(), tag);
563
564        assert!(set.contains(&"alice".into()));
565        assert!(!set.contains(&"bob".into()));
566        assert_eq!(set.len(), 1);
567    }
568
569    #[test]
570    fn add_multiple_elements() {
571        let mut set: OrSet<String> = OrSet::new();
572        set.add("alice".into(), make_tag(1, 1, 100));
573        set.add("bob".into(), make_tag(2, 1, 101));
574        set.add("charlie".into(), make_tag(3, 1, 102));
575
576        assert_eq!(set.len(), 3);
577        assert!(set.contains(&"alice".into()));
578        assert!(set.contains(&"bob".into()));
579        assert!(set.contains(&"charlie".into()));
580    }
581
582    #[test]
583    fn add_same_element_twice_with_different_tags() {
584        let mut set: OrSet<String> = OrSet::new();
585        set.add("alice".into(), make_tag(1, 1, 100));
586        set.add("alice".into(), make_tag(2, 2, 101));
587
588        // Still one distinct value, but two tags.
589        assert_eq!(set.len(), 1);
590        assert!(set.contains(&"alice".into()));
591        assert_eq!(set.active_tags(&"alice".into()).len(), 2);
592    }
593
594    #[test]
595    fn remove_element() {
596        let mut set: OrSet<String> = OrSet::new();
597        set.add("alice".into(), make_tag(1, 1, 100));
598        assert!(set.contains(&"alice".into()));
599
600        let removed = set.remove(&"alice".into());
601        assert_eq!(removed.len(), 1);
602        assert!(!set.contains(&"alice".into()));
603        assert!(set.is_empty());
604    }
605
606    #[test]
607    fn remove_nonexistent_element() {
608        let mut set: OrSet<String> = OrSet::new();
609        let removed = set.remove(&"alice".into());
610        assert!(removed.is_empty());
611        assert!(set.is_empty());
612    }
613
614    #[test]
615    fn add_remove_add_cycle() {
616        let mut set: OrSet<String> = OrSet::new();
617
618        // First add
619        set.add("alice".into(), make_tag(1, 1, 100));
620        assert!(set.contains(&"alice".into()));
621
622        // Remove
623        set.remove(&"alice".into());
624        assert!(!set.contains(&"alice".into()));
625
626        // Re-add with new tag
627        set.add("alice".into(), make_tag(3, 1, 102));
628        assert!(set.contains(&"alice".into()));
629        assert_eq!(set.active_tags(&"alice".into()).len(), 1);
630    }
631
632    #[test]
633    fn multiple_add_remove_cycles() {
634        let mut set: OrSet<String> = OrSet::new();
635
636        for i in 0..5 {
637            set.add("x".into(), make_tag(i * 2, 1, (i * 2) as u64));
638            assert!(set.contains(&"x".into()));
639            set.remove(&"x".into());
640            assert!(!set.contains(&"x".into()));
641        }
642
643        // Final add
644        set.add("x".into(), make_tag(100, 1, 999));
645        assert!(set.contains(&"x".into()));
646        assert_eq!(set.len(), 1);
647    }
648
649    // ===================================================================
650    // Concurrent add+remove (add-wins)
651    // ===================================================================
652
653    #[test]
654    fn concurrent_add_remove_add_wins() {
655        // Simulate: base has element "x". Agent A removes "x", Agent B
656        // concurrently adds "x" (with a new tag). After merge, "x" should
657        // be present (add-wins).
658
659        // Base state: "x" with tag1
660        let tag1 = make_tag(1, 1, 100);
661        let mut base: OrSet<String> = OrSet::new();
662        base.add("x".into(), tag1.clone());
663
664        // Agent A: removes "x" (observes tag1)
665        let mut agent_a = base.clone();
666        agent_a.remove(&"x".into());
667        assert!(!agent_a.contains(&"x".into()));
668
669        // Agent B: adds "x" with new tag (concurrent, doesn't see remove)
670        let mut agent_b = base.clone();
671        let tag2 = make_tag(2, 2, 200);
672        agent_b.add("x".into(), tag2.clone());
673
674        // Merge A into B
675        let mut merged_ab = agent_a.clone();
676        merged_ab.merge(agent_b.clone());
677        assert!(
678            merged_ab.contains(&"x".into()),
679            "add-wins: concurrent add should survive remove"
680        );
681
682        // Merge B into A (commutativity check)
683        let mut merged_ba = agent_b.clone();
684        merged_ba.merge(agent_a.clone());
685        assert!(
686            merged_ba.contains(&"x".into()),
687            "add-wins: merge must be commutative"
688        );
689    }
690
691    #[test]
692    fn concurrent_adds_both_present() {
693        // Two agents concurrently add "x" with different tags.
694        let mut agent_a: OrSet<String> = OrSet::new();
695        agent_a.add("x".into(), make_tag(1, 1, 100));
696
697        let mut agent_b: OrSet<String> = OrSet::new();
698        agent_b.add("x".into(), make_tag(1, 2, 200));
699
700        let mut merged = agent_a.clone();
701        merged.merge(agent_b);
702
703        assert!(merged.contains(&"x".into()));
704        // Should have 2 tags for "x"
705        assert_eq!(merged.active_tags(&"x".into()).len(), 2);
706    }
707
708    #[test]
709    fn causal_remove_after_add_element_absent() {
710        // Agent A adds "x", then causally removes it. Result: absent.
711        let mut set: OrSet<String> = OrSet::new();
712        set.add("x".into(), make_tag(1, 1, 100));
713        set.remove(&"x".into());
714
715        assert!(!set.contains(&"x".into()));
716    }
717
718    // ===================================================================
719    // Merge semilattice properties
720    // ===================================================================
721
722    #[test]
723    fn merge_commutative() {
724        let mut a: OrSet<u32> = OrSet::new();
725        a.add(1, make_tag(1, 1, 100));
726        a.add(2, make_tag(2, 1, 101));
727
728        let mut b: OrSet<u32> = OrSet::new();
729        b.add(2, make_tag(3, 2, 200));
730        b.add(3, make_tag(4, 2, 201));
731
732        let mut ab = a.clone();
733        ab.merge(b.clone());
734
735        let mut ba = b.clone();
736        ba.merge(a.clone());
737
738        assert_eq!(ab, ba);
739    }
740
741    #[test]
742    fn merge_associative() {
743        let mut a: OrSet<u32> = OrSet::new();
744        a.add(1, make_tag(1, 1, 100));
745
746        let mut b: OrSet<u32> = OrSet::new();
747        b.add(2, make_tag(2, 2, 200));
748
749        let mut c: OrSet<u32> = OrSet::new();
750        c.add(3, make_tag(3, 3, 300));
751
752        // (a ⊔ b) ⊔ c
753        let mut ab_c = a.clone();
754        ab_c.merge(b.clone());
755        ab_c.merge(c.clone());
756
757        // a ⊔ (b ⊔ c)
758        let mut bc = b.clone();
759        bc.merge(c.clone());
760        let mut a_bc = a.clone();
761        a_bc.merge(bc);
762
763        assert_eq!(ab_c, a_bc);
764    }
765
766    #[test]
767    fn merge_idempotent() {
768        let mut a: OrSet<u32> = OrSet::new();
769        a.add(1, make_tag(1, 1, 100));
770        a.add(2, make_tag(2, 1, 101));
771        a.remove(&1);
772
773        let before = a.clone();
774        a.merge(before.clone());
775        assert_eq!(a, before);
776    }
777
778    #[test]
779    fn merge_empty_sets() {
780        let a: OrSet<String> = OrSet::new();
781        let b: OrSet<String> = OrSet::new();
782
783        let mut merged = a.clone();
784        merged.merge(b);
785
786        assert!(merged.is_empty());
787    }
788
789    #[test]
790    fn merge_with_empty_is_identity() {
791        let mut a: OrSet<u32> = OrSet::new();
792        a.add(1, make_tag(1, 1, 100));
793        a.add(2, make_tag(2, 1, 101));
794
795        let before = a.clone();
796        a.merge(OrSet::new());
797        assert_eq!(a, before);
798    }
799
800    // ===================================================================
801    // Complex scenarios
802    // ===================================================================
803
804    #[test]
805    fn three_way_concurrent_add_remove() {
806        // Base: {x(tag1)}
807        let tag1 = make_tag(1, 0, 1);
808        let mut base: OrSet<String> = OrSet::new();
809        base.add("x".into(), tag1.clone());
810
811        // Agent A: removes x
812        let mut a = base.clone();
813        a.remove(&"x".into());
814
815        // Agent B: adds x with new tag
816        let mut b = base.clone();
817        b.add("x".into(), make_tag(2, 2, 200));
818
819        // Agent C: removes x
820        let mut c = base.clone();
821        c.remove(&"x".into());
822
823        // Merge all three
824        let mut result = a.clone();
825        result.merge(b.clone());
826        result.merge(c.clone());
827
828        // Agent B's add should survive both removes
829        assert!(
830            result.contains(&"x".into()),
831            "B's concurrent add should win over A and C's removes"
832        );
833    }
834
835    #[test]
836    fn remove_then_concurrent_re_adds() {
837        // Base: {x(tag1)}
838        let tag1 = make_tag(1, 0, 1);
839        let mut base: OrSet<String> = OrSet::new();
840        base.add("x".into(), tag1.clone());
841
842        // Agent A: removes x, then adds x with new tag
843        let mut a = base.clone();
844        a.remove(&"x".into());
845        a.add("x".into(), make_tag(3, 1, 300));
846
847        // Agent B: also removes x, then adds x with different new tag
848        let mut b = base.clone();
849        b.remove(&"x".into());
850        b.add("x".into(), make_tag(4, 2, 400));
851
852        let mut merged = a.clone();
853        merged.merge(b.clone());
854
855        // Both new adds should survive
856        assert!(merged.contains(&"x".into()));
857        assert_eq!(merged.active_tags(&"x".into()).len(), 2);
858    }
859
860    #[test]
861    fn mixed_elements_concurrent_ops() {
862        // Base: {a(t1), b(t2)}
863        let mut base: OrSet<String> = OrSet::new();
864        base.add("a".into(), make_tag(1, 0, 1));
865        base.add("b".into(), make_tag(2, 0, 2));
866
867        // Agent 1: remove a, add c
868        let mut s1 = base.clone();
869        s1.remove(&"a".into());
870        s1.add("c".into(), make_tag(3, 1, 100));
871
872        // Agent 2: remove b, add d
873        let mut s2 = base.clone();
874        s2.remove(&"b".into());
875        s2.add("d".into(), make_tag(4, 2, 200));
876
877        let mut merged = s1.clone();
878        merged.merge(s2);
879
880        // a removed by agent 1 only (agent 2 didn't remove it,
881        // but agent 2 kept original tag which IS tombstoned by agent 1)
882        // Wait - agent 2 starts from base which has a(t1). Agent 2
883        // doesn't touch a, so a(t1) stays in agent 2's elements.
884        // Agent 1 tombstones a(t1). After merge, a(t1) is tombstoned.
885        // So a is NOT present.
886        assert!(!merged.contains(&"a".into()));
887
888        // b removed by agent 2 only. Agent 1 keeps b(t2). Agent 2
889        // tombstones b(t2). After merge, b(t2) is tombstoned.
890        assert!(!merged.contains(&"b".into()));
891
892        // c added by agent 1, d added by agent 2
893        assert!(merged.contains(&"c".into()));
894        assert!(merged.contains(&"d".into()));
895    }
896
897    // ===================================================================
898    // Apply operations
899    // ===================================================================
900
901    #[test]
902    fn apply_add_op() {
903        let mut set: OrSet<String> = OrSet::new();
904        let tag = make_tag(1, 1, 100);
905        set.apply(OrSetOp::Add("alice".into(), tag));
906
907        assert!(set.contains(&"alice".into()));
908    }
909
910    #[test]
911    fn apply_remove_op_with_observed_tags() {
912        let mut set: OrSet<String> = OrSet::new();
913        let tag = make_tag(1, 1, 100);
914        set.add("alice".into(), tag.clone());
915
916        // Remove by specifying observed tags
917        set.apply(OrSetOp::Remove("alice".into(), vec![tag]));
918        assert!(!set.contains(&"alice".into()));
919    }
920
921    #[test]
922    fn apply_remove_with_unobserved_tag_survives() {
923        let mut set: OrSet<String> = OrSet::new();
924        let tag1 = make_tag(1, 1, 100);
925        let tag2 = make_tag(2, 2, 200);
926        set.add("alice".into(), tag1.clone());
927        set.add("alice".into(), tag2.clone());
928
929        // Remove only observing tag1
930        set.apply(OrSetOp::Remove("alice".into(), vec![tag1]));
931
932        // tag2 survives
933        assert!(set.contains(&"alice".into()));
934        assert_eq!(set.active_tags(&"alice".into()).len(), 1);
935    }
936
937    // ===================================================================
938    // Values method
939    // ===================================================================
940
941    #[test]
942    fn values_returns_distinct_present_elements() {
943        let mut set: OrSet<String> = OrSet::new();
944        set.add("a".into(), make_tag(1, 1, 100));
945        set.add("b".into(), make_tag(2, 1, 101));
946        set.add("c".into(), make_tag(3, 1, 102));
947        set.remove(&"b".into());
948
949        let vals = set.values();
950        assert_eq!(vals.len(), 2);
951        assert!(vals.contains(&"a".to_string()));
952        assert!(vals.contains(&"c".to_string()));
953        assert!(!vals.contains(&"b".to_string()));
954    }
955
956    // ===================================================================
957    // Reproduction Tests for Linearization Bugs
958    // ===================================================================
959
960    #[test]
961    fn remove_base_state_item_succeeds() {
962        // Scenario: Item added in base state, then removed in a divergent event.
963        // With base_state passed to ops_from_events, this should now work.
964
965        let base_tag = make_tag(1, 1, 100);
966        let mut base_state: OrSet<String> = OrSet::new();
967        base_state.add("alice".into(), base_tag.clone());
968
969        // Event: Unassign alice (Unlink/Remove)
970        let event = Event {
971            wall_ts_us: 2000,
972            agent: "agent".into(),
973            itc: "itc".into(),
974            parents: vec![],
975            event_type: EventType::Assign,
976            item_id: crate::model::item_id::ItemId::new_unchecked("bn-test"),
977            data: EventData::Assign(AssignData {
978                agent: "alice".into(),
979                action: AssignAction::Unassign,
980                extra: BTreeMap::new(),
981            }),
982            event_hash: "hash".into(),
983        };
984
985        // Pass base_state so it knows what to remove.
986        let ops = ops_from_events(&[event], &OrSetField::Assignees, Some(&base_state), None);
987
988        // Apply ops to base state
989        let mut result = base_state.clone();
990        for op in ops {
991            result.apply(op);
992        }
993
994        // Alice should be removed
995        assert!(
996            !result.contains(&"alice".into()),
997            "Alice should be removed when base_state is provided"
998        );
999    }
1000
1001    #[test]
1002    fn concurrent_remove_respects_dag_visibility() {
1003        // Scenario: Concurrent Add(A) and Remove(A).
1004        // Remove(A) does NOT see Add(A).
1005        // Result: A should remain present (Add wins).
1006
1007        let root_hash = "root";
1008        let add_hash = "add_hash";
1009        let remove_hash = "remove_hash";
1010
1011        // DAG: root -> add, root -> remove
1012        let mut dag = EventDag::new();
1013        // We mock the DAG structure without full events for the DAG check
1014        // insert() requires full Event, so we construct minimal events.
1015
1016        let make_evt = |hash: &str, parents: Vec<&str>| Event {
1017            wall_ts_us: 0,
1018            agent: "a".into(),
1019            itc: "i".into(),
1020            parents: parents.iter().map(|s| s.to_string()).collect(),
1021            event_type: EventType::Create, // dummy
1022            item_id: crate::model::item_id::ItemId::new_unchecked("bn"),
1023            data: EventData::Create(crate::event::data::CreateData {
1024                title: "".into(),
1025                kind: crate::model::item::Kind::Task,
1026                size: None,
1027                urgency: crate::model::item::Urgency::Default,
1028                labels: vec![],
1029                parent: None,
1030                causation: None,
1031                description: None,
1032                extra: BTreeMap::new(),
1033            }),
1034            event_hash: hash.into(),
1035        };
1036
1037        dag.insert(make_evt(root_hash, vec![]));
1038        dag.insert(make_evt(add_hash, vec![root_hash]));
1039        dag.insert(make_evt(remove_hash, vec![root_hash]));
1040
1041        // Events for ops_from_events
1042        // 1. Add "alice"
1043        let add_event = Event {
1044            wall_ts_us: 1000,
1045            agent: "alice".into(),
1046            itc: "1".into(),
1047            parents: vec![root_hash.into()],
1048            event_type: EventType::Assign,
1049            item_id: crate::model::item_id::ItemId::new_unchecked("bn"),
1050            data: EventData::Assign(AssignData {
1051                agent: "alice".into(),
1052                action: AssignAction::Assign,
1053                extra: BTreeMap::new(),
1054            }),
1055            event_hash: add_hash.into(),
1056        };
1057
1058        // 2. Remove "alice" (concurrent)
1059        let remove_event = Event {
1060            wall_ts_us: 2000,
1061            agent: "bob".into(),
1062            itc: "2".into(),
1063            parents: vec![root_hash.into()],
1064            event_type: EventType::Assign,
1065            item_id: crate::model::item_id::ItemId::new_unchecked("bn"),
1066            data: EventData::Assign(AssignData {
1067                agent: "alice".into(),
1068                action: AssignAction::Unassign,
1069                extra: BTreeMap::new(),
1070            }),
1071            event_hash: remove_hash.into(),
1072        };
1073
1074        // Replay order: Add then Remove (linearized)
1075        let events = vec![add_event, remove_event];
1076
1077        let ops = ops_from_events(
1078            &events,
1079            &OrSetField::Assignees,
1080            None, // empty base
1081            Some(&dag),
1082        );
1083
1084        let mut set = OrSet::new();
1085        for op in ops {
1086            set.apply(op);
1087        }
1088
1089        // Alice should be present because Remove didn't see Add
1090        assert!(
1091            set.contains(&"alice".into()),
1092            "Concurrent Add should survive Remove"
1093        );
1094    }
1095}