moonpool_transport/simulations/messaging/
invariants.rs1#![allow(dead_code)] use std::collections::HashSet;
8
9use moonpool_sim::{Invariant, StateHandle, assert_always, assert_sometimes};
10
11#[derive(Debug, Default, Clone)]
13pub struct MessageInvariants {
14 pub reliable_sent: HashSet<u64>,
16 pub reliable_received: HashSet<u64>,
18 pub unreliable_sent: HashSet<u64>,
20 pub unreliable_received: HashSet<u64>,
22
23 pub duplicate_count: u64,
25 pub messages_dropped: u64,
27}
28
29impl MessageInvariants {
30 pub fn new() -> Self {
32 Self::default()
33 }
34
35 pub fn record_sent(&mut self, seq_id: u64, reliable: bool) {
37 if reliable {
38 self.reliable_sent.insert(seq_id);
39 } else {
40 self.unreliable_sent.insert(seq_id);
41 }
42 }
43
44 pub fn record_received(&mut self, seq_id: u64, reliable: bool) -> bool {
46 let is_dup = if reliable {
47 self.reliable_received.contains(&seq_id)
48 } else {
49 self.unreliable_received.contains(&seq_id)
50 };
51
52 if is_dup {
53 self.duplicate_count += 1;
54 }
55
56 if reliable {
57 self.reliable_received.insert(seq_id);
58 } else {
59 self.unreliable_received.insert(seq_id);
60 }
61
62 is_dup
63 }
64
65 pub fn record_dropped(&mut self) {
67 self.messages_dropped += 1;
68 }
69
70 pub fn validate_always(&self) {
74 for seq_id in &self.reliable_received {
76 assert_always!(
77 self.reliable_sent.contains(seq_id),
78 format!("Received reliable message {} that was never sent", seq_id)
79 );
80 }
81
82 for seq_id in &self.unreliable_received {
83 assert_always!(
84 self.unreliable_sent.contains(seq_id),
85 format!("Received unreliable message {} that was never sent", seq_id)
86 );
87 }
88
89 assert_always!(
91 self.unreliable_received.len() <= self.unreliable_sent.len(),
92 format!(
93 "More unique unreliable received ({}) than sent ({})",
94 self.unreliable_received.len(),
95 self.unreliable_sent.len()
96 )
97 );
98 }
99
100 pub fn validate_receiver_always(&self) {
102 assert_sometimes!(
104 self.duplicate_count > 0,
105 "Should sometimes see duplicates due to retransmission"
106 );
107
108 assert_sometimes!(
110 self.duplicate_count == 0,
111 "Should sometimes see no duplicates (clean delivery)"
112 );
113 }
114
115 pub fn validate_at_quiescence(&self) {
119 let missing: Vec<_> = self
121 .reliable_sent
122 .difference(&self.reliable_received)
123 .collect();
124
125 assert_always!(
126 missing.is_empty(),
127 format!(
128 "Not all reliable messages delivered. Missing: {:?}",
129 missing
130 )
131 );
132
133 assert_sometimes!(
135 self.unreliable_received.len() < self.unreliable_sent.len(),
136 "Some unreliable should sometimes be dropped under chaos"
137 );
138
139 assert_sometimes!(
140 self.unreliable_received.len() == self.unreliable_sent.len(),
141 "All unreliable should sometimes be delivered (no chaos)"
142 );
143 }
144
145 pub fn total_sent(&self) -> usize {
147 self.reliable_sent.len() + self.unreliable_sent.len()
148 }
149
150 pub fn total_received(&self) -> usize {
152 self.reliable_received.len() + self.unreliable_received.len()
153 }
154}
155
156#[derive(Debug, Default, Clone)]
162pub struct RpcInvariants {
163 pub requests_sent: HashSet<u64>,
165 pub responses_received: HashSet<u64>,
167 pub broken_promises: HashSet<u64>,
169 pub connection_failures: u64,
171 pub timeouts: u64,
173 pub successful_responses: u64,
175}
176
177impl RpcInvariants {
178 pub fn new() -> Self {
180 Self::default()
181 }
182
183 pub fn record_request_sent(&mut self, request_id: u64) {
185 self.requests_sent.insert(request_id);
186 }
187
188 pub fn record_response_received(&mut self, request_id: u64) {
190 self.responses_received.insert(request_id);
191 self.successful_responses += 1;
192 }
193
194 pub fn record_broken_promise(&mut self, request_id: u64) {
196 self.broken_promises.insert(request_id);
197 }
198
199 pub fn record_connection_failure(&mut self) {
201 self.connection_failures += 1;
202 }
203
204 pub fn record_timeout(&mut self) {
206 self.timeouts += 1;
207 }
208
209 pub fn validate_always(&self) {
211 for request_id in &self.responses_received {
213 assert_always!(
214 self.requests_sent.contains(request_id),
215 format!(
216 "Received RPC response {} that was never requested",
217 request_id
218 )
219 );
220 }
221
222 for request_id in &self.broken_promises {
224 assert_always!(
225 self.requests_sent.contains(request_id),
226 format!(
227 "Received broken promise {} that was never requested",
228 request_id
229 )
230 );
231 }
232
233 for request_id in &self.responses_received {
235 assert_always!(
236 !self.broken_promises.contains(request_id),
237 format!(
238 "Request {} has both success response and broken promise",
239 request_id
240 )
241 );
242 }
243 }
244
245 pub fn validate_coverage(&self) {
247 assert_sometimes!(
249 self.successful_responses > 0,
250 "Should sometimes see successful RPC responses"
251 );
252
253 assert_sometimes!(
255 !self.broken_promises.is_empty(),
256 "Should sometimes see broken promises"
257 );
258
259 assert_sometimes!(self.timeouts > 0, "Should sometimes see timeouts");
261 }
262
263 pub fn summary(&self) -> String {
265 format!(
266 "RPC: sent={}, success={}, broken={}, timeouts={}, conn_fail={}",
267 self.requests_sent.len(),
268 self.successful_responses,
269 self.broken_promises.len(),
270 self.timeouts,
271 self.connection_failures
272 )
273 }
274}
275
276pub struct MessageInvariantChecker;
282
283impl Invariant for MessageInvariantChecker {
284 fn name(&self) -> &str {
285 "message_invariants"
286 }
287
288 fn check(&self, state: &StateHandle, _sim_time_ms: u64) {
289 if let Some(inv) = state.get::<MessageInvariants>("message_invariants") {
290 inv.validate_always();
291 }
292 }
293}
294
295pub struct RpcInvariantChecker;
297
298impl Invariant for RpcInvariantChecker {
299 fn name(&self) -> &str {
300 "rpc_invariants"
301 }
302
303 fn check(&self, state: &StateHandle, _sim_time_ms: u64) {
304 if let Some(inv) = state.get::<RpcInvariants>("rpc_invariants") {
305 inv.validate_always();
306 }
307 }
308}
309
310#[cfg(test)]
311mod tests {
312 use super::*;
313
314 #[test]
315 fn test_empty_invariants() {
316 let inv = MessageInvariants::new();
317 inv.validate_always(); }
319
320 #[test]
321 fn test_record_sent_received() {
322 let mut inv = MessageInvariants::new();
323
324 inv.record_sent(1, true);
325 inv.record_sent(2, false);
326
327 assert_eq!(inv.reliable_sent.len(), 1);
328 assert_eq!(inv.unreliable_sent.len(), 1);
329
330 let dup = inv.record_received(1, true);
331 assert!(!dup);
332
333 let dup = inv.record_received(1, true);
334 assert!(dup);
335 assert_eq!(inv.duplicate_count, 1);
336 }
337
338 #[test]
339 fn test_validate_always_passes() {
340 let mut inv = MessageInvariants::new();
341
342 inv.record_sent(1, true);
343 inv.record_sent(2, true);
344 inv.record_received(1, true);
345
346 inv.validate_always(); }
348
349 #[test]
350 fn test_phantom_message_detected() {
351 moonpool_sim::reset_always_violations();
352 let mut inv = MessageInvariants::new();
353
354 inv.record_received(999, true);
356 inv.validate_always();
357
358 assert!(
359 moonpool_sim::has_always_violations(),
360 "expected always-violation for phantom message"
361 );
362 }
363
364 #[test]
369 fn test_rpc_invariants_empty() {
370 let inv = RpcInvariants::new();
371 inv.validate_always(); }
373
374 #[test]
375 fn test_rpc_invariants_request_response() {
376 let mut inv = RpcInvariants::new();
377
378 inv.record_request_sent(1);
379 inv.record_request_sent(2);
380 inv.record_response_received(1);
381
382 assert_eq!(inv.requests_sent.len(), 2);
383 assert_eq!(inv.responses_received.len(), 1);
384 assert_eq!(inv.successful_responses, 1);
385
386 inv.validate_always(); }
388
389 #[test]
390 fn test_rpc_invariants_broken_promise() {
391 let mut inv = RpcInvariants::new();
392
393 inv.record_request_sent(1);
394 inv.record_broken_promise(1);
395
396 assert!(!inv.broken_promises.is_empty());
397 inv.validate_always(); }
399
400 #[test]
401 fn test_rpc_phantom_response_detected() {
402 moonpool_sim::reset_always_violations();
403 let mut inv = RpcInvariants::new();
404
405 inv.record_response_received(999);
407 inv.validate_always();
408
409 assert!(
410 moonpool_sim::has_always_violations(),
411 "expected always-violation for phantom response"
412 );
413 }
414
415 #[test]
416 fn test_rpc_phantom_broken_promise_detected() {
417 moonpool_sim::reset_always_violations();
418 let mut inv = RpcInvariants::new();
419
420 inv.record_broken_promise(999);
422 inv.validate_always();
423
424 assert!(
425 moonpool_sim::has_always_violations(),
426 "expected always-violation for phantom broken promise"
427 );
428 }
429
430 #[test]
431 fn test_rpc_double_resolution_detected() {
432 moonpool_sim::reset_always_violations();
433 let mut inv = RpcInvariants::new();
434
435 inv.record_request_sent(1);
436 inv.record_response_received(1);
437 inv.record_broken_promise(1); inv.validate_always();
440
441 assert!(
442 moonpool_sim::has_always_violations(),
443 "expected always-violation for double resolution"
444 );
445 }
446
447 #[test]
448 fn test_rpc_invariants_summary() {
449 let mut inv = RpcInvariants::new();
450 inv.record_request_sent(1);
451 inv.record_request_sent(2);
452 inv.record_response_received(1);
453 inv.record_timeout();
454 inv.record_connection_failure();
455
456 let summary = inv.summary();
457 assert!(summary.contains("sent=2"));
458 assert!(summary.contains("success=1"));
459 assert!(summary.contains("timeouts=1"));
460 assert!(summary.contains("conn_fail=1"));
461 }
462}