Skip to main content

constraint_crdt/
decay.rs

1//! # Novel Experiment 3: Time-Decay CRDT
2//!
3//! Constraints that lose weight over time — old violations decay,
4//! recent violations weigh more. This models real systems where
5//! "3 violations in the last hour" matters more than "100 violations last month".
6//!
7//! Uses exponential decay: weight = e^(-λ * age)
8//! The decay parameter λ controls how fast old data becomes irrelevant.
9//!
10//! Novel: the decay IS a semilattice operation (monotone decreasing),
11//! so time-decay CRDTs still satisfy C/A/I laws.
12
13use crate::merge::Merge;
14use serde::{Deserialize, Serialize};
15use std::collections::HashMap;
16use std::fmt;
17
18/// A time-decaying counter. Each event decays exponentially.
19#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct DecayCounter {
21    /// Per-node decay accumulators: node → (last_value, last_time_ns)
22    accumulators: HashMap<String, (f64, u64)>,
23    /// Decay rate λ (higher = faster decay)
24    lambda: f64,
25    /// Current total (lazily computed)
26    total: f64,
27}
28
29impl DecayCounter {
30    pub fn new(half_life_secs: f64) -> Self {
31        // λ = ln(2) / half_life
32        let lambda = 2.0_f64.ln() / half_life_secs;
33        Self {
34            accumulators: HashMap::new(),
35            lambda,
36            total: 0.0,
37        }
38    }
39
40    /// Record an event from a node at a given time.
41    pub fn record(&mut self, node: &str, value: f64, time_ns: u64) {
42        let entry = self.accumulators.entry(node.to_string()).or_insert((0.0, 0));
43        
44        // Decay existing value to current time
45        let elapsed_ns = time_ns.saturating_sub(entry.1);
46        let elapsed_secs = elapsed_ns as f64 / 1e9;
47        entry.0 *= (-self.lambda * elapsed_secs).exp();
48        
49        // Add new value
50        entry.0 += value;
51        entry.1 = time_ns;
52        
53        self.recompute_total();
54    }
55
56    /// Get the current decayed value for a node.
57    pub fn node_value(&mut self, node: &str, now_ns: u64) -> f64 {
58        if let Some((val, last_time)) = self.accumulators.get(node) {
59            let elapsed_ns = now_ns.saturating_sub(*last_time);
60            let elapsed_secs = elapsed_ns as f64 / 1e9;
61            val * (-self.lambda * elapsed_secs).exp()
62        } else {
63            0.0
64        }
65    }
66
67    /// Get the total decayed value across all nodes.
68    pub fn total(&mut self, now_ns: u64) -> f64 {
69        let mut sum = 0.0;
70        for (val, last_time) in self.accumulators.values() {
71            let elapsed_ns = now_ns.saturating_sub(*last_time);
72            let elapsed_secs = elapsed_ns as f64 / 1e9;
73            sum += val * (-self.lambda * elapsed_secs).exp();
74        }
75        self.total = sum;
76        sum
77    }
78
79    /// Half-life in seconds.
80    pub fn half_life(&self) -> f64 {
81        2.0_f64.ln() / self.lambda
82    }
83
84    /// Lambda (decay rate).
85    pub fn lambda(&self) -> f64 {
86        self.lambda
87    }
88
89    fn recompute_total(&mut self) {
90        // Approximate — exact total requires knowing current time
91        self.total = self.accumulators.values().map(|(v, _)| v).sum();
92    }
93}
94
95impl Merge for DecayCounter {
96    fn merge(&mut self, other: &Self) {
97        // Merge accumulators: take the one with later timestamp per node
98        for (node, (val, time)) in &other.accumulators {
99            let entry = self.accumulators.entry(node.clone()).or_insert((0.0, 0));
100            if *time > entry.1 {
101                *entry = (*val, *time);
102            } else if *time == entry.1 {
103                // Same time: take max value
104                entry.0 = entry.0.max(*val);
105            }
106            // If our time is later, keep ours
107        }
108        self.recompute_total();
109    }
110}
111
112impl PartialEq for DecayCounter {
113    fn eq(&self, other: &Self) -> bool {
114        (self.total - other.total).abs() < 0.001
115    }
116}
117
118/// A time-decaying constraint state.
119#[derive(Debug, Clone, Serialize, Deserialize)]
120pub struct DecayConstraintState {
121    pub node_id: String,
122    /// Satisfied constraints (decay counter)
123    pub satisfied: DecayCounter,
124    /// Violations (decay counter)
125    pub violations: DecayCounter,
126    /// Half-life in seconds
127    pub half_life: f64,
128}
129
130impl DecayConstraintState {
131    pub fn new(node_id: &str, half_life_secs: f64) -> Self {
132        Self {
133            node_id: node_id.to_string(),
134            satisfied: DecayCounter::new(half_life_secs),
135            violations: DecayCounter::new(half_life_secs),
136            half_life: half_life_secs,
137        }
138    }
139
140    /// Record satisfied constraints.
141    pub fn record_satisfied(&mut self, count: f64, time_ns: u64) {
142        self.satisfied.record(&self.node_id, count, time_ns);
143    }
144
145    /// Record violations.
146    pub fn record_violations(&mut self, count: f64, time_ns: u64) {
147        self.violations.record(&self.node_id, count, time_ns);
148    }
149
150    /// Get satisfaction rate at a given time.
151    pub fn satisfaction_rate(&mut self, time_ns: u64) -> f64 {
152        let sat = self.satisfied.total(time_ns);
153        let vio = self.violations.total(time_ns);
154        let total = sat + vio;
155        if total == 0.0 { return 1.0; }
156        sat / total
157    }
158
159    /// Current violation "weight" (how much recent violations matter).
160    pub fn violation_weight(&mut self, time_ns: u64) -> f64 {
161        self.violations.total(time_ns)
162    }
163}
164
165impl Merge for DecayConstraintState {
166    fn merge(&mut self, other: &Self) {
167        self.satisfied.merge(&other.satisfied);
168        self.violations.merge(&other.violations);
169    }
170}
171
172impl PartialEq for DecayConstraintState {
173    fn eq(&self, other: &Self) -> bool {
174        self.satisfied == other.satisfied && self.violations == other.violations
175    }
176}
177
178#[cfg(test)]
179mod tests {
180    use super::*;
181    use crate::merge::laws;
182
183    const NS_PER_SEC: u64 = 1_000_000_000;
184
185    #[test]
186    fn test_decay() {
187        let mut dc = DecayCounter::new(1.0); // 1 second half-life
188        dc.record("a", 100.0, 0);
189        
190        // After 1 second, should be ~50
191        let val = dc.node_value("a", NS_PER_SEC);
192        assert!((val - 50.0).abs() < 1.0, "Expected ~50, got {:.1}", val);
193        
194        // After 2 seconds, should be ~25
195        let val = dc.node_value("a", 2 * NS_PER_SEC);
196        assert!((val - 25.0).abs() < 1.0, "Expected ~25, got {:.1}", val);
197    }
198
199    #[test]
200    fn test_recent_weighs_more() {
201        let mut dc = DecayCounter::new(1.0);
202        dc.record("a", 100.0, 0); // 100 at t=0
203        dc.record("b", 100.0, 5 * NS_PER_SEC); // 100 at t=5s
204        
205        // At t=6s: a decayed to ~3.1, b decayed to ~50
206        let total = dc.total(6 * NS_PER_SEC);
207        assert!(total < 60.0, "Recent violations should dominate, total={:.1}", total);
208    }
209
210    #[test]
211    fn test_merge_takes_latest() {
212        let mut a = DecayCounter::new(1.0);
213        a.record("x", 100.0, 10 * NS_PER_SEC);
214        
215        let mut b = DecayCounter::new(1.0);
216        b.record("x", 200.0, 20 * NS_PER_SEC);
217        
218        let merged = a.merged(&b);
219        // Should take b's value (later timestamp)
220        assert!(merged.accumulators.get("x").unwrap().1 == 20 * NS_PER_SEC);
221    }
222
223    #[test]
224    fn test_decay_constraint_state() {
225        let mut state = DecayConstraintState::new("test", 60.0); // 1 min half-life
226        state.record_satisfied(100.0, 0);
227        state.record_violations(5.0, 0);
228        
229        // At t=0: rate = 100/105 ≈ 95.2%
230        let rate = state.satisfaction_rate(0);
231        assert!((rate - 0.952).abs() < 0.01);
232    }
233
234    #[test]
235    fn test_old_violations_decay() {
236        let mut state = DecayConstraintState::new("test", 1.0);
237        state.record_violations(100.0, 0);
238        
239        // After 10 half-lives, violations should be negligible
240        let weight = state.violation_weight(10 * NS_PER_SEC);
241        assert!(weight < 1.0, "Old violations should decay: weight={:.2}", weight);
242    }
243}