Skip to main content

constraint_crdt/
simulation.rs

1//! # Deterministic Network Simulation
2//!
3//! Test fleet consensus under controlled conditions:
4//! - Variable latency
5//! - Message loss
6//! - Partition/rejoin
7//! - Node crash/recovery
8//!
9//! All random seeds are deterministic — same seed, same simulation.
10
11use crate::gossip::{GossipNode, GossipMessage, GossipResult};
12use crate::state::ConstraintState;
13use std::collections::HashMap;
14
15/// Simulation seed for deterministic randomness.
16pub struct SimRng {
17    state: u64,
18}
19
20impl SimRng {
21    pub fn new(seed: u64) -> Self {
22        Self { state: seed }
23    }
24
25    /// xorshift64 — fast, deterministic, good enough for simulation.
26    pub fn next_u64(&mut self) -> u64 {
27        let mut x = self.state;
28        x ^= x << 13;
29        x ^= x >> 7;
30        x ^= x << 17;
31        self.state = x;
32        x
33    }
34
35    /// Uniform f64 in [0, 1)
36    pub fn next_f64(&mut self) -> f64 {
37        (self.next_u64() >> 11) as f64 / (1u64 << 53) as f64
38    }
39
40    /// Should we drop this message? (based on loss rate 0.0-1.0)
41    pub fn should_drop(&mut self, loss_rate: f64) -> bool {
42        self.next_f64() < loss_rate
43    }
44
45    /// Pick a random peer index (not self).
46    pub fn random_peer(&mut self, self_idx: usize, total: usize) -> usize {
47        if total <= 1 { return 0; }
48        let mut peer = (self.next_u64() as usize) % (total - 1);
49        if peer >= self_idx { peer += 1; }
50        peer
51    }
52}
53
54/// A delayed message in the network.
55#[derive(Debug, Clone)]
56struct InFlightMessage {
57    from: usize,
58    message: GossipMessage,
59    deliver_at_round: u64,
60}
61
62/// The simulation environment.
63pub struct Simulation {
64    /// Nodes in the simulation.
65    pub nodes: Vec<GossipNode>,
66    /// Random number generator.
67    rng: SimRng,
68    /// Message loss rate (0.0 = no loss, 1.0 = all lost).
69    pub loss_rate: f64,
70    /// Messages in flight (delayed delivery).
71    in_flight: Vec<InFlightMessage>,
72    /// Current simulation round.
73    round: u64,
74    /// Statistics.
75    stats: SimStats,
76}
77
78/// Simulation statistics.
79#[derive(Debug, Clone, Default)]
80pub struct SimStats {
81    pub total_messages_sent: u64,
82    pub messages_dropped: u64,
83    pub messages_delivered: u64,
84    pub state_changes: u64,
85    pub convergence_rounds: Vec<u64>,
86}
87
88impl Simulation {
89    /// Create a new simulation with N nodes.
90    pub fn new(node_count: usize, seed: u64) -> Self {
91        let nodes = (0..node_count)
92            .map(|i| GossipNode::new(&format!("node-{}", i)))
93            .collect();
94
95        Self {
96            nodes,
97            rng: SimRng::new(seed),
98            loss_rate: 0.0,
99            in_flight: Vec::new(),
100            round: 0,
101            stats: SimStats::default(),
102        }
103    }
104
105    /// Set message loss rate.
106    pub fn with_loss_rate(mut self, rate: f64) -> Self {
107        self.loss_rate = rate;
108        self
109    }
110
111    /// Get stats.
112    pub fn stats(&self) -> &SimStats {
113        &self.stats
114    }
115
116    /// Current round.
117    pub fn round(&self) -> u64 {
118        self.round
119    }
120
121    /// Run one round of gossip: each node pings a random peer.
122    pub fn step(&mut self) {
123        self.round += 1;
124        let n = self.nodes.len();
125        let mut new_messages = Vec::new();
126
127        for i in 0..n {
128            let peer = self.rng.random_peer(i, n);
129            let ping = self.nodes[i].ping();
130            self.stats.total_messages_sent += 1;
131
132            if self.rng.should_drop(self.loss_rate) {
133                self.stats.messages_dropped += 1;
134                continue;
135            }
136
137            // Deliver immediately (could add latency delay)
138            let result = self.nodes[peer].receive(&ping);
139            self.stats.messages_delivered += 1;
140
141            // Process responses
142            for resp in result.responses {
143                self.stats.total_messages_sent += 1;
144                if self.rng.should_drop(self.loss_rate) {
145                    self.stats.messages_dropped += 1;
146                    continue;
147                }
148                let back_result = self.nodes[i].receive(&resp);
149                self.stats.messages_delivered += 1;
150                if back_result.state_changed {
151                    self.stats.state_changes += 1;
152                }
153                new_messages.extend(back_result.responses);
154            }
155        }
156    }
157
158    /// Run simulation until convergence or max rounds.
159    /// Returns the round at which convergence was reached, or None.
160    pub fn run_until_converged(&mut self, max_rounds: u64) -> Option<u64> {
161        for _ in 0..max_rounds {
162            self.step();
163
164            // Check convergence: all nodes have the same state hash
165            if self.nodes.len() > 1 {
166                let hashes: Vec<_> = self.nodes.iter()
167                    .map(|n| crate::merkle::StateHash::from_state(&n.state))
168                    .collect();
169                let first = hashes[0];
170                if hashes.iter().all(|h| *h == first) {
171                    self.stats.convergence_rounds.push(self.round);
172                    return Some(self.round);
173                }
174            }
175        }
176        None
177    }
178
179    /// Add constraints to a specific node (simulate local operation).
180    pub fn add_constraint(&mut self, node_idx: usize, constraint: &str) {
181        self.nodes[node_idx].add_constraint(constraint);
182    }
183
184    /// Record satisfied constraints on a specific node.
185    pub fn record_satisfied(&mut self, node_idx: usize, count: u64) {
186        self.nodes[node_idx].record_satisfied(count);
187    }
188
189    /// Check if all nodes have converged to the same state.
190    pub fn is_converged(&self) -> bool {
191        if self.nodes.len() <= 1 { return true; }
192        let hashes: Vec<_> = self.nodes.iter()
193            .map(|n| crate::merkle::StateHash::from_state(&n.state))
194            .collect();
195        let first = hashes[0];
196        hashes.iter().all(|h| *h == first)
197    }
198
199    /// Print a summary of node states.
200    pub fn summary(&self) -> String {
201        let mut lines = vec![format!("=== Simulation Round {} ===", self.round)];
202        for node in &self.nodes {
203            lines.push(format!("  {}", node));
204        }
205        lines.push(format!("Stats: {} sent, {} dropped, {} delivered, {} state changes",
206            self.stats.total_messages_sent,
207            self.stats.messages_dropped,
208            self.stats.messages_delivered,
209            self.stats.state_changes));
210        lines.push(format!("Converged: {}", self.is_converged()));
211        lines.join("\n")
212    }
213}
214
215#[cfg(test)]
216mod tests {
217    use super::*;
218
219    #[test]
220    fn test_rng_deterministic() {
221        let mut a = SimRng::new(42);
222        let mut b = SimRng::new(42);
223        for _ in 0..100 {
224            assert_eq!(a.next_u64(), b.next_u64());
225        }
226    }
227
228    #[test]
229    fn test_rng_different_seeds() {
230        let mut a = SimRng::new(42);
231        let mut b = SimRng::new(43);
232        assert_ne!(a.next_u64(), b.next_u64());
233    }
234
235    #[test]
236    fn test_loss_rate() {
237        let mut rng = SimRng::new(42);
238        let mut dropped = 0;
239        let trials = 10000;
240        for _ in 0..trials {
241            if rng.should_drop(0.5) { dropped += 1; }
242        }
243        // Should be roughly 50% ± 2%
244        let rate = dropped as f64 / trials as f64;
245        assert!(rate > 0.47 && rate < 0.53, "Loss rate was {:.3}, expected ~0.5", rate);
246    }
247
248    #[test]
249    fn test_two_nodes_converge() {
250        let mut sim = Simulation::new(2, 42);
251        sim.add_constraint(0, "c1");
252        sim.add_constraint(1, "c2");
253
254        let converged = sim.run_until_converged(20);
255        assert!(converged.is_some(), "Should converge within 20 rounds");
256        assert_eq!(sim.nodes[0].state.active_constraint_count(), 2);
257        assert_eq!(sim.nodes[1].state.active_constraint_count(), 2);
258    }
259
260    #[test]
261    fn test_five_nodes_converge() {
262        let mut sim = Simulation::new(5, 42);
263        for i in 0..5 {
264            sim.add_constraint(i, &format!("c{}", i));
265        }
266
267        let converged = sim.run_until_converged(50);
268        assert!(converged.is_some(), "5 nodes should converge within 50 rounds");
269        for node in &sim.nodes {
270            assert_eq!(node.state.active_constraint_count(), 5);
271        }
272    }
273
274    #[test]
275    fn test_converge_with_loss() {
276        let mut sim = Simulation::new(4, 42).with_loss_rate(0.3);
277        for i in 0..4 {
278            sim.add_constraint(i, &format!("c{}", i));
279        }
280
281        let converged = sim.run_until_converged(100);
282        assert!(converged.is_some(), "Should converge even with 30% loss");
283    }
284
285    #[test]
286    fn test_summary() {
287        let sim = Simulation::new(3, 42);
288        let s = sim.summary();
289        assert!(s.contains("node-0"));
290        assert!(s.contains("Converged:"));
291    }
292
293    #[test]
294    fn test_random_peer() {
295        let mut rng = SimRng::new(42);
296        for _ in 0..100 {
297            let peer = rng.random_peer(0, 5);
298            assert_ne!(peer, 0);
299            assert!(peer < 5);
300        }
301    }
302}