rings_core/dht/entry/
crdt.rs1use std::collections::BTreeMap;
22use std::collections::BTreeSet;
23
24use serde::Deserialize;
25use serde::Serialize;
26
27use crate::algebra::JoinSemilattice;
28use crate::dht::Did;
29use crate::error::Error;
30use crate::error::Result;
31use crate::message::Encoded;
32
33#[derive(
40 Clone, Copy, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize,
41)]
42pub struct EntryVersion {
43 #[serde(alias = "epoch_ms")]
45 pub logical_time_ms: u128,
46 pub actor: Did,
48 #[serde(default)]
50 pub operation: Did,
51}
52
53impl EntryVersion {
54 pub fn new(logical_time_ms: u128, actor: Did, operation: Did) -> Self {
56 Self {
57 logical_time_ms,
58 actor,
59 operation,
60 }
61 }
62
63 pub fn issued_by(actor: Did, operation: Did) -> Self {
65 Self::new(crate::utils::get_epoch_ms(), actor, operation)
66 }
67
68 pub(super) fn after(self, floor: Option<Self>) -> Self {
69 let Some(floor) = floor else {
70 return self;
71 };
72 if self > floor {
73 return self;
74 }
75 Self {
76 logical_time_ms: floor.logical_time_ms.saturating_add(1),
77 actor: self.actor,
78 operation: self.operation,
79 }
80 }
81}
82
83#[derive(
85 Clone, Copy, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize,
86)]
87pub struct EntryDot {
88 pub version: EntryVersion,
90 pub index: u32,
92}
93
94impl EntryDot {
95 pub(super) fn for_index(version: EntryVersion, index: usize) -> Result<Self> {
96 let index = u32::try_from(index).map_err(|_| Error::EntryDotIndexOutOfBounds { index })?;
97 Ok(Self { version, index })
98 }
99}
100
101#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
107pub struct EntryCrdt {
108 pub register: Option<EntryVersion>,
110 pub dots: Vec<EntryDot>,
113 pub tombstones: Vec<EntryDot>,
115}
116
117impl EntryCrdt {
118 pub(super) fn has_write_witness(&self) -> bool {
119 self.register.is_some() || !self.dots.is_empty()
120 }
121
122 pub(super) fn legacy_floor(&self) -> EntryVersion {
124 self.register.unwrap_or_default()
125 }
126}
127
128#[derive(Clone, Debug, Default, PartialEq, Eq)]
130pub struct GSet<T: Ord> {
131 members: BTreeSet<T>,
132}
133
134impl<T: Ord> GSet<T> {
135 pub fn new() -> Self {
137 Self {
138 members: BTreeSet::new(),
139 }
140 }
141
142 pub fn insert(&mut self, member: T) {
144 self.members.insert(member);
145 }
146
147 pub fn iter(&self) -> impl Iterator<Item = &T> {
149 self.members.iter()
150 }
151}
152
153impl<T: Ord> JoinSemilattice for GSet<T> {
154 fn join(mut self, other: Self) -> Self {
155 self.members.extend(other.members);
156 self
157 }
158}
159
160#[derive(Clone, Debug, Default, PartialEq, Eq)]
162pub struct DataTopicBuffer {
163 pub(super) register: Option<EntryVersion>,
164 pub(super) values: BTreeMap<Encoded, EntryDot>,
165}
166
167impl DataTopicBuffer {
168 pub(super) fn new(
169 register: Option<EntryVersion>,
170 mut values: BTreeMap<Encoded, EntryDot>,
171 ) -> Self {
172 if let Some(floor) = register {
173 values.retain(|_, dot| dot.version >= floor);
174 }
175 Self { register, values }
176 }
177}
178
179impl JoinSemilattice for DataTopicBuffer {
180 fn join(mut self, other: Self) -> Self {
181 self.register = self.register.max(other.register);
182 for (value, dot) in other.values {
183 self.values
184 .entry(value)
185 .and_modify(|current| *current = (*current).max(dot))
186 .or_insert(dot);
187 }
188 Self::new(self.register, self.values)
189 }
190}
191
192#[derive(Clone, Debug, Default, PartialEq, Eq)]
194pub struct RelayMessageSet {
195 pub(super) adds: DataTopicBuffer,
196 pub(super) removes: BTreeSet<EntryDot>,
197}
198
199impl RelayMessageSet {
200 pub(super) fn new(mut adds: DataTopicBuffer, removes: BTreeSet<EntryDot>) -> Self {
201 adds.values.retain(|_, dot| !removes.contains(dot));
202 Self { adds, removes }
203 }
204}
205
206impl JoinSemilattice for RelayMessageSet {
207 fn join(mut self, other: Self) -> Self {
208 self.adds = self.adds.join(other.adds);
209 self.removes.extend(other.removes);
210 Self::new(self.adds, self.removes)
211 }
212}
213
214pub type SubringMemberSet = GSet<Did>;