Skip to main content

elara_core/
state.rs

1//! State atom definitions
2//!
3//! State atoms (ω) are the fundamental units of reality in ELARA.
4//! Each atom has identity, type, authority, versioning, and merge behavior.
5
6use std::collections::{HashMap, HashSet};
7
8use crate::{NodeId, StateId, StateTime, StateType};
9
10/// Version vector for causal ordering (not total ordering)
11#[derive(Clone, Debug, Default, PartialEq, Eq)]
12pub struct VersionVector {
13    clocks: HashMap<NodeId, u64>,
14}
15
16impl VersionVector {
17    pub fn new() -> Self {
18        VersionVector {
19            clocks: HashMap::with_capacity(8),
20        }
21    }
22
23    /// Get the clock value for a node
24    #[inline]
25    pub fn get(&self, node: NodeId) -> u64 {
26        self.clocks.get(&node).copied().unwrap_or(0)
27    }
28
29    /// Increment the clock for a node
30    #[inline]
31    pub fn increment(&mut self, node: NodeId) {
32        *self.clocks.entry(node).or_insert(0) += 1;
33    }
34
35    /// Set the clock for a node
36    #[inline]
37    pub fn set(&mut self, node: NodeId, value: u64) {
38        self.clocks.insert(node, value);
39    }
40
41    #[inline]
42    pub fn happens_before(&self, other: &VersionVector) -> bool {
43        let mut strictly_less = false;
44
45        for (node, &clock) in &self.clocks {
46            match other.clocks.get(node) {
47                Some(&other_clock) => {
48                    if clock > other_clock {
49                        return false;
50                    }
51                    if clock < other_clock {
52                        strictly_less = true;
53                    }
54                }
55                None => {
56                    if clock > 0 {
57                        return false;
58                    }
59                }
60            }
61        }
62
63        for (node, &clock) in &other.clocks {
64            if !self.clocks.contains_key(node) && clock > 0 {
65                strictly_less = true;
66            }
67        }
68
69        strictly_less
70    }
71
72    /// Check if two version vectors are concurrent (neither happens-before)
73    #[inline]
74    pub fn concurrent(&self, other: &VersionVector) -> bool {
75        let mut less = false;
76        let mut greater = false;
77
78        for (node, &a) in &self.clocks {
79            let b = other.clocks.get(node).copied().unwrap_or(0);
80            if a < b {
81                less = true;
82            } else if a > b {
83                greater = true;
84            }
85            if less && greater {
86                return true;
87            }
88        }
89
90        for (node, &b) in &other.clocks {
91            if self.clocks.contains_key(node) {
92                continue;
93            }
94            if b > 0 && greater {
95                return true;
96            }
97        }
98
99        false
100    }
101
102    /// Merge two version vectors (element-wise max)
103    #[inline]
104    pub fn merge(&self, other: &VersionVector) -> VersionVector {
105        let mut merged = self.clocks.clone();
106        for (node, &clock) in &other.clocks {
107            merged
108                .entry(*node)
109                .and_modify(|c| *c = (*c).max(clock))
110                .or_insert(clock);
111        }
112        VersionVector { clocks: merged }
113    }
114
115    /// Compact representation for wire format
116    pub fn to_compact(&self) -> Vec<(NodeId, u64)> {
117        let mut out = Vec::with_capacity(self.clocks.len());
118        for (&n, &c) in &self.clocks {
119            out.push((n, c));
120        }
121        out
122    }
123
124    /// Restore from compact representation
125    pub fn from_compact(entries: Vec<(NodeId, u64)>) -> Self {
126        let mut clocks = HashMap::with_capacity(entries.len());
127        clocks.extend(entries);
128        VersionVector { clocks }
129    }
130}
131
132/// Authority set - who can mutate a state atom
133#[derive(Clone, Debug, Default)]
134pub struct AuthoritySet {
135    /// Nodes with full authority
136    pub owners: HashSet<NodeId>,
137    /// Nodes with delegated authority (with scope)
138    pub delegates: HashMap<NodeId, AuthorityScope>,
139    /// Explicitly revoked nodes
140    pub revoked: HashSet<NodeId>,
141}
142
143impl AuthoritySet {
144    pub fn new() -> Self {
145        AuthoritySet::default()
146    }
147
148    pub fn with_owner(owner: NodeId) -> Self {
149        let mut set = AuthoritySet::new();
150        set.owners.insert(owner);
151        set
152    }
153
154    /// Check if a node has authority to perform an operation
155    pub fn has_authority(&self, node: NodeId, operation: &AuthorityScope) -> bool {
156        if self.revoked.contains(&node) {
157            return false;
158        }
159
160        if self.owners.contains(&node) {
161            return true;
162        }
163
164        if let Some(scope) = self.delegates.get(&node) {
165            return scope.allows(operation);
166        }
167
168        false
169    }
170
171    /// Add an owner
172    pub fn add_owner(&mut self, node: NodeId) {
173        self.owners.insert(node);
174        self.revoked.remove(&node);
175    }
176
177    /// Add a delegate with limited scope
178    pub fn add_delegate(&mut self, node: NodeId, scope: AuthorityScope) {
179        self.delegates.insert(node, scope);
180        self.revoked.remove(&node);
181    }
182
183    /// Revoke authority from a node
184    pub fn revoke(&mut self, node: NodeId) {
185        self.owners.remove(&node);
186        self.delegates.remove(&node);
187        self.revoked.insert(node);
188    }
189
190    /// Check if a node is in the authority set (owner or delegate)
191    pub fn contains(&self, node: &NodeId) -> bool {
192        !self.revoked.contains(node)
193            && (self.owners.contains(node) || self.delegates.contains_key(node))
194    }
195
196    /// Check if a node is revoked
197    pub fn is_revoked(&self, node: &NodeId) -> bool {
198        self.revoked.contains(node)
199    }
200}
201
202/// Authority scope - what operations a delegate can perform
203#[derive(Clone, Debug, PartialEq, Eq)]
204pub enum AuthorityScope {
205    /// Full authority (same as owner)
206    Full,
207    /// Read and append only
208    Append,
209    /// Read only
210    ReadOnly,
211    /// Custom scope with specific operations
212    Custom(HashSet<String>),
213}
214
215impl AuthorityScope {
216    /// Check if this scope allows a given operation
217    pub fn allows(&self, operation: &AuthorityScope) -> bool {
218        match (self, operation) {
219            (AuthorityScope::Full, _) => true,
220            (AuthorityScope::Append, AuthorityScope::Append) => true,
221            (AuthorityScope::Append, AuthorityScope::ReadOnly) => true,
222            (AuthorityScope::ReadOnly, AuthorityScope::ReadOnly) => true,
223            (AuthorityScope::Custom(allowed), AuthorityScope::Custom(requested)) => {
224                requested.is_subset(allowed)
225            }
226            _ => false,
227        }
228    }
229}
230
231/// Delta law - how to merge conflicting mutations
232#[derive(Clone, Debug, Default)]
233pub enum DeltaLaw {
234    /// Last-writer-wins based on timestamp
235    #[default]
236    LastWriterWins,
237    /// Append-only list (CRDT)
238    AppendOnly { max_size: usize },
239    /// Counter with specified merge strategy
240    Counter { merge: CounterMerge },
241    /// Multi-value register (keep all concurrent values)
242    MultiValueRegister,
243    /// Continuous blend for audio/motion
244    ContinuousBlend {
245        interpolation: InterpolationType,
246        max_deviation: f64,
247    },
248}
249
250/// Counter merge strategy
251#[derive(Clone, Copy, Debug)]
252pub enum CounterMerge {
253    Max,
254    Sum,
255    Average,
256}
257
258/// Interpolation type for continuous blending
259#[derive(Clone, Copy, Debug)]
260pub enum InterpolationType {
261    Linear,
262    Cubic,
263    Catmull,
264}
265
266/// State bounds - constraints on state values
267#[derive(Clone, Debug)]
268pub struct StateBounds {
269    /// Maximum size in bytes
270    pub max_size: usize,
271    /// Rate limit (events per second)
272    pub rate_limit: Option<RateLimit>,
273    /// Maximum entropy before compression
274    pub max_entropy: f64,
275}
276
277impl Default for StateBounds {
278    fn default() -> Self {
279        StateBounds {
280            max_size: 65536,
281            rate_limit: None,
282            max_entropy: 1.0,
283        }
284    }
285}
286
287/// Rate limit configuration
288#[derive(Clone, Debug)]
289pub struct RateLimit {
290    pub max_events: u32,
291    pub window_ms: u32,
292}
293
294impl RateLimit {
295    pub fn new(max_events: u32, window_ms: u32) -> Self {
296        RateLimit {
297            max_events,
298            window_ms,
299        }
300    }
301}
302
303/// Entropy model - tracks uncertainty/divergence
304#[derive(Clone, Debug, Default)]
305pub struct EntropyModel {
306    /// Current entropy level (0.0 = certain, 1.0 = maximum uncertainty)
307    pub level: f64,
308    /// Accumulated entropy from predictions
309    pub accumulated: f64,
310    /// Time since last actual data
311    pub time_since_actual: u64,
312}
313
314impl EntropyModel {
315    pub fn new() -> Self {
316        EntropyModel::default()
317    }
318
319    /// Increase entropy (during prediction)
320    pub fn increase(&mut self, amount: f64) {
321        self.level = (self.level + amount).min(1.0);
322        self.accumulated += amount;
323    }
324
325    /// Decrease entropy (when actual data arrives)
326    pub fn decrease(&mut self, amount: f64) {
327        self.level = (self.level - amount).max(0.0);
328    }
329
330    /// Reset entropy (full actual data received)
331    pub fn reset(&mut self) {
332        self.level = 0.0;
333        self.accumulated = 0.0;
334        self.time_since_actual = 0;
335    }
336}
337
338/// State atom - the fundamental unit of reality
339#[derive(Clone, Debug)]
340pub struct StateAtom {
341    /// Unique identifier
342    pub id: StateId,
343    /// State type classification
344    pub state_type: StateType,
345    /// Authority set
346    pub authority: AuthoritySet,
347    /// Version vector for causal ordering
348    pub version: VersionVector,
349    /// Delta law for merging
350    pub delta_law: DeltaLaw,
351    /// Bounds and constraints
352    pub bounds: StateBounds,
353    /// Entropy/uncertainty model
354    pub entropy: EntropyModel,
355    /// Last modification time
356    pub last_modified: StateTime,
357    /// The actual value (opaque bytes for now)
358    pub value: Vec<u8>,
359}
360
361impl StateAtom {
362    pub fn new(id: StateId, state_type: StateType, owner: NodeId) -> Self {
363        StateAtom {
364            id,
365            state_type,
366            authority: AuthoritySet::with_owner(owner),
367            version: VersionVector::new(),
368            delta_law: DeltaLaw::default(),
369            bounds: StateBounds::default(),
370            entropy: EntropyModel::new(),
371            last_modified: StateTime::ZERO,
372            value: Vec::new(),
373        }
374    }
375
376    /// Check if this atom needs prediction (no recent actual data)
377    pub fn needs_prediction(&self, threshold_ms: u64) -> bool {
378        self.entropy.time_since_actual > threshold_ms * 1000
379    }
380
381    /// Get memory size estimate
382    pub fn memory_size(&self) -> usize {
383        std::mem::size_of::<Self>()
384            + self.value.len()
385            + self.authority.owners.len() * std::mem::size_of::<NodeId>()
386            + self.version.clocks.len() * (std::mem::size_of::<NodeId>() + 8)
387    }
388}
389
390#[cfg(test)]
391mod tests {
392    use super::*;
393
394    #[test]
395    fn test_version_vector_happens_before() {
396        let mut v1 = VersionVector::new();
397        v1.set(NodeId::new(1), 1);
398        v1.set(NodeId::new(2), 2);
399
400        let mut v2 = VersionVector::new();
401        v2.set(NodeId::new(1), 1);
402        v2.set(NodeId::new(2), 3);
403
404        assert!(v1.happens_before(&v2));
405        assert!(!v2.happens_before(&v1));
406    }
407
408    #[test]
409    fn test_version_vector_concurrent() {
410        let mut v1 = VersionVector::new();
411        v1.set(NodeId::new(1), 2);
412        v1.set(NodeId::new(2), 1);
413
414        let mut v2 = VersionVector::new();
415        v2.set(NodeId::new(1), 1);
416        v2.set(NodeId::new(2), 2);
417
418        assert!(v1.concurrent(&v2));
419        assert!(v2.concurrent(&v1));
420    }
421
422    #[test]
423    fn test_version_vector_merge() {
424        let mut v1 = VersionVector::new();
425        v1.set(NodeId::new(1), 2);
426        v1.set(NodeId::new(2), 1);
427
428        let mut v2 = VersionVector::new();
429        v2.set(NodeId::new(1), 1);
430        v2.set(NodeId::new(2), 3);
431
432        let merged = v1.merge(&v2);
433        assert_eq!(merged.get(NodeId::new(1)), 2);
434        assert_eq!(merged.get(NodeId::new(2)), 3);
435    }
436
437    #[test]
438    fn test_authority_set() {
439        let owner = NodeId::new(1);
440        let delegate = NodeId::new(2);
441        let outsider = NodeId::new(3);
442
443        let mut auth = AuthoritySet::with_owner(owner);
444        auth.add_delegate(delegate, AuthorityScope::Append);
445
446        assert!(auth.has_authority(owner, &AuthorityScope::Full));
447        assert!(auth.has_authority(delegate, &AuthorityScope::Append));
448        assert!(!auth.has_authority(delegate, &AuthorityScope::Full));
449        assert!(!auth.has_authority(outsider, &AuthorityScope::Append));
450
451        auth.revoke(delegate);
452        assert!(!auth.has_authority(delegate, &AuthorityScope::Append));
453    }
454}