Skip to main content

moonpool_transport/simulations/messaging/
invariants.rs

1//! Message tracking and invariant validation for simulation tests.
2//!
3//! Tracks sent/received messages and validates correctness properties.
4
5#![allow(dead_code)] // Utility methods may not all be used in every test
6
7use std::collections::HashSet;
8
9use moonpool_sim::{Invariant, StateHandle, assert_always, assert_sometimes};
10
11/// Tracks messages sent and received for invariant validation.
12#[derive(Debug, Default, Clone)]
13pub struct MessageInvariants {
14    /// Reliable messages sent (by seq_id)
15    pub reliable_sent: HashSet<u64>,
16    /// Reliable messages received (by seq_id)
17    pub reliable_received: HashSet<u64>,
18    /// Unreliable messages sent (by seq_id)
19    pub unreliable_sent: HashSet<u64>,
20    /// Unreliable messages received (by seq_id)
21    pub unreliable_received: HashSet<u64>,
22
23    /// Count of duplicate messages received
24    pub duplicate_count: u64,
25    /// Count of messages dropped due to deserialization errors
26    pub messages_dropped: u64,
27}
28
29impl MessageInvariants {
30    /// Create a new empty invariant tracker.
31    pub fn new() -> Self {
32        Self::default()
33    }
34
35    /// Record a message sent.
36    pub fn record_sent(&mut self, seq_id: u64, reliable: bool) {
37        if reliable {
38            self.reliable_sent.insert(seq_id);
39        } else {
40            self.unreliable_sent.insert(seq_id);
41        }
42    }
43
44    /// Record a message received. Returns true if it was a duplicate.
45    pub fn record_received(&mut self, seq_id: u64, reliable: bool) -> bool {
46        let is_dup = if reliable {
47            self.reliable_received.contains(&seq_id)
48        } else {
49            self.unreliable_received.contains(&seq_id)
50        };
51
52        if is_dup {
53            self.duplicate_count += 1;
54        }
55
56        if reliable {
57            self.reliable_received.insert(seq_id);
58        } else {
59            self.unreliable_received.insert(seq_id);
60        }
61
62        is_dup
63    }
64
65    /// Record a message dropped due to deserialization error.
66    pub fn record_dropped(&mut self) {
67        self.messages_dropped += 1;
68    }
69
70    /// Validate invariants that should always hold.
71    ///
72    /// Called continuously during test execution.
73    pub fn validate_always(&self) {
74        // No phantom messages - received must be subset of sent
75        for seq_id in &self.reliable_received {
76            assert_always!(
77                self.reliable_sent.contains(seq_id),
78                format!("Received reliable message {} that was never sent", seq_id)
79            );
80        }
81
82        for seq_id in &self.unreliable_received {
83            assert_always!(
84                self.unreliable_sent.contains(seq_id),
85                format!("Received unreliable message {} that was never sent", seq_id)
86            );
87        }
88
89        // Unreliable can drop but never multiply (per-message)
90        assert_always!(
91            self.unreliable_received.len() <= self.unreliable_sent.len(),
92            format!(
93                "More unique unreliable received ({}) than sent ({})",
94                self.unreliable_received.len(),
95                self.unreliable_sent.len()
96            )
97        );
98    }
99
100    /// Validate receiver-side invariants with coverage assertions.
101    pub fn validate_receiver_always(&self) {
102        // Track duplicate occurrence under chaos (expected with retransmission)
103        assert_sometimes!(
104            self.duplicate_count > 0,
105            "Should sometimes see duplicates due to retransmission"
106        );
107
108        // Also track clean delivery path
109        assert_sometimes!(
110            self.duplicate_count == 0,
111            "Should sometimes see no duplicates (clean delivery)"
112        );
113    }
114
115    /// Validate at quiescence (after drain phase).
116    ///
117    /// Checks that all reliable messages were eventually delivered.
118    pub fn validate_at_quiescence(&self) {
119        // All reliable messages should be delivered
120        let missing: Vec<_> = self
121            .reliable_sent
122            .difference(&self.reliable_received)
123            .collect();
124
125        assert_always!(
126            missing.is_empty(),
127            format!(
128                "Not all reliable messages delivered. Missing: {:?}",
129                missing
130            )
131        );
132
133        // Coverage assertions for unreliable behavior
134        assert_sometimes!(
135            self.unreliable_received.len() < self.unreliable_sent.len(),
136            "Some unreliable should sometimes be dropped under chaos"
137        );
138
139        assert_sometimes!(
140            self.unreliable_received.len() == self.unreliable_sent.len(),
141            "All unreliable should sometimes be delivered (no chaos)"
142        );
143    }
144
145    /// Get total messages sent (reliable + unreliable).
146    pub fn total_sent(&self) -> usize {
147        self.reliable_sent.len() + self.unreliable_sent.len()
148    }
149
150    /// Get total messages received (reliable + unreliable).
151    pub fn total_received(&self) -> usize {
152        self.reliable_received.len() + self.unreliable_received.len()
153    }
154}
155
156// ============================================================================
157// RPC Invariants
158// ============================================================================
159
160/// Tracks RPC request-response pairs for invariant validation.
161#[derive(Debug, Default, Clone)]
162pub struct RpcInvariants {
163    /// RPC requests sent (by request_id)
164    pub requests_sent: HashSet<u64>,
165    /// RPC responses received (by request_id)
166    pub responses_received: HashSet<u64>,
167    /// Broken promises detected (by request_id)
168    pub broken_promises: HashSet<u64>,
169    /// Connection failures count
170    pub connection_failures: u64,
171    /// Timeouts count
172    pub timeouts: u64,
173    /// Successful responses count
174    pub successful_responses: u64,
175}
176
177impl RpcInvariants {
178    /// Create a new empty RPC invariant tracker.
179    pub fn new() -> Self {
180        Self::default()
181    }
182
183    /// Record an RPC request sent.
184    pub fn record_request_sent(&mut self, request_id: u64) {
185        self.requests_sent.insert(request_id);
186    }
187
188    /// Record an RPC response received (success).
189    pub fn record_response_received(&mut self, request_id: u64) {
190        self.responses_received.insert(request_id);
191        self.successful_responses += 1;
192    }
193
194    /// Record a broken promise error received.
195    pub fn record_broken_promise(&mut self, request_id: u64) {
196        self.broken_promises.insert(request_id);
197    }
198
199    /// Record a connection failure.
200    pub fn record_connection_failure(&mut self) {
201        self.connection_failures += 1;
202    }
203
204    /// Record a timeout.
205    pub fn record_timeout(&mut self) {
206        self.timeouts += 1;
207    }
208
209    /// Validate invariants that should always hold.
210    pub fn validate_always(&self) {
211        // INV-1: No phantom responses - received must be subset of sent
212        for request_id in &self.responses_received {
213            assert_always!(
214                self.requests_sent.contains(request_id),
215                format!(
216                    "Received RPC response {} that was never requested",
217                    request_id
218                )
219            );
220        }
221
222        // INV-2: No phantom broken promises
223        for request_id in &self.broken_promises {
224            assert_always!(
225                self.requests_sent.contains(request_id),
226                format!(
227                    "Received broken promise {} that was never requested",
228                    request_id
229                )
230            );
231        }
232
233        // INV-3: At most one resolution per request (success XOR broken)
234        for request_id in &self.responses_received {
235            assert_always!(
236                !self.broken_promises.contains(request_id),
237                format!(
238                    "Request {} has both success response and broken promise",
239                    request_id
240                )
241            );
242        }
243    }
244
245    /// Validate coverage assertions for error paths.
246    pub fn validate_coverage(&self) {
247        // Success path coverage
248        assert_sometimes!(
249            self.successful_responses > 0,
250            "Should sometimes see successful RPC responses"
251        );
252
253        // Broken promise path coverage
254        assert_sometimes!(
255            !self.broken_promises.is_empty(),
256            "Should sometimes see broken promises"
257        );
258
259        // Timeout path coverage
260        assert_sometimes!(self.timeouts > 0, "Should sometimes see timeouts");
261    }
262
263    /// Get summary for logging.
264    pub fn summary(&self) -> String {
265        format!(
266            "RPC: sent={}, success={}, broken={}, timeouts={}, conn_fail={}",
267            self.requests_sent.len(),
268            self.successful_responses,
269            self.broken_promises.len(),
270            self.timeouts,
271            self.connection_failures
272        )
273    }
274}
275
276// ============================================================================
277// Invariant trait wrappers for use with SimulationBuilder
278// ============================================================================
279
280/// Invariant wrapper that validates MessageInvariants from StateHandle.
281pub struct MessageInvariantChecker;
282
283impl Invariant for MessageInvariantChecker {
284    fn name(&self) -> &str {
285        "message_invariants"
286    }
287
288    fn check(&self, state: &StateHandle, _sim_time_ms: u64) {
289        if let Some(inv) = state.get::<MessageInvariants>("message_invariants") {
290            inv.validate_always();
291        }
292    }
293}
294
295/// Invariant wrapper that validates RpcInvariants from StateHandle.
296pub struct RpcInvariantChecker;
297
298impl Invariant for RpcInvariantChecker {
299    fn name(&self) -> &str {
300        "rpc_invariants"
301    }
302
303    fn check(&self, state: &StateHandle, _sim_time_ms: u64) {
304        if let Some(inv) = state.get::<RpcInvariants>("rpc_invariants") {
305            inv.validate_always();
306        }
307    }
308}
309
310#[cfg(test)]
311mod tests {
312    use super::*;
313
314    #[test]
315    fn test_empty_invariants() {
316        let inv = MessageInvariants::new();
317        inv.validate_always(); // Should not panic
318    }
319
320    #[test]
321    fn test_record_sent_received() {
322        let mut inv = MessageInvariants::new();
323
324        inv.record_sent(1, true);
325        inv.record_sent(2, false);
326
327        assert_eq!(inv.reliable_sent.len(), 1);
328        assert_eq!(inv.unreliable_sent.len(), 1);
329
330        let dup = inv.record_received(1, true);
331        assert!(!dup);
332
333        let dup = inv.record_received(1, true);
334        assert!(dup);
335        assert_eq!(inv.duplicate_count, 1);
336    }
337
338    #[test]
339    fn test_validate_always_passes() {
340        let mut inv = MessageInvariants::new();
341
342        inv.record_sent(1, true);
343        inv.record_sent(2, true);
344        inv.record_received(1, true);
345
346        inv.validate_always(); // Should not panic
347    }
348
349    #[test]
350    fn test_phantom_message_detected() {
351        moonpool_sim::reset_always_violations();
352        let mut inv = MessageInvariants::new();
353
354        // Receive without sending - should trigger always-violation
355        inv.record_received(999, true);
356        inv.validate_always();
357
358        assert!(
359            moonpool_sim::has_always_violations(),
360            "expected always-violation for phantom message"
361        );
362    }
363
364    // ========================================================================
365    // RPC Invariants Tests
366    // ========================================================================
367
368    #[test]
369    fn test_rpc_invariants_empty() {
370        let inv = RpcInvariants::new();
371        inv.validate_always(); // Should not panic
372    }
373
374    #[test]
375    fn test_rpc_invariants_request_response() {
376        let mut inv = RpcInvariants::new();
377
378        inv.record_request_sent(1);
379        inv.record_request_sent(2);
380        inv.record_response_received(1);
381
382        assert_eq!(inv.requests_sent.len(), 2);
383        assert_eq!(inv.responses_received.len(), 1);
384        assert_eq!(inv.successful_responses, 1);
385
386        inv.validate_always(); // Should not panic
387    }
388
389    #[test]
390    fn test_rpc_invariants_broken_promise() {
391        let mut inv = RpcInvariants::new();
392
393        inv.record_request_sent(1);
394        inv.record_broken_promise(1);
395
396        assert!(!inv.broken_promises.is_empty());
397        inv.validate_always(); // Should not panic
398    }
399
400    #[test]
401    fn test_rpc_phantom_response_detected() {
402        moonpool_sim::reset_always_violations();
403        let mut inv = RpcInvariants::new();
404
405        // Receive response without sending request - should trigger violation
406        inv.record_response_received(999);
407        inv.validate_always();
408
409        assert!(
410            moonpool_sim::has_always_violations(),
411            "expected always-violation for phantom response"
412        );
413    }
414
415    #[test]
416    fn test_rpc_phantom_broken_promise_detected() {
417        moonpool_sim::reset_always_violations();
418        let mut inv = RpcInvariants::new();
419
420        // Broken promise for never-sent request - should trigger violation
421        inv.record_broken_promise(999);
422        inv.validate_always();
423
424        assert!(
425            moonpool_sim::has_always_violations(),
426            "expected always-violation for phantom broken promise"
427        );
428    }
429
430    #[test]
431    fn test_rpc_double_resolution_detected() {
432        moonpool_sim::reset_always_violations();
433        let mut inv = RpcInvariants::new();
434
435        inv.record_request_sent(1);
436        inv.record_response_received(1);
437        inv.record_broken_promise(1); // Already resolved
438
439        inv.validate_always();
440
441        assert!(
442            moonpool_sim::has_always_violations(),
443            "expected always-violation for double resolution"
444        );
445    }
446
447    #[test]
448    fn test_rpc_invariants_summary() {
449        let mut inv = RpcInvariants::new();
450        inv.record_request_sent(1);
451        inv.record_request_sent(2);
452        inv.record_response_received(1);
453        inv.record_timeout();
454        inv.record_connection_failure();
455
456        let summary = inv.summary();
457        assert!(summary.contains("sent=2"));
458        assert!(summary.contains("success=1"));
459        assert!(summary.contains("timeouts=1"));
460        assert!(summary.contains("conn_fail=1"));
461    }
462}