commonware_broadcast/buffered/
mod.rs

1//! Broadcast messages to and cache messages from untrusted peers.
2//!
3//! # Overview
4//!
5//! The core of the module is the [`Engine`]. It is responsible for:
6//! - Accepting and caching messages from other participants
7//! - Broadcasting messages to all peers
8//! - Serving cached messages on-demand
9//!
10//! # Details
11//!
12//! The engine receives messages from other peers and caches them. The cache is a bounded queue of
13//! messages per peer. When the cache is full, the oldest message is removed to make room for the
14//! new one.
15//!
16//! The [`Mailbox`] is used to make requests to the [`Engine`]. It implements the
17//! [`Broadcaster`](crate::Broadcaster) trait. This is used to have the engine send a message to all
18//! other peers in the network in a best-effort manner. It also has a method to request a message by
19//! digest. The engine will return the message immediately if it is in the cache, or wait for it to
20//! be received over the network if it is not.
21
22mod config;
23pub use config::Config;
24mod engine;
25pub use engine::Engine;
26mod ingress;
27pub use ingress::Mailbox;
28pub(crate) use ingress::Message;
29mod metrics;
30
31#[cfg(test)]
32pub mod mocks;
33
34#[cfg(test)]
35mod tests {
36    use super::{mocks::TestMessage, *};
37    use crate::Broadcaster;
38    use commonware_codec::RangeCfg;
39    use commonware_cryptography::{
40        ed25519::PublicKey, sha256::Digest as Sha256Digest, Committable, Digestible, Ed25519,
41        Signer,
42    };
43    use commonware_macros::{select, test_traced};
44    use commonware_p2p::{
45        simulated::{Link, Network, Oracle, Receiver, Sender},
46        Recipients,
47    };
48    use commonware_runtime::{deterministic, Clock, Metrics, Runner};
49    use std::{collections::BTreeMap, time::Duration};
50
51    // Number of messages to cache per sender
52    const CACHE_SIZE: usize = 10;
53
54    // Enough time to receive a cached message. Cannot be instantaneous as the test runtime
55    // requires some time to switch context.
56    const A_JIFFY: Duration = Duration::from_millis(10);
57
58    // Network speed for the simulated network
59    const NETWORK_SPEED: Duration = Duration::from_millis(100);
60
61    // Enough time for a message to propagate through the network
62    const NETWORK_SPEED_WITH_BUFFER: Duration = Duration::from_millis(200);
63
64    type Registrations = BTreeMap<PublicKey, (Sender<PublicKey>, Receiver<PublicKey>)>;
65
66    async fn initialize_simulation(
67        context: deterministic::Context,
68        num_peers: u32,
69        success_rate: f64,
70    ) -> (Vec<PublicKey>, Registrations, Oracle<PublicKey>) {
71        let (network, mut oracle) = Network::<deterministic::Context, PublicKey>::new(
72            context.with_label("network"),
73            commonware_p2p::simulated::Config {
74                max_size: 1024 * 1024,
75            },
76        );
77        network.start();
78
79        let mut schemes = (0..num_peers)
80            .map(|i| Ed25519::from_seed(i as u64))
81            .collect::<Vec<_>>();
82        schemes.sort_by_key(|s| s.public_key());
83        let peers: Vec<PublicKey> = schemes.iter().map(|c| (c.public_key())).collect();
84
85        let mut registrations: Registrations = BTreeMap::new();
86        for peer in peers.iter() {
87            let (sender, receiver) = oracle.register(peer.clone(), 0).await.unwrap();
88            registrations.insert(peer.clone(), (sender, receiver));
89        }
90
91        // Add links between all peers
92        let link = Link {
93            latency: NETWORK_SPEED.as_millis() as f64,
94            jitter: 0.0,
95            success_rate,
96        };
97        for p1 in peers.iter() {
98            for p2 in peers.iter() {
99                if p2 == p1 {
100                    continue;
101                }
102                oracle
103                    .add_link(p1.clone(), p2.clone(), link.clone())
104                    .await
105                    .unwrap();
106            }
107        }
108
109        (peers, registrations, oracle)
110    }
111
112    fn spawn_peer_engines(
113        context: deterministic::Context,
114        registrations: &mut Registrations,
115    ) -> BTreeMap<PublicKey, Mailbox<PublicKey, Sha256Digest, Sha256Digest, TestMessage>> {
116        let mut mailboxes = BTreeMap::new();
117        while let Some((peer, network)) = registrations.pop_first() {
118            let context = context.with_label(&peer.to_string());
119            let config = Config {
120                public_key: peer.clone(),
121                mailbox_size: 1024,
122                deque_size: CACHE_SIZE,
123                priority: false,
124                codec_config: RangeCfg::from(..),
125            };
126            let (engine, engine_mailbox) =
127                Engine::<_, PublicKey, Sha256Digest, Sha256Digest, TestMessage>::new(
128                    context.clone(),
129                    config,
130                );
131            mailboxes.insert(peer.clone(), engine_mailbox);
132            engine.start(network);
133        }
134        mailboxes
135    }
136
137    #[test_traced]
138    fn test_broadcast() {
139        let runner = deterministic::Runner::timed(Duration::from_secs(5));
140        runner.start(|context| async move {
141            let (peers, mut registrations, _oracle) =
142                initialize_simulation(context.clone(), 4, 1.0).await;
143            let mailboxes = spawn_peer_engines(context.clone(), &mut registrations);
144
145            // Send a single broadcast message from the first peer
146            let message = TestMessage::shared(b"hello world test message");
147            let mut first_mailbox = mailboxes.get(peers.first().unwrap()).unwrap().clone();
148            let result = first_mailbox
149                .broadcast(Recipients::All, message.clone())
150                .await;
151
152            // Allow time for propagation
153            context.sleep(Duration::from_secs(1)).await;
154
155            // Check that all peers received the message
156            for peer in peers.iter() {
157                let mut mailbox = mailboxes.get(peer).unwrap().clone();
158                let commitment = message.commitment();
159                let receiver = mailbox.subscribe(None, commitment, None).await;
160                let received_message = receiver.await.ok();
161                assert_eq!(received_message.unwrap(), message.clone());
162            }
163            assert_eq!(result.await.unwrap().len(), peers.len() - 1);
164
165            // Drop broadcast result
166            let message = TestMessage::shared(b"hello world again");
167            let result = first_mailbox
168                .broadcast(Recipients::All, message.clone())
169                .await;
170            drop(result);
171
172            // Allow time for propagation
173            context.sleep(Duration::from_secs(1)).await;
174
175            // Check that all peers received the new message
176            let mut found = 0;
177            for peer in peers.iter() {
178                let mut mailbox = mailboxes.get(peer).unwrap().clone();
179                let commitment = message.commitment();
180                let receiver = mailbox.get(None, commitment, None).await;
181                if !receiver.is_empty() {
182                    assert_eq!(receiver, vec![message.clone()]);
183                    found += 1;
184                }
185            }
186            assert!(found > 0, "No peers received the message");
187        });
188    }
189
190    #[test_traced]
191    fn test_self_retrieval() {
192        let runner = deterministic::Runner::timed(Duration::from_secs(5));
193        runner.start(|context| async move {
194            // Initialize simulation with 1 peer
195            let (peers, mut registrations, _oracle) =
196                initialize_simulation(context.clone(), 1, 1.0).await;
197            let mailboxes = spawn_peer_engines(context.clone(), &mut registrations);
198
199            // Set up mailbox for Peer A
200            let mut mailbox_a = mailboxes.get(&peers[0]).unwrap().clone();
201
202            // Create a test message
203            let m1 = TestMessage::shared(b"hello world");
204            let commitment_m1 = m1.commitment();
205
206            // Attempt immediate retrieval before broadcasting
207            let receiver_before = mailbox_a.get(None, commitment_m1, None).await;
208            assert!(receiver_before.is_empty());
209
210            // Attempt retrieval before broadcasting
211            let receiver_before = mailbox_a.subscribe(None, commitment_m1, None).await;
212
213            // Broadcast the message
214            let result = mailbox_a.broadcast(Recipients::All, m1.clone()).await;
215            assert_eq!(result.await.unwrap().len(), peers.len() - 1);
216
217            // Wait for the pre-broadcast retrieval to complete
218            let msg_before = receiver_before
219                .await
220                .expect("Pre-broadcast retrieval failed");
221            assert_eq!(msg_before, m1);
222
223            // Attempt immediate retrieval after broadcasting
224            let receiver_after = mailbox_a.get(None, commitment_m1, None).await;
225            assert_eq!(receiver_after, vec![m1.clone()]);
226
227            // Perform a second retrieval after the broadcast
228            let receiver_after = mailbox_a.subscribe(None, commitment_m1, None).await;
229
230            // Measure the time taken for the second retrieval
231            let start = context.current();
232            let msg_after = receiver_after
233                .await
234                .expect("Post-broadcast retrieval failed");
235            let duration = context.current().duration_since(start).unwrap();
236
237            // Verify the second retrieval matches the original message
238            assert_eq!(msg_after, m1);
239
240            // Verify the second retrieval was instant (less than 10ms)
241            assert!(duration < A_JIFFY, "get not instant");
242        });
243    }
244
245    #[test_traced]
246    fn test_packet_loss() {
247        let runner = deterministic::Runner::timed(Duration::from_secs(30));
248        runner.start(|context| async move {
249            let (peers, mut registrations, _oracle) =
250                initialize_simulation(context.clone(), 10, 0.1).await;
251            let mailboxes = spawn_peer_engines(context.clone(), &mut registrations);
252
253            // Create a message and grab an arbitrary mailbox
254            let message = TestMessage::shared(b"hello world test message");
255            let mut first_mailbox = mailboxes.get(peers.first().unwrap()).unwrap().clone();
256
257            // Retry until all peers receive the message (or timeout)
258            let commitment = message.commitment();
259            for i in 0..100 {
260                // Broadcast the message
261                let result = first_mailbox
262                    .broadcast(Recipients::All, message.clone())
263                    .await;
264                context.sleep(NETWORK_SPEED_WITH_BUFFER).await;
265
266                // Check if all peers received the message
267                let mut all_received = true;
268                for peer in peers.iter() {
269                    let mut mailbox = mailboxes.get(peer).unwrap().clone();
270                    let receiver = mailbox.subscribe(None, commitment, None).await;
271                    let has = select! {
272                        _ = context.sleep(A_JIFFY) => {false},
273                        r = receiver => { r.is_ok() },
274                    };
275                    all_received &= has;
276                }
277                assert_eq!(result.await.unwrap().len(), peers.len() - 1);
278
279                // If all received, we're done
280                if all_received {
281                    assert!(i > 0, "Message received on first try");
282                    return;
283                }
284            }
285            panic!("Not all peers received the message after retries");
286        });
287    }
288
289    #[test_traced]
290    fn test_get_cached() {
291        let runner = deterministic::Runner::timed(Duration::from_secs(5));
292        runner.start(|context| async move {
293            let (peers, mut registrations, _oracle) =
294                initialize_simulation(context.clone(), 2, 1.0).await;
295            let mailboxes = spawn_peer_engines(context.clone(), &mut registrations);
296
297            // Broadcast a message
298            let message = TestMessage::shared(b"cached message");
299            let mut first_mailbox = mailboxes.get(peers.first().unwrap()).unwrap().clone();
300            let result = first_mailbox
301                .broadcast(Recipients::All, message.clone())
302                .await;
303            assert_eq!(result.await.unwrap().len(), peers.len() - 1);
304
305            // Wait for propagation
306            context.sleep(NETWORK_SPEED_WITH_BUFFER).await;
307
308            // Get from cache (should be instant)
309            let commitment = message.commitment();
310            let mut mailbox = mailboxes.get(peers.last().unwrap()).unwrap().clone();
311            let receiver = mailbox.subscribe(None, commitment, None).await;
312            let start = context.current();
313            let received = receiver.await.expect("failed to get cached message");
314            let duration = context.current().duration_since(start).unwrap();
315            assert_eq!(received, message);
316            assert!(duration < A_JIFFY, "get not instant",);
317        });
318    }
319
320    #[test_traced]
321    fn test_get_nonexistent() {
322        let runner = deterministic::Runner::timed(Duration::from_secs(5));
323        runner.start(|context| async move {
324            let (peers, mut registrations, _oracle) =
325                initialize_simulation(context.clone(), 2, 1.0).await;
326            let mailboxes = spawn_peer_engines(context.clone(), &mut registrations);
327
328            // Request nonexistent message from two nodes
329            let message = TestMessage::shared(b"future message");
330            let commitment = message.commitment();
331            let mut mailbox1 = mailboxes.get(&peers[0]).unwrap().clone();
332            let mut mailbox2 = mailboxes.get(&peers[1]).unwrap().clone();
333            let receiver = mailbox1.subscribe(None, commitment, None).await;
334
335            // Create two other requests which are dropped
336            let dummy1 = mailbox1.subscribe(None, commitment, None).await;
337            let dummy2 = mailbox2.subscribe(None, commitment, None).await;
338            drop(dummy1);
339            drop(dummy2);
340
341            // Broadcast the message
342            let result = mailbox1.broadcast(Recipients::All, message.clone()).await;
343            assert_eq!(result.await.unwrap().len(), peers.len() - 1);
344
345            // Wait for propagation
346            context.sleep(NETWORK_SPEED_WITH_BUFFER).await;
347
348            // Check receiver1 gets the message, receiver2 was dropped
349            let received = receiver.await.expect("receiver1 should get message");
350            assert_eq!(received, message);
351        });
352    }
353
354    #[test_traced]
355    fn test_cache_eviction_single_peer() {
356        let runner = deterministic::Runner::timed(Duration::from_secs(5));
357        runner.start(|context| async move {
358            let (peers, mut registrations, _oracle) =
359                initialize_simulation(context.clone(), 2, 1.0).await;
360            let mailboxes = spawn_peer_engines(context.clone(), &mut registrations);
361
362            // Broadcast messages exceeding cache size
363            let mut mailbox = mailboxes.get(&peers[0]).unwrap().clone();
364            let mut messages = vec![];
365            for i in 0..CACHE_SIZE + 1 {
366                messages.push(TestMessage::shared(format!("message {}", i).as_bytes()));
367            }
368            for message in messages.iter() {
369                let result = mailbox.broadcast(Recipients::All, message.clone()).await;
370                assert_eq!(result.await.unwrap().len(), peers.len() - 1);
371            }
372
373            // Wait for propagation
374            context.sleep(NETWORK_SPEED_WITH_BUFFER).await;
375
376            // Check all other messages exist
377            let mut peer_mailbox = mailboxes.get(&peers[1]).unwrap().clone();
378            for msg in messages.iter().skip(1) {
379                let result = peer_mailbox
380                    .subscribe(None, msg.commitment(), None)
381                    .await
382                    .await
383                    .unwrap();
384                assert_eq!(result, msg.clone());
385            }
386
387            // Check first message times out
388            let receiver = peer_mailbox
389                .subscribe(None, messages[0].commitment(), None)
390                .await;
391            select! {
392                _ = context.sleep(A_JIFFY) => {},
393                _ = receiver => { panic!("receiver should have failed")},
394            }
395        });
396    }
397
398    #[test_traced]
399    fn test_cache_eviction_multi_peer() {
400        let runner = deterministic::Runner::timed(Duration::from_secs(10));
401        runner.start(|context| async move {
402            // Initialize simulation with 3 peers
403            let (peers, mut registrations, _oracle) =
404                initialize_simulation(context.clone(), 3, 1.0).await;
405            let mailboxes = spawn_peer_engines(context.clone(), &mut registrations);
406
407            // Assign mailboxes for peers A, B, C
408            let mut mailbox_a = mailboxes.get(&peers[0]).unwrap().clone();
409            let mut mailbox_b = mailboxes.get(&peers[1]).unwrap().clone();
410            let mut mailbox_c = mailboxes.get(&peers[2]).unwrap().clone();
411
412            // Create and broadcast message M1 from A
413            let m1 = TestMessage::shared(b"message M1");
414            let commitment_m1 = m1.commitment();
415            let result = mailbox_a.broadcast(Recipients::All, m1.clone()).await;
416            assert_eq!(result.await.unwrap().len(), peers.len() - 1);
417            context.sleep(NETWORK_SPEED_WITH_BUFFER).await;
418
419            // Broadcast M1 from C
420            let result = mailbox_c.broadcast(Recipients::All, m1.clone()).await;
421            assert_eq!(result.await.unwrap().len(), peers.len() - 1);
422            context.sleep(NETWORK_SPEED_WITH_BUFFER).await;
423
424            // M1 is now in A's and C's deques in B's engine
425
426            // Peer A broadcasts 10 new messages to evict M1 from A's deque
427            let mut new_messages_a = Vec::with_capacity(CACHE_SIZE);
428            for i in 0..CACHE_SIZE {
429                new_messages_a.push(TestMessage::shared(format!("A{}", i).as_bytes()));
430            }
431            for msg in &new_messages_a {
432                let result = mailbox_a.broadcast(Recipients::All, msg.clone()).await;
433                assert_eq!(result.await.unwrap().len(), peers.len() - 1);
434            }
435            context.sleep(NETWORK_SPEED_WITH_BUFFER).await;
436
437            // Verify B can still get M1 (in C's deque)
438            let receiver = mailbox_b.subscribe(None, commitment_m1, None).await;
439            let received = receiver.await.expect("M1 should be retrievable");
440            assert_eq!(received, m1);
441
442            // Peer C broadcasts 10 new messages to evict M1 from C's deque
443            let mut new_messages_c = Vec::with_capacity(CACHE_SIZE);
444            for i in 0..CACHE_SIZE {
445                new_messages_c.push(TestMessage::shared(format!("C{}", i).as_bytes()));
446            }
447            for msg in &new_messages_c {
448                let result = mailbox_c.broadcast(Recipients::All, msg.clone()).await;
449                assert_eq!(result.await.unwrap().len(), peers.len() - 1);
450            }
451            context.sleep(NETWORK_SPEED_WITH_BUFFER).await;
452
453            // Verify B cannot get M1 (evicted from all deques)
454            let receiver = mailbox_b.subscribe(None, commitment_m1, None).await;
455            select! {
456                _ = context.sleep(A_JIFFY) => {},
457                _ = receiver => { panic!("M1 should not be retrievable"); },
458            }
459        });
460    }
461
462    #[test_traced]
463    fn test_selective_recipients() {
464        let runner = deterministic::Runner::timed(Duration::from_secs(5));
465        runner.start(|context| async move {
466            let (peers, mut registrations, _oracle) =
467                initialize_simulation(context.clone(), 4, 1.0).await;
468
469            let sender_pk = peers[0].clone();
470            let target_peer = peers[1].clone();
471            let non_target_peer = peers[2].clone();
472
473            let mailboxes = spawn_peer_engines(context.clone(), &mut registrations);
474            let mut sender_mb = mailboxes.get(&sender_pk).unwrap().clone();
475
476            let msg = TestMessage::shared(b"selective-broadcast");
477            let result = sender_mb
478                .broadcast(Recipients::One(target_peer.clone()), msg.clone())
479                .await;
480            assert_eq!(result.await.unwrap(), vec![target_peer.clone()]);
481
482            context.sleep(NETWORK_SPEED_WITH_BUFFER).await;
483
484            // Only target peer should retrieve the message.
485            let got_target = mailboxes
486                .get(&target_peer)
487                .unwrap()
488                .clone()
489                .get(None, msg.commitment(), None)
490                .await;
491            assert_eq!(got_target, vec![msg.clone()]);
492
493            // Non-target peer should not retrieve the message.
494            let got_other = mailboxes
495                .get(&non_target_peer)
496                .unwrap()
497                .clone()
498                .get(None, msg.commitment(), None)
499                .await;
500            assert!(got_other.is_empty());
501        });
502    }
503
504    #[test_traced]
505    fn test_sender_filter_subscribe() {
506        let runner = deterministic::Runner::timed(Duration::from_secs(10));
507        runner.start(|context| async move {
508            let (peers, mut registrations, _oracle) =
509                initialize_simulation(context.clone(), 4, 1.0).await;
510            let mailboxes = spawn_peer_engines(context.clone(), &mut registrations);
511
512            let sender1 = peers[0].clone();
513            let sender2 = peers[1].clone();
514            let sender3 = peers[2].clone();
515
516            let mut mb1 = mailboxes.get(&sender1).unwrap().clone();
517            let mut mb2 = mailboxes.get(&sender2).unwrap().clone();
518            let mut mb3 = mailboxes.get(&sender3).unwrap().clone();
519
520            let msg = TestMessage::shared(b"from-one");
521            let id = msg.commitment();
522
523            // mb2 waits for `id` but only if it originates from `sender1`.
524            let mut recv = mb2.subscribe(Some(sender1.clone()), id, None).await;
525
526            // Broadcast from the wrong sender (should *not* satisfy).
527            mb3.broadcast(Recipients::All, msg.clone())
528                .await
529                .await
530                .unwrap();
531
532            // Wait for the broadcast to propagate
533            context.sleep(A_JIFFY).await;
534
535            // Check that the receiver is still waiting
536            assert!(recv.try_recv().unwrap().is_none());
537
538            // Correct sender broadcasts and subscription fulfills.
539            mb1.broadcast(Recipients::All, msg.clone())
540                .await
541                .await
542                .unwrap();
543            assert_eq!(recv.await.unwrap(), msg);
544        });
545    }
546
547    #[test_traced]
548    fn test_get_all_for_commitment() {
549        let runner = deterministic::Runner::timed(Duration::from_secs(5));
550        runner.start(|context| async move {
551            let (peers, mut registrations, _oracle) =
552                initialize_simulation(context.clone(), 4, 1.0).await;
553            let mailboxes = spawn_peer_engines(context.clone(), &mut registrations);
554
555            let sender1 = peers[0].clone();
556            let sender2 = peers[1].clone();
557
558            let mut mb1 = mailboxes.get(&sender1).unwrap().clone();
559            let mut mb2 = mailboxes.get(&sender2).unwrap().clone();
560
561            // Two messages share commitment but have distinct digests.
562            let m1 = TestMessage::new(b"id", b"content-1");
563            let m2 = TestMessage::new(b"id", b"content-2");
564            let m3 = TestMessage::new(b"other-id", b"content-3");
565            mb1.broadcast(Recipients::All, m1.clone())
566                .await
567                .await
568                .unwrap();
569            mb1.broadcast(Recipients::All, m2.clone())
570                .await
571                .await
572                .unwrap();
573            mb1.broadcast(Recipients::All, m3.clone())
574                .await
575                .await
576                .unwrap();
577
578            // Wait for propagation
579            context.sleep(NETWORK_SPEED_WITH_BUFFER).await;
580
581            // `get` with digest=None returns both.
582            let mut got = mb2.get(None, m1.commitment(), None).await;
583            got.sort_by_key(|m| m.content.clone());
584            assert_eq!(got, vec![m1.clone(), m2.clone()]);
585
586            // `get` with digest=Some returns only the first.
587            let got = mb2.get(None, m1.commitment(), Some(m1.digest())).await;
588            assert_eq!(got, vec![m1.clone()]);
589
590            // `get` with digest=None returns only one with a duplicate digest.
591            let got = mb2.get(None, m3.commitment(), None).await;
592            assert_eq!(got, vec![m3.clone()]);
593
594            // `get` with digest=Some that does not exist returns empty.
595            let got = mb2.get(None, m3.commitment(), Some(m2.digest())).await;
596            assert!(got.is_empty());
597
598            // `get` with digest=None from a single sender should return all messages.
599            let mut got = mb2.get(Some(sender1.clone()), m1.commitment(), None).await;
600            got.sort_by_key(|m| m.content.clone());
601            assert_eq!(got, vec![m1.clone(), m2.clone()]);
602            let got = mb2.get(Some(sender1.clone()), m3.commitment(), None).await;
603            assert_eq!(got, vec![m3.clone()]);
604        });
605    }
606
607    #[test_traced]
608    fn test_ref_count_across_peers() {
609        let runner = deterministic::Runner::timed(Duration::from_secs(10));
610        runner.start(|context| async move {
611            // three peers so we can observe from a third
612            let (peers, mut registrations, _oracle) =
613                initialize_simulation(context.clone(), 3, 1.0).await;
614            let mailboxes = spawn_peer_engines(context.clone(), &mut registrations);
615
616            let p0 = peers[0].clone();
617            let p1 = peers[1].clone();
618            let observer = peers[2].clone();
619
620            let mut mb0 = mailboxes.get(&p0).unwrap().clone();
621            let mut mb1 = mailboxes.get(&p1).unwrap().clone();
622            let mut obs = mailboxes.get(&observer).unwrap().clone();
623
624            // the message duplicated by p0 and p1
625            let dup = TestMessage::shared(b"dup");
626            let id = dup.commitment();
627
628            // broadcast from both senders
629            mb0.broadcast(Recipients::All, dup.clone())
630                .await
631                .await
632                .unwrap();
633            mb1.broadcast(Recipients::All, dup.clone())
634                .await
635                .await
636                .unwrap();
637            context.sleep(NETWORK_SPEED_WITH_BUFFER).await;
638
639            // observer must get it now
640            assert_eq!(obs.get(None, id, None).await, vec![dup.clone()]);
641
642            // Evict from p0’s deque only
643            for i in 0..CACHE_SIZE {
644                let spam = TestMessage::shared(format!("p0-{i}").into_bytes());
645                mb0.broadcast(Recipients::All, spam).await.await.unwrap();
646            }
647            context.sleep(NETWORK_SPEED_WITH_BUFFER).await;
648            assert_eq!(obs.get(None, id, None).await, vec![dup.clone()]);
649
650            // Evict from p1’s deque as well
651            for i in 0..CACHE_SIZE {
652                let spam = TestMessage::shared(format!("p1-{i}").into_bytes());
653                mb1.broadcast(Recipients::All, spam).await.await.unwrap();
654            }
655            context.sleep(NETWORK_SPEED_WITH_BUFFER).await;
656            assert!(obs.get(None, id, None).await.is_empty());
657        });
658    }
659
660    #[test_traced]
661    fn test_digest_filtered_waiter() {
662        let runner = deterministic::Runner::timed(Duration::from_secs(5));
663        runner.start(|context| async move {
664            let (peers, mut registrations, _oracle) =
665                initialize_simulation(context.clone(), 3, 1.0).await;
666            let mailboxes = spawn_peer_engines(context.clone(), &mut registrations);
667
668            let owner = peers[0].clone();
669            let spoiler = peers[1].clone();
670            let waiter = peers[2].clone();
671
672            let mut mb_owner = mailboxes.get(&owner).unwrap().clone();
673            let mut mb_spoiler = mailboxes.get(&spoiler).unwrap().clone();
674            let mut mb_waiter = mailboxes.get(&waiter).unwrap().clone();
675
676            // two messages share commitment but differ in digest
677            let wanted = TestMessage::new(b"same-id", b"wanted");
678            let not_want = TestMessage::new(b"same-id", b"noise");
679
680            // waiter only wants the *wanted* digest and only from `owner`
681            let mut recv = mb_waiter
682                .subscribe(
683                    Some(owner.clone()),
684                    wanted.commitment(),
685                    Some(wanted.digest()),
686                )
687                .await;
688
689            // spoiler broadcasts the *wanted* digest
690            mb_spoiler
691                .broadcast(Recipients::All, wanted.clone())
692                .await
693                .await
694                .unwrap();
695            context.sleep(A_JIFFY).await;
696            assert!(recv.try_recv().unwrap().is_none());
697
698            // owner broadcasts a *different* digest with same commitment
699            mb_owner
700                .broadcast(Recipients::All, not_want.clone())
701                .await
702                .await
703                .unwrap();
704            context.sleep(A_JIFFY).await;
705            assert!(recv.try_recv().unwrap().is_none());
706
707            // owner finally broadcasts the exact match
708            mb_owner
709                .broadcast(Recipients::All, wanted.clone())
710                .await
711                .await
712                .unwrap();
713            assert_eq!(recv.await.unwrap(), wanted);
714        });
715    }
716}