1use super::{
4 ingress::{self, Oracle},
5 metrics,
6 transmitter::{self, Completion},
7 Error,
8};
9use crate::{
10 authenticated::UnboundedMailbox,
11 utils::limited::{CheckedSender as LimitedCheckedSender, Connected, LimitedSender},
12 Channel, Message, Recipients, UnlimitedSender as _,
13};
14use commonware_codec::{DecodeExt, FixedSize};
15use commonware_cryptography::PublicKey;
16use commonware_macros::{select, select_loop};
17use commonware_runtime::{
18 spawn_cell, Clock, ContextCell, Handle, IoBuf, IoBufMut, Listener as _, Metrics,
19 Network as RNetwork, Quota, Spawner,
20};
21use commonware_stream::utils::codec::{recv_frame, send_frame};
22use commonware_utils::{
23 channel::{fallible::FallibleExt, mpsc, oneshot, ring},
24 ordered::Set,
25 NZUsize, TryCollect,
26};
27use either::Either;
28use futures::{future, SinkExt};
29use prometheus_client::metrics::{counter::Counter, family::Family};
30use rand::Rng;
31use rand_distr::{Distribution, Normal};
32use std::{
33 collections::{BTreeMap, BTreeSet, HashMap},
34 fmt::Debug,
35 net::{IpAddr, Ipv4Addr, SocketAddr},
36 time::{Duration, SystemTime},
37};
38use tracing::{debug, error, trace, warn};
39
40type Task<P> = (Channel, P, Recipients<P>, IoBuf, oneshot::Sender<Vec<P>>);
42
43#[derive(Clone, Copy, Debug, PartialEq, Eq)]
45#[must_use]
46pub enum SplitTarget {
47 None,
48 Primary,
49 Secondary,
50 Both,
51}
52
53#[derive(Clone, Copy, Debug, PartialEq, Eq)]
55#[must_use]
56pub enum SplitOrigin {
57 Primary,
58 Secondary,
59}
60
61pub trait SplitForwarder<P: PublicKey>:
63 Fn(SplitOrigin, &Recipients<P>, &IoBuf) -> Option<Recipients<P>> + Send + Sync + Clone + 'static
64{
65}
66
67impl<P: PublicKey, F> SplitForwarder<P> for F where
68 F: Fn(SplitOrigin, &Recipients<P>, &IoBuf) -> Option<Recipients<P>>
69 + Send
70 + Sync
71 + Clone
72 + 'static
73{
74}
75
76pub trait SplitRouter<P: PublicKey>:
78 Fn(&Message<P>) -> SplitTarget + Send + Sync + 'static
79{
80}
81
82impl<P: PublicKey, F> SplitRouter<P> for F where
83 F: Fn(&Message<P>) -> SplitTarget + Send + Sync + 'static
84{
85}
86
87pub struct Config {
89 pub max_size: u32,
91
92 pub disconnect_on_block: bool,
96
97 pub tracked_peer_sets: Option<usize>,
103}
104
105pub struct Network<E: RNetwork + Spawner + Rng + Clock + Metrics, P: PublicKey> {
107 context: ContextCell<E>,
108
109 max_size: u32,
111
112 disconnect_on_block: bool,
116
117 next_addr: SocketAddr,
120
121 ingress: mpsc::UnboundedReceiver<ingress::Message<P, E>>,
123
124 oracle_mailbox: UnboundedMailbox<ingress::Message<P, E>>,
126
127 sender: mpsc::UnboundedSender<Task<P>>,
131 receiver: mpsc::UnboundedReceiver<Task<P>>,
132
133 links: HashMap<(P, P), Link>,
135
136 peers: BTreeMap<P, Peer<P>>,
138
139 peer_sets: BTreeMap<u64, Set<P>>,
141
142 peer_refs: BTreeMap<P, usize>,
144
145 tracked_peer_sets: Option<usize>,
147
148 blocks: BTreeSet<(P, P)>,
150
151 transmitter: transmitter::State<P>,
153
154 #[allow(clippy::type_complexity)]
156 subscribers: Vec<mpsc::UnboundedSender<(u64, Set<P>, Set<P>)>>,
157
158 peer_subscribers: Vec<ring::Sender<Vec<P>>>,
160
161 received_messages: Family<metrics::Message, Counter>,
163 sent_messages: Family<metrics::Message, Counter>,
164}
165
166impl<E: RNetwork + Spawner + Rng + Clock + Metrics, P: PublicKey> Network<E, P> {
167 pub fn new(mut context: E, cfg: Config) -> (Self, Oracle<P, E>) {
172 let (sender, receiver) = mpsc::unbounded_channel();
173 let (oracle_mailbox, oracle_receiver) = UnboundedMailbox::new();
174 let sent_messages = Family::<metrics::Message, Counter>::default();
175 let received_messages = Family::<metrics::Message, Counter>::default();
176 context.register("messages_sent", "messages sent", sent_messages.clone());
177 context.register(
178 "messages_received",
179 "messages received",
180 received_messages.clone(),
181 );
182
183 let next_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::from_bits(context.next_u32())), 0);
185
186 (
187 Self {
188 context: ContextCell::new(context),
189 max_size: cfg.max_size,
190 disconnect_on_block: cfg.disconnect_on_block,
191 tracked_peer_sets: cfg.tracked_peer_sets,
192 next_addr,
193 ingress: oracle_receiver,
194 oracle_mailbox: oracle_mailbox.clone(),
195 sender,
196 receiver,
197 links: HashMap::new(),
198 peers: BTreeMap::new(),
199 peer_sets: BTreeMap::new(),
200 peer_refs: BTreeMap::new(),
201 blocks: BTreeSet::new(),
202 transmitter: transmitter::State::new(),
203 subscribers: Vec::new(),
204 peer_subscribers: Vec::new(),
205 received_messages,
206 sent_messages,
207 },
208 Oracle::new(oracle_mailbox),
209 )
210 }
211
212 fn get_next_socket(&mut self) -> SocketAddr {
217 let result = self.next_addr;
218
219 match self.next_addr.port().checked_add(1) {
222 Some(port) => {
223 self.next_addr.set_port(port);
224 }
225 None => {
226 let ip = match self.next_addr.ip() {
227 IpAddr::V4(ipv4) => ipv4,
228 _ => unreachable!(),
229 };
230 let next_ip = Ipv4Addr::to_bits(ip).wrapping_add(1);
231 self.next_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::from_bits(next_ip)), 0);
232 }
233 }
234
235 result
236 }
237
238 async fn handle_ingress(&mut self, message: ingress::Message<P, E>) {
242 fn send_result<T: std::fmt::Debug>(
246 result: oneshot::Sender<Result<T, Error>>,
247 value: Result<T, Error>,
248 ) {
249 let success = value.is_ok();
250 if let Err(e) = result.send(value) {
251 error!(?e, "failed to send result to oracle (ok = {})", success);
252 }
253 }
254
255 match message {
256 ingress::Message::Track { id, peers } => {
257 let Some(tracked_peer_sets) = self.tracked_peer_sets else {
258 warn!("attempted to register peer set when tracking is disabled");
259 return;
260 };
261
262 if self.peer_sets.contains_key(&id) {
264 warn!(id, "peer set already exists");
265 return;
266 }
267
268 if let Some((last, _)) = self.peer_sets.last_key_value() {
270 if id <= *last {
271 warn!(
272 new_id = id,
273 old_id = last,
274 "attempted to register peer set with non-monotonically increasing ID"
275 );
276 return;
277 }
278 }
279
280 for public_key in peers.iter() {
282 self.ensure_peer_exists(public_key).await;
284
285 *self.peer_refs.entry(public_key.clone()).or_insert(0) += 1;
287 }
288 self.peer_sets.insert(id, peers.clone());
289
290 while self.peer_sets.len() > tracked_peer_sets {
292 let (id, set) = self.peer_sets.pop_first().unwrap();
293 debug!(id, "removed oldest peer set");
294
295 for public_key in set.iter() {
297 let refs = self.peer_refs.get_mut(public_key).unwrap();
298 *refs = refs.checked_sub(1).expect("reference count underflow");
299
300 if *refs == 0 {
303 self.peer_refs.remove(public_key);
304 debug!(?public_key, "removed peer no longer in any tracked set");
305 }
306 }
307 }
308
309 let all = self.all_tracked_peers();
311 let notification = (id, peers, all);
312 self.subscribers
313 .retain(|subscriber| subscriber.send_lossy(notification.clone()));
314
315 self.broadcast_peer_list().await;
317 }
318 ingress::Message::Register {
319 channel,
320 public_key,
321 quota,
322 result,
323 } => {
324 let (_, is_new) = self.ensure_peer_exists(&public_key).await;
326
327 if is_new && self.peer_sets.is_empty() {
329 self.broadcast_peer_list().await;
330 }
331
332 let clock = self
334 .context
335 .with_label(&format!("rate_limiter_{channel}_{public_key}"))
336 .take();
337
338 let (sender, handle) = Sender::new(
340 self.context.with_label("sender"),
341 public_key.clone(),
342 channel,
343 self.max_size,
344 self.sender.clone(),
345 self.oracle_mailbox.clone(),
346 clock,
347 quota,
348 );
349
350 let peer = self.peers.get_mut(&public_key).unwrap();
352 let receiver = match peer.register(channel, handle).await {
353 Ok(receiver) => Receiver { receiver },
354 Err(err) => return send_result(result, Err(err)),
355 };
356
357 send_result(result, Ok((sender, receiver)))
358 }
359 ingress::Message::PeerSet { id, response } => {
360 if self.peer_sets.is_empty() {
361 let _ = response.send(Some(
363 self.peers
364 .keys()
365 .cloned()
366 .try_collect()
367 .expect("BTreeMap keys are unique"),
368 ));
369 } else {
370 let _ = response.send(self.peer_sets.get(&id).cloned());
372 }
373 }
374 ingress::Message::Subscribe { sender } => {
375 if let Some((index, peers)) = self.peer_sets.last_key_value() {
377 let all = self.all_tracked_peers();
378 let notification = (*index, peers.clone(), all);
379 sender.send_lossy(notification);
380 }
381 self.subscribers.push(sender);
382 }
383 ingress::Message::SubscribeConnected { response } => {
384 let (mut sender, receiver) = ring::channel(NZUsize!(1));
386
387 let peer_list: Vec<P> = self.all_tracked_peers().into_iter().collect();
389 let _ = sender.send(peer_list).await;
390
391 self.peer_subscribers.push(sender);
393
394 let _ = response.send(receiver);
396 }
397 ingress::Message::LimitBandwidth {
398 public_key,
399 egress_cap,
400 ingress_cap,
401 result,
402 } => {
403 let (_, is_new) = self.ensure_peer_exists(&public_key).await;
405
406 if is_new && self.peer_sets.is_empty() {
408 self.broadcast_peer_list().await;
409 }
410
411 let now = self.context.current();
413 let completions = self
414 .transmitter
415 .limit(now, &public_key, egress_cap, ingress_cap);
416 self.process_completions(completions);
417
418 let _ = result.send(());
420 }
421 ingress::Message::AddLink {
422 sender,
423 receiver,
424 sampler,
425 success_rate,
426 result,
427 } => {
428 let (_, sender_is_new) = self.ensure_peer_exists(&sender).await;
430 let (receiver_socket, receiver_is_new) = self.ensure_peer_exists(&receiver).await;
431
432 if (sender_is_new || receiver_is_new) && self.peer_sets.is_empty() {
434 self.broadcast_peer_list().await;
435 }
436
437 let key = (sender.clone(), receiver.clone());
439 if self.links.contains_key(&key) {
440 return send_result(result, Err(Error::LinkExists));
441 }
442
443 let link = Link::new(
444 &mut self.context,
445 sender,
446 receiver,
447 receiver_socket,
448 sampler,
449 success_rate,
450 self.max_size,
451 self.received_messages.clone(),
452 );
453 self.links.insert(key, link);
454 send_result(result, Ok(()))
455 }
456 ingress::Message::RemoveLink {
457 sender,
458 receiver,
459 result,
460 } => {
461 match self.links.remove(&(sender, receiver)) {
462 Some(_) => (),
463 None => return send_result(result, Err(Error::LinkMissing)),
464 }
465 send_result(result, Ok(()))
466 }
467 ingress::Message::Block { from, to } => {
468 self.blocks.insert((from, to));
469 }
470 ingress::Message::Blocked { result } => {
471 send_result(result, Ok(self.blocks.iter().cloned().collect()))
472 }
473 }
474 }
475
476 async fn ensure_peer_exists(&mut self, public_key: &P) -> (SocketAddr, bool) {
480 if !self.peers.contains_key(public_key) {
481 let socket = self.get_next_socket();
483 let peer = Peer::new(
484 self.context.with_label("peer"),
485 public_key.clone(),
486 socket,
487 self.max_size,
488 )
489 .await;
490
491 self.peers.insert(public_key.clone(), peer);
493
494 (socket, true)
495 } else {
496 (self.peers.get(public_key).unwrap().socket, false)
497 }
498 }
499
500 async fn broadcast_peer_list(&mut self) {
508 let peer_list = self.all_tracked_peers().into_iter().collect::<Vec<_>>();
509 let mut live_subscribers = Vec::with_capacity(self.peer_subscribers.len());
510 for mut subscriber in self.peer_subscribers.drain(..) {
511 if subscriber.send(peer_list.clone()).await.is_ok() {
512 live_subscribers.push(subscriber);
513 }
514 }
515 self.peer_subscribers = live_subscribers;
516 }
517
518 fn all_tracked_peers(&self) -> Set<P> {
524 if self.peer_sets.is_empty() && self.tracked_peer_sets.is_none() {
525 self.peers
526 .keys()
527 .cloned()
528 .try_collect()
529 .expect("BTreeMap keys are unique")
530 } else {
531 self.peer_refs
532 .keys()
533 .cloned()
534 .try_collect()
535 .expect("BTreeMap keys are unique")
536 }
537 }
538}
539
540impl<E: RNetwork + Spawner + Rng + Clock + Metrics, P: PublicKey> Network<E, P> {
541 fn process_completions(&mut self, completions: Vec<Completion<P>>) {
543 for completion in completions {
544 let Some(deliver_at) = completion.deliver_at else {
546 trace!(
547 origin = ?completion.origin,
548 recipient = ?completion.recipient,
549 "message dropped before delivery",
550 );
551 continue;
552 };
553
554 let key = (completion.origin.clone(), completion.recipient.clone());
556 let Some(link) = self.links.get_mut(&key) else {
557 trace!(
559 origin = ?completion.origin,
560 recipient = ?completion.recipient,
561 "missing link for completion",
562 );
563 continue;
564 };
565 if let Err(err) = link.send(completion.channel, completion.message, deliver_at) {
566 error!(?err, "failed to send");
567 }
568 }
569 }
570
571 fn handle_task(&mut self, task: Task<P>) {
576 let (channel, origin, recipients, message, reply) = task;
578 if self.tracked_peer_sets.is_some() && !self.peer_refs.contains_key(&origin) {
579 warn!(
580 ?origin,
581 reason = "not in tracked peer set",
582 "dropping message"
583 );
584 if let Err(err) = reply.send(Vec::new()) {
585 error!(?err, "failed to send ack");
586 }
587 return;
588 }
589
590 let recipients = match recipients {
592 Recipients::All => {
593 if self.peer_sets.is_empty() {
597 self.peers.keys().cloned().collect()
598 } else {
599 self.peer_refs.keys().cloned().collect()
600 }
601 }
602 Recipients::Some(keys) => keys,
603 Recipients::One(key) => vec![key],
604 };
605
606 let now = self.context.current();
608 let mut sent = Vec::new();
609 for recipient in recipients {
610 if recipient == origin {
612 trace!(?recipient, reason = "self", "dropping message");
613 continue;
614 }
615
616 if self.tracked_peer_sets.is_some() && !self.peer_refs.contains_key(&recipient) {
618 trace!(
619 ?origin,
620 ?recipient,
621 reason = "not in tracked peer set",
622 "dropping message"
623 );
624 continue;
625 }
626
627 let o_r = (origin.clone(), recipient.clone());
629 let r_o = (recipient.clone(), origin.clone());
630 if self.disconnect_on_block
631 && (self.blocks.contains(&o_r) || self.blocks.contains(&r_o))
632 {
633 trace!(?origin, ?recipient, reason = "blocked", "dropping message");
634 continue;
635 }
636
637 let Some(link) = self.links.get_mut(&o_r) else {
639 trace!(?origin, ?recipient, reason = "no link", "dropping message");
640 continue;
641 };
642
643 self.sent_messages
649 .get_or_create(&metrics::Message::new(&origin, &recipient, channel))
650 .inc();
651
652 let latency = Duration::from_millis(link.sampler.sample(&mut self.context) as u64);
654
655 let should_deliver = self.context.gen_bool(link.success_rate);
657
658 let completions = self.transmitter.enqueue(
660 now,
661 origin.clone(),
662 recipient.clone(),
663 channel,
664 message.clone(),
665 latency,
666 should_deliver,
667 );
668 self.process_completions(completions);
669
670 sent.push(recipient);
671 }
672
673 if let Err(err) = reply.send(sent) {
675 error!(?err, "failed to send ack");
676 }
677 }
678
679 pub fn start(mut self) -> Handle<()> {
684 spawn_cell!(self.context, self.run().await)
685 }
686
687 async fn run(mut self) {
688 select_loop! {
689 self.context,
690 on_start => {
691 let tick = match self.transmitter.next() {
692 Some(when) => Either::Left(self.context.sleep_until(when)),
693 None => Either::Right(future::pending()),
694 };
695 },
696 on_stopped => {},
697 _ = tick => {
698 let now = self.context.current();
699 let completions = self.transmitter.advance(now);
700 self.process_completions(completions);
701 },
702 Some(message) = self.ingress.recv() else break => {
703 self.handle_ingress(message).await;
704 },
705 Some(task) = self.receiver.recv() else break => {
706 self.handle_task(task);
707 },
708 }
709 }
710}
711
712pub struct ConnectedPeerProvider<P: PublicKey, E: Clock> {
717 mailbox: UnboundedMailbox<ingress::Message<P, E>>,
718}
719
720impl<P: PublicKey, E: Clock> Clone for ConnectedPeerProvider<P, E> {
721 fn clone(&self) -> Self {
722 Self {
723 mailbox: self.mailbox.clone(),
724 }
725 }
726}
727
728impl<P: PublicKey, E: Clock> ConnectedPeerProvider<P, E> {
729 const fn new(mailbox: UnboundedMailbox<ingress::Message<P, E>>) -> Self {
730 Self { mailbox }
731 }
732}
733
734impl<P: PublicKey, E: Clock> Connected for ConnectedPeerProvider<P, E> {
735 type PublicKey = P;
736
737 async fn subscribe(&mut self) -> ring::Receiver<Vec<Self::PublicKey>> {
738 self.mailbox
739 .0
740 .request(|response| ingress::Message::SubscribeConnected { response })
741 .await
742 .unwrap_or_else(|| {
743 let (_sender, receiver) = ring::channel(NZUsize!(1));
744 receiver
745 })
746 }
747}
748
749#[derive(Clone)]
753pub struct UnlimitedSender<P: PublicKey> {
754 me: P,
755 channel: Channel,
756 max_size: u32,
757 high: mpsc::UnboundedSender<Task<P>>,
758 low: mpsc::UnboundedSender<Task<P>>,
759}
760
761impl<P: PublicKey> crate::UnlimitedSender for UnlimitedSender<P> {
762 type Error = Error;
763 type PublicKey = P;
764
765 async fn send(
766 &mut self,
767 recipients: Recipients<P>,
768 message: impl Into<IoBufMut> + Send,
769 priority: bool,
770 ) -> Result<Vec<P>, Error> {
771 let message = message.into().freeze();
772
773 if message.len() > self.max_size as usize {
775 return Err(Error::MessageTooLarge(message.len()));
776 }
777
778 let (sender, receiver) = oneshot::channel();
780 let channel = if priority { &self.high } else { &self.low };
781 if channel
782 .send((self.channel, self.me.clone(), recipients, message, sender))
783 .is_err()
784 {
785 return Ok(Vec::new());
786 }
787 Ok(receiver.await.unwrap_or_default())
788 }
789}
790
791pub struct Sender<P: PublicKey, E: Clock> {
796 limited_sender: LimitedSender<E, UnlimitedSender<P>, ConnectedPeerProvider<P, E>>,
797}
798
799impl<P: PublicKey, E: Clock> Clone for Sender<P, E> {
800 fn clone(&self) -> Self {
801 Self {
802 limited_sender: self.limited_sender.clone(),
803 }
804 }
805}
806
807impl<P: PublicKey, E: Clock> Debug for Sender<P, E> {
808 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
809 f.debug_struct("Sender").finish_non_exhaustive()
810 }
811}
812
813impl<P: PublicKey, E: Clock> Sender<P, E> {
814 #[allow(clippy::too_many_arguments)]
815 fn new(
816 context: impl Spawner + Metrics,
817 me: P,
818 channel: Channel,
819 max_size: u32,
820 sender: mpsc::UnboundedSender<Task<P>>,
821 oracle_mailbox: UnboundedMailbox<ingress::Message<P, E>>,
822 clock: E,
823 quota: Quota,
824 ) -> (Self, Handle<()>) {
825 let (high, mut high_receiver) = mpsc::unbounded_channel();
827 let (low, mut low_receiver) = mpsc::unbounded_channel();
828 let processor = context.with_label("processor").spawn(move |_| async move {
829 loop {
830 let task;
832 select! {
833 high_task = high_receiver.recv() => {
834 task = match high_task {
835 Some(task) => task,
836 None => break,
837 };
838 },
839 low_task = low_receiver.recv() => {
840 task = match low_task {
841 Some(task) => task,
842 None => break,
843 };
844 },
845 }
846
847 if let Err(err) = sender.send(task) {
849 error!(?err, channel, "failed to send task");
850 }
851 }
852 });
853
854 let unlimited_sender = UnlimitedSender {
855 me,
856 channel,
857 max_size,
858 high,
859 low,
860 };
861 let peer_source = ConnectedPeerProvider::new(oracle_mailbox);
862 let limited_sender = LimitedSender::new(unlimited_sender, quota, clock, peer_source);
863
864 (Self { limited_sender }, processor)
865 }
866
867 pub fn split_with<F: SplitForwarder<P>>(
869 self,
870 forwarder: F,
871 ) -> (SplitSender<P, E, F>, SplitSender<P, E, F>) {
872 (
873 SplitSender {
874 replica: SplitOrigin::Primary,
875 inner: self.clone(),
876 forwarder: forwarder.clone(),
877 },
878 SplitSender {
879 replica: SplitOrigin::Secondary,
880 inner: self,
881 forwarder,
882 },
883 )
884 }
885}
886
887impl<P: PublicKey, E: Clock> crate::LimitedSender for Sender<P, E> {
888 type PublicKey = P;
889 type Checked<'a>
890 = crate::utils::limited::CheckedSender<'a, UnlimitedSender<P>>
891 where
892 Self: 'a;
893
894 async fn check(
895 &mut self,
896 recipients: Recipients<Self::PublicKey>,
897 ) -> Result<Self::Checked<'_>, SystemTime> {
898 self.limited_sender.check(recipients).await
899 }
900}
901
902pub struct SplitSender<P: PublicKey, E: Clock, F: SplitForwarder<P>> {
904 replica: SplitOrigin,
905 inner: Sender<P, E>,
906 forwarder: F,
907}
908
909impl<P: PublicKey, E: Clock, F: SplitForwarder<P>> Clone for SplitSender<P, E, F> {
910 fn clone(&self) -> Self {
911 Self {
912 replica: self.replica,
913 inner: self.inner.clone(),
914 forwarder: self.forwarder.clone(),
915 }
916 }
917}
918
919impl<P: PublicKey, E: Clock, F: SplitForwarder<P>> std::fmt::Debug for SplitSender<P, E, F> {
920 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
921 f.debug_struct("SplitSender")
922 .field("replica", &self.replica)
923 .field("inner", &self.inner)
924 .finish()
925 }
926}
927
928impl<P: PublicKey, E: Clock, F: SplitForwarder<P>> crate::LimitedSender for SplitSender<P, E, F> {
929 type PublicKey = P;
930 type Checked<'a> = SplitCheckedSender<'a, P, E, F>;
931
932 async fn check(
933 &mut self,
934 recipients: Recipients<Self::PublicKey>,
935 ) -> Result<Self::Checked<'_>, SystemTime> {
936 Ok(SplitCheckedSender {
937 checked: self.inner.limited_sender.check(recipients.clone()).await?,
940 replica: self.replica,
941 forwarder: self.forwarder.clone(),
942 recipients,
943
944 _phantom: std::marker::PhantomData,
945 })
946 }
947}
948
949pub struct SplitCheckedSender<'a, P: PublicKey, E: Clock, F: SplitForwarder<P>> {
954 checked: LimitedCheckedSender<'a, UnlimitedSender<P>>,
955 replica: SplitOrigin,
956 forwarder: F,
957 recipients: Recipients<P>,
958
959 _phantom: std::marker::PhantomData<E>,
960}
961
962impl<'a, P: PublicKey, E: Clock, F: SplitForwarder<P>> crate::CheckedSender
963 for SplitCheckedSender<'a, P, E, F>
964{
965 type PublicKey = P;
966 type Error = Error;
967
968 async fn send(
969 self,
970 message: impl Into<IoBufMut> + Send,
971 priority: bool,
972 ) -> Result<Vec<Self::PublicKey>, Self::Error> {
973 let message = message.into().freeze();
975
976 let Some(recipients) = (self.forwarder)(self.replica, &self.recipients, &message) else {
978 return Ok(Vec::new());
979 };
980
981 self.checked
988 .into_inner()
989 .send(recipients, message, priority)
990 .await
991 }
992}
993
994type MessageReceiver<P> = mpsc::UnboundedReceiver<Message<P>>;
995
996#[derive(Debug)]
998pub struct Receiver<P: PublicKey> {
999 receiver: MessageReceiver<P>,
1000}
1001
1002impl<P: PublicKey> crate::Receiver for Receiver<P> {
1003 type Error = Error;
1004 type PublicKey = P;
1005
1006 async fn recv(&mut self) -> Result<Message<Self::PublicKey>, Error> {
1007 self.receiver.recv().await.ok_or(Error::NetworkClosed)
1008 }
1009}
1010
1011impl<P: PublicKey> Receiver<P> {
1012 pub fn split_with<E: Spawner, R: SplitRouter<P>>(
1014 mut self,
1015 context: E,
1016 router: R,
1017 ) -> (Self, Self) {
1018 let (primary_tx, primary_rx) = mpsc::unbounded_channel();
1019 let (secondary_tx, secondary_rx) = mpsc::unbounded_channel();
1020 context.spawn(move |_| async move {
1021 while let Some(message) = self.receiver.recv().await {
1022 let direction = router(&message);
1024 match direction {
1025 SplitTarget::None => {}
1026 SplitTarget::Primary => {
1027 if let Err(err) = primary_tx.send(message) {
1028 error!(?err, "failed to send message to primary");
1029 }
1030 }
1031 SplitTarget::Secondary => {
1032 if let Err(err) = secondary_tx.send(message) {
1033 error!(?err, "failed to send message to secondary");
1034 }
1035 }
1036 SplitTarget::Both => {
1037 if let Err(err) = primary_tx.send(message.clone()) {
1038 error!(?err, "failed to send message to primary");
1039 }
1040 if let Err(err) = secondary_tx.send(message) {
1041 error!(?err, "failed to send message to secondary");
1042 }
1043 }
1044 }
1045
1046 if primary_tx.is_closed() && secondary_tx.is_closed() {
1048 break;
1049 }
1050 }
1051 });
1052
1053 (
1054 Self {
1055 receiver: primary_rx,
1056 },
1057 Self {
1058 receiver: secondary_rx,
1059 },
1060 )
1061 }
1062}
1063
1064struct Peer<P: PublicKey> {
1068 socket: SocketAddr,
1070
1071 control: mpsc::UnboundedSender<(Channel, Handle<()>, oneshot::Sender<MessageReceiver<P>>)>,
1073}
1074
1075impl<P: PublicKey> Peer<P> {
1076 async fn new<E: Spawner + RNetwork + Metrics + Clock>(
1081 context: E,
1082 public_key: P,
1083 socket: SocketAddr,
1084 max_size: u32,
1085 ) -> Self {
1086 #[allow(clippy::type_complexity)]
1089 let (control_sender, mut control_receiver): (
1090 mpsc::UnboundedSender<(Channel, Handle<()>, oneshot::Sender<MessageReceiver<P>>)>,
1091 _,
1092 ) = mpsc::unbounded_channel();
1093
1094 let (inbox_sender, mut inbox_receiver) = mpsc::unbounded_channel();
1097
1098 context.with_label("router").spawn(|context| async move {
1100 let mut mailboxes = HashMap::new();
1102
1103 select_loop! {
1105 context,
1106 on_stopped => {},
1107 Some((channel, sender, result_tx)) = control_receiver.recv() else break => {
1109 let (receiver_tx, receiver_rx) = mpsc::unbounded_channel();
1111 if let Some((_, existing_sender)) =
1112 mailboxes.insert(channel, (receiver_tx, sender))
1113 {
1114 warn!(?public_key, ?channel, "overwriting existing channel");
1115 existing_sender.abort();
1116 }
1117 result_tx.send(receiver_rx).unwrap();
1118 },
1119
1120 Some((channel, message)) = inbox_receiver.recv() else break => {
1122 match mailboxes.get_mut(&channel) {
1124 Some((receiver_tx, _)) => {
1125 if let Err(err) = receiver_tx.send(message) {
1126 debug!(?err, "failed to send message to mailbox");
1127 }
1128 }
1129 None => {
1130 trace!(
1131 recipient = ?public_key,
1132 channel,
1133 reason = "missing channel",
1134 "dropping message",
1135 );
1136 }
1137 }
1138 },
1139 }
1140 });
1141
1142 let (ready_tx, ready_rx) = oneshot::channel();
1144 context
1145 .with_label("listener")
1146 .spawn(move |context| async move {
1147 let mut listener = context.bind(socket).await.unwrap();
1149 let _ = ready_tx.send(());
1150
1151 while let Ok((_, _, mut stream)) = listener.accept().await {
1153 context.with_label("receiver").spawn({
1155 let inbox_sender = inbox_sender.clone();
1156 move |_| async move {
1157 let dialer = match recv_frame(&mut stream, max_size).await {
1159 Ok(data) => data,
1160 Err(_) => {
1161 error!("failed to receive public key from dialer");
1162 return;
1163 }
1164 };
1165 let Ok(dialer) = P::decode(dialer.coalesce()) else {
1166 error!("received public key is invalid");
1167 return;
1168 };
1169
1170 while let Ok(data) = recv_frame(&mut stream, max_size).await {
1172 let data = data.coalesce();
1173 let channel = Channel::from_be_bytes(
1174 data.as_ref()[..Channel::SIZE].try_into().unwrap(),
1175 );
1176 let message = data.slice(Channel::SIZE..);
1177 if let Err(err) =
1178 inbox_sender.send((channel, (dialer.clone(), message)))
1179 {
1180 debug!(?err, "failed to send message to mailbox");
1181 break;
1182 }
1183 }
1184 }
1185 });
1186 }
1187 });
1188
1189 let _ = ready_rx.await;
1191
1192 Self {
1194 socket,
1195 control: control_sender,
1196 }
1197 }
1198
1199 async fn register(
1204 &mut self,
1205 channel: Channel,
1206 sender: Handle<()>,
1207 ) -> Result<MessageReceiver<P>, Error> {
1208 let (result_tx, result_rx) = oneshot::channel();
1209 self.control
1210 .send((channel, sender, result_tx))
1211 .map_err(|_| Error::NetworkClosed)?;
1212 result_rx.await.map_err(|_| Error::NetworkClosed)
1213 }
1214}
1215
1216struct Link {
1219 sampler: Normal<f64>,
1220 success_rate: f64,
1221 inbox: mpsc::UnboundedSender<(Channel, IoBuf, SystemTime)>,
1223}
1224
1225impl Link {
1227 #[allow(clippy::too_many_arguments)]
1228 fn new<E: Spawner + RNetwork + Clock + Metrics, P: PublicKey>(
1229 context: &mut E,
1230 dialer: P,
1231 receiver: P,
1232 socket: SocketAddr,
1233 sampler: Normal<f64>,
1234 success_rate: f64,
1235 max_size: u32,
1236 received_messages: Family<metrics::Message, Counter>,
1237 ) -> Self {
1238 let (inbox, mut outbox) = mpsc::unbounded_channel::<(Channel, IoBuf, SystemTime)>();
1241 context.with_label("link").spawn(move |context| async move {
1242 let (mut sink, _) = context.dial(socket).await.unwrap();
1244 if let Err(err) = send_frame(&mut sink, dialer.as_ref().to_vec(), max_size).await {
1245 error!(?err, "failed to send public key to listener");
1246 return;
1247 }
1248
1249 while let Some((channel, message, receive_complete_at)) = outbox.recv().await {
1251 context.sleep_until(receive_complete_at).await;
1253
1254 let channel_bytes = channel.to_be_bytes();
1256 let mut data = Vec::with_capacity(channel_bytes.len() + message.len());
1257 data.extend_from_slice(&channel_bytes);
1258 data.extend_from_slice(message.as_ref());
1259 let _ = send_frame(&mut sink, data, max_size).await;
1260
1261 received_messages
1263 .get_or_create(&metrics::Message::new(&dialer, &receiver, channel))
1264 .inc();
1265 }
1266 });
1267
1268 Self {
1269 sampler,
1270 success_rate,
1271 inbox,
1272 }
1273 }
1274
1275 fn send(
1277 &mut self,
1278 channel: Channel,
1279 message: IoBuf,
1280 receive_complete_at: SystemTime,
1281 ) -> Result<(), Error> {
1282 self.inbox
1283 .send((channel, message, receive_complete_at))
1284 .map_err(|_| Error::NetworkClosed)?;
1285 Ok(())
1286 }
1287}
1288
1289#[cfg(test)]
1290mod tests {
1291 use super::*;
1292 use crate::{Manager, Provider, Receiver as _, Recipients, Sender as _};
1293 use commonware_cryptography::{ed25519, Signer as _};
1294 use commonware_runtime::{deterministic, Quota, Runner as _};
1295 use futures::FutureExt;
1296 use std::num::NonZeroU32;
1297
1298 const MAX_MESSAGE_SIZE: u32 = 1024 * 1024;
1299
1300 const TEST_QUOTA: Quota = Quota::per_second(NonZeroU32::MAX);
1302
1303 #[test]
1304 fn test_register_and_link() {
1305 let executor = deterministic::Runner::default();
1306 executor.start(|context| async move {
1307 let cfg = Config {
1308 max_size: MAX_MESSAGE_SIZE,
1309 disconnect_on_block: true,
1310 tracked_peer_sets: Some(3),
1311 };
1312 let network_context = context.with_label("network");
1313 let (network, oracle) = Network::new(network_context.clone(), cfg);
1314 network_context.spawn(|_| network.run());
1315
1316 let pk1 = ed25519::PrivateKey::from_seed(1).public_key();
1318 let pk2 = ed25519::PrivateKey::from_seed(2).public_key();
1319
1320 let mut manager = oracle.manager();
1322 manager
1323 .track(0, [pk1.clone(), pk2.clone()].try_into().unwrap())
1324 .await;
1325 let control = oracle.control(pk1.clone());
1326 control.register(0, TEST_QUOTA).await.unwrap();
1327 control.register(1, TEST_QUOTA).await.unwrap();
1328 let control = oracle.control(pk2.clone());
1329 control.register(0, TEST_QUOTA).await.unwrap();
1330 control.register(1, TEST_QUOTA).await.unwrap();
1331
1332 control.register(1, TEST_QUOTA).await.unwrap();
1334
1335 let link = ingress::Link {
1337 latency: Duration::from_millis(2),
1338 jitter: Duration::from_millis(1),
1339 success_rate: 0.9,
1340 };
1341 oracle
1342 .add_link(pk1.clone(), pk2.clone(), link.clone())
1343 .await
1344 .unwrap();
1345
1346 assert!(matches!(
1348 oracle.add_link(pk1, pk2, link).await,
1349 Err(Error::LinkExists)
1350 ));
1351 });
1352 }
1353
1354 #[test]
1355 fn test_split_channel_single() {
1356 let executor = deterministic::Runner::default();
1357 executor.start(|context| async move {
1358 let cfg = Config {
1359 max_size: MAX_MESSAGE_SIZE,
1360 disconnect_on_block: true,
1361 tracked_peer_sets: Some(3),
1362 };
1363 let network_context = context.with_label("network");
1364 let (network, oracle) = Network::new(network_context.clone(), cfg);
1365 network_context.spawn(|_| network.run());
1366
1367 let twin = ed25519::PrivateKey::from_seed(20).public_key();
1369 let peer_a = ed25519::PrivateKey::from_seed(21).public_key();
1370 let peer_b = ed25519::PrivateKey::from_seed(22).public_key();
1371
1372 let mut manager = oracle.manager();
1374 manager
1375 .track(
1376 0,
1377 [twin.clone(), peer_a.clone(), peer_b.clone()]
1378 .try_into()
1379 .unwrap(),
1380 )
1381 .await;
1382
1383 let (mut peer_a_sender, mut peer_a_recv) = oracle
1385 .control(peer_a.clone())
1386 .register(0, TEST_QUOTA)
1387 .await
1388 .unwrap();
1389 let (mut peer_b_sender, mut peer_b_recv) = oracle
1390 .control(peer_b.clone())
1391 .register(0, TEST_QUOTA)
1392 .await
1393 .unwrap();
1394
1395 let (twin_sender, twin_receiver) = oracle
1401 .control(twin.clone())
1402 .register(0, TEST_QUOTA)
1403 .await
1404 .unwrap();
1405 let peer_a_for_router = peer_a.clone();
1406 let peer_b_for_router = peer_b.clone();
1407 let (mut twin_primary_sender, mut twin_secondary_sender) =
1408 twin_sender.split_with(move |origin, _, _| match origin {
1409 SplitOrigin::Primary => Some(Recipients::One(peer_a_for_router.clone())),
1410 SplitOrigin::Secondary => Some(Recipients::One(peer_b_for_router.clone())),
1411 });
1412 let peer_a_for_recv = peer_a.clone();
1413 let peer_b_for_recv = peer_b.clone();
1414 let (mut twin_primary_recv, mut twin_secondary_recv) = twin_receiver.split_with(
1415 context.with_label("split_receiver"),
1416 move |(sender, _)| {
1417 if sender == &peer_a_for_recv {
1418 SplitTarget::Primary
1419 } else if sender == &peer_b_for_recv {
1420 SplitTarget::Secondary
1421 } else {
1422 panic!("unexpected sender");
1423 }
1424 },
1425 );
1426
1427 let link = ingress::Link {
1429 latency: Duration::from_millis(0),
1430 jitter: Duration::from_millis(0),
1431 success_rate: 1.0,
1432 };
1433 oracle
1434 .add_link(peer_a.clone(), twin.clone(), link.clone())
1435 .await
1436 .unwrap();
1437 oracle
1438 .add_link(twin.clone(), peer_a.clone(), link.clone())
1439 .await
1440 .unwrap();
1441 oracle
1442 .add_link(peer_b.clone(), twin.clone(), link.clone())
1443 .await
1444 .unwrap();
1445 oracle
1446 .add_link(twin.clone(), peer_b.clone(), link.clone())
1447 .await
1448 .unwrap();
1449
1450 peer_a_sender
1452 .send(Recipients::One(twin.clone()), b"from_a", false)
1453 .await
1454 .unwrap();
1455 peer_b_sender
1456 .send(Recipients::One(twin.clone()), b"from_b", false)
1457 .await
1458 .unwrap();
1459 twin_primary_sender
1460 .send(Recipients::All, b"primary_out", false)
1461 .await
1462 .unwrap();
1463 twin_secondary_sender
1464 .send(Recipients::All, b"secondary_out", false)
1465 .await
1466 .unwrap();
1467
1468 let (sender, payload) = twin_primary_recv.recv().await.unwrap();
1470 assert_eq!(sender, peer_a);
1471 assert_eq!(payload, b"from_a");
1472 let (sender, payload) = twin_secondary_recv.recv().await.unwrap();
1473 assert_eq!(sender, peer_b);
1474 assert_eq!(payload, b"from_b");
1475
1476 let (sender, payload) = peer_a_recv.recv().await.unwrap();
1478 assert_eq!(sender, twin);
1479 assert_eq!(payload, b"primary_out");
1480 let (sender, payload) = peer_b_recv.recv().await.unwrap();
1481 assert_eq!(sender, twin);
1482 assert_eq!(payload, b"secondary_out");
1483 });
1484 }
1485
1486 #[test]
1487 fn test_split_channel_both() {
1488 let executor = deterministic::Runner::default();
1489 executor.start(|context| async move {
1490 let cfg = Config {
1491 max_size: MAX_MESSAGE_SIZE,
1492 disconnect_on_block: true,
1493 tracked_peer_sets: Some(3),
1494 };
1495 let network_context = context.with_label("network");
1496 let (network, oracle) = Network::new(network_context.clone(), cfg);
1497 network_context.spawn(|_| network.run());
1498
1499 let twin = ed25519::PrivateKey::from_seed(30).public_key();
1501 let peer_c = ed25519::PrivateKey::from_seed(31).public_key();
1502
1503 let mut manager = oracle.manager();
1505 manager
1506 .track(0, [twin.clone(), peer_c.clone()].try_into().unwrap())
1507 .await;
1508
1509 let (mut peer_c_sender, _peer_c_recv) = oracle
1511 .control(peer_c.clone())
1512 .register(0, TEST_QUOTA)
1513 .await
1514 .unwrap();
1515
1516 let (twin_sender, twin_receiver) = oracle
1518 .control(twin.clone())
1519 .register(0, TEST_QUOTA)
1520 .await
1521 .unwrap();
1522 let (_twin_primary_sender, _twin_secondary_sender) =
1523 twin_sender.split_with(|_origin, recipients, _| Some(recipients.clone()));
1524 let (mut twin_primary_recv, mut twin_secondary_recv) = twin_receiver
1525 .split_with(context.with_label("split_receiver_both"), |_| {
1526 SplitTarget::Both
1527 });
1528
1529 let link = ingress::Link {
1531 latency: Duration::from_millis(0),
1532 jitter: Duration::from_millis(0),
1533 success_rate: 1.0,
1534 };
1535 oracle
1536 .add_link(peer_c.clone(), twin.clone(), link.clone())
1537 .await
1538 .unwrap();
1539 oracle
1540 .add_link(twin.clone(), peer_c.clone(), link)
1541 .await
1542 .unwrap();
1543
1544 peer_c_sender
1546 .send(Recipients::One(twin.clone()), b"to_both", false)
1547 .await
1548 .unwrap();
1549
1550 let (sender, payload) = twin_primary_recv.recv().await.unwrap();
1552 assert_eq!(sender, peer_c);
1553 assert_eq!(payload, b"to_both");
1554 let (sender, payload) = twin_secondary_recv.recv().await.unwrap();
1555 assert_eq!(sender, peer_c);
1556 assert_eq!(payload, b"to_both");
1557 });
1558 }
1559
1560 #[test]
1561 fn test_split_channel_none() {
1562 let executor = deterministic::Runner::default();
1563 executor.start(|context| async move {
1564 let cfg = Config {
1565 max_size: MAX_MESSAGE_SIZE,
1566 disconnect_on_block: true,
1567 tracked_peer_sets: Some(3),
1568 };
1569 let network_context = context.with_label("network");
1570 let (network, oracle) = Network::new(network_context.clone(), cfg);
1571 network_context.spawn(|_| network.run());
1572
1573 let twin = ed25519::PrivateKey::from_seed(30).public_key();
1575 let peer_c = ed25519::PrivateKey::from_seed(31).public_key();
1576
1577 let mut manager = oracle.manager();
1579 manager
1580 .track(0, [twin.clone(), peer_c.clone()].try_into().unwrap())
1581 .await;
1582
1583 let (mut peer_c_sender, _peer_c_recv) = oracle
1585 .control(peer_c.clone())
1586 .register(0, TEST_QUOTA)
1587 .await
1588 .unwrap();
1589
1590 let (twin_sender, twin_receiver) = oracle
1592 .control(twin.clone())
1593 .register(0, TEST_QUOTA)
1594 .await
1595 .unwrap();
1596 let (mut twin_primary_sender, mut twin_secondary_sender) =
1597 twin_sender.split_with(|_origin, _, _| None);
1598 let (mut twin_primary_recv, mut twin_secondary_recv) = twin_receiver
1599 .split_with(context.with_label("split_receiver_both"), |_| {
1600 SplitTarget::None
1601 });
1602
1603 let link = ingress::Link {
1605 latency: Duration::from_millis(0),
1606 jitter: Duration::from_millis(0),
1607 success_rate: 1.0,
1608 };
1609 oracle
1610 .add_link(peer_c.clone(), twin.clone(), link.clone())
1611 .await
1612 .unwrap();
1613 oracle
1614 .add_link(twin.clone(), peer_c.clone(), link)
1615 .await
1616 .unwrap();
1617
1618 let sent = peer_c_sender
1620 .send(Recipients::One(twin.clone()), b"to_both", false)
1621 .await
1622 .unwrap();
1623 assert_eq!(sent.len(), 1);
1624 assert_eq!(sent[0], twin);
1625
1626 context.sleep(Duration::from_millis(100)).await;
1628 assert!(twin_primary_recv.recv().now_or_never().is_none());
1629 assert!(twin_secondary_recv.recv().now_or_never().is_none());
1630
1631 let sent = twin_primary_sender
1633 .send(Recipients::One(peer_c.clone()), b"to_both", false)
1634 .await
1635 .unwrap();
1636 assert_eq!(sent.len(), 0);
1637
1638 let sent = twin_secondary_sender
1640 .send(Recipients::One(peer_c.clone()), b"to_both", false)
1641 .await
1642 .unwrap();
1643 assert_eq!(sent.len(), 0);
1644 });
1645 }
1646
1647 #[test]
1648 fn test_unordered_peer_sets() {
1649 let executor = deterministic::Runner::default();
1650 executor.start(|context| async move {
1651 let cfg = Config {
1652 max_size: MAX_MESSAGE_SIZE,
1653 disconnect_on_block: true,
1654 tracked_peer_sets: Some(3),
1655 };
1656 let network_context = context.with_label("network");
1657 let (network, oracle) = Network::new(network_context.clone(), cfg);
1658 network_context.spawn(|_| network.run());
1659
1660 let pk1 = ed25519::PrivateKey::from_seed(1).public_key();
1662 let pk2 = ed25519::PrivateKey::from_seed(2).public_key();
1663
1664 let mut manager = oracle.manager();
1666 let mut subscription = manager.subscribe().await;
1667
1668 manager
1670 .track(10, [pk1.clone(), pk2.clone()].try_into().unwrap())
1671 .await;
1672 let (id, new, all) = subscription.recv().await.unwrap();
1673 assert_eq!(id, 10);
1674 assert_eq!(new.len(), 2);
1675 assert_eq!(all.len(), 2);
1676
1677 let pk3 = ed25519::PrivateKey::from_seed(3).public_key();
1679 manager.track(9, [pk3.clone()].try_into().unwrap()).await;
1680
1681 let pk4 = ed25519::PrivateKey::from_seed(4).public_key();
1683 manager.track(11, [pk4.clone()].try_into().unwrap()).await;
1684 let (id, new, all) = subscription.recv().await.unwrap();
1685 assert_eq!(id, 11);
1686 assert_eq!(new, [pk4.clone()].try_into().unwrap());
1687 assert_eq!(all, [pk1, pk2, pk4].try_into().unwrap());
1688 });
1689 }
1690
1691 #[test]
1692 fn test_get_next_socket() {
1693 let cfg = Config {
1694 max_size: MAX_MESSAGE_SIZE,
1695 disconnect_on_block: true,
1696 tracked_peer_sets: None,
1697 };
1698 let runner = deterministic::Runner::default();
1699
1700 runner.start(|context| async move {
1701 type PublicKey = ed25519::PublicKey;
1702 let (mut network, _) =
1703 Network::<deterministic::Context, PublicKey>::new(context.clone(), cfg);
1704
1705 let mut original = network.next_addr;
1707 let next = network.get_next_socket();
1708 assert_eq!(next, original);
1709 let next = network.get_next_socket();
1710 original.set_port(1);
1711 assert_eq!(next, original);
1712
1713 let max_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(255, 0, 255, 255)), 65535);
1715 network.next_addr = max_addr;
1716 let next = network.get_next_socket();
1717 assert_eq!(next, max_addr);
1718 let next = network.get_next_socket();
1719 assert_eq!(
1720 next,
1721 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(255, 1, 0, 0)), 0)
1722 );
1723 });
1724 }
1725
1726 #[test]
1727 fn test_fifo_burst_same_recipient() {
1728 let cfg = Config {
1729 max_size: MAX_MESSAGE_SIZE,
1730 disconnect_on_block: true,
1731 tracked_peer_sets: Some(3),
1732 };
1733 let runner = deterministic::Runner::default();
1734
1735 runner.start(|context| async move {
1736 let (network, oracle) = Network::new(context.with_label("network"), cfg);
1737 let network_handle = network.start();
1738
1739 let sender_pk = ed25519::PrivateKey::from_seed(10).public_key();
1740 let recipient_pk = ed25519::PrivateKey::from_seed(11).public_key();
1741
1742 let mut manager = oracle.manager();
1743 manager
1744 .track(
1745 0,
1746 [sender_pk.clone(), recipient_pk.clone()]
1747 .try_into()
1748 .unwrap(),
1749 )
1750 .await;
1751 let (mut sender, _sender_recv) = oracle
1752 .control(sender_pk.clone())
1753 .register(0, TEST_QUOTA)
1754 .await
1755 .unwrap();
1756 let (_sender2, mut receiver) = oracle
1757 .control(recipient_pk.clone())
1758 .register(0, TEST_QUOTA)
1759 .await
1760 .unwrap();
1761
1762 oracle
1763 .limit_bandwidth(sender_pk.clone(), Some(5_000), None)
1764 .await
1765 .unwrap();
1766 oracle
1767 .limit_bandwidth(recipient_pk.clone(), None, Some(5_000))
1768 .await
1769 .unwrap();
1770
1771 oracle
1772 .add_link(
1773 sender_pk.clone(),
1774 recipient_pk.clone(),
1775 ingress::Link {
1776 latency: Duration::from_millis(0),
1777 jitter: Duration::from_millis(0),
1778 success_rate: 1.0,
1779 },
1780 )
1781 .await
1782 .unwrap();
1783
1784 const COUNT: usize = 50;
1785 let mut expected = Vec::with_capacity(COUNT);
1786 for i in 0..COUNT {
1787 let msg = vec![i as u8; 64];
1788 sender
1789 .send(Recipients::One(recipient_pk.clone()), msg.clone(), false)
1790 .await
1791 .unwrap();
1792 expected.push(msg);
1793 }
1794
1795 for expected_msg in expected {
1796 let (_pk, bytes) = receiver.recv().await.unwrap();
1797 assert_eq!(bytes, expected_msg.as_slice());
1798 }
1799
1800 drop(oracle);
1801 drop(sender);
1802 network_handle.abort();
1803 });
1804 }
1805
1806 #[test]
1807 fn test_broadcast_respects_transmit_latency() {
1808 let cfg = Config {
1809 max_size: MAX_MESSAGE_SIZE,
1810 disconnect_on_block: true,
1811 tracked_peer_sets: Some(3),
1812 };
1813 let runner = deterministic::Runner::default();
1814
1815 runner.start(|context| async move {
1816 let (network, oracle) = Network::new(context.with_label("network"), cfg);
1817 let network_handle = network.start();
1818
1819 let sender_pk = ed25519::PrivateKey::from_seed(42).public_key();
1820 let recipient_a = ed25519::PrivateKey::from_seed(43).public_key();
1821 let recipient_b = ed25519::PrivateKey::from_seed(44).public_key();
1822
1823 let mut manager = oracle.manager();
1824 manager
1825 .track(
1826 0,
1827 [sender_pk.clone(), recipient_a.clone(), recipient_b.clone()]
1828 .try_into()
1829 .unwrap(),
1830 )
1831 .await;
1832 let (mut sender, _recv_sender) = oracle
1833 .control(sender_pk.clone())
1834 .register(0, TEST_QUOTA)
1835 .await
1836 .unwrap();
1837 let (_sender2, mut recv_a) = oracle
1838 .control(recipient_a.clone())
1839 .register(0, TEST_QUOTA)
1840 .await
1841 .unwrap();
1842 let (_sender3, mut recv_b) = oracle
1843 .control(recipient_b.clone())
1844 .register(0, TEST_QUOTA)
1845 .await
1846 .unwrap();
1847
1848 oracle
1849 .limit_bandwidth(sender_pk.clone(), Some(1_000), None)
1850 .await
1851 .unwrap();
1852 oracle
1853 .limit_bandwidth(recipient_a.clone(), None, Some(1_000))
1854 .await
1855 .unwrap();
1856 oracle
1857 .limit_bandwidth(recipient_b.clone(), None, Some(1_000))
1858 .await
1859 .unwrap();
1860
1861 let link = ingress::Link {
1862 latency: Duration::from_millis(0),
1863 jitter: Duration::from_millis(0),
1864 success_rate: 1.0,
1865 };
1866 oracle
1867 .add_link(sender_pk.clone(), recipient_a.clone(), link.clone())
1868 .await
1869 .unwrap();
1870 oracle
1871 .add_link(sender_pk.clone(), recipient_b.clone(), link)
1872 .await
1873 .unwrap();
1874
1875 let big_msg = vec![7u8; 10_000];
1876 let start = context.current();
1877 sender
1878 .send(Recipients::All, big_msg.clone(), false)
1879 .await
1880 .unwrap();
1881
1882 let (_pk, received_a) = recv_a.recv().await.unwrap();
1883 assert_eq!(received_a, big_msg.as_slice());
1884 let elapsed_a = context.current().duration_since(start).unwrap();
1885 assert!(elapsed_a >= Duration::from_secs(20));
1886
1887 let (_pk, received_b) = recv_b.recv().await.unwrap();
1888 assert_eq!(received_b, big_msg.as_slice());
1889 let elapsed_b = context.current().duration_since(start).unwrap();
1890 assert!(elapsed_b >= Duration::from_secs(20));
1891
1892 assert!(elapsed_a.abs_diff(elapsed_b) <= Duration::from_secs(1));
1894
1895 drop(oracle);
1896 drop(sender);
1897 network_handle.abort();
1898 });
1899 }
1900}