1use bytes::Bytes;
67use commonware_utils::{channel::oneshot, Span};
68
69mod config;
70pub use config::Config;
71mod engine;
72pub use engine::Engine;
73mod fetcher;
74mod inflight;
75mod ingress;
76pub use ingress::Mailbox;
77mod metrics;
78mod wire;
79
80#[cfg(feature = "mocks")]
81pub mod mocks;
82
83pub trait Producer: Clone + Send + 'static {
85 type Key: Span;
87
88 fn produce(&mut self, key: Self::Key) -> oneshot::Receiver<Bytes>;
90}
91
92#[cfg(test)]
93mod tests {
94 use super::{
95 mocks::{Consumer, Key, Producer},
96 Config, Engine, Mailbox,
97 };
98 use crate::{Delivery, Fetch, Resolver, TargetedResolver};
99 use bytes::Bytes;
100 use commonware_cryptography::{
101 ed25519::{PrivateKey, PublicKey},
102 Signer,
103 };
104 use commonware_macros::{select, test_traced};
105 use commonware_p2p::{
106 simulated::{Link, Network, Oracle, Receiver, Sender},
107 Blocker, Manager as _, Provider, TrackedPeers,
108 };
109 use commonware_runtime::{
110 deterministic, telemetry::metrics::count_running_tasks, Clock, Metrics as _, Quota, Runner,
111 Spawner as _, Supervisor as _,
112 };
113 use commonware_utils::{
114 channel::{fallible::FallibleExt, mpsc, oneshot},
115 non_empty_vec,
116 ordered::Set,
117 sync::Mutex,
118 NZUsize, NZU32,
119 };
120 use std::{
121 collections::{HashMap, VecDeque},
122 num::{NonZeroU32, NonZeroUsize},
123 sync::Arc,
124 time::Duration,
125 };
126
127 const MAILBOX_SIZE: NonZeroUsize = NZUsize!(1024);
128 const RATE_LIMIT: NonZeroU32 = NZU32!(10);
129 const INITIAL_DURATION: Duration = Duration::from_millis(100);
130 const TIMEOUT: Duration = Duration::from_millis(400);
131 const FETCH_RETRY_TIMEOUT: Duration = Duration::from_millis(100);
132 const LINK: Link = Link {
133 latency: Duration::from_millis(10),
134 jitter: Duration::from_millis(1),
135 success_rate: 1.0,
136 };
137 const LINK_UNRELIABLE: Link = Link {
138 latency: Duration::from_millis(10),
139 jitter: Duration::from_millis(1),
140 success_rate: 0.5,
141 };
142
143 fn status_metric_total(metrics: &str, name: &str, status: &str) -> u64 {
144 let prefix = format!("{name}{{");
145 let status_label = format!("status=\"{status}\"");
146 metrics
147 .lines()
148 .filter(|line| line.starts_with(&prefix) && line.contains(&status_label))
149 .map(|line| {
150 line.split_whitespace()
151 .next_back()
152 .expect("metric line must have a value")
153 .parse::<u64>()
154 .expect("status metric value must be an integer")
155 })
156 .sum()
157 }
158
159 async fn setup_network_and_peers(
160 context: &deterministic::Context,
161 peer_seeds: &[u64],
162 ) -> (
163 Oracle<PublicKey, deterministic::Context>,
164 Vec<PrivateKey>,
165 Vec<PublicKey>,
166 Vec<(
167 Sender<PublicKey, deterministic::Context>,
168 Receiver<PublicKey>,
169 )>,
170 ) {
171 setup_network_and_peers_with_rate_limit(context, peer_seeds, Quota::per_second(RATE_LIMIT))
172 .await
173 }
174
175 async fn setup_network_and_peers_with_rate_limit(
176 context: &deterministic::Context,
177 peer_seeds: &[u64],
178 rate_limit: Quota,
179 ) -> (
180 Oracle<PublicKey, deterministic::Context>,
181 Vec<PrivateKey>,
182 Vec<PublicKey>,
183 Vec<(
184 Sender<PublicKey, deterministic::Context>,
185 Receiver<PublicKey>,
186 )>,
187 ) {
188 let (network, oracle) = Network::new(
189 context.child("network"),
190 commonware_p2p::simulated::Config {
191 max_size: 1024 * 1024,
192 disconnect_on_block: true,
193 tracked_peer_sets: NZUsize!(3),
194 },
195 );
196 network.start();
197
198 let schemes: Vec<PrivateKey> = peer_seeds
199 .iter()
200 .map(|seed| PrivateKey::from_seed(*seed))
201 .collect();
202 let peers: Vec<PublicKey> = schemes.iter().map(|s| s.public_key()).collect();
203 let mut manager = oracle.manager();
204 manager.track(0, Set::try_from(peers.clone()).unwrap());
205
206 let mut connections = Vec::new();
207 for peer in &peers {
208 let (sender, receiver) = oracle
209 .control(peer.clone())
210 .register(0, rate_limit)
211 .await
212 .unwrap();
213 connections.push((sender, receiver));
214 }
215
216 (oracle, schemes, peers, connections)
217 }
218
219 async fn add_link(
220 oracle: &mut Oracle<PublicKey, deterministic::Context>,
221 link: Link,
222 peers: &[PublicKey],
223 from: usize,
224 to: usize,
225 ) {
226 oracle
227 .add_link(peers[from].clone(), peers[to].clone(), link.clone())
228 .await
229 .unwrap();
230 oracle
231 .add_link(peers[to].clone(), peers[from].clone(), link)
232 .await
233 .unwrap();
234 }
235
236 #[derive(Clone, Default)]
237 struct SequencedProducer {
238 data: Arc<Mutex<HashMap<Key, VecDeque<Bytes>>>>,
239 }
240
241 impl SequencedProducer {
242 fn insert(&mut self, key: Key, values: impl IntoIterator<Item = Bytes>) {
243 self.data.lock().insert(key, values.into_iter().collect());
244 }
245
246 fn remaining(&self, key: &Key) -> Vec<Bytes> {
247 self.data
248 .lock()
249 .get(key)
250 .map(|values| values.iter().cloned().collect())
251 .unwrap_or_default()
252 }
253 }
254
255 impl crate::p2p::Producer for SequencedProducer {
256 type Key = Key;
257
258 fn produce(&mut self, key: Self::Key) -> oneshot::Receiver<Bytes> {
259 let (sender, receiver) = oneshot::channel();
260 if let Some(value) = self.data.lock().get_mut(&key).and_then(VecDeque::pop_front) {
261 let _ = sender.send(value);
262 }
263 receiver
264 }
265 }
266
267 fn setup_and_spawn_actor<C, R>(
268 context: &deterministic::Context,
269 provider: impl Provider<PublicKey = PublicKey>,
270 blocker: impl Blocker<PublicKey = PublicKey>,
271 signer: impl Signer<PublicKey = PublicKey>,
272 connection: (
273 Sender<PublicKey, deterministic::Context>,
274 Receiver<PublicKey>,
275 ),
276 consumer: C,
277 producer: Producer<Key, Bytes>,
278 ) -> Mailbox<Key, PublicKey, R>
279 where
280 C: crate::Consumer<Key = Key, Subscriber = R, Value = Bytes>,
281 R: Clone + Ord + Send + 'static,
282 {
283 setup_and_spawn_actor_with_producer(
284 context, provider, blocker, signer, connection, consumer, producer,
285 )
286 }
287
288 fn setup_and_spawn_actor_with_producer<C, R, Pro>(
289 context: &deterministic::Context,
290 provider: impl Provider<PublicKey = PublicKey>,
291 blocker: impl Blocker<PublicKey = PublicKey>,
292 signer: impl Signer<PublicKey = PublicKey>,
293 connection: (
294 Sender<PublicKey, deterministic::Context>,
295 Receiver<PublicKey>,
296 ),
297 consumer: C,
298 producer: Pro,
299 ) -> Mailbox<Key, PublicKey, R>
300 where
301 C: crate::Consumer<Key = Key, Subscriber = R, Value = Bytes>,
302 Pro: crate::p2p::Producer<Key = Key>,
303 R: Clone + Ord + Send + 'static,
304 {
305 let public_key = signer.public_key();
306 let (engine, mailbox) = Engine::new(
307 context.child("actor").with_attribute("peer", &public_key),
308 Config {
309 peer_provider: provider,
310 blocker,
311 consumer,
312 producer,
313 mailbox_size: MAILBOX_SIZE,
314 me: Some(public_key),
315 initial: INITIAL_DURATION,
316 timeout: TIMEOUT,
317 fetch_retry_timeout: FETCH_RETRY_TIMEOUT,
318 priority_requests: false,
319 priority_responses: false,
320 },
321 );
322 engine.start(connection);
323
324 mailbox
325 }
326
327 type DeliveryGate = (oneshot::Receiver<()>, bool);
328 type DeliveryGates = Arc<Mutex<VecDeque<DeliveryGate>>>;
329
330 #[derive(Clone)]
331 struct BlockingConsumer {
332 context: Arc<deterministic::Context>,
333 sender: mpsc::UnboundedSender<(Key, Bytes)>,
334 started: mpsc::UnboundedSender<Key>,
335 gates: DeliveryGates,
336 }
337
338 impl BlockingConsumer {
339 fn new(
340 context: deterministic::Context,
341 gates: Vec<DeliveryGate>,
342 ) -> (
343 Self,
344 mpsc::UnboundedReceiver<(Key, Bytes)>,
345 mpsc::UnboundedReceiver<Key>,
346 ) {
347 let (sender, receiver) = mpsc::unbounded_channel();
348 let (started, started_receiver) = mpsc::unbounded_channel();
349 (
350 Self {
351 context: Arc::new(context),
352 sender,
353 started,
354 gates: Arc::new(Mutex::new(gates.into())),
355 },
356 receiver,
357 started_receiver,
358 )
359 }
360 }
361
362 impl crate::Consumer for BlockingConsumer {
363 type Key = Key;
364 type Value = Bytes;
365 type Subscriber = ();
366
367 fn deliver(
368 &mut self,
369 delivery: Delivery<Self::Key, Self::Subscriber>,
370 value: Self::Value,
371 ) -> oneshot::Receiver<bool> {
372 let key = delivery.key;
373 self.started.send_lossy(key.clone());
374 let (gate, valid) = self
375 .gates
376 .lock()
377 .pop_front()
378 .map_or((None, true), |(gate, valid)| (Some(gate), valid));
379 let (mut response, receiver) = oneshot::channel();
380 let sender = self.sender.clone();
381 self.context.child("delivery").spawn(move |_| async move {
382 if let Some(gate) = gate {
383 select! {
384 _ = response.closed() => return,
385 result = gate => {
386 if result.is_err() {
387 let _ = response.send(false);
388 return;
389 }
390 },
391 }
392 }
393 if valid {
394 sender.send_lossy((key, value));
395 }
396 let _ = response.send(valid);
397 });
398 receiver
399 }
400 }
401
402 #[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd)]
403 struct SubscriberTag(u16);
404
405 type RecordedDelivery = (Delivery<Key, SubscriberTag>, Bytes);
406
407 #[derive(Clone)]
408 struct BlockingSubscriberRecordingConsumer {
409 context: Arc<deterministic::Context>,
410 sender: mpsc::UnboundedSender<RecordedDelivery>,
411 started: mpsc::UnboundedSender<Delivery<Key, SubscriberTag>>,
412 gates: DeliveryGates,
413 }
414
415 impl BlockingSubscriberRecordingConsumer {
416 fn new(
417 context: deterministic::Context,
418 gates: Vec<DeliveryGate>,
419 ) -> (
420 Self,
421 mpsc::UnboundedReceiver<RecordedDelivery>,
422 mpsc::UnboundedReceiver<Delivery<Key, SubscriberTag>>,
423 ) {
424 let (sender, receiver) = mpsc::unbounded_channel();
425 let (started, started_receiver) = mpsc::unbounded_channel();
426 (
427 Self {
428 context: Arc::new(context),
429 sender,
430 started,
431 gates: Arc::new(Mutex::new(gates.into())),
432 },
433 receiver,
434 started_receiver,
435 )
436 }
437 }
438
439 impl crate::Consumer for BlockingSubscriberRecordingConsumer {
440 type Key = Key;
441 type Value = Bytes;
442 type Subscriber = SubscriberTag;
443
444 fn deliver(
445 &mut self,
446 delivery: Delivery<Self::Key, Self::Subscriber>,
447 value: Self::Value,
448 ) -> oneshot::Receiver<bool> {
449 self.started.send_lossy(delivery.clone());
450 let (gate, valid) = self
451 .gates
452 .lock()
453 .pop_front()
454 .map_or((None, true), |(gate, valid)| (Some(gate), valid));
455 let (mut response, receiver) = oneshot::channel();
456 let sender = self.sender.clone();
457 self.context.child("delivery").spawn(move |_| async move {
458 if let Some(gate) = gate {
459 select! {
460 _ = response.closed() => return,
461 result = gate => {
462 if result.is_err() {
463 let _ = response.send(false);
464 return;
465 }
466 },
467 }
468 }
469 if valid {
470 sender.send_lossy((delivery, value));
471 }
472 let _ = response.send(valid);
473 });
474 receiver
475 }
476 }
477
478 #[derive(Clone)]
479 struct SubscriberRecordingConsumer {
480 sender: mpsc::UnboundedSender<RecordedDelivery>,
481 }
482
483 impl SubscriberRecordingConsumer {
484 fn new() -> (Self, mpsc::UnboundedReceiver<RecordedDelivery>) {
485 let (sender, receiver) = mpsc::unbounded_channel();
486 (Self { sender }, receiver)
487 }
488 }
489
490 impl crate::Consumer for SubscriberRecordingConsumer {
491 type Key = Key;
492 type Value = Bytes;
493 type Subscriber = SubscriberTag;
494
495 fn deliver(
496 &mut self,
497 delivery: Delivery<Self::Key, Self::Subscriber>,
498 value: Self::Value,
499 ) -> oneshot::Receiver<bool> {
500 let (sender, receiver) = oneshot::channel();
501 self.sender.send_lossy((delivery, value));
502 let _ = sender.send(true);
503 receiver
504 }
505 }
506
507 fn dummy_consumer() -> Consumer<Key, Bytes> {
508 Consumer::dummy()
509 }
510
511 fn consumer() -> (Consumer<Key, Bytes>, mpsc::UnboundedReceiver<(Key, Bytes)>) {
512 Consumer::new()
513 }
514
515 async fn wait_for_blocked(
516 context: &deterministic::Context,
517 oracle: &Oracle<PublicKey, deterministic::Context>,
518 blocker: &PublicKey,
519 blocked: &PublicKey,
520 ) {
521 loop {
522 let blocked_peers = oracle.blocked().await.unwrap();
523 if blocked_peers
524 .iter()
525 .any(|(a, b)| a == blocker && b == blocked)
526 {
527 return;
528 }
529 context.sleep(Duration::from_millis(10)).await;
530 }
531 }
532
533 #[test_traced]
537 fn test_fetch_success() {
538 let executor = deterministic::Runner::timed(Duration::from_secs(10));
539 executor.start(|context| async move {
540 let (mut oracle, mut schemes, peers, mut connections) =
541 setup_network_and_peers(&context, &[1, 2]).await;
542
543 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
544
545 let key = Key(2);
546 let mut prod2 = Producer::default();
547 prod2.insert(key.clone(), Bytes::from("data for key 2"));
548
549 let (cons1, mut cons_out1) = consumer();
550
551 let scheme = schemes.remove(0);
552 let mut mailbox1 = setup_and_spawn_actor(
553 &context,
554 oracle.manager(),
555 oracle.control(scheme.public_key()),
556 scheme,
557 connections.remove(0),
558 cons1,
559 Producer::default(),
560 );
561
562 let scheme = schemes.remove(0);
563 let _mailbox2 = setup_and_spawn_actor(
564 &context,
565 oracle.manager(),
566 oracle.control(scheme.public_key()),
567 scheme,
568 connections.remove(0),
569 dummy_consumer(),
570 prod2,
571 );
572
573 mailbox1.fetch(key.clone());
574
575 let (key_actual, value) = cons_out1.recv().await.unwrap();
576 assert_eq!(key_actual, key);
577 assert_eq!(value, Bytes::from("data for key 2"));
578 });
579 }
580
581 #[test_traced]
582 fn test_pending_delivery_does_not_block_engine() {
583 let executor = deterministic::Runner::timed(Duration::from_secs(10));
584 executor.start(|context| async move {
585 let (mut oracle, mut schemes, peers, mut connections) =
586 setup_network_and_peers(&context, &[1, 2, 3]).await;
587
588 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
589 add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
590
591 let key1 = Key(1);
592 let key2 = Key(2);
593 let data1 = Bytes::from("data for key 1");
594 let data2 = Bytes::from("data for key 2");
595
596 let mut prod2 = Producer::default();
597 prod2.insert(key1.clone(), data1.clone());
598
599 let mut prod3 = Producer::default();
600 prod3.insert(key2.clone(), data2.clone());
601
602 let (gate_sender1, gate_receiver1) = oneshot::channel();
603 let (gate_sender2, gate_receiver2) = oneshot::channel();
604 let (cons1, mut cons_out1, mut started) = BlockingConsumer::new(
605 context.child("consumer"),
606 vec![(gate_receiver1, true), (gate_receiver2, true)],
607 );
608
609 let scheme = schemes.remove(0);
610 let mut mailbox1 = setup_and_spawn_actor(
611 &context,
612 oracle.manager(),
613 oracle.control(scheme.public_key()),
614 scheme,
615 connections.remove(0),
616 cons1,
617 Producer::default(),
618 );
619
620 let scheme = schemes.remove(0);
621 let _mailbox2 = setup_and_spawn_actor(
622 &context,
623 oracle.manager(),
624 oracle.control(scheme.public_key()),
625 scheme,
626 connections.remove(0),
627 dummy_consumer(),
628 prod2,
629 );
630
631 let scheme = schemes.remove(0);
632 let _mailbox3 = setup_and_spawn_actor(
633 &context,
634 oracle.manager(),
635 oracle.control(scheme.public_key()),
636 scheme,
637 connections.remove(0),
638 dummy_consumer(),
639 prod3,
640 );
641
642 mailbox1.fetch(key1.clone());
643 let started_key = started.recv().await.expect("delivery did not start");
644 assert_eq!(started_key, key1);
645
646 mailbox1.fetch(key2.clone());
647 select! {
648 started_key = started.recv() => {
649 assert_eq!(started_key.expect("delivery did not start"), key2);
650 },
651 _ = context.sleep(Duration::from_secs(2)) => {
652 panic!("resolver engine blocked on pending delivery");
653 },
654 };
655
656 gate_sender2.send(()).unwrap();
657 let (key_actual, value) = cons_out1.recv().await.expect("consumer channel closed");
658 assert_eq!(key_actual, key2);
659 assert_eq!(value, data2);
660
661 gate_sender1.send(()).unwrap();
662 let (key_actual, value) = cons_out1.recv().await.expect("consumer channel closed");
663 assert_eq!(key_actual, key1);
664 assert_eq!(value, data1);
665 });
666 }
667
668 #[test_traced]
669 fn test_retain_drops_pending_delivery() {
670 let executor = deterministic::Runner::timed(Duration::from_secs(10));
671 executor.start(|context| async move {
672 let (mut oracle, mut schemes, peers, mut connections) =
673 setup_network_and_peers(&context, &[1, 2]).await;
674
675 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
676
677 let key = Key(1);
678 let data = Bytes::from("data for key 1");
679 let mut prod2 = Producer::default();
680 prod2.insert(key.clone(), data.clone());
681
682 let (mut first_gate_sender, first_gate_receiver) = oneshot::channel();
683 let (second_gate_sender, second_gate_receiver) = oneshot::channel();
684 let (cons1, mut cons_out1, mut started) = BlockingConsumer::new(
685 context.child("consumer"),
686 vec![(first_gate_receiver, true), (second_gate_receiver, true)],
687 );
688
689 let scheme = schemes.remove(0);
690 let mut mailbox1 = setup_and_spawn_actor(
691 &context,
692 oracle.manager(),
693 oracle.control(scheme.public_key()),
694 scheme,
695 connections.remove(0),
696 cons1,
697 Producer::default(),
698 );
699
700 let scheme = schemes.remove(0);
701 let _mailbox2 = setup_and_spawn_actor(
702 &context,
703 oracle.manager(),
704 oracle.control(scheme.public_key()),
705 scheme,
706 connections.remove(0),
707 dummy_consumer(),
708 prod2,
709 );
710
711 mailbox1.fetch(key.clone());
712 let started_key = started.recv().await.expect("delivery did not start");
713 assert_eq!(started_key, key);
714
715 let canceled = key.clone();
716 mailbox1.retain(move |key, _| key != &canceled);
717 mailbox1.fetch(key.clone());
718
719 first_gate_sender.closed().await;
720 let started_key = started.recv().await.expect("second delivery did not start");
721 assert_eq!(started_key, key);
722
723 second_gate_sender.send(()).unwrap();
724 let (key_actual, value) = cons_out1.recv().await.expect("consumer channel closed");
725 assert_eq!(key_actual, key);
726 assert_eq!(value, data);
727
728 select! {
729 _ = cons_out1.recv() => panic!("unexpected extra event"),
730 _ = context.sleep(Duration::from_millis(100)) => {},
731 };
732 });
733 }
734
735 #[test_traced]
736 fn test_invalid_delivery_retries_and_rearms_slot() {
737 let executor = deterministic::Runner::timed(Duration::from_secs(10));
738 executor.start(|context| async move {
739 let (mut oracle, mut schemes, peers, mut connections) =
740 setup_network_and_peers(&context, &[1, 2, 3]).await;
741
742 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
743
744 let key = Key(1);
745 let data = Bytes::from("data for key 1");
746
747 let mut prod2 = Producer::default();
748 prod2.insert(key.clone(), data.clone());
749
750 let mut prod3 = Producer::default();
751 prod3.insert(key.clone(), data.clone());
752
753 let (first_gate_sender, first_gate_receiver) = oneshot::channel();
754 let (second_gate_sender, second_gate_receiver) = oneshot::channel();
755 let (cons1, mut cons_out1, mut started) = BlockingConsumer::new(
756 context.child("consumer"),
757 vec![(first_gate_receiver, false), (second_gate_receiver, true)],
758 );
759
760 let scheme = schemes.remove(0);
761 let mut mailbox1 = setup_and_spawn_actor(
762 &context,
763 oracle.manager(),
764 oracle.control(scheme.public_key()),
765 scheme,
766 connections.remove(0),
767 cons1,
768 Producer::default(),
769 );
770
771 let scheme = schemes.remove(0);
772 let _mailbox2 = setup_and_spawn_actor(
773 &context,
774 oracle.manager(),
775 oracle.control(scheme.public_key()),
776 scheme,
777 connections.remove(0),
778 dummy_consumer(),
779 prod2,
780 );
781
782 let scheme = schemes.remove(0);
783 let _mailbox3 = setup_and_spawn_actor(
784 &context,
785 oracle.manager(),
786 oracle.control(scheme.public_key()),
787 scheme,
788 connections.remove(0),
789 dummy_consumer(),
790 prod3,
791 );
792
793 mailbox1.fetch_targeted(
794 key.clone(),
795 non_empty_vec![peers[1].clone(), peers[2].clone()],
796 );
797 let started_key = started.recv().await.expect("delivery did not start");
798 assert_eq!(started_key, key);
799
800 first_gate_sender.send(()).unwrap();
801 wait_for_blocked(&context, &oracle, &peers[0], &peers[1]).await;
802
803 add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
804 oracle.manager().track(
805 1,
806 Set::try_from([peers[0].clone(), peers[2].clone()]).unwrap(),
807 );
808
809 let started_key = started.recv().await.expect("retry delivery did not start");
810 assert_eq!(started_key, key);
811
812 second_gate_sender.send(()).unwrap();
813 let (key_actual, value) = cons_out1.recv().await.expect("consumer channel closed");
814 assert_eq!(key_actual, key);
815 assert_eq!(value, data);
816 });
817 }
818
819 async fn run_pending_invalid_delivery_race(
820 context: &deterministic::Context,
821 validation_first: bool,
822 ) {
823 let (mut oracle, mut schemes, peers, mut connections) =
824 setup_network_and_peers(context, &[1, 2]).await;
825
826 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
827
828 let key = Key(1);
829 let mut prod2 = Producer::default();
830 prod2.insert(key.clone(), Bytes::from("data for key 1"));
831
832 let (mut gate_sender, gate_receiver) = oneshot::channel();
833 let (cons1, mut cons_out1, mut started) =
834 BlockingConsumer::new(context.child("consumer"), vec![(gate_receiver, false)]);
835
836 let scheme = schemes.remove(0);
837 let mut mailbox1 = setup_and_spawn_actor(
838 context,
839 oracle.manager(),
840 oracle.control(scheme.public_key()),
841 scheme,
842 connections.remove(0),
843 cons1,
844 Producer::default(),
845 );
846
847 let scheme = schemes.remove(0);
848 let _mailbox2 = setup_and_spawn_actor(
849 context,
850 oracle.manager(),
851 oracle.control(scheme.public_key()),
852 scheme,
853 connections.remove(0),
854 dummy_consumer(),
855 prod2,
856 );
857
858 mailbox1.fetch(key.clone());
859 let started_key = started.recv().await.expect("delivery did not start");
860 assert_eq!(started_key, key);
861
862 if validation_first {
863 gate_sender.send(()).unwrap();
864 wait_for_blocked(context, &oracle, &peers[0], &peers[1]).await;
865 mailbox1.retain(|_, _| false);
866 let blocked = oracle.blocked().await.unwrap();
867 assert_eq!(blocked.len(), 1);
868 assert_eq!(blocked[0].0, peers[0]);
869 assert_eq!(blocked[0].1, peers[1]);
870 } else {
871 mailbox1.retain(|_, _| false);
872 gate_sender.closed().await;
873 assert!(oracle.blocked().await.unwrap().is_empty());
874 }
875
876 select! {
877 _ = cons_out1.recv() => panic!("unexpected event"),
878 _ = context.sleep(Duration::from_millis(100)) => {},
879 };
880 }
881
882 #[test_traced]
883 fn test_retain_pending_invalid_delivery_race() {
884 let executor = deterministic::Runner::timed(Duration::from_secs(10));
885 executor.start(|context| async move {
886 run_pending_invalid_delivery_race(&context, false).await;
887 run_pending_invalid_delivery_race(&context, true).await;
888 });
889 }
890
891 #[test_traced]
895 fn test_retain_drops_fetch() {
896 let executor = deterministic::Runner::timed(Duration::from_secs(10));
897 executor.start(|context| async move {
898 let (oracle, mut schemes, _peers, mut connections) =
899 setup_network_and_peers(&context, &[1]).await;
900
901 let (cons1, mut cons_out1) = consumer();
902 let prod1 = Producer::default();
903
904 let scheme = schemes.remove(0);
905 let mut mailbox1 = setup_and_spawn_actor(
906 &context,
907 oracle.manager(),
908 oracle.control(scheme.public_key()),
909 scheme,
910 connections.remove(0),
911 cons1,
912 prod1,
913 );
914
915 let key = Key(3);
916 mailbox1.fetch(key.clone());
917 let canceled = key.clone();
918 mailbox1.retain(move |key, _| key != &canceled);
919
920 select! {
921 _ = cons_out1.recv() => panic!("unexpected event"),
922 _ = context.sleep(Duration::from_millis(100)) => {},
923 };
924 });
925 }
926
927 #[test_traced]
932 fn test_peer_no_data() {
933 let executor = deterministic::Runner::timed(Duration::from_secs(10));
934 executor.start(|context| async move {
935 let (mut oracle, mut schemes, peers, mut connections) =
936 setup_network_and_peers(&context, &[1, 2, 3]).await;
937
938 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
939 add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
940
941 let prod1 = Producer::default();
942 let prod2 = Producer::default();
943 let mut prod3 = Producer::default();
944 let key = Key(3);
945 prod3.insert(key.clone(), Bytes::from("data for key 3"));
946
947 let (cons1, mut cons_out1) = consumer();
948
949 let scheme = schemes.remove(0);
950 let mut mailbox1 = setup_and_spawn_actor(
951 &context,
952 oracle.manager(),
953 oracle.control(scheme.public_key()),
954 scheme,
955 connections.remove(0),
956 cons1,
957 prod1,
958 );
959
960 let scheme = schemes.remove(0);
961 let _mailbox2 = setup_and_spawn_actor(
962 &context,
963 oracle.manager(),
964 oracle.control(scheme.public_key()),
965 scheme,
966 connections.remove(0),
967 dummy_consumer(),
968 prod2,
969 );
970
971 let scheme = schemes.remove(0);
972 let _mailbox3 = setup_and_spawn_actor(
973 &context,
974 oracle.manager(),
975 oracle.control(scheme.public_key()),
976 scheme,
977 connections.remove(0),
978 dummy_consumer(),
979 prod3,
980 );
981
982 mailbox1.fetch(key.clone());
983
984 let (key_actual, value) = cons_out1.recv().await.unwrap();
985 assert_eq!(key_actual, key);
986 assert_eq!(value, Bytes::from("data for key 3"));
987 });
988 }
989
990 #[test_traced]
995 fn test_no_peers_available() {
996 let executor = deterministic::Runner::timed(Duration::from_secs(10));
997 executor.start(|context| async move {
998 let (oracle, mut schemes, _peers, mut connections) =
999 setup_network_and_peers(&context, &[1]).await;
1000
1001 let (cons1, mut cons_out1) = consumer();
1002 let prod1 = Producer::default();
1003
1004 let scheme = schemes.remove(0);
1005 let mut mailbox1 = setup_and_spawn_actor(
1006 &context,
1007 oracle.manager(),
1008 oracle.control(scheme.public_key()),
1009 scheme,
1010 connections.remove(0),
1011 cons1,
1012 prod1,
1013 );
1014
1015 mailbox1.fetch(Key(4));
1016 context.sleep(Duration::from_secs(5)).await;
1017
1018 select! {
1020 _ = cons_out1.recv() => panic!("Fetch should have failed due to no peers"),
1021 _ = context.sleep(Duration::from_millis(100)) => {},
1022 };
1023 });
1024 }
1025
1026 #[test_traced]
1029 fn test_fetch_before_initial_peer_set_waits_for_update() {
1030 let executor = deterministic::Runner::timed(Duration::from_secs(10));
1031 executor.start(|context| async move {
1032 let (network, mut oracle) = Network::new(
1033 context.child("network"),
1034 commonware_p2p::simulated::Config {
1035 max_size: 1024 * 1024,
1036 disconnect_on_block: true,
1037 tracked_peer_sets: NZUsize!(1),
1038 },
1039 );
1040 network.start();
1041
1042 let mut schemes = [1_u64, 2]
1043 .into_iter()
1044 .map(PrivateKey::from_seed)
1045 .collect::<Vec<_>>();
1046 schemes.sort_by_key(|s| s.public_key());
1047 let peers: Vec<PublicKey> = schemes.iter().map(|s| s.public_key()).collect();
1048
1049 let mut connections = Vec::new();
1050 for peer in &peers {
1051 let (sender, receiver) = oracle
1052 .control(peer.clone())
1053 .register(0, Quota::per_second(RATE_LIMIT))
1054 .await
1055 .unwrap();
1056 connections.push((sender, receiver));
1057 }
1058
1059 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1060
1061 let key = Key(2);
1062 let mut prod2 = Producer::default();
1063 prod2.insert(key.clone(), Bytes::from("data for key 2"));
1064
1065 let (cons1, mut cons_out1) = consumer();
1066
1067 let scheme = schemes.remove(0);
1068 let mut mailbox1 = setup_and_spawn_actor(
1069 &context,
1070 oracle.manager(),
1071 oracle.control(scheme.public_key()),
1072 scheme,
1073 connections.remove(0),
1074 cons1,
1075 Producer::default(),
1076 );
1077
1078 let scheme = schemes.remove(0);
1079 let _mailbox2 = setup_and_spawn_actor(
1080 &context,
1081 oracle.manager(),
1082 oracle.control(scheme.public_key()),
1083 scheme,
1084 connections.remove(0),
1085 dummy_consumer(),
1086 prod2,
1087 );
1088
1089 mailbox1.fetch(key.clone());
1090
1091 select! {
1092 event = cons_out1.recv() => {
1093 panic!("fetch should wait for the initial peer set, got {event:?}");
1094 },
1095 _ = context.sleep(Duration::from_millis(200)) => {},
1096 };
1097
1098 oracle
1099 .manager()
1100 .track(0, Set::try_from(peers.clone()).unwrap());
1101
1102 let (key_actual, value) = cons_out1.recv().await.unwrap();
1103 assert_eq!(key_actual, key);
1104 assert_eq!(value, Bytes::from("data for key 2"));
1105 });
1106 }
1107
1108 #[test_traced]
1112 fn test_concurrent_fetch_requests() {
1113 let executor = deterministic::Runner::default();
1114 executor.start(|context| async move {
1115 let (mut oracle, mut schemes, peers, mut connections) =
1116 setup_network_and_peers(&context, &[1, 2, 3]).await;
1117
1118 let key2 = Key(2);
1119 let key3 = Key(3);
1120 let mut prod2 = Producer::default();
1121 prod2.insert(key2.clone(), Bytes::from("data for key 2"));
1122 let mut prod3 = Producer::default();
1123 prod3.insert(key3.clone(), Bytes::from("data for key 3"));
1124
1125 let (cons1, mut cons_out1) = consumer();
1126
1127 let scheme = schemes.remove(0);
1128 let mut mailbox1 = setup_and_spawn_actor(
1129 &context,
1130 oracle.manager(),
1131 oracle.control(scheme.public_key()),
1132 scheme,
1133 connections.remove(0),
1134 cons1,
1135 Producer::default(),
1136 );
1137
1138 let scheme = schemes.remove(0);
1139 let _mailbox2 = setup_and_spawn_actor(
1140 &context,
1141 oracle.manager(),
1142 oracle.control(scheme.public_key()),
1143 scheme,
1144 connections.remove(0),
1145 dummy_consumer(),
1146 prod2,
1147 );
1148
1149 let scheme = schemes.remove(0);
1150 let _mailbox3 = setup_and_spawn_actor(
1151 &context,
1152 oracle.manager(),
1153 oracle.control(scheme.public_key()),
1154 scheme,
1155 connections.remove(0),
1156 dummy_consumer(),
1157 prod3,
1158 );
1159
1160 add_link(&mut oracle, LINK_UNRELIABLE.clone(), &peers, 0, 1).await;
1162 add_link(&mut oracle, LINK_UNRELIABLE.clone(), &peers, 0, 2).await;
1163
1164 for _ in 0..10 {
1166 mailbox1.fetch(key2.clone());
1168 mailbox1.fetch(key3.clone());
1169
1170 let mut events = Vec::new();
1172 events.push(cons_out1.recv().await.expect("Consumer channel closed"));
1173 events.push(cons_out1.recv().await.expect("Consumer channel closed"));
1174
1175 let mut found_key2 = false;
1177 let mut found_key3 = false;
1178 for (key_actual, value) in events {
1179 if key_actual == key2 {
1180 assert_eq!(value, Bytes::from("data for key 2"));
1181 found_key2 = true;
1182 } else if key_actual == key3 {
1183 assert_eq!(value, Bytes::from("data for key 3"));
1184 found_key3 = true;
1185 } else {
1186 panic!("Unexpected key received");
1187 }
1188 }
1189 assert!(found_key2 && found_key3,);
1190 }
1191 });
1192 }
1193
1194 #[test_traced]
1197 fn test_retain_drops_key() {
1198 let executor = deterministic::Runner::timed(Duration::from_secs(10));
1199 executor.start(|context| async move {
1200 let (mut oracle, mut schemes, peers, mut connections) =
1201 setup_network_and_peers(&context, &[1, 2]).await;
1202
1203 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1204
1205 let key = Key(6);
1206 let mut prod2 = Producer::default();
1207 prod2.insert(key.clone(), Bytes::from("data for key 6"));
1208
1209 let (cons1, mut cons_out1) = consumer();
1210
1211 let scheme = schemes.remove(0);
1212 let mut mailbox1 = setup_and_spawn_actor(
1213 &context,
1214 oracle.manager(),
1215 oracle.control(scheme.public_key()),
1216 scheme,
1217 connections.remove(0),
1218 cons1,
1219 Producer::default(),
1220 );
1221
1222 let scheme = schemes.remove(0);
1223 let _mailbox2 = setup_and_spawn_actor(
1224 &context,
1225 oracle.manager(),
1226 oracle.control(scheme.public_key()),
1227 scheme,
1228 connections.remove(0),
1229 dummy_consumer(),
1230 prod2,
1231 );
1232
1233 let canceled = key.clone();
1235 mailbox1.retain(move |key, _| key != &canceled);
1236 select! {
1237 _ = cons_out1.recv() => {
1238 panic!("unexpected event");
1239 },
1240 _ = context.sleep(Duration::from_millis(100)) => {},
1241 };
1242
1243 mailbox1.fetch(key.clone());
1245 let (key_actual, value) = cons_out1.recv().await.unwrap();
1246 assert_eq!(key_actual, key);
1247 assert_eq!(value, Bytes::from("data for key 6"));
1248
1249 let canceled = key.clone();
1251 mailbox1.retain(move |key, _| key != &canceled);
1252 select! {
1253 _ = cons_out1.recv() => {
1254 panic!("unexpected event");
1255 },
1256 _ = context.sleep(Duration::from_millis(100)) => {},
1257 };
1258
1259 let key = Key(7);
1261 mailbox1.fetch(key.clone());
1262 let canceled = key.clone();
1263 mailbox1.retain(move |key, _| key != &canceled);
1264
1265 select! {
1267 _ = cons_out1.recv() => panic!("unexpected event"),
1268 _ = context.sleep(Duration::from_millis(100)) => {},
1269 };
1270 });
1271 }
1272
1273 #[test_traced]
1276 fn test_blocking_peer() {
1277 let executor = deterministic::Runner::timed(Duration::from_secs(10));
1278 executor.start(|context| async move {
1279 let (mut oracle, mut schemes, peers, mut connections) =
1280 setup_network_and_peers(&context, &[1, 2, 3]).await;
1281
1282 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1283 add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
1284 add_link(&mut oracle, LINK.clone(), &peers, 1, 2).await;
1285
1286 let key_a = Key(1);
1287 let key_b = Key(2);
1288 let invalid_data_a = Bytes::from("invalid for A");
1289 let valid_data_a = Bytes::from("valid for A");
1290 let valid_data_b = Bytes::from("valid for B");
1291
1292 let mut prod2 = Producer::default();
1294 prod2.insert(key_a.clone(), invalid_data_a.clone());
1295 prod2.insert(key_b.clone(), valid_data_b.clone());
1296
1297 let mut prod3 = Producer::default();
1298 prod3.insert(key_a.clone(), valid_data_a.clone());
1299
1300 let (mut cons1, mut cons_out1) = consumer();
1302 cons1.add_expected(key_a.clone(), valid_data_a.clone());
1303 cons1.add_expected(key_b.clone(), valid_data_b.clone());
1304
1305 let scheme = schemes.remove(0);
1307 let mut mailbox1 = setup_and_spawn_actor(
1308 &context,
1309 oracle.manager(),
1310 oracle.control(scheme.public_key()),
1311 scheme,
1312 connections.remove(0),
1313 cons1,
1314 Producer::default(),
1315 );
1316
1317 let scheme = schemes.remove(0);
1318 let _mailbox2 = setup_and_spawn_actor(
1319 &context,
1320 oracle.manager(),
1321 oracle.control(scheme.public_key()),
1322 scheme,
1323 connections.remove(0),
1324 dummy_consumer(),
1325 prod2,
1326 );
1327
1328 let scheme = schemes.remove(0);
1329 let _mailbox3 = setup_and_spawn_actor(
1330 &context,
1331 oracle.manager(),
1332 oracle.control(scheme.public_key()),
1333 scheme,
1334 connections.remove(0),
1335 dummy_consumer(),
1336 prod3,
1337 );
1338
1339 for _ in 0..20 {
1341 mailbox1.fetch(key_a.clone());
1343
1344 let (key_actual, value) = cons_out1.recv().await.unwrap();
1346 assert_eq!(key_actual, key_a);
1347 assert_eq!(value, valid_data_a);
1348 }
1349
1350 mailbox1.fetch(key_b.clone());
1352
1353 context.sleep(Duration::from_secs(5)).await;
1355
1356 select! {
1358 _ = cons_out1.recv() => panic!("unexpected event"),
1359 _ = context.sleep(Duration::from_millis(100)) => {},
1360 };
1361
1362 let canceled = key_b.clone();
1364 mailbox1.retain(move |key, _| key != &canceled);
1365
1366 let blocked = oracle.blocked().await.unwrap();
1368 assert_eq!(blocked.len(), 1);
1369 assert_eq!(blocked[0].0, peers[0]);
1370 assert_eq!(blocked[0].1, peers[1]);
1371 });
1372 }
1373
1374 #[test_traced]
1378 fn test_duplicate_fetch_key() {
1379 let executor = deterministic::Runner::timed(Duration::from_secs(10));
1380 executor.start(|context| async move {
1381 let (mut oracle, mut schemes, peers, mut connections) =
1382 setup_network_and_peers(&context, &[1, 2]).await;
1383
1384 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1385
1386 let key = Key(5);
1387 let mut prod2 = Producer::default();
1388 prod2.insert(key.clone(), Bytes::from("data for key 5"));
1389
1390 let (cons1, mut cons_out1) = consumer();
1391
1392 let scheme = schemes.remove(0);
1393 let mut mailbox1 = setup_and_spawn_actor(
1394 &context,
1395 oracle.manager(),
1396 oracle.control(scheme.public_key()),
1397 scheme,
1398 connections.remove(0),
1399 cons1,
1400 Producer::default(),
1401 );
1402
1403 let scheme = schemes.remove(0);
1404 let _mailbox2 = setup_and_spawn_actor(
1405 &context,
1406 oracle.manager(),
1407 oracle.control(scheme.public_key()),
1408 scheme,
1409 connections.remove(0),
1410 dummy_consumer(),
1411 prod2,
1412 );
1413
1414 mailbox1.fetch(key.clone());
1416 mailbox1.fetch(key.clone());
1417
1418 let (key_actual, value) = cons_out1.recv().await.unwrap();
1420 assert_eq!(key_actual, key);
1421 assert_eq!(value, Bytes::from("data for key 5"));
1422
1423 select! {
1425 _ = cons_out1.recv() => {
1426 panic!("Unexpected second event received for duplicate fetch");
1427 },
1428 _ = context.sleep(Duration::from_millis(500)) => {
1429 },
1431 };
1432 });
1433 }
1434
1435 #[test_traced]
1439 fn test_changing_peer_sets() {
1440 let executor = deterministic::Runner::timed(Duration::from_secs(10));
1441 executor.start(|context| async move {
1442 let (mut oracle, mut schemes, peers, mut connections) =
1443 setup_network_and_peers(&context, &[1, 2, 3]).await;
1444
1445 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1446 add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
1447
1448 let key1 = Key(1);
1449 let key2 = Key(2);
1450
1451 let mut prod2 = Producer::default();
1452 prod2.insert(key1.clone(), Bytes::from("data from peer 2"));
1453
1454 let mut prod3 = Producer::default();
1455 prod3.insert(key2.clone(), Bytes::from("data from peer 3"));
1456
1457 let (cons1, mut cons_out1) = consumer();
1458
1459 let scheme = schemes.remove(0);
1460 let mut mailbox1 = setup_and_spawn_actor(
1461 &context,
1462 oracle.manager(),
1463 oracle.control(scheme.public_key()),
1464 scheme,
1465 connections.remove(0),
1466 cons1,
1467 Producer::default(),
1468 );
1469
1470 let scheme = schemes.remove(0);
1471 let _mailbox2 = setup_and_spawn_actor(
1472 &context,
1473 oracle.manager(),
1474 oracle.control(scheme.public_key()),
1475 scheme,
1476 connections.remove(0),
1477 dummy_consumer(),
1478 prod2,
1479 );
1480
1481 mailbox1.fetch(key1.clone());
1483
1484 let (key_actual, value) = cons_out1.recv().await.unwrap();
1486 assert_eq!(key_actual, key1);
1487 assert_eq!(value, Bytes::from("data from peer 2"));
1488
1489 let scheme = schemes.remove(0);
1491 let _mailbox3 = setup_and_spawn_actor(
1492 &context,
1493 oracle.manager(),
1494 oracle.control(scheme.public_key()),
1495 scheme,
1496 connections.remove(0),
1497 dummy_consumer(),
1498 prod3,
1499 );
1500
1501 context.sleep(Duration::from_millis(200)).await;
1503
1504 mailbox1.fetch(key2.clone());
1506
1507 let (key_actual, value) = cons_out1.recv().await.unwrap();
1509 assert_eq!(key_actual, key2);
1510 assert_eq!(value, Bytes::from("data from peer 3"));
1511 });
1512 }
1513
1514 #[test_traced]
1515 fn test_fetch_targeted() {
1516 let executor = deterministic::Runner::timed(Duration::from_secs(10));
1517 executor.start(|context| async move {
1518 let (mut oracle, mut schemes, peers, mut connections) =
1519 setup_network_and_peers(&context, &[1, 2, 3]).await;
1520
1521 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1522 add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
1523
1524 let key = Key(1);
1525 let invalid_data = Bytes::from("invalid data");
1526 let valid_data = Bytes::from("valid data");
1527
1528 let mut prod2 = Producer::default();
1530 prod2.insert(key.clone(), invalid_data.clone());
1531
1532 let mut prod3 = Producer::default();
1533 prod3.insert(key.clone(), valid_data.clone());
1534
1535 let (mut cons1, mut cons_out1) = consumer();
1537 cons1.add_expected(key.clone(), valid_data.clone());
1538
1539 let scheme = schemes.remove(0);
1540 let mut mailbox1 = setup_and_spawn_actor(
1541 &context,
1542 oracle.manager(),
1543 oracle.control(scheme.public_key()),
1544 scheme,
1545 connections.remove(0),
1546 cons1,
1547 Producer::default(),
1548 );
1549
1550 let scheme = schemes.remove(0);
1551 let _mailbox2 = setup_and_spawn_actor(
1552 &context,
1553 oracle.manager(),
1554 oracle.control(scheme.public_key()),
1555 scheme,
1556 connections.remove(0),
1557 dummy_consumer(),
1558 prod2,
1559 );
1560
1561 let scheme = schemes.remove(0);
1562 let _mailbox3 = setup_and_spawn_actor(
1563 &context,
1564 oracle.manager(),
1565 oracle.control(scheme.public_key()),
1566 scheme,
1567 connections.remove(0),
1568 dummy_consumer(),
1569 prod3,
1570 );
1571
1572 context.sleep(Duration::from_millis(100)).await;
1574
1575 mailbox1.fetch_targeted(
1579 key.clone(),
1580 non_empty_vec![peers[1].clone(), peers[2].clone()],
1581 );
1582
1583 let (key_actual, value) = cons_out1.recv().await.unwrap();
1585 assert_eq!(key_actual, key);
1586 assert_eq!(value, valid_data);
1587
1588 let blocked = oracle.blocked().await.unwrap();
1590 assert_eq!(blocked.len(), 1);
1591 assert_eq!(blocked[0].0, peers[0]);
1592 assert_eq!(blocked[0].1, peers[1]);
1593
1594 let metrics = context.encode();
1596 assert_eq!(
1597 status_metric_total(&metrics, "actor_fetch_total", "Success"),
1598 1
1599 );
1600 });
1601 }
1602
1603 #[test_traced]
1604 fn test_fetch_targeted_no_fallback() {
1605 let executor = deterministic::Runner::timed(Duration::from_secs(10));
1606 executor.start(|context| async move {
1607 let (mut oracle, mut schemes, peers, mut connections) =
1608 setup_network_and_peers(&context, &[1, 2, 3, 4]).await;
1609
1610 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1611 add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
1612 add_link(&mut oracle, LINK.clone(), &peers, 0, 3).await;
1613
1614 let key = Key(1);
1615
1616 let mut prod4 = Producer::default();
1618 prod4.insert(key.clone(), Bytes::from("data from peer 4"));
1619
1620 let (cons1, mut cons_out1) = consumer();
1621
1622 let scheme = schemes.remove(0);
1623 let mut mailbox1 = setup_and_spawn_actor(
1624 &context,
1625 oracle.manager(),
1626 oracle.control(scheme.public_key()),
1627 scheme,
1628 connections.remove(0),
1629 cons1,
1630 Producer::default(),
1631 );
1632
1633 let scheme = schemes.remove(0);
1634 let _mailbox2 = setup_and_spawn_actor(
1635 &context,
1636 oracle.manager(),
1637 oracle.control(scheme.public_key()),
1638 scheme,
1639 connections.remove(0),
1640 dummy_consumer(),
1641 Producer::default(), );
1643
1644 let scheme = schemes.remove(0);
1645 let _mailbox3 = setup_and_spawn_actor(
1646 &context,
1647 oracle.manager(),
1648 oracle.control(scheme.public_key()),
1649 scheme,
1650 connections.remove(0),
1651 dummy_consumer(),
1652 Producer::default(), );
1654
1655 let scheme = schemes.remove(0);
1656 let _mailbox4 = setup_and_spawn_actor(
1657 &context,
1658 oracle.manager(),
1659 oracle.control(scheme.public_key()),
1660 scheme,
1661 connections.remove(0),
1662 dummy_consumer(),
1663 prod4,
1664 );
1665
1666 context.sleep(Duration::from_millis(100)).await;
1668
1669 mailbox1.fetch_targeted(
1672 key.clone(),
1673 non_empty_vec![peers[1].clone(), peers[2].clone()],
1674 );
1675
1676 select! {
1679 event = cons_out1.recv() => {
1680 panic!("Fetch should not succeed, but got: {event:?}");
1681 },
1682 _ = context.sleep(Duration::from_secs(3)) => {
1683 },
1685 };
1686 });
1687 }
1688
1689 #[test_traced]
1690 fn test_fetch_all_targeted() {
1691 let executor = deterministic::Runner::timed(Duration::from_secs(10));
1692 executor.start(|context| async move {
1693 let (mut oracle, mut schemes, peers, mut connections) =
1694 setup_network_and_peers(&context, &[1, 2, 3, 4]).await;
1695
1696 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1697 add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
1698 add_link(&mut oracle, LINK.clone(), &peers, 0, 3).await;
1699
1700 let key1 = Key(1);
1701 let key2 = Key(2);
1702 let key3 = Key(3);
1703
1704 let mut prod2 = Producer::default();
1706 prod2.insert(key1.clone(), Bytes::from("data for key 1"));
1707
1708 let mut prod3 = Producer::default();
1710 prod3.insert(key3.clone(), Bytes::from("data for key 3"));
1711
1712 let mut prod4 = Producer::default();
1714 prod4.insert(key2.clone(), Bytes::from("data for key 2"));
1715
1716 let (mut cons1, mut cons_out1) = consumer();
1718 cons1.add_expected(key1.clone(), Bytes::from("data for key 1"));
1719 cons1.add_expected(key2.clone(), Bytes::from("data for key 2"));
1720 cons1.add_expected(key3.clone(), Bytes::from("data for key 3"));
1721
1722 let scheme = schemes.remove(0);
1723 let mut mailbox1 = setup_and_spawn_actor(
1724 &context,
1725 oracle.manager(),
1726 oracle.control(scheme.public_key()),
1727 scheme,
1728 connections.remove(0),
1729 cons1,
1730 Producer::default(),
1731 );
1732
1733 let scheme = schemes.remove(0);
1734 let _mailbox2 = setup_and_spawn_actor(
1735 &context,
1736 oracle.manager(),
1737 oracle.control(scheme.public_key()),
1738 scheme,
1739 connections.remove(0),
1740 dummy_consumer(),
1741 prod2,
1742 );
1743
1744 let scheme = schemes.remove(0);
1745 let _mailbox3 = setup_and_spawn_actor(
1746 &context,
1747 oracle.manager(),
1748 oracle.control(scheme.public_key()),
1749 scheme,
1750 connections.remove(0),
1751 dummy_consumer(),
1752 prod3,
1753 );
1754
1755 let scheme = schemes.remove(0);
1756 let _mailbox4 = setup_and_spawn_actor(
1757 &context,
1758 oracle.manager(),
1759 oracle.control(scheme.public_key()),
1760 scheme,
1761 connections.remove(0),
1762 dummy_consumer(),
1763 prod4,
1764 );
1765
1766 context.sleep(Duration::from_millis(100)).await;
1768
1769 mailbox1.fetch_all_targeted(vec![
1774 (key1.clone(), non_empty_vec![peers[1].clone()]), (key2.clone(), non_empty_vec![peers[3].clone()]), ]);
1777 mailbox1.fetch(key3.clone()); let mut results = HashMap::new();
1781 for _ in 0..3 {
1782 let (key, value) = cons_out1.recv().await.unwrap();
1783 results.insert(key, value);
1784 }
1785
1786 assert_eq!(results.len(), 3);
1788 assert_eq!(results.get(&key1).unwrap(), &Bytes::from("data for key 1"));
1789 assert_eq!(results.get(&key2).unwrap(), &Bytes::from("data for key 2"));
1790 assert_eq!(results.get(&key3).unwrap(), &Bytes::from("data for key 3"));
1791
1792 let metrics = context.encode();
1794 assert_eq!(
1795 status_metric_total(&metrics, "actor_fetch_total", "Success"),
1796 3
1797 );
1798 });
1799 }
1800
1801 #[test_traced]
1804 fn test_fetch_clears_targets() {
1805 let executor = deterministic::Runner::timed(Duration::from_secs(10));
1806 executor.start(|context| async move {
1807 let (mut oracle, mut schemes, peers, mut connections) =
1808 setup_network_and_peers(&context, &[1, 2, 3]).await;
1809
1810 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1811 add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
1812
1813 let key = Key(1);
1814 let valid_data = Bytes::from("valid data");
1815
1816 let mut prod3 = Producer::default();
1818 prod3.insert(key.clone(), valid_data.clone());
1819
1820 let (cons1, mut cons_out1) = consumer();
1821
1822 let scheme = schemes.remove(0);
1823 let mut mailbox1 = setup_and_spawn_actor(
1824 &context,
1825 oracle.manager(),
1826 oracle.control(scheme.public_key()),
1827 scheme,
1828 connections.remove(0),
1829 cons1,
1830 Producer::default(),
1831 );
1832
1833 let scheme = schemes.remove(0);
1834 let _mailbox2 = setup_and_spawn_actor(
1835 &context,
1836 oracle.manager(),
1837 oracle.control(scheme.public_key()),
1838 scheme,
1839 connections.remove(0),
1840 dummy_consumer(),
1841 Producer::default(), );
1843
1844 let scheme = schemes.remove(0);
1845 let _mailbox3 = setup_and_spawn_actor(
1846 &context,
1847 oracle.manager(),
1848 oracle.control(scheme.public_key()),
1849 scheme,
1850 connections.remove(0),
1851 dummy_consumer(),
1852 prod3,
1853 );
1854
1855 context.sleep(Duration::from_millis(100)).await;
1857
1858 mailbox1.fetch_targeted(key.clone(), non_empty_vec![peers[1].clone()]);
1860
1861 context.sleep(Duration::from_millis(500)).await;
1863
1864 mailbox1.fetch(key.clone());
1866
1867 let (key_actual, value) = cons_out1.recv().await.unwrap();
1869 assert_eq!(key_actual, key);
1870 assert_eq!(value, valid_data);
1871 });
1872 }
1873
1874 #[test_traced]
1875 fn test_fetch_targeted_does_not_restrict_all() {
1876 let executor = deterministic::Runner::timed(Duration::from_secs(10));
1877 executor.start(|context| async move {
1878 let (mut oracle, mut schemes, peers, mut connections) =
1879 setup_network_and_peers(&context, &[1, 2, 3]).await;
1880
1881 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1882 add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
1883
1884 let key = Key(1);
1885 let valid_data = Bytes::from("valid data");
1886
1887 let mut prod3 = Producer::default();
1889 prod3.insert(key.clone(), valid_data.clone());
1890
1891 let (cons1, mut cons_out1) = consumer();
1892
1893 let scheme = schemes.remove(0);
1894 let mut mailbox1 = setup_and_spawn_actor(
1895 &context,
1896 oracle.manager(),
1897 oracle.control(scheme.public_key()),
1898 scheme,
1899 connections.remove(0),
1900 cons1,
1901 Producer::default(),
1902 );
1903
1904 let scheme = schemes.remove(0);
1905 let _mailbox2 = setup_and_spawn_actor(
1906 &context,
1907 oracle.manager(),
1908 oracle.control(scheme.public_key()),
1909 scheme,
1910 connections.remove(0),
1911 dummy_consumer(),
1912 Producer::default(), );
1914
1915 let scheme = schemes.remove(0);
1916 let _mailbox3 = setup_and_spawn_actor(
1917 &context,
1918 oracle.manager(),
1919 oracle.control(scheme.public_key()),
1920 scheme,
1921 connections.remove(0),
1922 dummy_consumer(),
1923 prod3,
1924 );
1925
1926 context.sleep(Duration::from_millis(100)).await;
1928
1929 mailbox1.fetch(key.clone());
1931
1932 context.sleep(Duration::from_millis(50)).await;
1934
1935 mailbox1.fetch_targeted(key.clone(), non_empty_vec![peers[1].clone()]);
1938
1939 let (key_actual, value) = cons_out1.recv().await.unwrap();
1942 assert_eq!(key_actual, key);
1943 assert_eq!(value, valid_data);
1944 });
1945 }
1946
1947 #[test_traced]
1948 fn test_retain() {
1949 let executor = deterministic::Runner::timed(Duration::from_secs(10));
1950 executor.start(|context| async move {
1951 let (mut oracle, mut schemes, peers, mut connections) =
1952 setup_network_and_peers(&context, &[1, 2]).await;
1953
1954 let key = Key(5);
1955 let mut prod2 = Producer::default();
1956 prod2.insert(key.clone(), Bytes::from("data for key 5"));
1957
1958 let (cons1, mut cons_out1) = consumer();
1959
1960 let scheme = schemes.remove(0);
1961 let mut mailbox1 = setup_and_spawn_actor(
1962 &context,
1963 oracle.manager(),
1964 oracle.control(scheme.public_key()),
1965 scheme,
1966 connections.remove(0),
1967 cons1,
1968 Producer::default(),
1969 );
1970
1971 let scheme = schemes.remove(0);
1972 let _mailbox2 = setup_and_spawn_actor(
1973 &context,
1974 oracle.manager(),
1975 oracle.control(scheme.public_key()),
1976 scheme,
1977 connections.remove(0),
1978 dummy_consumer(),
1979 prod2,
1980 );
1981
1982 mailbox1.retain(|_, _| true);
1984 select! {
1985 _ = cons_out1.recv() => {
1986 panic!("unexpected event");
1987 },
1988 _ = context.sleep(Duration::from_millis(100)) => {},
1989 };
1990
1991 mailbox1.fetch(key.clone());
1993
1994 let key_clone = key.clone();
1997 mailbox1.retain(move |key, _| key != &key_clone);
1998
1999 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
2001
2002 mailbox1.fetch(key.clone());
2005
2006 let (key_actual, value) = cons_out1.recv().await.unwrap();
2008 assert_eq!(key_actual, key);
2009 assert_eq!(value, Bytes::from("data for key 5"));
2010 });
2011 }
2012
2013 #[test_traced]
2014 fn test_retain_uses_subscribers() {
2015 let executor = deterministic::Runner::timed(Duration::from_secs(10));
2016 executor.start(|context| async move {
2017 let (mut oracle, mut schemes, peers, mut connections) =
2018 setup_network_and_peers(&context, &[1, 2]).await;
2019
2020 let key = Key(5);
2021 let mut prod2 = Producer::default();
2022 prod2.insert(key.clone(), Bytes::from("data for key 5"));
2023
2024 let (cons1, mut cons_out1): (Consumer<Key, Bytes, SubscriberTag>, _) = Consumer::new();
2025
2026 let scheme = schemes.remove(0);
2027 let mut mailbox1 = setup_and_spawn_actor(
2028 &context,
2029 oracle.manager(),
2030 oracle.control(scheme.public_key()),
2031 scheme,
2032 connections.remove(0),
2033 cons1,
2034 Producer::default(),
2035 );
2036
2037 let scheme = schemes.remove(0);
2038 let _mailbox2 = setup_and_spawn_actor(
2039 &context,
2040 oracle.manager(),
2041 oracle.control(scheme.public_key()),
2042 scheme,
2043 connections.remove(0),
2044 dummy_consumer(),
2045 prod2,
2046 );
2047
2048 let dropped_subscriber = SubscriberTag(50);
2049 let kept_subscriber = SubscriberTag(51);
2050 mailbox1.fetch(Fetch {
2051 key: key.clone(),
2052 subscriber: dropped_subscriber,
2053 });
2054 mailbox1.fetch(Fetch {
2055 key: key.clone(),
2056 subscriber: kept_subscriber.clone(),
2057 });
2058
2059 context.sleep(Duration::from_millis(100)).await;
2060 mailbox1.retain(move |_, subscriber| subscriber == &kept_subscriber);
2061 context.sleep(Duration::from_millis(100)).await;
2062
2063 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
2064
2065 let (key_actual, value) = cons_out1.recv().await.unwrap();
2066 assert_eq!(key_actual, key);
2067 assert_eq!(value, Bytes::from("data for key 5"));
2068 });
2069 }
2070
2071 #[test_traced]
2072 fn test_deliver_receives_subscribers() {
2073 let executor = deterministic::Runner::timed(Duration::from_secs(10));
2074 executor.start(|context| async move {
2075 let (mut oracle, mut schemes, peers, mut connections) =
2076 setup_network_and_peers(&context, &[1, 2]).await;
2077
2078 let key = Key(5);
2079 let mut prod2 = Producer::default();
2080 prod2.insert(key.clone(), Bytes::from("data for key 5"));
2081
2082 let (cons1, mut cons_out1) = SubscriberRecordingConsumer::new();
2083
2084 let scheme = schemes.remove(0);
2085 let mut mailbox1 = setup_and_spawn_actor(
2086 &context,
2087 oracle.manager(),
2088 oracle.control(scheme.public_key()),
2089 scheme,
2090 connections.remove(0),
2091 cons1,
2092 Producer::default(),
2093 );
2094
2095 let scheme = schemes.remove(0);
2096 let _mailbox2 = setup_and_spawn_actor(
2097 &context,
2098 oracle.manager(),
2099 oracle.control(scheme.public_key()),
2100 scheme,
2101 connections.remove(0),
2102 dummy_consumer(),
2103 prod2,
2104 );
2105
2106 let first_subscriber = SubscriberTag(50);
2107 let second_subscriber = SubscriberTag(51);
2108 mailbox1.fetch(Fetch {
2109 key: key.clone(),
2110 subscriber: second_subscriber.clone(),
2111 });
2112 mailbox1.fetch(Fetch {
2113 key: key.clone(),
2114 subscriber: first_subscriber.clone(),
2115 });
2116
2117 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
2118
2119 let (delivery, value) = cons_out1.recv().await.unwrap();
2120 assert_eq!(
2121 delivery,
2122 Delivery {
2123 key,
2124 subscribers: non_empty_vec![first_subscriber, second_subscriber],
2125 }
2126 );
2127 assert_eq!(value, Bytes::from("data for key 5"));
2128 });
2129 }
2130
2131 #[test_traced]
2132 fn test_deliver_receives_multiple_subscribers() {
2133 let executor = deterministic::Runner::timed(Duration::from_secs(10));
2134 executor.start(|context| async move {
2135 let (mut oracle, mut schemes, peers, mut connections) =
2136 setup_network_and_peers(&context, &[1, 2]).await;
2137
2138 let key = Key(5);
2139 let mut prod2 = Producer::default();
2140 prod2.insert(key.clone(), Bytes::from("data for key 5"));
2141
2142 let (cons1, mut cons_out1) = SubscriberRecordingConsumer::new();
2143
2144 let scheme = schemes.remove(0);
2145 let mut mailbox1 = setup_and_spawn_actor(
2146 &context,
2147 oracle.manager(),
2148 oracle.control(scheme.public_key()),
2149 scheme,
2150 connections.remove(0),
2151 cons1,
2152 Producer::default(),
2153 );
2154
2155 let scheme = schemes.remove(0);
2156 let _mailbox2 = setup_and_spawn_actor(
2157 &context,
2158 oracle.manager(),
2159 oracle.control(scheme.public_key()),
2160 scheme,
2161 connections.remove(0),
2162 dummy_consumer(),
2163 prod2,
2164 );
2165
2166 let first_subscriber = SubscriberTag(49);
2167 let second_subscriber = SubscriberTag(50);
2168 mailbox1.fetch(Fetch {
2169 key: key.clone(),
2170 subscriber: first_subscriber.clone(),
2171 });
2172 mailbox1.fetch(Fetch {
2173 key: key.clone(),
2174 subscriber: second_subscriber.clone(),
2175 });
2176
2177 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
2178
2179 let (delivery, value) = cons_out1.recv().await.unwrap();
2180 assert_eq!(
2181 delivery,
2182 Delivery {
2183 key: key.clone(),
2184 subscribers: non_empty_vec![first_subscriber, second_subscriber],
2185 }
2186 );
2187 assert_eq!(value, Bytes::from("data for key 5"));
2188 });
2189 }
2190
2191 #[test_traced]
2192 fn test_fetch_during_validation_reuses_response_after_success() {
2193 let executor = deterministic::Runner::timed(Duration::from_secs(10));
2194 executor.start(|context| async move {
2195 let (mut oracle, mut schemes, peers, mut connections) =
2196 setup_network_and_peers(&context, &[1, 2]).await;
2197
2198 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
2199
2200 let key = Key(5);
2201 let first_response = Bytes::from("data for key 5");
2202 let second_response = Bytes::from("refetched data for key 5");
2203 let mut prod2 = SequencedProducer::default();
2204 prod2.insert(
2205 key.clone(),
2206 [first_response.clone(), second_response.clone()],
2207 );
2208 let prod2_observer = prod2.clone();
2209
2210 let (first_gate_sender, first_gate_receiver) = oneshot::channel();
2211 let (second_gate_sender, second_gate_receiver) = oneshot::channel();
2212 let (cons1, mut deliveries, mut started) = BlockingSubscriberRecordingConsumer::new(
2213 context.child("consumer"),
2214 vec![(first_gate_receiver, true), (second_gate_receiver, true)],
2215 );
2216
2217 let scheme = schemes.remove(0);
2218 let mut mailbox1 = setup_and_spawn_actor(
2219 &context,
2220 oracle.manager(),
2221 oracle.control(scheme.public_key()),
2222 scheme,
2223 connections.remove(0),
2224 cons1,
2225 Producer::default(),
2226 );
2227
2228 let scheme = schemes.remove(0);
2229 let _mailbox2 = setup_and_spawn_actor_with_producer(
2230 &context,
2231 oracle.manager(),
2232 oracle.control(scheme.public_key()),
2233 scheme,
2234 connections.remove(0),
2235 dummy_consumer(),
2236 prod2,
2237 );
2238
2239 let first_subscriber = SubscriberTag(49);
2240 let second_subscriber = SubscriberTag(50);
2241 mailbox1.fetch(Fetch {
2242 key: key.clone(),
2243 subscriber: first_subscriber.clone(),
2244 });
2245
2246 let delivery = started.recv().await.expect("delivery did not start");
2247 assert_eq!(
2248 delivery,
2249 Delivery {
2250 key: key.clone(),
2251 subscribers: non_empty_vec![first_subscriber.clone()],
2252 }
2253 );
2254
2255 mailbox1.fetch(Fetch {
2256 key: key.clone(),
2257 subscriber: second_subscriber.clone(),
2258 });
2259 context.sleep(Duration::from_millis(100)).await;
2260 assert_eq!(
2261 prod2_observer.remaining(&key),
2262 vec![second_response.clone()]
2263 );
2264
2265 first_gate_sender.send(()).unwrap();
2266 let (delivery, value) = deliveries.recv().await.expect("consumer channel closed");
2267 assert_eq!(
2268 delivery,
2269 Delivery {
2270 key: key.clone(),
2271 subscribers: non_empty_vec![first_subscriber],
2272 }
2273 );
2274 assert_eq!(value, first_response);
2275
2276 let delivery = select! {
2277 delivery = started.recv() => delivery.expect("second delivery did not start"),
2278 _ = context.sleep(Duration::from_secs(2)) => {
2279 panic!("late subscriber was not delivered");
2280 },
2281 };
2282 assert_eq!(
2283 delivery,
2284 Delivery {
2285 key: key.clone(),
2286 subscribers: non_empty_vec![second_subscriber.clone()],
2287 }
2288 );
2289
2290 second_gate_sender.send(()).unwrap();
2291 let (delivery, value) = deliveries.recv().await.expect("consumer channel closed");
2292 assert_eq!(
2293 delivery,
2294 Delivery {
2295 key: key.clone(),
2296 subscribers: non_empty_vec![second_subscriber],
2297 }
2298 );
2299 assert_eq!(value, first_response);
2300 assert_eq!(prod2_observer.remaining(&key), vec![second_response]);
2301 });
2302 }
2303
2304 #[test_traced]
2305 fn test_late_subscriber_delivery_ignores_unrelated_waiter() {
2306 let executor = deterministic::Runner::timed(Duration::from_secs(10));
2307 executor.start(|context| async move {
2308 let (mut oracle, mut schemes, peers, mut connections) =
2309 setup_network_and_peers(&context, &[1, 2, 3]).await;
2310
2311 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
2312 add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
2313
2314 let blocked_key = Key(4);
2315 let waiting_key = Key(5);
2316 let main_key = Key(6);
2317 let data = Bytes::from("data for key 6");
2318
2319 let mut prod2 = Producer::default();
2320 prod2.insert(blocked_key.clone(), Bytes::from("bad data"));
2321
2322 let mut prod3 = Producer::default();
2323 prod3.insert(main_key.clone(), data.clone());
2324
2325 let (first_gate_sender, first_gate_receiver) = oneshot::channel();
2326 let (second_gate_sender, second_gate_receiver) = oneshot::channel();
2327 let (cons1, mut deliveries, mut started) = BlockingSubscriberRecordingConsumer::new(
2328 context.child("consumer"),
2329 vec![(first_gate_receiver, false), (second_gate_receiver, true)],
2330 );
2331
2332 let scheme = schemes.remove(0);
2333 let mut mailbox1 = setup_and_spawn_actor(
2334 &context,
2335 oracle.manager(),
2336 oracle.control(scheme.public_key()),
2337 scheme,
2338 connections.remove(0),
2339 cons1,
2340 Producer::default(),
2341 );
2342
2343 let scheme = schemes.remove(0);
2344 let _mailbox2 = setup_and_spawn_actor(
2345 &context,
2346 oracle.manager(),
2347 oracle.control(scheme.public_key()),
2348 scheme,
2349 connections.remove(0),
2350 dummy_consumer(),
2351 prod2,
2352 );
2353
2354 let scheme = schemes.remove(0);
2355 let _mailbox3 = setup_and_spawn_actor(
2356 &context,
2357 oracle.manager(),
2358 oracle.control(scheme.public_key()),
2359 scheme,
2360 connections.remove(0),
2361 dummy_consumer(),
2362 prod3,
2363 );
2364
2365 mailbox1.fetch(Fetch {
2366 key: blocked_key.clone(),
2367 subscriber: SubscriberTag(1),
2368 });
2369 started
2370 .recv()
2371 .await
2372 .expect("blocking delivery did not start");
2373 first_gate_sender.send(()).unwrap();
2374 wait_for_blocked(&context, &oracle, &peers[0], &peers[1]).await;
2375
2376 mailbox1.fetch_targeted(
2377 Fetch {
2378 key: waiting_key,
2379 subscriber: SubscriberTag(2),
2380 },
2381 non_empty_vec![peers[1].clone()],
2382 );
2383 context.sleep(Duration::from_millis(100)).await;
2384
2385 let first_subscriber = SubscriberTag(3);
2386 let second_subscriber = SubscriberTag(4);
2387 mailbox1.fetch(Fetch {
2388 key: main_key.clone(),
2389 subscriber: first_subscriber.clone(),
2390 });
2391
2392 let delivery = started.recv().await.expect("delivery did not start");
2393 assert_eq!(
2394 delivery,
2395 Delivery {
2396 key: main_key.clone(),
2397 subscribers: non_empty_vec![first_subscriber.clone()],
2398 }
2399 );
2400
2401 mailbox1.fetch(Fetch {
2402 key: main_key.clone(),
2403 subscriber: second_subscriber.clone(),
2404 });
2405 context.sleep(Duration::from_millis(100)).await;
2406
2407 second_gate_sender.send(()).unwrap();
2408 let (delivery, value) = deliveries.recv().await.expect("consumer channel closed");
2409 assert_eq!(
2410 delivery,
2411 Delivery {
2412 key: main_key.clone(),
2413 subscribers: non_empty_vec![first_subscriber],
2414 }
2415 );
2416 assert_eq!(value, data);
2417
2418 let delivery = select! {
2419 delivery = started.recv() => delivery.expect("second delivery did not start"),
2420 _ = context.sleep(Duration::from_secs(2)) => {
2421 panic!("late subscriber was not delivered while an unrelated waiter was armed");
2422 },
2423 };
2424 assert_eq!(
2425 delivery,
2426 Delivery {
2427 key: main_key,
2428 subscribers: non_empty_vec![second_subscriber],
2429 }
2430 );
2431 });
2432 }
2433
2434 #[test_traced]
2435 fn test_deliver_receives_distinct_subscriber_type() {
2436 let executor = deterministic::Runner::timed(Duration::from_secs(10));
2437 executor.start(|context| async move {
2438 let (mut oracle, mut schemes, peers, mut connections) =
2439 setup_network_and_peers(&context, &[1, 2]).await;
2440
2441 let key = Key(5);
2442 let mut prod2 = Producer::default();
2443 prod2.insert(key.clone(), Bytes::from("data for key 5"));
2444
2445 let (cons1, mut cons_out1) = SubscriberRecordingConsumer::new();
2446
2447 let scheme = schemes.remove(0);
2448 let mut mailbox1 = setup_and_spawn_actor(
2449 &context,
2450 oracle.manager(),
2451 oracle.control(scheme.public_key()),
2452 scheme,
2453 connections.remove(0),
2454 cons1,
2455 Producer::default(),
2456 );
2457
2458 let scheme = schemes.remove(0);
2459 let _mailbox2 = setup_and_spawn_actor(
2460 &context,
2461 oracle.manager(),
2462 oracle.control(scheme.public_key()),
2463 scheme,
2464 connections.remove(0),
2465 dummy_consumer(),
2466 prod2,
2467 );
2468
2469 let subscriber = SubscriberTag(50);
2470 let retained = subscriber.clone();
2471 mailbox1.fetch(Fetch {
2472 key: key.clone(),
2473 subscriber: subscriber.clone(),
2474 });
2475
2476 context.sleep(Duration::from_millis(100)).await;
2477 mailbox1.retain(move |_, subscriber| subscriber == &retained);
2478 context.sleep(Duration::from_millis(100)).await;
2479
2480 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
2481
2482 let (delivery, value) = cons_out1.recv().await.unwrap();
2483 assert_eq!(
2484 delivery,
2485 Delivery {
2486 key,
2487 subscribers: non_empty_vec![subscriber],
2488 }
2489 );
2490 assert_eq!(value, Bytes::from("data for key 5"));
2491 });
2492 }
2493
2494 #[test_traced]
2495 fn test_deliver_receives_single_subscriber() {
2496 let executor = deterministic::Runner::timed(Duration::from_secs(10));
2497 executor.start(|context| async move {
2498 let (mut oracle, mut schemes, peers, mut connections) =
2499 setup_network_and_peers(&context, &[1, 2]).await;
2500
2501 let key = Key(5);
2502 let mut prod2 = Producer::default();
2503 prod2.insert(key.clone(), Bytes::from("data for key 5"));
2504
2505 let (cons1, mut cons_out1) = SubscriberRecordingConsumer::new();
2506
2507 let scheme = schemes.remove(0);
2508 let mut mailbox1 = setup_and_spawn_actor(
2509 &context,
2510 oracle.manager(),
2511 oracle.control(scheme.public_key()),
2512 scheme,
2513 connections.remove(0),
2514 cons1,
2515 Producer::default(),
2516 );
2517
2518 let scheme = schemes.remove(0);
2519 let _mailbox2 = setup_and_spawn_actor(
2520 &context,
2521 oracle.manager(),
2522 oracle.control(scheme.public_key()),
2523 scheme,
2524 connections.remove(0),
2525 dummy_consumer(),
2526 prod2,
2527 );
2528
2529 let subscriber = SubscriberTag(50);
2530 mailbox1.fetch(Fetch {
2531 key: key.clone(),
2532 subscriber: subscriber.clone(),
2533 });
2534 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
2535
2536 let (delivery, value) = cons_out1.recv().await.unwrap();
2537 assert_eq!(
2538 delivery,
2539 Delivery {
2540 key,
2541 subscribers: non_empty_vec![subscriber],
2542 }
2543 );
2544 assert_eq!(value, Bytes::from("data for key 5"));
2545 });
2546 }
2547
2548 #[test_traced]
2549 fn test_retain_drops_all() {
2550 let executor = deterministic::Runner::timed(Duration::from_secs(10));
2551 executor.start(|context| async move {
2552 let (mut oracle, mut schemes, peers, mut connections) =
2553 setup_network_and_peers(&context, &[1, 2]).await;
2554
2555 let key = Key(6);
2557 let mut prod2 = Producer::default();
2558 prod2.insert(key.clone(), Bytes::from("data for key 6"));
2559
2560 let (cons1, mut cons_out1) = consumer();
2561
2562 let scheme = schemes.remove(0);
2563 let mut mailbox1 = setup_and_spawn_actor(
2564 &context,
2565 oracle.manager(),
2566 oracle.control(scheme.public_key()),
2567 scheme,
2568 connections.remove(0),
2569 cons1,
2570 Producer::default(),
2571 );
2572
2573 let scheme = schemes.remove(0);
2574 let _mailbox2 = setup_and_spawn_actor(
2575 &context,
2576 oracle.manager(),
2577 oracle.control(scheme.public_key()),
2578 scheme,
2579 connections.remove(0),
2580 dummy_consumer(),
2581 prod2,
2582 );
2583
2584 mailbox1.retain(|_, _| false);
2586 select! {
2587 _ = cons_out1.recv() => {
2588 panic!("unexpected event");
2589 },
2590 _ = context.sleep(Duration::from_millis(100)) => {},
2591 };
2592
2593 mailbox1.fetch(key.clone());
2595
2596 mailbox1.retain(|_, _| false);
2598
2599 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
2601
2602 mailbox1.fetch(key.clone());
2605
2606 let (key_actual, value) = cons_out1.recv().await.unwrap();
2608 assert_eq!(key_actual, key);
2609 assert_eq!(value, Bytes::from("data for key 6"));
2610 });
2611 }
2612
2613 #[test_traced]
2617 fn test_rate_limit_spillover() {
2618 let executor = deterministic::Runner::timed(Duration::from_secs(30));
2619 executor.start(|context| async move {
2620 let (mut oracle, mut schemes, peers, mut connections) =
2622 setup_network_and_peers_with_rate_limit(
2623 &context,
2624 &[1, 2, 3],
2625 Quota::per_second(NZU32!(1)),
2626 )
2627 .await;
2628
2629 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
2631 add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
2632
2633 let mut prod2 = Producer::default();
2635 let mut prod3 = Producer::default();
2636 prod2.insert(Key(0), Bytes::from("data for key 0"));
2637 prod2.insert(Key(1), Bytes::from("data for key 1"));
2638 prod3.insert(Key(0), Bytes::from("data for key 0"));
2639 prod3.insert(Key(1), Bytes::from("data for key 1"));
2640
2641 let (cons1, mut cons_out1) = consumer();
2642
2643 let scheme = schemes.remove(0);
2645 let mut mailbox1 = setup_and_spawn_actor(
2646 &context,
2647 oracle.manager(),
2648 oracle.control(scheme.public_key()),
2649 scheme,
2650 connections.remove(0),
2651 cons1,
2652 Producer::default(),
2653 );
2654
2655 let scheme = schemes.remove(0);
2657 let _mailbox2 = setup_and_spawn_actor(
2658 &context,
2659 oracle.manager(),
2660 oracle.control(scheme.public_key()),
2661 scheme,
2662 connections.remove(0),
2663 dummy_consumer(),
2664 prod2,
2665 );
2666
2667 let scheme = schemes.remove(0);
2669 let _mailbox3 = setup_and_spawn_actor(
2670 &context,
2671 oracle.manager(),
2672 oracle.control(scheme.public_key()),
2673 scheme,
2674 connections.remove(0),
2675 dummy_consumer(),
2676 prod3,
2677 );
2678
2679 context.sleep(Duration::from_millis(100)).await;
2681 let start = context.current();
2682
2683 mailbox1.fetch(Key(0));
2687 mailbox1.fetch(Key(1));
2688
2689 let mut results = HashMap::new();
2691 for _ in 0..2 {
2692 let (key, value) = cons_out1.recv().await.unwrap();
2693 results.insert(key.clone(), value);
2694 }
2695
2696 assert_eq!(results.len(), 2);
2698 assert_eq!(
2699 results.get(&Key(0)).unwrap(),
2700 &Bytes::from("data for key 0")
2701 );
2702 assert_eq!(
2703 results.get(&Key(1)).unwrap(),
2704 &Bytes::from("data for key 1")
2705 );
2706
2707 let elapsed = context.current().duration_since(start).unwrap();
2710 assert!(
2711 elapsed < Duration::from_millis(500),
2712 "Expected quick completion via spill-over, but took {elapsed:?}"
2713 );
2714 });
2715 }
2716
2717 #[test_traced]
2721 fn test_rate_limit_retry_after_reset() {
2722 let executor = deterministic::Runner::timed(Duration::from_secs(30));
2723 executor.start(|context| async move {
2724 let (mut oracle, mut schemes, peers, mut connections) =
2726 setup_network_and_peers_with_rate_limit(
2727 &context,
2728 &[1, 2],
2729 Quota::per_second(NZU32!(1)),
2730 )
2731 .await;
2732
2733 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
2734
2735 let mut prod2 = Producer::default();
2737 prod2.insert(Key(1), Bytes::from("data for key 1"));
2738 prod2.insert(Key(2), Bytes::from("data for key 2"));
2739 prod2.insert(Key(3), Bytes::from("data for key 3"));
2740
2741 let (cons1, mut cons_out1) = consumer();
2742
2743 let scheme = schemes.remove(0);
2744 let mut mailbox1 = setup_and_spawn_actor(
2745 &context,
2746 oracle.manager(),
2747 oracle.control(scheme.public_key()),
2748 scheme,
2749 connections.remove(0),
2750 cons1,
2751 Producer::default(),
2752 );
2753
2754 let scheme = schemes.remove(0);
2755 let _mailbox2 = setup_and_spawn_actor(
2756 &context,
2757 oracle.manager(),
2758 oracle.control(scheme.public_key()),
2759 scheme,
2760 connections.remove(0),
2761 dummy_consumer(),
2762 prod2,
2763 );
2764
2765 context.sleep(Duration::from_millis(100)).await;
2767 let start = context.current();
2768
2769 mailbox1.fetch(Key(1));
2772 mailbox1.fetch(Key(2));
2773 mailbox1.fetch(Key(3));
2774
2775 let mut results = HashMap::new();
2777 for _ in 0..3 {
2778 let (key, value) = cons_out1.recv().await.unwrap();
2779 results.insert(key.clone(), value);
2780 }
2781
2782 assert_eq!(results.len(), 3);
2783 for i in 1..=3 {
2784 assert_eq!(
2785 results.get(&Key(i)).unwrap(),
2786 &Bytes::from(format!("data for key {}", i))
2787 );
2788 }
2789
2790 let elapsed = context.current().duration_since(start).unwrap();
2794 assert!(
2795 elapsed > Duration::from_secs(2),
2796 "Expected rate limiting to cause delay > 2s, but took {elapsed:?}"
2797 );
2798 });
2799 }
2800
2801 #[test_traced]
2805 fn test_self_exclusion() {
2806 let executor = deterministic::Runner::timed(Duration::from_secs(10));
2807 executor.start(|context| async move {
2808 let (mut oracle, mut schemes, peers, mut connections) =
2809 setup_network_and_peers(&context, &[1, 2]).await;
2810
2811 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
2812
2813 let key = Key(1);
2814 let data = Bytes::from("shared data");
2815
2816 let mut prod1 = Producer::default();
2818 prod1.insert(key.clone(), data.clone());
2819 let mut prod2 = Producer::default();
2820 prod2.insert(key.clone(), data.clone());
2821
2822 let (cons1, mut cons_out1) = consumer();
2823
2824 let scheme = schemes.remove(0);
2826 let mut mailbox1 = setup_and_spawn_actor(
2827 &context,
2828 oracle.manager(),
2829 oracle.control(scheme.public_key()),
2830 scheme,
2831 connections.remove(0),
2832 cons1,
2833 prod1, );
2835
2836 let scheme = schemes.remove(0);
2838 let _mailbox2 = setup_and_spawn_actor(
2839 &context,
2840 oracle.manager(),
2841 oracle.control(scheme.public_key()),
2842 scheme,
2843 connections.remove(0),
2844 dummy_consumer(),
2845 prod2,
2846 );
2847
2848 context.sleep(Duration::from_millis(100)).await;
2850
2851 mailbox1.fetch(key.clone());
2853
2854 let (key_actual, value) = cons_out1.recv().await.unwrap();
2856 assert_eq!(key_actual, key);
2857 assert_eq!(value, data);
2858 });
2859 }
2860
2861 #[test_traced]
2862 fn test_fetch_uses_primary_peers_only() {
2863 let executor = deterministic::Runner::timed(Duration::from_secs(10));
2864 executor.start(|context| async move {
2865 let (network, oracle) = Network::new(
2866 context.child("network"),
2867 commonware_p2p::simulated::Config {
2868 max_size: 1024 * 1024,
2869 disconnect_on_block: true,
2870 tracked_peer_sets: NZUsize!(1),
2871 },
2872 );
2873 network.start();
2874
2875 let schemes: Vec<PrivateKey> = [1u64, 2, 3]
2876 .into_iter()
2877 .map(PrivateKey::from_seed)
2878 .collect();
2879 let peers: Vec<PublicKey> = schemes.iter().map(|s| s.public_key()).collect();
2880 let mut schemes = schemes;
2881
2882 let mut connections = Vec::new();
2883 for peer in &peers {
2884 let (sender, receiver) = oracle
2885 .control(peer.clone())
2886 .register(0, Quota::per_second(RATE_LIMIT))
2887 .await
2888 .unwrap();
2889 connections.push((sender, receiver));
2890 }
2891
2892 let mut oracle = oracle;
2896 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
2897 add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
2898
2899 oracle.manager().track(
2900 1,
2901 TrackedPeers::new(
2902 Set::try_from([peers[1].clone()]).unwrap(),
2903 Set::try_from([peers[2].clone()]).unwrap(),
2904 ),
2905 );
2906 context.sleep(Duration::from_millis(100)).await;
2907
2908 let key = Key(1);
2909 let data = Bytes::from("secondary only data");
2910
2911 let (cons1, mut cons_out1) = consumer();
2912
2913 let scheme = schemes.remove(0);
2915 let mut mailbox1 = setup_and_spawn_actor(
2916 &context,
2917 oracle.manager(),
2918 oracle.control(scheme.public_key()),
2919 scheme,
2920 connections.remove(0),
2921 cons1,
2922 Producer::default(),
2923 );
2924
2925 let scheme = schemes.remove(0);
2927 let _mailbox2 = setup_and_spawn_actor(
2928 &context,
2929 oracle.manager(),
2930 oracle.control(scheme.public_key()),
2931 scheme,
2932 connections.remove(0),
2933 dummy_consumer(),
2934 Producer::default(),
2935 );
2936
2937 let mut prod3 = Producer::default();
2939 prod3.insert(key.clone(), data);
2940 let scheme = schemes.remove(0);
2941 let _mailbox3 = setup_and_spawn_actor(
2942 &context,
2943 oracle.manager(),
2944 oracle.control(scheme.public_key()),
2945 scheme,
2946 connections.remove(0),
2947 dummy_consumer(),
2948 prod3,
2949 );
2950
2951 mailbox1.fetch(key.clone());
2954
2955 select! {
2956 event = cons_out1.recv() => {
2957 panic!("fetch should not succeed from a secondary peer, got: {event:?}");
2958 },
2959 _ = context.sleep(Duration::from_secs(2)) => {},
2960 }
2961 });
2962 }
2963
2964 #[test_traced]
2965 fn test_fetch_uses_latest_primary_set_only() {
2966 let executor = deterministic::Runner::timed(Duration::from_secs(10));
2967 executor.start(|context| async move {
2968 let (network, oracle) = Network::new(
2969 context.child("network"),
2970 commonware_p2p::simulated::Config {
2971 max_size: 1024 * 1024,
2972 disconnect_on_block: true,
2973 tracked_peer_sets: NZUsize!(2),
2974 },
2975 );
2976 network.start();
2977
2978 let schemes: Vec<PrivateKey> = [1u64, 2, 3]
2979 .into_iter()
2980 .map(PrivateKey::from_seed)
2981 .collect();
2982 let peers: Vec<PublicKey> = schemes.iter().map(|s| s.public_key()).collect();
2983 let mut schemes = schemes;
2984
2985 let mut connections = Vec::new();
2986 for peer in &peers {
2987 let (sender, receiver) = oracle
2988 .control(peer.clone())
2989 .register(0, Quota::per_second(RATE_LIMIT))
2990 .await
2991 .unwrap();
2992 connections.push((sender, receiver));
2993 }
2994
2995 let mut oracle = oracle;
2996 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
2997 add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
2998
2999 oracle
3003 .manager()
3004 .track(
3005 0,
3006 Set::try_from([peers[0].clone(), peers[1].clone()]).unwrap(),
3007 );
3008 context.sleep(Duration::from_millis(100)).await;
3009
3010 let key = Key(7);
3011 let targeted_key = Key(8);
3012 let data = Bytes::from("old primary data");
3013
3014 let (cons1, mut cons_out1) = consumer();
3015
3016 let scheme = schemes.remove(0);
3018 let mut mailbox1 = setup_and_spawn_actor(
3019 &context,
3020 oracle.manager(),
3021 oracle.control(scheme.public_key()),
3022 scheme,
3023 connections.remove(0),
3024 cons1,
3025 Producer::default(),
3026 );
3027
3028 let mut prod2 = Producer::default();
3030 prod2.insert(key.clone(), data.clone());
3031 prod2.insert(targeted_key.clone(), data);
3032 let scheme = schemes.remove(0);
3033 let _mailbox2 = setup_and_spawn_actor(
3034 &context,
3035 oracle.manager(),
3036 oracle.control(scheme.public_key()),
3037 scheme,
3038 connections.remove(0),
3039 dummy_consumer(),
3040 prod2,
3041 );
3042
3043 let scheme = schemes.remove(0);
3045 let _mailbox3 = setup_and_spawn_actor(
3046 &context,
3047 oracle.manager(),
3048 oracle.control(scheme.public_key()),
3049 scheme,
3050 connections.remove(0),
3051 dummy_consumer(),
3052 Producer::default(),
3053 );
3054
3055 context.sleep(Duration::from_millis(100)).await;
3056
3057 oracle
3061 .manager()
3062 .track(
3063 1,
3064 Set::try_from([peers[0].clone(), peers[2].clone()]).unwrap(),
3065 );
3066 context.sleep(Duration::from_millis(100)).await;
3067
3068 mailbox1.fetch(key);
3069
3070 select! {
3071 event = cons_out1.recv() => {
3072 panic!(
3073 "fetch should not succeed from an old primary retained only in the overlap window, got: {event:?}"
3074 );
3075 },
3076 _ = context.sleep(Duration::from_secs(1)) => {},
3077 }
3078
3079 mailbox1
3081 .fetch_targeted(targeted_key, non_empty_vec![peers[1].clone()]);
3082
3083 select! {
3084 event = cons_out1.recv() => {
3085 panic!(
3086 "targeted fetch should not bypass the latest-primary filter, got: {event:?}"
3087 );
3088 },
3089 _ = context.sleep(Duration::from_secs(1)) => {},
3090 }
3091 });
3092 }
3093
3094 #[test_traced]
3095 fn test_fetch_after_cutover_relies_on_latest_primary_history() {
3096 let executor = deterministic::Runner::timed(Duration::from_secs(10));
3097 executor.start(|context| async move {
3098 let (network, oracle) = Network::new(
3099 context.child("network"),
3100 commonware_p2p::simulated::Config {
3101 max_size: 1024 * 1024,
3102 disconnect_on_block: true,
3103 tracked_peer_sets: NZUsize!(2),
3104 },
3105 );
3106 network.start();
3107
3108 let schemes: Vec<PrivateKey> = [1u64, 2, 3]
3109 .into_iter()
3110 .map(PrivateKey::from_seed)
3111 .collect();
3112 let peers: Vec<PublicKey> = schemes.iter().map(|s| s.public_key()).collect();
3113 let mut schemes = schemes;
3114
3115 let mut connections = Vec::new();
3116 for peer in &peers {
3117 let (sender, receiver) = oracle
3118 .control(peer.clone())
3119 .register(0, Quota::per_second(RATE_LIMIT))
3120 .await
3121 .unwrap();
3122 connections.push((sender, receiver));
3123 }
3124
3125 let mut oracle = oracle;
3126 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
3127 add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
3128
3129 oracle.manager().track(
3132 0,
3133 Set::try_from([peers[0].clone(), peers[1].clone()]).unwrap(),
3134 );
3135 context.sleep(Duration::from_millis(100)).await;
3136
3137 let key = Key(9);
3138 let invalid_history = Bytes::from("stale overlap history");
3139 let valid_history = Bytes::from("latest primary history");
3140
3141 let (mut cons1, mut cons_out1) = consumer();
3142 cons1.add_expected(key.clone(), valid_history.clone());
3143
3144 let scheme = schemes.remove(0);
3146 let mut mailbox1 = setup_and_spawn_actor(
3147 &context,
3148 oracle.manager(),
3149 oracle.control(scheme.public_key()),
3150 scheme,
3151 connections.remove(0),
3152 cons1,
3153 Producer::default(),
3154 );
3155
3156 let mut prod2 = Producer::default();
3159 prod2.insert(key.clone(), invalid_history);
3160 let scheme = schemes.remove(0);
3161 let _mailbox2 = setup_and_spawn_actor(
3162 &context,
3163 oracle.manager(),
3164 oracle.control(scheme.public_key()),
3165 scheme,
3166 connections.remove(0),
3167 dummy_consumer(),
3168 prod2,
3169 );
3170
3171 let mut prod3 = Producer::default();
3173 prod3.insert(key.clone(), valid_history.clone());
3174 let scheme = schemes.remove(0);
3175 let _mailbox3 = setup_and_spawn_actor(
3176 &context,
3177 oracle.manager(),
3178 oracle.control(scheme.public_key()),
3179 scheme,
3180 connections.remove(0),
3181 dummy_consumer(),
3182 prod3,
3183 );
3184
3185 context.sleep(Duration::from_millis(100)).await;
3186
3187 oracle.manager().track(
3188 1,
3189 Set::try_from([peers[0].clone(), peers[2].clone()]).unwrap(),
3190 );
3191 context.sleep(Duration::from_millis(100)).await;
3192
3193 mailbox1.fetch(key.clone());
3194
3195 let (key_actual, value) = cons_out1.recv().await.unwrap();
3196 assert_eq!(key_actual, key);
3197 assert_eq!(value, valid_history);
3198
3199 assert!(
3200 oracle.blocked().await.unwrap().is_empty(),
3201 "overlap-only peers should not be queried for post-cutover history"
3202 );
3203 });
3204 }
3205
3206 #[test_traced]
3207 fn test_secondary_peer_requests_are_served() {
3208 let executor = deterministic::Runner::timed(Duration::from_secs(10));
3209 executor.start(|context| async move {
3210 let (mut oracle, mut schemes, peers, mut connections) =
3211 setup_network_and_peers(&context, &[1, 2]).await;
3212
3213 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
3218
3219 oracle.manager().track(
3220 1,
3221 TrackedPeers::new(
3222 Set::try_from([peers[0].clone()]).unwrap(),
3223 Set::try_from([peers[1].clone()]).unwrap(),
3224 ),
3225 );
3226 context.sleep(Duration::from_millis(100)).await;
3227
3228 let key = Key(9);
3229 let data = Bytes::from("served to secondary");
3230
3231 let mut prod1 = Producer::default();
3233 prod1.insert(key.clone(), data.clone());
3234
3235 let scheme = schemes.remove(0);
3236 let _mailbox1 = setup_and_spawn_actor(
3237 &context,
3238 oracle.manager(),
3239 oracle.control(scheme.public_key()),
3240 scheme,
3241 connections.remove(0),
3242 dummy_consumer(),
3243 prod1,
3244 );
3245
3246 let (mut cons2, mut cons_out2) = consumer();
3248 cons2.add_expected(key.clone(), data.clone());
3249 let scheme = schemes.remove(0);
3250 let mut mailbox2 = setup_and_spawn_actor(
3251 &context,
3252 oracle.manager(),
3253 oracle.control(scheme.public_key()),
3254 scheme,
3255 connections.remove(0),
3256 cons2,
3257 Producer::default(),
3258 );
3259
3260 mailbox2.fetch_targeted(key.clone(), non_empty_vec![peers[0].clone()]);
3261
3262 let (key_actual, value) = cons_out2.recv().await.unwrap();
3263 assert_eq!(key_actual, key);
3264 assert_eq!(value, data);
3265 });
3266 }
3267
3268 #[test_traced]
3269 fn test_shutdown_aborts_pending_delivery_without_leaked_tasks() {
3270 let executor = deterministic::Runner::timed(Duration::from_secs(10));
3271 executor.start(|context| async move {
3272 let (mut oracle, mut schemes, peers, mut connections) =
3273 setup_network_and_peers(&context, &[1, 2]).await;
3274
3275 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
3276
3277 let key = Key(1);
3278 let data = Bytes::from("data for key 1");
3279 let mut prod2 = Producer::default();
3280 prod2.insert(key.clone(), data);
3281
3282 let (mut gate_sender, gate_receiver) = oneshot::channel();
3283 let (cons1, mut cons_out1, mut started) =
3284 BlockingConsumer::new(context.child("consumer"), vec![(gate_receiver, true)]);
3285
3286 let actor_context = context.child("actor");
3287
3288 let scheme = schemes.remove(0);
3289 let public_key = scheme.public_key();
3290 let (engine, mut mailbox1): (_, Mailbox<Key, PublicKey>) = Engine::new(
3291 actor_context.child("peer").with_attribute("index", 0),
3292 Config {
3293 peer_provider: oracle.manager(),
3294 blocker: oracle.control(public_key.clone()),
3295 consumer: cons1,
3296 producer: Producer::<Key, Bytes>::default(),
3297 mailbox_size: MAILBOX_SIZE,
3298 me: Some(public_key),
3299 initial: INITIAL_DURATION,
3300 timeout: TIMEOUT,
3301 fetch_retry_timeout: FETCH_RETRY_TIMEOUT,
3302 priority_requests: false,
3303 priority_responses: false,
3304 },
3305 );
3306 let handle1 = engine.start(connections.remove(0));
3307
3308 let scheme = schemes.remove(0);
3309 let public_key = scheme.public_key();
3310 let (engine, _mailbox2): (_, Mailbox<Key, PublicKey>) = Engine::new(
3311 actor_context.child("peer").with_attribute("index", 1),
3312 Config {
3313 peer_provider: oracle.manager(),
3314 blocker: oracle.control(public_key.clone()),
3315 consumer: dummy_consumer(),
3316 producer: prod2,
3317 mailbox_size: MAILBOX_SIZE,
3318 me: Some(public_key),
3319 initial: INITIAL_DURATION,
3320 timeout: TIMEOUT,
3321 fetch_retry_timeout: FETCH_RETRY_TIMEOUT,
3322 priority_requests: false,
3323 priority_responses: false,
3324 },
3325 );
3326 let handle2 = engine.start(connections.remove(0));
3327
3328 mailbox1.fetch(key.clone());
3329 let started_key = started.recv().await.expect("delivery did not start");
3330 assert_eq!(started_key, key);
3331
3332 assert!(count_running_tasks(&context, "actor") > 0);
3333
3334 handle1.abort();
3335 handle2.abort();
3336
3337 context.sleep(Duration::from_millis(100)).await;
3338
3339 select! {
3340 _ = gate_sender.closed() => {},
3341 _ = context.sleep(Duration::from_secs(2)) => {
3342 panic!("pending delivery was not aborted");
3343 },
3344 };
3345
3346 select! {
3347 event = cons_out1.recv() => assert!(event.is_none(), "unexpected event"),
3348 _ = context.sleep(Duration::from_millis(100)) => {},
3349 };
3350
3351 let running_after = count_running_tasks(&context, "actor");
3352 assert_eq!(
3353 running_after, 0,
3354 "all actor tasks should be stopped, but {running_after} still running"
3355 );
3356 });
3357 }
3358
3359 #[allow(clippy::type_complexity)]
3360 fn spawn_actors_with_handles(
3361 context: &deterministic::Context,
3362 oracle: &Oracle<PublicKey, deterministic::Context>,
3363 schemes: Vec<PrivateKey>,
3364 connections: Vec<(
3365 Sender<PublicKey, deterministic::Context>,
3366 Receiver<PublicKey>,
3367 )>,
3368 consumers: Vec<Consumer<Key, Bytes>>,
3369 producers: Vec<Producer<Key, Bytes>>,
3370 ) -> (
3371 Vec<Mailbox<Key, PublicKey>>,
3372 Vec<commonware_runtime::Handle<()>>,
3373 ) {
3374 let actor_context = context.child("actor");
3375 let mut mailboxes = Vec::new();
3376 let mut handles = Vec::new();
3377
3378 for (idx, ((scheme, conn), (consumer, producer))) in schemes
3379 .into_iter()
3380 .zip(connections)
3381 .zip(consumers.into_iter().zip(producers))
3382 .enumerate()
3383 {
3384 let ctx = actor_context.child("peer").with_attribute("index", idx);
3385 let public_key = scheme.public_key();
3386 let (engine, mailbox) = Engine::new(
3387 ctx,
3388 Config {
3389 peer_provider: oracle.manager(),
3390 blocker: oracle.control(public_key.clone()),
3391 consumer,
3392 producer,
3393 mailbox_size: MAILBOX_SIZE,
3394 me: Some(public_key),
3395 initial: INITIAL_DURATION,
3396 timeout: TIMEOUT,
3397 fetch_retry_timeout: FETCH_RETRY_TIMEOUT,
3398 priority_requests: false,
3399 priority_responses: false,
3400 },
3401 );
3402 handles.push(engine.start(conn));
3403 mailboxes.push(mailbox);
3404 }
3405
3406 (mailboxes, handles)
3407 }
3408
3409 #[test_traced]
3410 fn test_operations_after_shutdown_do_not_panic() {
3411 let executor = deterministic::Runner::timed(Duration::from_secs(10));
3412 executor.start(|context| async move {
3413 let (mut oracle, schemes, peers, connections) =
3414 setup_network_and_peers(&context, &[1, 2]).await;
3415
3416 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
3417
3418 let key = Key(1);
3419 let mut prod2 = Producer::default();
3420 prod2.insert(key.clone(), Bytes::from("data for key 1"));
3421
3422 let (cons1, mut cons_out1) = consumer();
3423
3424 let (mut mailboxes, handles) = spawn_actors_with_handles(
3425 &context,
3426 &oracle,
3427 schemes,
3428 connections,
3429 vec![cons1, dummy_consumer()],
3430 vec![Producer::default(), prod2],
3431 );
3432
3433 mailboxes[0].fetch(key.clone());
3435 let (_, value) = cons_out1.recv().await.unwrap();
3436 assert_eq!(value, Bytes::from("data for key 1"));
3437
3438 for handle in handles {
3440 handle.abort();
3441 }
3442 context.sleep(Duration::from_millis(100)).await;
3443
3444 let key2 = Key(2);
3448 mailboxes[0].fetch(key2.clone());
3449
3450 let canceled = key2;
3452 mailboxes[0].retain(move |key, _| key != &canceled);
3453
3454 mailboxes[0].retain(|_, _| true);
3456
3457 mailboxes[0].fetch_targeted(Key(3), non_empty_vec![peers[1].clone()]);
3459 });
3460 }
3461
3462 fn clean_shutdown(seed: u64) {
3463 let cfg = deterministic::Config::default()
3464 .with_seed(seed)
3465 .with_timeout(Some(Duration::from_secs(30)));
3466 let executor = deterministic::Runner::new(cfg);
3467 executor.start(|context| async move {
3468 let (mut oracle, schemes, peers, connections) =
3469 setup_network_and_peers(&context, &[1, 2]).await;
3470
3471 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
3472
3473 let key = Key(1);
3474 let mut prod2 = Producer::default();
3475 prod2.insert(key.clone(), Bytes::from("data for key 1"));
3476
3477 let (cons1, mut cons_out1) = consumer();
3478
3479 let (mut mailboxes, handles) = spawn_actors_with_handles(
3480 &context,
3481 &oracle,
3482 schemes,
3483 connections,
3484 vec![cons1, dummy_consumer()],
3485 vec![Producer::default(), prod2],
3486 );
3487
3488 context.sleep(Duration::from_millis(100)).await;
3490
3491 let running_before = count_running_tasks(&context, "actor");
3493 assert!(
3494 running_before > 0,
3495 "at least one actor task should be running"
3496 );
3497
3498 mailboxes[0].fetch(key.clone());
3500 let (_, value) = cons_out1.recv().await.unwrap();
3501 assert_eq!(value, Bytes::from("data for key 1"));
3502
3503 for handle in handles {
3505 handle.abort();
3506 }
3507 context.sleep(Duration::from_millis(100)).await;
3508
3509 let running_after = count_running_tasks(&context, "actor");
3511 assert_eq!(
3512 running_after, 0,
3513 "all actor tasks should be stopped, but {running_after} still running"
3514 );
3515 });
3516 }
3517
3518 #[test]
3519 fn test_clean_shutdown() {
3520 for seed in 0..25 {
3521 clean_shutdown(seed);
3522 }
3523 }
3524}