1use super::{
4 ingress::{self, Oracle},
5 metrics,
6 transmitter::{self, Completion},
7 Error,
8};
9use crate::{
10 authenticated::UnboundedMailbox,
11 utils::limited::{CheckedSender as LimitedCheckedSender, Connected, LimitedSender},
12 Channel, Message, Recipients, UnlimitedSender as _,
13};
14use bytes::{Buf, Bytes};
15use commonware_codec::{DecodeExt, FixedSize};
16use commonware_cryptography::PublicKey;
17use commonware_macros::{select, select_loop};
18use commonware_runtime::{
19 spawn_cell, Clock, ContextCell, Handle, Listener as _, Metrics, Network as RNetwork, Quota,
20 Spawner,
21};
22use commonware_stream::utils::codec::{recv_frame, send_frame};
23use commonware_utils::{
24 channels::{fallible::FallibleExt, ring},
25 ordered::Set,
26 NZUsize, TryCollect,
27};
28use either::Either;
29use futures::{
30 channel::{mpsc, oneshot},
31 future, SinkExt, StreamExt,
32};
33use prometheus_client::metrics::{counter::Counter, family::Family};
34use rand::Rng;
35use rand_distr::{Distribution, Normal};
36use std::{
37 collections::{BTreeMap, BTreeSet, HashMap},
38 fmt::Debug,
39 net::{IpAddr, Ipv4Addr, SocketAddr},
40 time::{Duration, SystemTime},
41};
42use tracing::{debug, error, trace, warn};
43
44type Task<P> = (Channel, P, Recipients<P>, Bytes, oneshot::Sender<Vec<P>>);
46
47#[derive(Clone, Copy, Debug, PartialEq, Eq)]
49#[must_use]
50pub enum SplitTarget {
51 None,
52 Primary,
53 Secondary,
54 Both,
55}
56
57#[derive(Clone, Copy, Debug, PartialEq, Eq)]
59#[must_use]
60pub enum SplitOrigin {
61 Primary,
62 Secondary,
63}
64
65pub trait SplitForwarder<P: PublicKey>:
67 Fn(SplitOrigin, &Recipients<P>, &Bytes) -> Option<Recipients<P>> + Send + Sync + Clone + 'static
68{
69}
70
71impl<P: PublicKey, F> SplitForwarder<P> for F where
72 F: Fn(SplitOrigin, &Recipients<P>, &Bytes) -> Option<Recipients<P>>
73 + Send
74 + Sync
75 + Clone
76 + 'static
77{
78}
79
80pub trait SplitRouter<P: PublicKey>:
82 Fn(&Message<P>) -> SplitTarget + Send + Sync + 'static
83{
84}
85
86impl<P: PublicKey, F> SplitRouter<P> for F where
87 F: Fn(&Message<P>) -> SplitTarget + Send + Sync + 'static
88{
89}
90
91pub struct Config {
93 pub max_size: u32,
95
96 pub disconnect_on_block: bool,
100
101 pub tracked_peer_sets: Option<usize>,
107}
108
109pub struct Network<E: RNetwork + Spawner + Rng + Clock + Metrics, P: PublicKey> {
111 context: ContextCell<E>,
112
113 max_size: u32,
115
116 disconnect_on_block: bool,
120
121 next_addr: SocketAddr,
124
125 ingress: mpsc::UnboundedReceiver<ingress::Message<P, E>>,
127
128 oracle_mailbox: UnboundedMailbox<ingress::Message<P, E>>,
130
131 sender: mpsc::UnboundedSender<Task<P>>,
135 receiver: mpsc::UnboundedReceiver<Task<P>>,
136
137 links: HashMap<(P, P), Link>,
139
140 peers: BTreeMap<P, Peer<P>>,
142
143 peer_sets: BTreeMap<u64, Set<P>>,
145
146 peer_refs: BTreeMap<P, usize>,
148
149 tracked_peer_sets: Option<usize>,
151
152 blocks: BTreeSet<(P, P)>,
154
155 transmitter: transmitter::State<P>,
157
158 #[allow(clippy::type_complexity)]
160 subscribers: Vec<mpsc::UnboundedSender<(u64, Set<P>, Set<P>)>>,
161
162 peer_subscribers: Vec<ring::Sender<Vec<P>>>,
164
165 received_messages: Family<metrics::Message, Counter>,
167 sent_messages: Family<metrics::Message, Counter>,
168}
169
170impl<E: RNetwork + Spawner + Rng + Clock + Metrics, P: PublicKey> Network<E, P> {
171 pub fn new(mut context: E, cfg: Config) -> (Self, Oracle<P, E>) {
176 let (sender, receiver) = mpsc::unbounded();
177 let (oracle_mailbox, oracle_receiver) = UnboundedMailbox::new();
178 let sent_messages = Family::<metrics::Message, Counter>::default();
179 let received_messages = Family::<metrics::Message, Counter>::default();
180 context.register("messages_sent", "messages sent", sent_messages.clone());
181 context.register(
182 "messages_received",
183 "messages received",
184 received_messages.clone(),
185 );
186
187 let next_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::from_bits(context.next_u32())), 0);
189
190 (
191 Self {
192 context: ContextCell::new(context),
193 max_size: cfg.max_size,
194 disconnect_on_block: cfg.disconnect_on_block,
195 tracked_peer_sets: cfg.tracked_peer_sets,
196 next_addr,
197 ingress: oracle_receiver,
198 oracle_mailbox: oracle_mailbox.clone(),
199 sender,
200 receiver,
201 links: HashMap::new(),
202 peers: BTreeMap::new(),
203 peer_sets: BTreeMap::new(),
204 peer_refs: BTreeMap::new(),
205 blocks: BTreeSet::new(),
206 transmitter: transmitter::State::new(),
207 subscribers: Vec::new(),
208 peer_subscribers: Vec::new(),
209 received_messages,
210 sent_messages,
211 },
212 Oracle::new(oracle_mailbox),
213 )
214 }
215
216 fn get_next_socket(&mut self) -> SocketAddr {
221 let result = self.next_addr;
222
223 match self.next_addr.port().checked_add(1) {
226 Some(port) => {
227 self.next_addr.set_port(port);
228 }
229 None => {
230 let ip = match self.next_addr.ip() {
231 IpAddr::V4(ipv4) => ipv4,
232 _ => unreachable!(),
233 };
234 let next_ip = Ipv4Addr::to_bits(ip).wrapping_add(1);
235 self.next_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::from_bits(next_ip)), 0);
236 }
237 }
238
239 result
240 }
241
242 async fn handle_ingress(&mut self, message: ingress::Message<P, E>) {
246 fn send_result<T: std::fmt::Debug>(
250 result: oneshot::Sender<Result<T, Error>>,
251 value: Result<T, Error>,
252 ) {
253 let success = value.is_ok();
254 if let Err(e) = result.send(value) {
255 error!(?e, "failed to send result to oracle (ok = {})", success);
256 }
257 }
258
259 match message {
260 ingress::Message::Update { id, peers } => {
261 let Some(tracked_peer_sets) = self.tracked_peer_sets else {
262 warn!("attempted to register peer set when tracking is disabled");
263 return;
264 };
265
266 if self.peer_sets.contains_key(&id) {
268 warn!(id, "peer set already exists");
269 return;
270 }
271
272 if let Some((last, _)) = self.peer_sets.last_key_value() {
274 if id <= *last {
275 warn!(
276 new_id = id,
277 old_id = last,
278 "attempted to register peer set with non-monotonically increasing ID"
279 );
280 return;
281 }
282 }
283
284 for public_key in peers.iter() {
286 self.ensure_peer_exists(public_key).await;
288
289 *self.peer_refs.entry(public_key.clone()).or_insert(0) += 1;
291 }
292 self.peer_sets.insert(id, peers.clone());
293
294 while self.peer_sets.len() > tracked_peer_sets {
296 let (id, set) = self.peer_sets.pop_first().unwrap();
297 debug!(id, "removed oldest peer set");
298
299 for public_key in set.iter() {
301 let refs = self.peer_refs.get_mut(public_key).unwrap();
302 *refs = refs.checked_sub(1).expect("reference count underflow");
303
304 if *refs == 0 {
307 self.peer_refs.remove(public_key);
308 debug!(?public_key, "removed peer no longer in any tracked set");
309 }
310 }
311 }
312
313 let all = self.all_tracked_peers();
315 let notification = (id, peers, all);
316 self.subscribers
317 .retain(|subscriber| subscriber.send_lossy(notification.clone()));
318
319 self.broadcast_peer_list().await;
321 }
322 ingress::Message::Register {
323 channel,
324 public_key,
325 quota,
326 result,
327 } => {
328 let (_, is_new) = self.ensure_peer_exists(&public_key).await;
330
331 if is_new && self.peer_sets.is_empty() {
333 self.broadcast_peer_list().await;
334 }
335
336 let clock = self
338 .context
339 .with_label(&format!("rate_limiter_{channel}_{public_key}"))
340 .take();
341
342 let (sender, handle) = Sender::new(
344 self.context.with_label("sender"),
345 public_key.clone(),
346 channel,
347 self.max_size,
348 self.sender.clone(),
349 self.oracle_mailbox.clone(),
350 clock,
351 quota,
352 );
353
354 let peer = self.peers.get_mut(&public_key).unwrap();
356 let receiver = match peer.register(channel, handle).await {
357 Ok(receiver) => Receiver { receiver },
358 Err(err) => return send_result(result, Err(err)),
359 };
360
361 send_result(result, Ok((sender, receiver)))
362 }
363 ingress::Message::PeerSet { id, response } => {
364 if self.peer_sets.is_empty() {
365 let _ = response.send(Some(
367 self.peers
368 .keys()
369 .cloned()
370 .try_collect()
371 .expect("BTreeMap keys are unique"),
372 ));
373 } else {
374 let _ = response.send(self.peer_sets.get(&id).cloned());
376 }
377 }
378 ingress::Message::Subscribe { sender } => {
379 if let Some((index, peers)) = self.peer_sets.last_key_value() {
381 let all = self.all_tracked_peers();
382 let notification = (*index, peers.clone(), all);
383 sender.send_lossy(notification);
384 }
385 self.subscribers.push(sender);
386 }
387 ingress::Message::SubscribeConnected { response } => {
388 let (mut sender, receiver) = ring::channel(NZUsize!(1));
390
391 let peer_list: Vec<P> = self.all_tracked_peers().into_iter().collect();
393 let _ = sender.send(peer_list).await;
394
395 self.peer_subscribers.push(sender);
397
398 let _ = response.send(receiver);
400 }
401 ingress::Message::LimitBandwidth {
402 public_key,
403 egress_cap,
404 ingress_cap,
405 result,
406 } => {
407 let (_, is_new) = self.ensure_peer_exists(&public_key).await;
409
410 if is_new && self.peer_sets.is_empty() {
412 self.broadcast_peer_list().await;
413 }
414
415 let now = self.context.current();
417 let completions = self
418 .transmitter
419 .limit(now, &public_key, egress_cap, ingress_cap);
420 self.process_completions(completions);
421
422 let _ = result.send(());
424 }
425 ingress::Message::AddLink {
426 sender,
427 receiver,
428 sampler,
429 success_rate,
430 result,
431 } => {
432 let (_, sender_is_new) = self.ensure_peer_exists(&sender).await;
434 let (receiver_socket, receiver_is_new) = self.ensure_peer_exists(&receiver).await;
435
436 if (sender_is_new || receiver_is_new) && self.peer_sets.is_empty() {
438 self.broadcast_peer_list().await;
439 }
440
441 let key = (sender.clone(), receiver.clone());
443 if self.links.contains_key(&key) {
444 return send_result(result, Err(Error::LinkExists));
445 }
446
447 let link = Link::new(
448 &mut self.context,
449 sender,
450 receiver,
451 receiver_socket,
452 sampler,
453 success_rate,
454 self.max_size,
455 self.received_messages.clone(),
456 );
457 self.links.insert(key, link);
458 send_result(result, Ok(()))
459 }
460 ingress::Message::RemoveLink {
461 sender,
462 receiver,
463 result,
464 } => {
465 match self.links.remove(&(sender, receiver)) {
466 Some(_) => (),
467 None => return send_result(result, Err(Error::LinkMissing)),
468 }
469 send_result(result, Ok(()))
470 }
471 ingress::Message::Block { from, to } => {
472 self.blocks.insert((from, to));
473 }
474 ingress::Message::Blocked { result } => {
475 send_result(result, Ok(self.blocks.iter().cloned().collect()))
476 }
477 }
478 }
479
480 async fn ensure_peer_exists(&mut self, public_key: &P) -> (SocketAddr, bool) {
484 if !self.peers.contains_key(public_key) {
485 let socket = self.get_next_socket();
487 let peer = Peer::new(
488 self.context.with_label("peer"),
489 public_key.clone(),
490 socket,
491 self.max_size,
492 )
493 .await;
494
495 self.peers.insert(public_key.clone(), peer);
497
498 (socket, true)
499 } else {
500 (self.peers.get(public_key).unwrap().socket, false)
501 }
502 }
503
504 async fn broadcast_peer_list(&mut self) {
512 let peer_list = self.all_tracked_peers().into_iter().collect::<Vec<_>>();
513 let mut live_subscribers = Vec::with_capacity(self.peer_subscribers.len());
514 for mut subscriber in self.peer_subscribers.drain(..) {
515 if subscriber.send(peer_list.clone()).await.is_ok() {
516 live_subscribers.push(subscriber);
517 }
518 }
519 self.peer_subscribers = live_subscribers;
520 }
521
522 fn all_tracked_peers(&self) -> Set<P> {
528 if self.peer_sets.is_empty() && self.tracked_peer_sets.is_none() {
529 self.peers
530 .keys()
531 .cloned()
532 .try_collect()
533 .expect("BTreeMap keys are unique")
534 } else {
535 self.peer_refs
536 .keys()
537 .cloned()
538 .try_collect()
539 .expect("BTreeMap keys are unique")
540 }
541 }
542}
543
544impl<E: RNetwork + Spawner + Rng + Clock + Metrics, P: PublicKey> Network<E, P> {
545 fn process_completions(&mut self, completions: Vec<Completion<P>>) {
547 for completion in completions {
548 let Some(deliver_at) = completion.deliver_at else {
550 trace!(
551 origin = ?completion.origin,
552 recipient = ?completion.recipient,
553 "message dropped before delivery",
554 );
555 continue;
556 };
557
558 let key = (completion.origin.clone(), completion.recipient.clone());
560 let Some(link) = self.links.get_mut(&key) else {
561 trace!(
563 origin = ?completion.origin,
564 recipient = ?completion.recipient,
565 "missing link for completion",
566 );
567 continue;
568 };
569 if let Err(err) = link.send(completion.channel, completion.message, deliver_at) {
570 error!(?err, "failed to send");
571 }
572 }
573 }
574
575 fn handle_task(&mut self, task: Task<P>) {
580 let (channel, origin, recipients, message, reply) = task;
582 if self.tracked_peer_sets.is_some() && !self.peer_refs.contains_key(&origin) {
583 warn!(
584 ?origin,
585 reason = "not in tracked peer set",
586 "dropping message"
587 );
588 if let Err(err) = reply.send(Vec::new()) {
589 error!(?err, "failed to send ack");
590 }
591 return;
592 }
593
594 let recipients = match recipients {
596 Recipients::All => {
597 if self.peer_sets.is_empty() {
601 self.peers.keys().cloned().collect()
602 } else {
603 self.peer_refs.keys().cloned().collect()
604 }
605 }
606 Recipients::Some(keys) => keys,
607 Recipients::One(key) => vec![key],
608 };
609
610 let now = self.context.current();
612 let mut sent = Vec::new();
613 for recipient in recipients {
614 if recipient == origin {
616 trace!(?recipient, reason = "self", "dropping message");
617 continue;
618 }
619
620 if self.tracked_peer_sets.is_some() && !self.peer_refs.contains_key(&recipient) {
622 trace!(
623 ?origin,
624 ?recipient,
625 reason = "not in tracked peer set",
626 "dropping message"
627 );
628 continue;
629 }
630
631 let o_r = (origin.clone(), recipient.clone());
633 let r_o = (recipient.clone(), origin.clone());
634 if self.disconnect_on_block
635 && (self.blocks.contains(&o_r) || self.blocks.contains(&r_o))
636 {
637 trace!(?origin, ?recipient, reason = "blocked", "dropping message");
638 continue;
639 }
640
641 let Some(link) = self.links.get_mut(&o_r) else {
643 trace!(?origin, ?recipient, reason = "no link", "dropping message");
644 continue;
645 };
646
647 self.sent_messages
653 .get_or_create(&metrics::Message::new(&origin, &recipient, channel))
654 .inc();
655
656 let latency = Duration::from_millis(link.sampler.sample(&mut self.context) as u64);
658
659 let should_deliver = self.context.gen_bool(link.success_rate);
661
662 let completions = self.transmitter.enqueue(
664 now,
665 origin.clone(),
666 recipient.clone(),
667 channel,
668 message.clone(),
669 latency,
670 should_deliver,
671 );
672 self.process_completions(completions);
673
674 sent.push(recipient);
675 }
676
677 if let Err(err) = reply.send(sent) {
679 error!(?err, "failed to send ack");
680 }
681 }
682
683 pub fn start(mut self) -> Handle<()> {
688 spawn_cell!(self.context, self.run().await)
689 }
690
691 async fn run(mut self) {
692 loop {
693 let tick = match self.transmitter.next() {
694 Some(when) => Either::Left(self.context.sleep_until(when)),
695 None => Either::Right(future::pending()),
696 };
697 select! {
698 _ = tick => {
699 let now = self.context.current();
700 let completions = self.transmitter.advance(now);
701 self.process_completions(completions);
702 },
703 message = self.ingress.next() => {
704 let message = match message {
706 Some(message) => message,
707 None => break,
708 };
709 self.handle_ingress(message).await;
710 },
711 task = self.receiver.next() => {
712 let task = match task {
714 Some(task) => task,
715 None => break,
716 };
717 self.handle_task(task);
718 },
719 }
720 }
721 }
722}
723
724pub struct ConnectedPeerProvider<P: PublicKey, E: Clock> {
729 mailbox: UnboundedMailbox<ingress::Message<P, E>>,
730}
731
732impl<P: PublicKey, E: Clock> Clone for ConnectedPeerProvider<P, E> {
733 fn clone(&self) -> Self {
734 Self {
735 mailbox: self.mailbox.clone(),
736 }
737 }
738}
739
740impl<P: PublicKey, E: Clock> ConnectedPeerProvider<P, E> {
741 const fn new(mailbox: UnboundedMailbox<ingress::Message<P, E>>) -> Self {
742 Self { mailbox }
743 }
744}
745
746impl<P: PublicKey, E: Clock> Connected for ConnectedPeerProvider<P, E> {
747 type PublicKey = P;
748
749 async fn subscribe(&mut self) -> ring::Receiver<Vec<Self::PublicKey>> {
750 self.mailbox
751 .0
752 .request(|response| ingress::Message::SubscribeConnected { response })
753 .await
754 .unwrap_or_else(|| {
755 let (_sender, receiver) = ring::channel(NZUsize!(1));
756 receiver
757 })
758 }
759}
760
761#[derive(Clone)]
765pub struct UnlimitedSender<P: PublicKey> {
766 me: P,
767 channel: Channel,
768 max_size: u32,
769 high: mpsc::UnboundedSender<Task<P>>,
770 low: mpsc::UnboundedSender<Task<P>>,
771}
772
773impl<P: PublicKey> crate::UnlimitedSender for UnlimitedSender<P> {
774 type Error = Error;
775 type PublicKey = P;
776
777 async fn send(
778 &mut self,
779 recipients: Recipients<P>,
780 mut message: impl Buf + Send,
781 priority: bool,
782 ) -> Result<Vec<P>, Error> {
783 if message.remaining() > self.max_size as usize {
785 return Err(Error::MessageTooLarge(message.remaining()));
786 }
787
788 let (sender, receiver) = oneshot::channel();
790 let channel = if priority { &self.high } else { &self.low };
791 let message = message.copy_to_bytes(message.remaining());
792 if channel
793 .unbounded_send((self.channel, self.me.clone(), recipients, message, sender))
794 .is_err()
795 {
796 return Ok(Vec::new());
797 }
798 Ok(receiver.await.unwrap_or_default())
799 }
800}
801
802pub struct Sender<P: PublicKey, E: Clock> {
807 limited_sender: LimitedSender<E, UnlimitedSender<P>, ConnectedPeerProvider<P, E>>,
808}
809
810impl<P: PublicKey, E: Clock> Clone for Sender<P, E> {
811 fn clone(&self) -> Self {
812 Self {
813 limited_sender: self.limited_sender.clone(),
814 }
815 }
816}
817
818impl<P: PublicKey, E: Clock> Debug for Sender<P, E> {
819 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
820 f.debug_struct("Sender").finish_non_exhaustive()
821 }
822}
823
824impl<P: PublicKey, E: Clock> Sender<P, E> {
825 #[allow(clippy::too_many_arguments)]
826 fn new(
827 context: impl Spawner + Metrics,
828 me: P,
829 channel: Channel,
830 max_size: u32,
831 mut sender: mpsc::UnboundedSender<Task<P>>,
832 oracle_mailbox: UnboundedMailbox<ingress::Message<P, E>>,
833 clock: E,
834 quota: Quota,
835 ) -> (Self, Handle<()>) {
836 let (high, mut high_receiver) = mpsc::unbounded();
838 let (low, mut low_receiver) = mpsc::unbounded();
839 let processor = context.with_label("processor").spawn(move |_| async move {
840 loop {
841 let task;
843 select! {
844 high_task = high_receiver.next() => {
845 task = match high_task {
846 Some(task) => task,
847 None => break,
848 };
849 },
850 low_task = low_receiver.next() => {
851 task = match low_task {
852 Some(task) => task,
853 None => break,
854 };
855 }
856 }
857
858 if let Err(err) = sender.send(task).await {
860 error!(?err, channel, "failed to send task");
861 }
862 }
863 });
864
865 let unlimited_sender = UnlimitedSender {
866 me,
867 channel,
868 max_size,
869 high,
870 low,
871 };
872 let peer_source = ConnectedPeerProvider::new(oracle_mailbox);
873 let limited_sender = LimitedSender::new(unlimited_sender, quota, clock, peer_source);
874
875 (Self { limited_sender }, processor)
876 }
877
878 pub fn split_with<F: SplitForwarder<P>>(
880 self,
881 forwarder: F,
882 ) -> (SplitSender<P, E, F>, SplitSender<P, E, F>) {
883 (
884 SplitSender {
885 replica: SplitOrigin::Primary,
886 inner: self.clone(),
887 forwarder: forwarder.clone(),
888 },
889 SplitSender {
890 replica: SplitOrigin::Secondary,
891 inner: self,
892 forwarder,
893 },
894 )
895 }
896}
897
898impl<P: PublicKey, E: Clock> crate::LimitedSender for Sender<P, E> {
899 type PublicKey = P;
900 type Checked<'a>
901 = crate::utils::limited::CheckedSender<'a, UnlimitedSender<P>>
902 where
903 Self: 'a;
904
905 async fn check(
906 &mut self,
907 recipients: Recipients<Self::PublicKey>,
908 ) -> Result<Self::Checked<'_>, SystemTime> {
909 self.limited_sender.check(recipients).await
910 }
911}
912
913pub struct SplitSender<P: PublicKey, E: Clock, F: SplitForwarder<P>> {
915 replica: SplitOrigin,
916 inner: Sender<P, E>,
917 forwarder: F,
918}
919
920impl<P: PublicKey, E: Clock, F: SplitForwarder<P>> Clone for SplitSender<P, E, F> {
921 fn clone(&self) -> Self {
922 Self {
923 replica: self.replica,
924 inner: self.inner.clone(),
925 forwarder: self.forwarder.clone(),
926 }
927 }
928}
929
930impl<P: PublicKey, E: Clock, F: SplitForwarder<P>> std::fmt::Debug for SplitSender<P, E, F> {
931 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
932 f.debug_struct("SplitSender")
933 .field("replica", &self.replica)
934 .field("inner", &self.inner)
935 .finish()
936 }
937}
938
939impl<P: PublicKey, E: Clock, F: SplitForwarder<P>> crate::LimitedSender for SplitSender<P, E, F> {
940 type PublicKey = P;
941 type Checked<'a> = SplitCheckedSender<'a, P, E, F>;
942
943 async fn check(
944 &mut self,
945 recipients: Recipients<Self::PublicKey>,
946 ) -> Result<Self::Checked<'_>, SystemTime> {
947 Ok(SplitCheckedSender {
948 checked: self.inner.limited_sender.check(recipients.clone()).await?,
951 replica: self.replica,
952 forwarder: self.forwarder.clone(),
953 recipients,
954
955 _phantom: std::marker::PhantomData,
956 })
957 }
958}
959
960pub struct SplitCheckedSender<'a, P: PublicKey, E: Clock, F: SplitForwarder<P>> {
965 checked: LimitedCheckedSender<'a, UnlimitedSender<P>>,
966 replica: SplitOrigin,
967 forwarder: F,
968 recipients: Recipients<P>,
969
970 _phantom: std::marker::PhantomData<E>,
971}
972
973impl<'a, P: PublicKey, E: Clock, F: SplitForwarder<P>> crate::CheckedSender
974 for SplitCheckedSender<'a, P, E, F>
975{
976 type PublicKey = P;
977 type Error = Error;
978
979 async fn send(
980 self,
981 mut message: impl Buf + Send,
982 priority: bool,
983 ) -> Result<Vec<Self::PublicKey>, Self::Error> {
984 let message = message.copy_to_bytes(message.remaining());
986
987 let Some(recipients) = (self.forwarder)(self.replica, &self.recipients, &message) else {
989 return Ok(Vec::new());
990 };
991
992 self.checked
999 .into_inner()
1000 .send(recipients, message, priority)
1001 .await
1002 }
1003}
1004
1005type MessageReceiver<P> = mpsc::UnboundedReceiver<Message<P>>;
1006
1007#[derive(Debug)]
1009pub struct Receiver<P: PublicKey> {
1010 receiver: MessageReceiver<P>,
1011}
1012
1013impl<P: PublicKey> crate::Receiver for Receiver<P> {
1014 type Error = Error;
1015 type PublicKey = P;
1016
1017 async fn recv(&mut self) -> Result<Message<Self::PublicKey>, Error> {
1018 self.receiver.next().await.ok_or(Error::NetworkClosed)
1019 }
1020}
1021
1022impl<P: PublicKey> Receiver<P> {
1023 pub fn split_with<E: Spawner, R: SplitRouter<P>>(
1025 mut self,
1026 context: E,
1027 router: R,
1028 ) -> (Self, Self) {
1029 let (mut primary_tx, primary_rx) = mpsc::unbounded();
1030 let (mut secondary_tx, secondary_rx) = mpsc::unbounded();
1031 context.spawn(move |_| async move {
1032 while let Some(message) = self.receiver.next().await {
1033 let direction = router(&message);
1035 match direction {
1036 SplitTarget::None => {}
1037 SplitTarget::Primary => {
1038 if let Err(err) = primary_tx.send(message).await {
1039 error!(?err, "failed to send message to primary");
1040 }
1041 }
1042 SplitTarget::Secondary => {
1043 if let Err(err) = secondary_tx.send(message).await {
1044 error!(?err, "failed to send message to secondary");
1045 }
1046 }
1047 SplitTarget::Both => {
1048 if let Err(err) = primary_tx.send(message.clone()).await {
1049 error!(?err, "failed to send message to primary");
1050 }
1051 if let Err(err) = secondary_tx.send(message).await {
1052 error!(?err, "failed to send message to secondary");
1053 }
1054 }
1055 }
1056
1057 if primary_tx.is_closed() && secondary_tx.is_closed() {
1059 break;
1060 }
1061 }
1062 });
1063
1064 (
1065 Self {
1066 receiver: primary_rx,
1067 },
1068 Self {
1069 receiver: secondary_rx,
1070 },
1071 )
1072 }
1073}
1074
1075struct Peer<P: PublicKey> {
1079 socket: SocketAddr,
1081
1082 control: mpsc::UnboundedSender<(Channel, Handle<()>, oneshot::Sender<MessageReceiver<P>>)>,
1084}
1085
1086impl<P: PublicKey> Peer<P> {
1087 async fn new<E: Spawner + RNetwork + Metrics + Clock>(
1092 context: E,
1093 public_key: P,
1094 socket: SocketAddr,
1095 max_size: u32,
1096 ) -> Self {
1097 let (control_sender, mut control_receiver) = mpsc::unbounded();
1100
1101 let (inbox_sender, mut inbox_receiver) = mpsc::unbounded();
1104
1105 context.with_label("router").spawn(|context| async move {
1107 let mut mailboxes = HashMap::new();
1109
1110 select_loop! {
1112 context,
1113 on_stopped => {},
1114 control = control_receiver.next() => {
1116 let (channel, sender, result_tx): (Channel, Handle<()>, oneshot::Sender<MessageReceiver<P>>) = match control {
1118 Some(control) => control,
1119 None => break,
1120 };
1121
1122 let (receiver_tx, receiver_rx) = mpsc::unbounded();
1124 if let Some((_, existing_sender)) = mailboxes.insert(channel, (receiver_tx, sender)) {
1125 warn!(?public_key, ?channel, "overwriting existing channel");
1126 existing_sender.abort();
1127 }
1128 result_tx.send(receiver_rx).unwrap();
1129 },
1130
1131 inbox = inbox_receiver.next() => {
1133 let (channel, message) = match inbox {
1135 Some(message) => message,
1136 None => break,
1137 };
1138
1139 match mailboxes.get_mut(&channel) {
1141 Some((receiver_tx, _)) => {
1142 if let Err(err) = receiver_tx.send(message).await {
1143 debug!(?err, "failed to send message to mailbox");
1144 }
1145 }
1146 None => {
1147 trace!(
1148 recipient = ?public_key,
1149 channel,
1150 reason = "missing channel",
1151 "dropping message",
1152 );
1153 }
1154 }
1155 },
1156 }
1157 });
1158
1159 let (ready_tx, ready_rx) = oneshot::channel();
1161 context
1162 .with_label("listener")
1163 .spawn(move |context| async move {
1164 let mut listener = context.bind(socket).await.unwrap();
1166 let _ = ready_tx.send(());
1167
1168 while let Ok((_, _, mut stream)) = listener.accept().await {
1170 context.with_label("receiver").spawn({
1172 let mut inbox_sender = inbox_sender.clone();
1173 move |_| async move {
1174 let dialer = match recv_frame(&mut stream, max_size).await {
1176 Ok(data) => data,
1177 Err(_) => {
1178 error!("failed to receive public key from dialer");
1179 return;
1180 }
1181 };
1182 let Ok(dialer) = P::decode(dialer.as_ref()) else {
1183 error!("received public key is invalid");
1184 return;
1185 };
1186
1187 while let Ok(data) = recv_frame(&mut stream, max_size).await {
1189 let channel = Channel::from_be_bytes(
1190 data[..Channel::SIZE].try_into().unwrap(),
1191 );
1192 let message = data.slice(Channel::SIZE..);
1193 if let Err(err) = inbox_sender
1194 .send((channel, (dialer.clone(), message)))
1195 .await
1196 {
1197 debug!(?err, "failed to send message to mailbox");
1198 break;
1199 }
1200 }
1201 }
1202 });
1203 }
1204 });
1205
1206 let _ = ready_rx.await;
1208
1209 Self {
1211 socket,
1212 control: control_sender,
1213 }
1214 }
1215
1216 async fn register(
1221 &mut self,
1222 channel: Channel,
1223 sender: Handle<()>,
1224 ) -> Result<MessageReceiver<P>, Error> {
1225 let (result_tx, result_rx) = oneshot::channel();
1226 self.control
1227 .send((channel, sender, result_tx))
1228 .await
1229 .map_err(|_| Error::NetworkClosed)?;
1230 result_rx.await.map_err(|_| Error::NetworkClosed)
1231 }
1232}
1233
1234struct Link {
1237 sampler: Normal<f64>,
1238 success_rate: f64,
1239 inbox: mpsc::UnboundedSender<(Channel, Bytes, SystemTime)>,
1241}
1242
1243impl Link {
1245 #[allow(clippy::too_many_arguments)]
1246 fn new<E: Spawner + RNetwork + Clock + Metrics, P: PublicKey>(
1247 context: &mut E,
1248 dialer: P,
1249 receiver: P,
1250 socket: SocketAddr,
1251 sampler: Normal<f64>,
1252 success_rate: f64,
1253 max_size: u32,
1254 received_messages: Family<metrics::Message, Counter>,
1255 ) -> Self {
1256 let (inbox, mut outbox) = mpsc::unbounded::<(Channel, Bytes, SystemTime)>();
1259 context.with_label("link").spawn(move |context| async move {
1260 let (mut sink, _) = context.dial(socket).await.unwrap();
1262 if let Err(err) = send_frame(&mut sink, dialer.as_ref(), max_size).await {
1263 error!(?err, "failed to send public key to listener");
1264 return;
1265 }
1266
1267 while let Some((channel, message, receive_complete_at)) = outbox.next().await {
1269 context.sleep_until(receive_complete_at).await;
1271
1272 let data = Bytes::from_owner(channel.to_be_bytes()).chain(message);
1274 let _ = send_frame(&mut sink, data, max_size).await;
1275
1276 received_messages
1278 .get_or_create(&metrics::Message::new(&dialer, &receiver, channel))
1279 .inc();
1280 }
1281 });
1282
1283 Self {
1284 sampler,
1285 success_rate,
1286 inbox,
1287 }
1288 }
1289
1290 fn send(
1292 &mut self,
1293 channel: Channel,
1294 message: Bytes,
1295 receive_complete_at: SystemTime,
1296 ) -> Result<(), Error> {
1297 self.inbox
1298 .unbounded_send((channel, message, receive_complete_at))
1299 .map_err(|_| Error::NetworkClosed)?;
1300 Ok(())
1301 }
1302}
1303
1304#[cfg(test)]
1305mod tests {
1306 use super::*;
1307 use crate::{Manager, Receiver as _, Recipients, Sender as _};
1308 use bytes::Bytes;
1309 use commonware_cryptography::{ed25519, Signer as _};
1310 use commonware_runtime::{deterministic, Quota, Runner as _};
1311 use futures::FutureExt;
1312 use std::num::NonZeroU32;
1313
1314 const MAX_MESSAGE_SIZE: u32 = 1024 * 1024;
1315
1316 const TEST_QUOTA: Quota = Quota::per_second(NonZeroU32::MAX);
1318
1319 #[test]
1320 fn test_register_and_link() {
1321 let executor = deterministic::Runner::default();
1322 executor.start(|context| async move {
1323 let cfg = Config {
1324 max_size: MAX_MESSAGE_SIZE,
1325 disconnect_on_block: true,
1326 tracked_peer_sets: Some(3),
1327 };
1328 let network_context = context.with_label("network");
1329 let (network, oracle) = Network::new(network_context.clone(), cfg);
1330 network_context.spawn(|_| network.run());
1331
1332 let pk1 = ed25519::PrivateKey::from_seed(1).public_key();
1334 let pk2 = ed25519::PrivateKey::from_seed(2).public_key();
1335
1336 let mut manager = oracle.manager();
1338 manager
1339 .update(0, [pk1.clone(), pk2.clone()].try_into().unwrap())
1340 .await;
1341 let control = oracle.control(pk1.clone());
1342 control.register(0, TEST_QUOTA).await.unwrap();
1343 control.register(1, TEST_QUOTA).await.unwrap();
1344 let control = oracle.control(pk2.clone());
1345 control.register(0, TEST_QUOTA).await.unwrap();
1346 control.register(1, TEST_QUOTA).await.unwrap();
1347
1348 control.register(1, TEST_QUOTA).await.unwrap();
1350
1351 let link = ingress::Link {
1353 latency: Duration::from_millis(2),
1354 jitter: Duration::from_millis(1),
1355 success_rate: 0.9,
1356 };
1357 oracle
1358 .add_link(pk1.clone(), pk2.clone(), link.clone())
1359 .await
1360 .unwrap();
1361
1362 assert!(matches!(
1364 oracle.add_link(pk1, pk2, link).await,
1365 Err(Error::LinkExists)
1366 ));
1367 });
1368 }
1369
1370 #[test]
1371 fn test_split_channel_single() {
1372 let executor = deterministic::Runner::default();
1373 executor.start(|context| async move {
1374 let cfg = Config {
1375 max_size: MAX_MESSAGE_SIZE,
1376 disconnect_on_block: true,
1377 tracked_peer_sets: Some(3),
1378 };
1379 let network_context = context.with_label("network");
1380 let (network, oracle) = Network::new(network_context.clone(), cfg);
1381 network_context.spawn(|_| network.run());
1382
1383 let twin = ed25519::PrivateKey::from_seed(20).public_key();
1385 let peer_a = ed25519::PrivateKey::from_seed(21).public_key();
1386 let peer_b = ed25519::PrivateKey::from_seed(22).public_key();
1387
1388 let mut manager = oracle.manager();
1390 manager
1391 .update(
1392 0,
1393 [twin.clone(), peer_a.clone(), peer_b.clone()]
1394 .try_into()
1395 .unwrap(),
1396 )
1397 .await;
1398
1399 let (mut peer_a_sender, mut peer_a_recv) = oracle
1401 .control(peer_a.clone())
1402 .register(0, TEST_QUOTA)
1403 .await
1404 .unwrap();
1405 let (mut peer_b_sender, mut peer_b_recv) = oracle
1406 .control(peer_b.clone())
1407 .register(0, TEST_QUOTA)
1408 .await
1409 .unwrap();
1410
1411 let (twin_sender, twin_receiver) = oracle
1417 .control(twin.clone())
1418 .register(0, TEST_QUOTA)
1419 .await
1420 .unwrap();
1421 let peer_a_for_router = peer_a.clone();
1422 let peer_b_for_router = peer_b.clone();
1423 let (mut twin_primary_sender, mut twin_secondary_sender) =
1424 twin_sender.split_with(move |origin, _, _| match origin {
1425 SplitOrigin::Primary => Some(Recipients::One(peer_a_for_router.clone())),
1426 SplitOrigin::Secondary => Some(Recipients::One(peer_b_for_router.clone())),
1427 });
1428 let peer_a_for_recv = peer_a.clone();
1429 let peer_b_for_recv = peer_b.clone();
1430 let (mut twin_primary_recv, mut twin_secondary_recv) = twin_receiver.split_with(
1431 context.with_label("split_receiver"),
1432 move |(sender, _)| {
1433 if sender == &peer_a_for_recv {
1434 SplitTarget::Primary
1435 } else if sender == &peer_b_for_recv {
1436 SplitTarget::Secondary
1437 } else {
1438 panic!("unexpected sender");
1439 }
1440 },
1441 );
1442
1443 let link = ingress::Link {
1445 latency: Duration::from_millis(0),
1446 jitter: Duration::from_millis(0),
1447 success_rate: 1.0,
1448 };
1449 oracle
1450 .add_link(peer_a.clone(), twin.clone(), link.clone())
1451 .await
1452 .unwrap();
1453 oracle
1454 .add_link(twin.clone(), peer_a.clone(), link.clone())
1455 .await
1456 .unwrap();
1457 oracle
1458 .add_link(peer_b.clone(), twin.clone(), link.clone())
1459 .await
1460 .unwrap();
1461 oracle
1462 .add_link(twin.clone(), peer_b.clone(), link.clone())
1463 .await
1464 .unwrap();
1465
1466 let msg_a_to_twin = Bytes::from_static(b"from_a");
1468 let msg_b_to_twin = Bytes::from_static(b"from_b");
1469 let msg_primary_out = Bytes::from_static(b"primary_out");
1470 let msg_secondary_out = Bytes::from_static(b"secondary_out");
1471 peer_a_sender
1472 .send(Recipients::One(twin.clone()), msg_a_to_twin.clone(), false)
1473 .await
1474 .unwrap();
1475 peer_b_sender
1476 .send(Recipients::One(twin.clone()), msg_b_to_twin.clone(), false)
1477 .await
1478 .unwrap();
1479 twin_primary_sender
1480 .send(Recipients::All, msg_primary_out.clone(), false)
1481 .await
1482 .unwrap();
1483 twin_secondary_sender
1484 .send(Recipients::All, msg_secondary_out.clone(), false)
1485 .await
1486 .unwrap();
1487
1488 let (sender, payload) = twin_primary_recv.recv().await.unwrap();
1490 assert_eq!(sender, peer_a);
1491 assert_eq!(payload, msg_a_to_twin);
1492 let (sender, payload) = twin_secondary_recv.recv().await.unwrap();
1493 assert_eq!(sender, peer_b);
1494 assert_eq!(payload, msg_b_to_twin);
1495
1496 let (sender, payload) = peer_a_recv.recv().await.unwrap();
1498 assert_eq!(sender, twin);
1499 assert_eq!(payload, msg_primary_out);
1500 let (sender, payload) = peer_b_recv.recv().await.unwrap();
1501 assert_eq!(sender, twin);
1502 assert_eq!(payload, msg_secondary_out);
1503 });
1504 }
1505
1506 #[test]
1507 fn test_split_channel_both() {
1508 let executor = deterministic::Runner::default();
1509 executor.start(|context| async move {
1510 let cfg = Config {
1511 max_size: MAX_MESSAGE_SIZE,
1512 disconnect_on_block: true,
1513 tracked_peer_sets: Some(3),
1514 };
1515 let network_context = context.with_label("network");
1516 let (network, oracle) = Network::new(network_context.clone(), cfg);
1517 network_context.spawn(|_| network.run());
1518
1519 let twin = ed25519::PrivateKey::from_seed(30).public_key();
1521 let peer_c = ed25519::PrivateKey::from_seed(31).public_key();
1522
1523 let mut manager = oracle.manager();
1525 manager
1526 .update(0, [twin.clone(), peer_c.clone()].try_into().unwrap())
1527 .await;
1528
1529 let (mut peer_c_sender, _peer_c_recv) = oracle
1531 .control(peer_c.clone())
1532 .register(0, TEST_QUOTA)
1533 .await
1534 .unwrap();
1535
1536 let (twin_sender, twin_receiver) = oracle
1538 .control(twin.clone())
1539 .register(0, TEST_QUOTA)
1540 .await
1541 .unwrap();
1542 let (_twin_primary_sender, _twin_secondary_sender) =
1543 twin_sender.split_with(|_origin, recipients, _| Some(recipients.clone()));
1544 let (mut twin_primary_recv, mut twin_secondary_recv) = twin_receiver
1545 .split_with(context.with_label("split_receiver_both"), |_| {
1546 SplitTarget::Both
1547 });
1548
1549 let link = ingress::Link {
1551 latency: Duration::from_millis(0),
1552 jitter: Duration::from_millis(0),
1553 success_rate: 1.0,
1554 };
1555 oracle
1556 .add_link(peer_c.clone(), twin.clone(), link.clone())
1557 .await
1558 .unwrap();
1559 oracle
1560 .add_link(twin.clone(), peer_c.clone(), link)
1561 .await
1562 .unwrap();
1563
1564 let msg_both = Bytes::from_static(b"to_both");
1566 peer_c_sender
1567 .send(Recipients::One(twin.clone()), msg_both.clone(), false)
1568 .await
1569 .unwrap();
1570
1571 let (sender, payload) = twin_primary_recv.recv().await.unwrap();
1573 assert_eq!(sender, peer_c);
1574 assert_eq!(payload, msg_both);
1575 let (sender, payload) = twin_secondary_recv.recv().await.unwrap();
1576 assert_eq!(sender, peer_c);
1577 assert_eq!(payload, msg_both);
1578 });
1579 }
1580
1581 #[test]
1582 fn test_split_channel_none() {
1583 let executor = deterministic::Runner::default();
1584 executor.start(|context| async move {
1585 let cfg = Config {
1586 max_size: MAX_MESSAGE_SIZE,
1587 disconnect_on_block: true,
1588 tracked_peer_sets: Some(3),
1589 };
1590 let network_context = context.with_label("network");
1591 let (network, oracle) = Network::new(network_context.clone(), cfg);
1592 network_context.spawn(|_| network.run());
1593
1594 let twin = ed25519::PrivateKey::from_seed(30).public_key();
1596 let peer_c = ed25519::PrivateKey::from_seed(31).public_key();
1597
1598 let mut manager = oracle.manager();
1600 manager
1601 .update(0, [twin.clone(), peer_c.clone()].try_into().unwrap())
1602 .await;
1603
1604 let (mut peer_c_sender, _peer_c_recv) = oracle
1606 .control(peer_c.clone())
1607 .register(0, TEST_QUOTA)
1608 .await
1609 .unwrap();
1610
1611 let (twin_sender, twin_receiver) = oracle
1613 .control(twin.clone())
1614 .register(0, TEST_QUOTA)
1615 .await
1616 .unwrap();
1617 let (mut twin_primary_sender, mut twin_secondary_sender) =
1618 twin_sender.split_with(|_origin, _, _| None);
1619 let (mut twin_primary_recv, mut twin_secondary_recv) = twin_receiver
1620 .split_with(context.with_label("split_receiver_both"), |_| {
1621 SplitTarget::None
1622 });
1623
1624 let link = ingress::Link {
1626 latency: Duration::from_millis(0),
1627 jitter: Duration::from_millis(0),
1628 success_rate: 1.0,
1629 };
1630 oracle
1631 .add_link(peer_c.clone(), twin.clone(), link.clone())
1632 .await
1633 .unwrap();
1634 oracle
1635 .add_link(twin.clone(), peer_c.clone(), link)
1636 .await
1637 .unwrap();
1638
1639 let msg_both = Bytes::from_static(b"to_both");
1641 let sent = peer_c_sender
1642 .send(Recipients::One(twin.clone()), msg_both.clone(), false)
1643 .await
1644 .unwrap();
1645 assert_eq!(sent.len(), 1);
1646 assert_eq!(sent[0], twin);
1647
1648 context.sleep(Duration::from_millis(100)).await;
1650 assert!(twin_primary_recv.recv().now_or_never().is_none());
1651 assert!(twin_secondary_recv.recv().now_or_never().is_none());
1652
1653 let msg_both = Bytes::from_static(b"to_both");
1655 let sent = twin_primary_sender
1656 .send(Recipients::One(peer_c.clone()), msg_both.clone(), false)
1657 .await
1658 .unwrap();
1659 assert_eq!(sent.len(), 0);
1660
1661 let msg_both = Bytes::from_static(b"to_both");
1663 let sent = twin_secondary_sender
1664 .send(Recipients::One(peer_c.clone()), msg_both.clone(), false)
1665 .await
1666 .unwrap();
1667 assert_eq!(sent.len(), 0);
1668 });
1669 }
1670
1671 #[test]
1672 fn test_unordered_peer_sets() {
1673 let executor = deterministic::Runner::default();
1674 executor.start(|context| async move {
1675 let cfg = Config {
1676 max_size: MAX_MESSAGE_SIZE,
1677 disconnect_on_block: true,
1678 tracked_peer_sets: Some(3),
1679 };
1680 let network_context = context.with_label("network");
1681 let (network, oracle) = Network::new(network_context.clone(), cfg);
1682 network_context.spawn(|_| network.run());
1683
1684 let pk1 = ed25519::PrivateKey::from_seed(1).public_key();
1686 let pk2 = ed25519::PrivateKey::from_seed(2).public_key();
1687
1688 let mut manager = oracle.manager();
1690 let mut subscription = manager.subscribe().await;
1691
1692 manager
1694 .update(10, [pk1.clone(), pk2.clone()].try_into().unwrap())
1695 .await;
1696 let (id, new, all) = subscription.next().await.unwrap();
1697 assert_eq!(id, 10);
1698 assert_eq!(new.len(), 2);
1699 assert_eq!(all.len(), 2);
1700
1701 let pk3 = ed25519::PrivateKey::from_seed(3).public_key();
1703 manager.update(9, [pk3.clone()].try_into().unwrap()).await;
1704
1705 let pk4 = ed25519::PrivateKey::from_seed(4).public_key();
1707 manager.update(11, [pk4.clone()].try_into().unwrap()).await;
1708 let (id, new, all) = subscription.next().await.unwrap();
1709 assert_eq!(id, 11);
1710 assert_eq!(new, [pk4.clone()].try_into().unwrap());
1711 assert_eq!(all, [pk1, pk2, pk4].try_into().unwrap());
1712 });
1713 }
1714
1715 #[test]
1716 fn test_get_next_socket() {
1717 let cfg = Config {
1718 max_size: MAX_MESSAGE_SIZE,
1719 disconnect_on_block: true,
1720 tracked_peer_sets: None,
1721 };
1722 let runner = deterministic::Runner::default();
1723
1724 runner.start(|context| async move {
1725 type PublicKey = ed25519::PublicKey;
1726 let (mut network, _) =
1727 Network::<deterministic::Context, PublicKey>::new(context.clone(), cfg);
1728
1729 let mut original = network.next_addr;
1731 let next = network.get_next_socket();
1732 assert_eq!(next, original);
1733 let next = network.get_next_socket();
1734 original.set_port(1);
1735 assert_eq!(next, original);
1736
1737 let max_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(255, 0, 255, 255)), 65535);
1739 network.next_addr = max_addr;
1740 let next = network.get_next_socket();
1741 assert_eq!(next, max_addr);
1742 let next = network.get_next_socket();
1743 assert_eq!(
1744 next,
1745 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(255, 1, 0, 0)), 0)
1746 );
1747 });
1748 }
1749
1750 #[test]
1751 fn test_fifo_burst_same_recipient() {
1752 let cfg = Config {
1753 max_size: MAX_MESSAGE_SIZE,
1754 disconnect_on_block: true,
1755 tracked_peer_sets: Some(3),
1756 };
1757 let runner = deterministic::Runner::default();
1758
1759 runner.start(|context| async move {
1760 let (network, oracle) = Network::new(context.with_label("network"), cfg);
1761 let network_handle = network.start();
1762
1763 let sender_pk = ed25519::PrivateKey::from_seed(10).public_key();
1764 let recipient_pk = ed25519::PrivateKey::from_seed(11).public_key();
1765
1766 let mut manager = oracle.manager();
1767 manager
1768 .update(
1769 0,
1770 [sender_pk.clone(), recipient_pk.clone()]
1771 .try_into()
1772 .unwrap(),
1773 )
1774 .await;
1775 let (mut sender, _sender_recv) = oracle
1776 .control(sender_pk.clone())
1777 .register(0, TEST_QUOTA)
1778 .await
1779 .unwrap();
1780 let (_sender2, mut receiver) = oracle
1781 .control(recipient_pk.clone())
1782 .register(0, TEST_QUOTA)
1783 .await
1784 .unwrap();
1785
1786 oracle
1787 .limit_bandwidth(sender_pk.clone(), Some(5_000), None)
1788 .await
1789 .unwrap();
1790 oracle
1791 .limit_bandwidth(recipient_pk.clone(), None, Some(5_000))
1792 .await
1793 .unwrap();
1794
1795 oracle
1796 .add_link(
1797 sender_pk.clone(),
1798 recipient_pk.clone(),
1799 ingress::Link {
1800 latency: Duration::from_millis(0),
1801 jitter: Duration::from_millis(0),
1802 success_rate: 1.0,
1803 },
1804 )
1805 .await
1806 .unwrap();
1807
1808 const COUNT: usize = 50;
1809 let mut expected = Vec::with_capacity(COUNT);
1810 for i in 0..COUNT {
1811 let msg = Bytes::from(vec![i as u8; 64]);
1812 sender
1813 .send(Recipients::One(recipient_pk.clone()), msg.clone(), false)
1814 .await
1815 .unwrap();
1816 expected.push(msg);
1817 }
1818
1819 for expected_msg in expected {
1820 let (_pk, bytes) = receiver.recv().await.unwrap();
1821 assert_eq!(bytes, expected_msg);
1822 }
1823
1824 drop(oracle);
1825 drop(sender);
1826 network_handle.abort();
1827 });
1828 }
1829
1830 #[test]
1831 fn test_broadcast_respects_transmit_latency() {
1832 let cfg = Config {
1833 max_size: MAX_MESSAGE_SIZE,
1834 disconnect_on_block: true,
1835 tracked_peer_sets: Some(3),
1836 };
1837 let runner = deterministic::Runner::default();
1838
1839 runner.start(|context| async move {
1840 let (network, oracle) = Network::new(context.with_label("network"), cfg);
1841 let network_handle = network.start();
1842
1843 let sender_pk = ed25519::PrivateKey::from_seed(42).public_key();
1844 let recipient_a = ed25519::PrivateKey::from_seed(43).public_key();
1845 let recipient_b = ed25519::PrivateKey::from_seed(44).public_key();
1846
1847 let mut manager = oracle.manager();
1848 manager
1849 .update(
1850 0,
1851 [sender_pk.clone(), recipient_a.clone(), recipient_b.clone()]
1852 .try_into()
1853 .unwrap(),
1854 )
1855 .await;
1856 let (mut sender, _recv_sender) = oracle
1857 .control(sender_pk.clone())
1858 .register(0, TEST_QUOTA)
1859 .await
1860 .unwrap();
1861 let (_sender2, mut recv_a) = oracle
1862 .control(recipient_a.clone())
1863 .register(0, TEST_QUOTA)
1864 .await
1865 .unwrap();
1866 let (_sender3, mut recv_b) = oracle
1867 .control(recipient_b.clone())
1868 .register(0, TEST_QUOTA)
1869 .await
1870 .unwrap();
1871
1872 oracle
1873 .limit_bandwidth(sender_pk.clone(), Some(1_000), None)
1874 .await
1875 .unwrap();
1876 oracle
1877 .limit_bandwidth(recipient_a.clone(), None, Some(1_000))
1878 .await
1879 .unwrap();
1880 oracle
1881 .limit_bandwidth(recipient_b.clone(), None, Some(1_000))
1882 .await
1883 .unwrap();
1884
1885 let link = ingress::Link {
1886 latency: Duration::from_millis(0),
1887 jitter: Duration::from_millis(0),
1888 success_rate: 1.0,
1889 };
1890 oracle
1891 .add_link(sender_pk.clone(), recipient_a.clone(), link.clone())
1892 .await
1893 .unwrap();
1894 oracle
1895 .add_link(sender_pk.clone(), recipient_b.clone(), link)
1896 .await
1897 .unwrap();
1898
1899 let big_msg = Bytes::from(vec![7u8; 10_000]);
1900 let start = context.current();
1901 sender
1902 .send(Recipients::All, big_msg.clone(), false)
1903 .await
1904 .unwrap();
1905
1906 let (_pk, received_a) = recv_a.recv().await.unwrap();
1907 assert_eq!(received_a, big_msg);
1908 let elapsed_a = context.current().duration_since(start).unwrap();
1909 assert!(elapsed_a >= Duration::from_secs(20));
1910
1911 let (_pk, received_b) = recv_b.recv().await.unwrap();
1912 assert_eq!(received_b, big_msg);
1913 let elapsed_b = context.current().duration_since(start).unwrap();
1914 assert!(elapsed_b >= Duration::from_secs(20));
1915
1916 assert!(elapsed_a.abs_diff(elapsed_b) <= Duration::from_secs(1));
1918
1919 drop(oracle);
1920 drop(sender);
1921 network_handle.abort();
1922 });
1923 }
1924}