Skip to main content

mdcs_core/
orset.rs

1//!  Observed-Remove Set (OR-Set / Add-Wins Set)
2//!
3//! Each add generates a unique tag.  Remove only removes currently observed tags.
4//!  Concurrent add and remove of the same element:  add wins.
5
6use crate::lattice::{DeltaCRDT, Lattice};
7use serde::{Deserialize, Serialize};
8use std::collections::{BTreeMap, BTreeSet};
9use ulid::Ulid;
10
11/// A unique tag for each add operation
12#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
13pub struct Tag {
14    /// The replica that created this tag
15    pub replica_id: String,
16    /// Unique identifier for this specific add
17    pub unique_id: Ulid,
18}
19
20impl Tag {
21    pub fn new(replica_id: impl Into<String>) -> Self {
22        Self {
23            replica_id: replica_id.into(),
24            unique_id: Ulid::new(),
25        }
26    }
27}
28
29/// An Observed-Remove Set (OR-Set) CRDT with add-wins semantics.
30///
31/// Each insertion is tagged with a globally unique [`Tag`]. A remove operation
32/// only removes the tags that were *observed* at the time of removal. This means
33/// a concurrent add and remove results in the element being present (add wins).
34///
35/// Supports delta-state replication via the [`DeltaCRDT`] trait.
36#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
37pub struct ORSet<T: Ord + Clone> {
38    /// Maps elements to their active tags
39    entries: BTreeMap<T, BTreeSet<Tag>>,
40    /// Tombstones:  tags that have been removed
41    /// (Required for distributed consistency)
42    tombstones: BTreeSet<Tag>,
43    /// Pending delta for delta-state replication
44    #[serde(skip)]
45    pending_delta: Option<ORSetDelta<T>>,
46}
47
48/// Delta payload for [`ORSet`] replication.
49#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
50pub struct ORSetDelta<T: Ord + Clone> {
51    /// New element additions with their tags.
52    pub additions: BTreeMap<T, BTreeSet<Tag>>,
53    /// Tags that have been removed.
54    pub removals: BTreeSet<Tag>,
55}
56
57impl<T: Ord + Clone> ORSet<T> {
58    /// Create a new empty OR-Set.
59    pub fn new() -> Self {
60        Self {
61            entries: BTreeMap::new(),
62            tombstones: BTreeSet::new(),
63            pending_delta: None,
64        }
65    }
66
67    /// Add an element with a new unique tag
68    pub fn add(&mut self, replica_id: &str, value: T) {
69        let tag = Tag::new(replica_id);
70
71        self.entries
72            .entry(value.clone())
73            .or_default()
74            .insert(tag.clone());
75
76        // Record in delta
77        let delta = self.pending_delta.get_or_insert_with(|| ORSetDelta {
78            additions: BTreeMap::new(),
79            removals: BTreeSet::new(),
80        });
81        delta.additions.entry(value).or_default().insert(tag);
82    }
83
84    /// Remove all observed instances of an element
85    pub fn remove(&mut self, value: &T) {
86        if let Some(tags) = self.entries.remove(value) {
87            // Move tags to tombstones
88            for tag in tags.iter() {
89                self.tombstones.insert(tag.clone());
90            }
91
92            // Record in delta
93            let delta = self.pending_delta.get_or_insert_with(|| ORSetDelta {
94                additions: BTreeMap::new(),
95                removals: BTreeSet::new(),
96            });
97            delta.removals.extend(tags);
98        }
99    }
100
101    /// Check whether `value` is present in the set (has at least one live tag).
102    pub fn contains(&self, value: &T) -> bool {
103        self.entries
104            .get(value)
105            .is_some_and(|tags| !tags.is_empty())
106    }
107
108    /// Iterate over all elements currently in the set.
109    pub fn iter(&self) -> impl Iterator<Item = &T> {
110        self.entries.keys()
111    }
112
113    /// Return the number of distinct elements in the set.
114    pub fn len(&self) -> usize {
115        self.entries.len()
116    }
117
118    /// Return `true` if the set contains no elements.
119    pub fn is_empty(&self) -> bool {
120        self.entries.is_empty()
121    }
122}
123
124
125impl<T: Ord + Clone> Default for ORSet<T> {
126    fn default() -> Self {
127        Self::new()
128    }
129}
130
131impl<T: Ord + Clone> Lattice for ORSet<T> {
132    fn bottom() -> Self {
133        Self::new()
134    }
135
136    fn join(&self, other: &Self) -> Self {
137        let mut result = Self::new();
138
139        // Merge tombstones first
140        result.tombstones = self.tombstones.union(&other.tombstones).cloned().collect();
141
142        // Merge entries, filtering out tombstoned tags
143        let all_keys: BTreeSet<_> = self
144            .entries
145            .keys()
146            .chain(other.entries.keys())
147            .cloned()
148            .collect();
149
150        for key in all_keys {
151            let self_tags = self.entries.get(&key).cloned().unwrap_or_default();
152            let other_tags = other.entries.get(&key).cloned().unwrap_or_default();
153
154            let merged_tags: BTreeSet<Tag> = self_tags
155                .union(&other_tags)
156                .filter(|tag| !result.tombstones.contains(tag))
157                .cloned()
158                .collect();
159
160            if !merged_tags.is_empty() {
161                result.entries.insert(key, merged_tags);
162            }
163        }
164
165        result
166    }
167}
168
169impl<T: Ord + Clone> Lattice for ORSetDelta<T> {
170    fn bottom() -> Self {
171        Self {
172            additions: BTreeMap::new(),
173            removals: BTreeSet::new(),
174        }
175    }
176
177    fn join(&self, other: &Self) -> Self {
178        let mut additions = self.additions.clone();
179        for (k, v) in &other.additions {
180            additions.entry(k.clone()).or_default().extend(v.clone());
181        }
182
183        Self {
184            additions,
185            removals: self.removals.union(&other.removals).cloned().collect(),
186        }
187    }
188}
189
190impl<T: Ord + Clone> DeltaCRDT for ORSet<T> {
191    type Delta = ORSetDelta<T>;
192
193    fn split_delta(&mut self) -> Option<Self::Delta> {
194        self.pending_delta.take()
195    }
196
197    fn apply_delta(&mut self, delta: &Self::Delta) {
198        // Apply removals to tombstones
199        self.tombstones.extend(delta.removals.iter().cloned());
200
201        // Apply additions, filtering tombstones
202        for (value, tags) in &delta.additions {
203            let entry = self.entries.entry(value.clone()).or_default();
204            for tag in tags {
205                if !self.tombstones.contains(tag) {
206                    entry.insert(tag.clone());
207                }
208            }
209        }
210
211        // Clean up empty entries
212        self.entries.retain(|_, tags| !tags.is_empty());
213    }
214}