Skip to main content

rings_core/dht/entry/
crdt.rs

1//! CRDT carriers for DHT entries.
2//!
3//! State variables:
4//! - `register` is an optional LWW reset floor for overwrite.
5//! - `values` is an LWW element set keyed by encoded payload.
6//! - `removes` is a two-phase tombstone set for relay-message entries.
7//!
8//! Semilattice laws:
9//! - `GSet` join is set union.
10//! - `DataTopicBuffer` join is idempotent, commutative, and associative over
11//!   normalized LWW element sets.
12//! - `RelayMessageSet` join is idempotent, commutative, and associative over
13//!   normalized two-phase sets.
14//!
15//! Constructor postconditions:
16//! - `DataTopicBuffer::new` preserves only values whose dot is at or after the
17//!   reset floor when a reset floor exists.
18//! - `RelayMessageSet::new` preserves only adds whose dot has not been
19//!   tombstoned.
20
21use std::collections::BTreeMap;
22use std::collections::BTreeSet;
23
24use serde::Deserialize;
25use serde::Serialize;
26
27use crate::algebra::JoinSemilattice;
28use crate::dht::Did;
29use crate::error::Error;
30use crate::error::Result;
31use crate::message::Encoded;
32
33/// Hybrid logical version for LWW entry registers and element dots.
34///
35/// `logical_time_ms` starts from the wall-clock millisecond observed at the
36/// storage-operation boundary, then advances beyond any local floor that would
37/// otherwise dominate it. `actor` and `operation` make concurrent writes from
38/// the same millisecond totally ordered without claiming wall-clock recency.
39#[derive(
40    Clone, Copy, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize,
41)]
42pub struct EntryVersion {
43    /// Hybrid logical time in milliseconds.
44    #[serde(alias = "epoch_ms")]
45    pub logical_time_ms: u128,
46    /// Storage node that first stamped the operation.
47    pub actor: Did,
48    /// Deterministic digest of the stamped operation payload.
49    #[serde(default)]
50    pub operation: Did,
51}
52
53impl EntryVersion {
54    /// Construct a version from an explicit hybrid logical time and actor.
55    pub fn new(logical_time_ms: u128, actor: Did, operation: Did) -> Self {
56        Self {
57            logical_time_ms,
58            actor,
59            operation,
60        }
61    }
62
63    /// Construct a version at the current operation boundary.
64    pub fn issued_by(actor: Did, operation: Did) -> Self {
65        Self::new(crate::utils::get_epoch_ms(), actor, operation)
66    }
67
68    pub(super) fn after(self, floor: Option<Self>) -> Self {
69        let Some(floor) = floor else {
70            return self;
71        };
72        if self > floor {
73            return self;
74        }
75        Self {
76            logical_time_ms: floor.logical_time_ms.saturating_add(1),
77            actor: self.actor,
78            operation: self.operation,
79        }
80    }
81}
82
83/// Unique add witness for one visible entry payload element.
84#[derive(
85    Clone, Copy, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize,
86)]
87pub struct EntryDot {
88    /// LWW version that issued this element.
89    pub version: EntryVersion,
90    /// Element position inside the issuing operation.
91    pub index: u32,
92}
93
94impl EntryDot {
95    pub(super) fn for_index(version: EntryVersion, index: usize) -> Result<Self> {
96        let index = u32::try_from(index).map_err(|_| Error::EntryDotIndexOutOfBounds { index })?;
97        Ok(Self { version, index })
98    }
99}
100
101/// CRDT metadata carried beside the legacy entry payload.
102///
103/// `register` is the LWW reset floor used by overwrite. `dots` are per-element
104/// add witnesses used by data/topic and relay element sets. `tombstones` is the
105/// remove set for relay-message two-phase semantics.
106#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
107pub struct EntryCrdt {
108    /// Optional LWW reset floor for the entry payload.
109    pub register: Option<EntryVersion>,
110    /// Per-element add dots. When absent, legacy entries synthesize dots from
111    /// their payload order and value digest.
112    pub dots: Vec<EntryDot>,
113    /// Remove dots for two-phase sets.
114    pub tombstones: Vec<EntryDot>,
115}
116
117impl EntryCrdt {
118    pub(super) fn has_write_witness(&self) -> bool {
119        self.register.is_some() || !self.dots.is_empty()
120    }
121
122    /// Return the bottom floor used only to lift legacy payloads without dots.
123    pub(super) fn legacy_floor(&self) -> EntryVersion {
124        self.register.unwrap_or_default()
125    }
126}
127
128/// Grow-only set used by subring membership.
129#[derive(Clone, Debug, Default, PartialEq, Eq)]
130pub struct GSet<T: Ord> {
131    members: BTreeSet<T>,
132}
133
134impl<T: Ord> GSet<T> {
135    /// Construct an empty grow-only set.
136    pub fn new() -> Self {
137        Self {
138            members: BTreeSet::new(),
139        }
140    }
141
142    /// Insert one member.
143    pub fn insert(&mut self, member: T) {
144        self.members.insert(member);
145    }
146
147    /// Iterate over members in deterministic order.
148    pub fn iter(&self) -> impl Iterator<Item = &T> {
149        self.members.iter()
150    }
151}
152
153impl<T: Ord> JoinSemilattice for GSet<T> {
154    fn join(mut self, other: Self) -> Self {
155        self.members.extend(other.members);
156        self
157    }
158}
159
160/// Bounded LWW element set used by data topic buffers.
161#[derive(Clone, Debug, Default, PartialEq, Eq)]
162pub struct DataTopicBuffer {
163    pub(super) register: Option<EntryVersion>,
164    pub(super) values: BTreeMap<Encoded, EntryDot>,
165}
166
167impl DataTopicBuffer {
168    pub(super) fn new(
169        register: Option<EntryVersion>,
170        mut values: BTreeMap<Encoded, EntryDot>,
171    ) -> Self {
172        if let Some(floor) = register {
173            values.retain(|_, dot| dot.version >= floor);
174        }
175        Self { register, values }
176    }
177}
178
179impl JoinSemilattice for DataTopicBuffer {
180    fn join(mut self, other: Self) -> Self {
181        self.register = self.register.max(other.register);
182        for (value, dot) in other.values {
183            self.values
184                .entry(value)
185                .and_modify(|current| *current = (*current).max(dot))
186                .or_insert(dot);
187        }
188        Self::new(self.register, self.values)
189    }
190}
191
192/// Two-phase set used by relay-message storage.
193#[derive(Clone, Debug, Default, PartialEq, Eq)]
194pub struct RelayMessageSet {
195    pub(super) adds: DataTopicBuffer,
196    pub(super) removes: BTreeSet<EntryDot>,
197}
198
199impl RelayMessageSet {
200    pub(super) fn new(mut adds: DataTopicBuffer, removes: BTreeSet<EntryDot>) -> Self {
201        adds.values.retain(|_, dot| !removes.contains(dot));
202        Self { adds, removes }
203    }
204}
205
206impl JoinSemilattice for RelayMessageSet {
207    fn join(mut self, other: Self) -> Self {
208        self.adds = self.adds.join(other.adds);
209        self.removes.extend(other.removes);
210        Self::new(self.adds, self.removes)
211    }
212}
213
214/// Grow-only subring membership CRDT.
215pub type SubringMemberSet = GSet<Did>;