Skip to main content

constraint_crdt/
delta.rs

1//! # Delta-State CRDTs
2//!
3//! Instead of sending entire CRDT state across the network, send only what changed.
4//! A delta is the minimal state needed to bring another replica up to date.
5
6use crate::eisenstein::eisenstein_norm;
7use serde::{Deserialize, Serialize};
8use std::fmt;
9
10/// A delta (incremental change) for a G-Counter.
11#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
12pub struct CounterDelta {
13    /// Node that generated this delta
14    pub node: String,
15    /// Increment since last delta
16    pub satisfied_delta: u64,
17    /// Violation increment since last delta
18    pub violations_delta: u64,
19    /// Sequence number for ordering
20    pub seq: u64,
21}
22
23/// A delta for the OR-Set (added or removed constraints).
24#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
25pub enum OrsetDelta {
26    Add {
27        constraint_id: String,
28        node: String,
29        seq: u64,
30    },
31    Remove {
32        constraint_id: String,
33        tombstoned_tags: Vec<(String, u64)>,
34        seq: u64,
35    },
36}
37
38/// A delta for an Eisenstein position.
39#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
40pub struct PositionDelta {
41    pub old_norm: i64,
42    pub new_norm: i64,
43    pub position: (i32, i32),
44    pub node: String,
45    pub seq: u64,
46}
47
48/// A composite delta for the full constraint state.
49#[derive(Debug, Clone, Serialize, Deserialize)]
50pub struct ConstraintDelta {
51    /// Source node
52    pub node: String,
53    /// Monotonic sequence number
54    pub seq: u64,
55    /// Counter deltas
56    pub counter: CounterDelta,
57    /// Constraint set deltas
58    pub constraints: Vec<OrsetDelta>,
59    /// Position delta (if changed)
60    pub position: Option<PositionDelta>,
61}
62
63impl ConstraintDelta {
64    /// Create an empty delta
65    pub fn empty(node: &str, seq: u64) -> Self {
66        Self {
67            node: node.to_string(),
68            seq,
69            counter: CounterDelta {
70                node: node.to_string(),
71                satisfied_delta: 0,
72                violations_delta: 0,
73                seq,
74            },
75            constraints: Vec::new(),
76            position: None,
77        }
78    }
79
80    /// Is this delta empty (no changes)?
81    pub fn is_empty(&self) -> bool {
82        self.counter.satisfied_delta == 0
83            && self.counter.violations_delta == 0
84            && self.constraints.is_empty()
85            && self.position.is_none()
86    }
87
88    /// Serialize to JSON bytes
89    pub fn to_bytes(&self) -> Vec<u8> {
90        serde_json::to_vec(self).unwrap_or_default()
91    }
92
93    /// Deserialize from JSON bytes
94    pub fn from_bytes(data: &[u8]) -> Option<Self> {
95        serde_json::from_slice(data).ok()
96    }
97
98    /// Approximate wire size in bytes
99    pub fn wire_size(&self) -> usize {
100        self.to_bytes().len()
101    }
102}
103
104/// Tracks the last known state per node for delta generation.
105#[derive(Debug, Clone, Default)]
106pub struct DeltaTracker {
107    /// Per-node: (last_satisfied, last_violations, last_seq, last_position)
108    node_state: std::collections::HashMap<String, (u64, u64, u64, Option<(i32, i32)>)>,
109}
110
111impl DeltaTracker {
112    pub fn new() -> Self {
113        Self::default()
114    }
115
116    /// Generate a delta from a full state snapshot.
117    pub fn generate(
118        &mut self,
119        node: &str,
120        satisfied: u64,
121        violations: u64,
122        position: (i32, i32),
123        added: &[String],
124        removed: &[String],
125    ) -> ConstraintDelta {
126        let (prev_sat, prev_vio, prev_seq, prev_pos) = self
127            .node_state
128            .get(node)
129            .copied()
130            .unwrap_or((0, 0, 0, None));
131
132        let new_seq = prev_seq + 1;
133
134        let counter_delta = CounterDelta {
135            node: node.to_string(),
136            satisfied_delta: satisfied.saturating_sub(prev_sat),
137            violations_delta: violations.saturating_sub(prev_vio),
138            seq: new_seq,
139        };
140
141        let mut constraint_deltas = Vec::new();
142        for id in added {
143            constraint_deltas.push(OrsetDelta::Add {
144                constraint_id: id.clone(),
145                node: node.to_string(),
146                seq: new_seq,
147            });
148        }
149        for id in removed {
150            constraint_deltas.push(OrsetDelta::Remove {
151                constraint_id: id.clone(),
152                tombstoned_tags: Vec::new(),
153                seq: new_seq,
154            });
155        }
156
157        let pos_delta = if prev_pos != Some(position) {
158            Some(PositionDelta {
159                old_norm: prev_pos.map(eisenstein_norm).unwrap_or(0),
160                new_norm: eisenstein_norm(position),
161                position,
162                node: node.to_string(),
163                seq: new_seq,
164            })
165        } else {
166            None
167        };
168
169        self.node_state.insert(
170            node.to_string(),
171            (satisfied, violations, new_seq, Some(position)),
172        );
173
174        ConstraintDelta {
175            node: node.to_string(),
176            seq: new_seq,
177            counter: counter_delta,
178            constraints: constraint_deltas,
179            position: pos_delta,
180        }
181    }
182
183    /// Get last known seq for a node
184    pub fn last_seq(&self, node: &str) -> u64 {
185        self.node_state.get(node).map(|(_, _, s, _)| *s).unwrap_or(0)
186    }
187}
188
189impl fmt::Display for ConstraintDelta {
190    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
191        write!(f, "Delta(node={}, seq={}, +{}s/+{}v, {} ops, {} bytes)",
192            self.node, self.seq,
193            self.counter.satisfied_delta, self.counter.violations_delta,
194            self.constraints.len(),
195            self.wire_size())
196    }
197}
198
199#[cfg(test)]
200mod tests {
201    use super::*;
202
203    #[test]
204    fn test_empty_delta() {
205        let d = ConstraintDelta::empty("node-a", 1);
206        assert!(d.is_empty());
207    }
208
209    #[test]
210    fn test_delta_generation() {
211        let mut tracker = DeltaTracker::new();
212        let d1 = tracker.generate("node-a", 100, 5, (1, 0), &["c1".into()], &[]);
213        assert_eq!(d1.counter.satisfied_delta, 100);
214        assert_eq!(d1.counter.violations_delta, 5);
215        assert_eq!(d1.constraints.len(), 1);
216        assert!(d1.position.is_some());
217
218        let d2 = tracker.generate("node-a", 150, 8, (1, 0), &["c2".into()], &[]);
219        assert_eq!(d2.counter.satisfied_delta, 50);
220        assert_eq!(d2.counter.violations_delta, 3);
221        assert_eq!(d2.constraints.len(), 1);
222        assert!(d2.position.is_none());
223    }
224
225    #[test]
226    fn test_delta_no_change() {
227        let mut tracker = DeltaTracker::new();
228        tracker.generate("a", 100, 5, (1, 0), &[], &[]);
229        let d = tracker.generate("a", 100, 5, (1, 0), &[], &[]);
230        assert!(d.is_empty());
231    }
232
233    #[test]
234    fn test_delta_serialization() {
235        let mut tracker = DeltaTracker::new();
236        let d = tracker.generate("node-a", 100, 5, (2, 1), &["c1".into()], &[]);
237        let bytes = d.to_bytes();
238        assert!(!bytes.is_empty());
239        let restored = ConstraintDelta::from_bytes(&bytes).unwrap();
240        assert_eq!(restored.node, "node-a");
241        assert_eq!(restored.counter.satisfied_delta, 100);
242    }
243
244    #[test]
245    fn test_delta_tracker_per_node() {
246        let mut tracker = DeltaTracker::new();
247        tracker.generate("a", 100, 5, (0, 0), &[], &[]);
248        tracker.generate("b", 200, 10, (0, 0), &[], &[]);
249        let da = tracker.generate("a", 120, 5, (0, 0), &[], &[]);
250        let db = tracker.generate("b", 200, 15, (0, 0), &[], &[]);
251        assert_eq!(da.counter.satisfied_delta, 20);
252        assert_eq!(db.counter.satisfied_delta, 0);
253        assert_eq!(db.counter.violations_delta, 5);
254    }
255
256    #[test]
257    fn test_wire_size() {
258        let d = ConstraintDelta::empty("node-a", 1);
259        assert!(d.wire_size() > 0);
260        assert!(d.wire_size() < 200);
261    }
262
263    #[test]
264    fn test_display() {
265        let mut tracker = DeltaTracker::new();
266        let d = tracker.generate("a", 100, 5, (1, 0), &["c1".into()], &[]);
267        let s = format!("{}", d);
268        assert!(s.contains("node=a"));
269        assert!(s.contains("+100s"));
270    }
271}