1use std::collections::{HashMap, HashSet};
7
8use crate::{NodeId, StateId, StateTime, StateType};
9
10#[derive(Clone, Debug, Default, PartialEq, Eq)]
12pub struct VersionVector {
13 clocks: HashMap<NodeId, u64>,
14}
15
16impl VersionVector {
17 pub fn new() -> Self {
18 VersionVector {
19 clocks: HashMap::with_capacity(8),
20 }
21 }
22
23 #[inline]
25 pub fn get(&self, node: NodeId) -> u64 {
26 self.clocks.get(&node).copied().unwrap_or(0)
27 }
28
29 #[inline]
31 pub fn increment(&mut self, node: NodeId) {
32 *self.clocks.entry(node).or_insert(0) += 1;
33 }
34
35 #[inline]
37 pub fn set(&mut self, node: NodeId, value: u64) {
38 self.clocks.insert(node, value);
39 }
40
41 #[inline]
42 pub fn happens_before(&self, other: &VersionVector) -> bool {
43 let mut strictly_less = false;
44
45 for (node, &clock) in &self.clocks {
46 match other.clocks.get(node) {
47 Some(&other_clock) => {
48 if clock > other_clock {
49 return false;
50 }
51 if clock < other_clock {
52 strictly_less = true;
53 }
54 }
55 None => {
56 if clock > 0 {
57 return false;
58 }
59 }
60 }
61 }
62
63 for (node, &clock) in &other.clocks {
64 if !self.clocks.contains_key(node) && clock > 0 {
65 strictly_less = true;
66 }
67 }
68
69 strictly_less
70 }
71
72 #[inline]
74 pub fn concurrent(&self, other: &VersionVector) -> bool {
75 let mut less = false;
76 let mut greater = false;
77
78 for (node, &a) in &self.clocks {
79 let b = other.clocks.get(node).copied().unwrap_or(0);
80 if a < b {
81 less = true;
82 } else if a > b {
83 greater = true;
84 }
85 if less && greater {
86 return true;
87 }
88 }
89
90 for (node, &b) in &other.clocks {
91 if self.clocks.contains_key(node) {
92 continue;
93 }
94 if b > 0 && greater {
95 return true;
96 }
97 }
98
99 false
100 }
101
102 #[inline]
104 pub fn merge(&self, other: &VersionVector) -> VersionVector {
105 let mut merged = self.clocks.clone();
106 for (node, &clock) in &other.clocks {
107 merged
108 .entry(*node)
109 .and_modify(|c| *c = (*c).max(clock))
110 .or_insert(clock);
111 }
112 VersionVector { clocks: merged }
113 }
114
115 pub fn to_compact(&self) -> Vec<(NodeId, u64)> {
117 let mut out = Vec::with_capacity(self.clocks.len());
118 for (&n, &c) in &self.clocks {
119 out.push((n, c));
120 }
121 out
122 }
123
124 pub fn from_compact(entries: Vec<(NodeId, u64)>) -> Self {
126 let mut clocks = HashMap::with_capacity(entries.len());
127 clocks.extend(entries);
128 VersionVector { clocks }
129 }
130}
131
132#[derive(Clone, Debug, Default)]
134pub struct AuthoritySet {
135 pub owners: HashSet<NodeId>,
137 pub delegates: HashMap<NodeId, AuthorityScope>,
139 pub revoked: HashSet<NodeId>,
141}
142
143impl AuthoritySet {
144 pub fn new() -> Self {
145 AuthoritySet::default()
146 }
147
148 pub fn with_owner(owner: NodeId) -> Self {
149 let mut set = AuthoritySet::new();
150 set.owners.insert(owner);
151 set
152 }
153
154 pub fn has_authority(&self, node: NodeId, operation: &AuthorityScope) -> bool {
156 if self.revoked.contains(&node) {
157 return false;
158 }
159
160 if self.owners.contains(&node) {
161 return true;
162 }
163
164 if let Some(scope) = self.delegates.get(&node) {
165 return scope.allows(operation);
166 }
167
168 false
169 }
170
171 pub fn add_owner(&mut self, node: NodeId) {
173 self.owners.insert(node);
174 self.revoked.remove(&node);
175 }
176
177 pub fn add_delegate(&mut self, node: NodeId, scope: AuthorityScope) {
179 self.delegates.insert(node, scope);
180 self.revoked.remove(&node);
181 }
182
183 pub fn revoke(&mut self, node: NodeId) {
185 self.owners.remove(&node);
186 self.delegates.remove(&node);
187 self.revoked.insert(node);
188 }
189
190 pub fn contains(&self, node: &NodeId) -> bool {
192 !self.revoked.contains(node)
193 && (self.owners.contains(node) || self.delegates.contains_key(node))
194 }
195
196 pub fn is_revoked(&self, node: &NodeId) -> bool {
198 self.revoked.contains(node)
199 }
200}
201
202#[derive(Clone, Debug, PartialEq, Eq)]
204pub enum AuthorityScope {
205 Full,
207 Append,
209 ReadOnly,
211 Custom(HashSet<String>),
213}
214
215impl AuthorityScope {
216 pub fn allows(&self, operation: &AuthorityScope) -> bool {
218 match (self, operation) {
219 (AuthorityScope::Full, _) => true,
220 (AuthorityScope::Append, AuthorityScope::Append) => true,
221 (AuthorityScope::Append, AuthorityScope::ReadOnly) => true,
222 (AuthorityScope::ReadOnly, AuthorityScope::ReadOnly) => true,
223 (AuthorityScope::Custom(allowed), AuthorityScope::Custom(requested)) => {
224 requested.is_subset(allowed)
225 }
226 _ => false,
227 }
228 }
229}
230
231#[derive(Clone, Debug, Default)]
233pub enum DeltaLaw {
234 #[default]
236 LastWriterWins,
237 AppendOnly { max_size: usize },
239 Counter { merge: CounterMerge },
241 MultiValueRegister,
243 ContinuousBlend {
245 interpolation: InterpolationType,
246 max_deviation: f64,
247 },
248}
249
250#[derive(Clone, Copy, Debug)]
252pub enum CounterMerge {
253 Max,
254 Sum,
255 Average,
256}
257
258#[derive(Clone, Copy, Debug)]
260pub enum InterpolationType {
261 Linear,
262 Cubic,
263 Catmull,
264}
265
266#[derive(Clone, Debug)]
268pub struct StateBounds {
269 pub max_size: usize,
271 pub rate_limit: Option<RateLimit>,
273 pub max_entropy: f64,
275}
276
277impl Default for StateBounds {
278 fn default() -> Self {
279 StateBounds {
280 max_size: 65536,
281 rate_limit: None,
282 max_entropy: 1.0,
283 }
284 }
285}
286
287#[derive(Clone, Debug)]
289pub struct RateLimit {
290 pub max_events: u32,
291 pub window_ms: u32,
292}
293
294impl RateLimit {
295 pub fn new(max_events: u32, window_ms: u32) -> Self {
296 RateLimit {
297 max_events,
298 window_ms,
299 }
300 }
301}
302
303#[derive(Clone, Debug, Default)]
305pub struct EntropyModel {
306 pub level: f64,
308 pub accumulated: f64,
310 pub time_since_actual: u64,
312}
313
314impl EntropyModel {
315 pub fn new() -> Self {
316 EntropyModel::default()
317 }
318
319 pub fn increase(&mut self, amount: f64) {
321 self.level = (self.level + amount).min(1.0);
322 self.accumulated += amount;
323 }
324
325 pub fn decrease(&mut self, amount: f64) {
327 self.level = (self.level - amount).max(0.0);
328 }
329
330 pub fn reset(&mut self) {
332 self.level = 0.0;
333 self.accumulated = 0.0;
334 self.time_since_actual = 0;
335 }
336}
337
338#[derive(Clone, Debug)]
340pub struct StateAtom {
341 pub id: StateId,
343 pub state_type: StateType,
345 pub authority: AuthoritySet,
347 pub version: VersionVector,
349 pub delta_law: DeltaLaw,
351 pub bounds: StateBounds,
353 pub entropy: EntropyModel,
355 pub last_modified: StateTime,
357 pub value: Vec<u8>,
359}
360
361impl StateAtom {
362 pub fn new(id: StateId, state_type: StateType, owner: NodeId) -> Self {
363 StateAtom {
364 id,
365 state_type,
366 authority: AuthoritySet::with_owner(owner),
367 version: VersionVector::new(),
368 delta_law: DeltaLaw::default(),
369 bounds: StateBounds::default(),
370 entropy: EntropyModel::new(),
371 last_modified: StateTime::ZERO,
372 value: Vec::new(),
373 }
374 }
375
376 pub fn needs_prediction(&self, threshold_ms: u64) -> bool {
378 self.entropy.time_since_actual > threshold_ms * 1000
379 }
380
381 pub fn memory_size(&self) -> usize {
383 std::mem::size_of::<Self>()
384 + self.value.len()
385 + self.authority.owners.len() * std::mem::size_of::<NodeId>()
386 + self.version.clocks.len() * (std::mem::size_of::<NodeId>() + 8)
387 }
388}
389
390#[cfg(test)]
391mod tests {
392 use super::*;
393
394 #[test]
395 fn test_version_vector_happens_before() {
396 let mut v1 = VersionVector::new();
397 v1.set(NodeId::new(1), 1);
398 v1.set(NodeId::new(2), 2);
399
400 let mut v2 = VersionVector::new();
401 v2.set(NodeId::new(1), 1);
402 v2.set(NodeId::new(2), 3);
403
404 assert!(v1.happens_before(&v2));
405 assert!(!v2.happens_before(&v1));
406 }
407
408 #[test]
409 fn test_version_vector_concurrent() {
410 let mut v1 = VersionVector::new();
411 v1.set(NodeId::new(1), 2);
412 v1.set(NodeId::new(2), 1);
413
414 let mut v2 = VersionVector::new();
415 v2.set(NodeId::new(1), 1);
416 v2.set(NodeId::new(2), 2);
417
418 assert!(v1.concurrent(&v2));
419 assert!(v2.concurrent(&v1));
420 }
421
422 #[test]
423 fn test_version_vector_merge() {
424 let mut v1 = VersionVector::new();
425 v1.set(NodeId::new(1), 2);
426 v1.set(NodeId::new(2), 1);
427
428 let mut v2 = VersionVector::new();
429 v2.set(NodeId::new(1), 1);
430 v2.set(NodeId::new(2), 3);
431
432 let merged = v1.merge(&v2);
433 assert_eq!(merged.get(NodeId::new(1)), 2);
434 assert_eq!(merged.get(NodeId::new(2)), 3);
435 }
436
437 #[test]
438 fn test_authority_set() {
439 let owner = NodeId::new(1);
440 let delegate = NodeId::new(2);
441 let outsider = NodeId::new(3);
442
443 let mut auth = AuthoritySet::with_owner(owner);
444 auth.add_delegate(delegate, AuthorityScope::Append);
445
446 assert!(auth.has_authority(owner, &AuthorityScope::Full));
447 assert!(auth.has_authority(delegate, &AuthorityScope::Append));
448 assert!(!auth.has_authority(delegate, &AuthorityScope::Full));
449 assert!(!auth.has_authority(outsider, &AuthorityScope::Append));
450
451 auth.revoke(delegate);
452 assert!(!auth.has_authority(delegate, &AuthorityScope::Append));
453 }
454}