1use super::{
4 ingress::{self, Oracle},
5 metrics,
6 transmitter::{self, Completion},
7 Error,
8};
9use crate::{
10 utils::limited::{CheckedSender as LimitedCheckedSender, Connected, LimitedSender},
11 Channel, Message, Recipients, UnlimitedSender as _,
12};
13use bytes::Bytes;
14use commonware_codec::{DecodeExt, FixedSize};
15use commonware_cryptography::PublicKey;
16use commonware_macros::{select, select_loop};
17use commonware_runtime::{
18 spawn_cell, Clock, ContextCell, Handle, Listener as _, Metrics, Network as RNetwork, Quota,
19 Spawner,
20};
21use commonware_stream::utils::codec::{recv_frame, send_frame};
22use commonware_utils::{channels::ring, ordered::Set, NZUsize, TryCollect};
23use either::Either;
24use futures::{
25 channel::{mpsc, oneshot},
26 future, SinkExt, StreamExt,
27};
28use prometheus_client::metrics::{counter::Counter, family::Family};
29use rand::Rng;
30use rand_distr::{Distribution, Normal};
31use std::{
32 collections::{BTreeMap, HashMap, HashSet},
33 fmt::Debug,
34 net::{IpAddr, Ipv4Addr, SocketAddr},
35 time::{Duration, SystemTime},
36};
37use tracing::{debug, error, trace, warn};
38
39type Task<P> = (Channel, P, Recipients<P>, Bytes, oneshot::Sender<Vec<P>>);
41
42#[derive(Clone, Copy, Debug, PartialEq, Eq)]
44#[must_use]
45pub enum SplitTarget {
46 None,
47 Primary,
48 Secondary,
49 Both,
50}
51
52#[derive(Clone, Copy, Debug, PartialEq, Eq)]
54#[must_use]
55pub enum SplitOrigin {
56 Primary,
57 Secondary,
58}
59
60pub trait SplitForwarder<P: PublicKey>:
62 Fn(SplitOrigin, &Recipients<P>, &Bytes) -> Option<Recipients<P>> + Send + Sync + Clone + 'static
63{
64}
65
66impl<P: PublicKey, F> SplitForwarder<P> for F where
67 F: Fn(SplitOrigin, &Recipients<P>, &Bytes) -> Option<Recipients<P>>
68 + Send
69 + Sync
70 + Clone
71 + 'static
72{
73}
74
75pub trait SplitRouter<P: PublicKey>:
77 Fn(&Message<P>) -> SplitTarget + Send + Sync + 'static
78{
79}
80
81impl<P: PublicKey, F> SplitRouter<P> for F where
82 F: Fn(&Message<P>) -> SplitTarget + Send + Sync + 'static
83{
84}
85
86pub struct Config {
88 pub max_size: u32,
90
91 pub disconnect_on_block: bool,
95
96 pub tracked_peer_sets: Option<usize>,
102}
103
104pub struct Network<E: RNetwork + Spawner + Rng + Clock + Metrics, P: PublicKey> {
106 context: ContextCell<E>,
107
108 max_size: u32,
110
111 disconnect_on_block: bool,
115
116 next_addr: SocketAddr,
119
120 ingress: mpsc::UnboundedReceiver<ingress::Message<P, E>>,
122
123 oracle_sender: mpsc::UnboundedSender<ingress::Message<P, E>>,
125
126 sender: mpsc::UnboundedSender<Task<P>>,
130 receiver: mpsc::UnboundedReceiver<Task<P>>,
131
132 links: HashMap<(P, P), Link>,
134
135 peers: BTreeMap<P, Peer<P>>,
137
138 peer_sets: BTreeMap<u64, Set<P>>,
140
141 peer_refs: BTreeMap<P, usize>,
143
144 tracked_peer_sets: Option<usize>,
146
147 blocks: HashSet<(P, P)>,
149
150 transmitter: transmitter::State<P>,
152
153 #[allow(clippy::type_complexity)]
155 subscribers: Vec<mpsc::UnboundedSender<(u64, Set<P>, Set<P>)>>,
156
157 peer_subscribers: Vec<ring::Sender<Vec<P>>>,
159
160 received_messages: Family<metrics::Message, Counter>,
162 sent_messages: Family<metrics::Message, Counter>,
163}
164
165impl<E: RNetwork + Spawner + Rng + Clock + Metrics, P: PublicKey> Network<E, P> {
166 pub fn new(mut context: E, cfg: Config) -> (Self, Oracle<P, E>) {
171 let (sender, receiver) = mpsc::unbounded();
172 let (oracle_sender, oracle_receiver) = mpsc::unbounded();
173 let sent_messages = Family::<metrics::Message, Counter>::default();
174 let received_messages = Family::<metrics::Message, Counter>::default();
175 context.register("messages_sent", "messages sent", sent_messages.clone());
176 context.register(
177 "messages_received",
178 "messages received",
179 received_messages.clone(),
180 );
181
182 let next_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::from_bits(context.next_u32())), 0);
184
185 (
186 Self {
187 context: ContextCell::new(context),
188 max_size: cfg.max_size,
189 disconnect_on_block: cfg.disconnect_on_block,
190 tracked_peer_sets: cfg.tracked_peer_sets,
191 next_addr,
192 ingress: oracle_receiver,
193 oracle_sender: oracle_sender.clone(),
194 sender,
195 receiver,
196 links: HashMap::new(),
197 peers: BTreeMap::new(),
198 peer_sets: BTreeMap::new(),
199 peer_refs: BTreeMap::new(),
200 blocks: HashSet::new(),
201 transmitter: transmitter::State::new(),
202 subscribers: Vec::new(),
203 peer_subscribers: Vec::new(),
204 received_messages,
205 sent_messages,
206 },
207 Oracle::new(oracle_sender),
208 )
209 }
210
211 fn get_next_socket(&mut self) -> SocketAddr {
216 let result = self.next_addr;
217
218 match self.next_addr.port().checked_add(1) {
221 Some(port) => {
222 self.next_addr.set_port(port);
223 }
224 None => {
225 let ip = match self.next_addr.ip() {
226 IpAddr::V4(ipv4) => ipv4,
227 _ => unreachable!(),
228 };
229 let next_ip = Ipv4Addr::to_bits(ip).wrapping_add(1);
230 self.next_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::from_bits(next_ip)), 0);
231 }
232 }
233
234 result
235 }
236
237 async fn handle_ingress(&mut self, message: ingress::Message<P, E>) {
241 fn send_result<T: std::fmt::Debug>(
245 result: oneshot::Sender<Result<T, Error>>,
246 value: Result<T, Error>,
247 ) {
248 let success = value.is_ok();
249 if let Err(e) = result.send(value) {
250 error!(?e, "failed to send result to oracle (ok = {})", success);
251 }
252 }
253
254 match message {
255 ingress::Message::Update { id, peers } => {
256 let Some(tracked_peer_sets) = self.tracked_peer_sets else {
257 warn!("attempted to register peer set when tracking is disabled");
258 return;
259 };
260
261 if self.peer_sets.contains_key(&id) {
263 warn!(id, "peer set already exists");
264 return;
265 }
266
267 if let Some((last, _)) = self.peer_sets.last_key_value() {
269 if id <= *last {
270 warn!(
271 new_id = id,
272 old_id = last,
273 "attempted to register peer set with non-monotonically increasing ID"
274 );
275 return;
276 }
277 }
278
279 for public_key in peers.iter() {
281 self.ensure_peer_exists(public_key).await;
283
284 *self.peer_refs.entry(public_key.clone()).or_insert(0) += 1;
286 }
287 self.peer_sets.insert(id, peers.clone());
288
289 while self.peer_sets.len() > tracked_peer_sets {
291 let (id, set) = self.peer_sets.pop_first().unwrap();
292 debug!(id, "removed oldest peer set");
293
294 for public_key in set.iter() {
296 let refs = self.peer_refs.get_mut(public_key).unwrap();
297 *refs = refs.checked_sub(1).expect("reference count underflow");
298
299 if *refs == 0 {
302 self.peer_refs.remove(public_key);
303 debug!(?public_key, "removed peer no longer in any tracked set");
304 }
305 }
306 }
307
308 let all = self.all_tracked_peers();
310 let notification = (id, peers, all);
311 self.subscribers
312 .retain(|subscriber| subscriber.unbounded_send(notification.clone()).is_ok());
313
314 self.broadcast_peer_list().await;
316 }
317 ingress::Message::Register {
318 channel,
319 public_key,
320 quota,
321 result,
322 } => {
323 let (_, is_new) = self.ensure_peer_exists(&public_key).await;
325
326 if is_new && self.peer_sets.is_empty() {
328 self.broadcast_peer_list().await;
329 }
330
331 let clock = self
333 .context
334 .with_label(&format!("rate_limiter_{channel}_{public_key}"))
335 .take();
336
337 let (sender, handle) = Sender::new(
339 self.context.with_label("sender"),
340 public_key.clone(),
341 channel,
342 self.max_size,
343 self.sender.clone(),
344 self.oracle_sender.clone(),
345 clock,
346 quota,
347 );
348
349 let peer = self.peers.get_mut(&public_key).unwrap();
351 let receiver = match peer.register(channel, handle).await {
352 Ok(receiver) => Receiver { receiver },
353 Err(err) => return send_result(result, Err(err)),
354 };
355
356 send_result(result, Ok((sender, receiver)))
357 }
358 ingress::Message::PeerSet { id, response } => {
359 if self.peer_sets.is_empty() {
360 let _ = response.send(Some(
362 self.peers
363 .keys()
364 .cloned()
365 .try_collect()
366 .expect("BTreeMap keys are unique"),
367 ));
368 } else {
369 let _ = response.send(self.peer_sets.get(&id).cloned());
371 }
372 }
373 ingress::Message::Subscribe { sender } => {
374 if let Some((index, peers)) = self.peer_sets.last_key_value() {
376 let all = self.all_tracked_peers();
377 let notification = (*index, peers.clone(), all);
378 let _ = sender.unbounded_send(notification);
379 }
380 self.subscribers.push(sender);
381 }
382 ingress::Message::SubscribeConnected { response } => {
383 let (mut sender, receiver) = ring::channel(NZUsize!(1));
385
386 let peer_list: Vec<P> = self.all_tracked_peers().into_iter().collect();
388 let _ = sender.send(peer_list).await;
389
390 self.peer_subscribers.push(sender);
392
393 let _ = response.send(receiver);
395 }
396 ingress::Message::LimitBandwidth {
397 public_key,
398 egress_cap,
399 ingress_cap,
400 result,
401 } => {
402 let (_, is_new) = self.ensure_peer_exists(&public_key).await;
404
405 if is_new && self.peer_sets.is_empty() {
407 self.broadcast_peer_list().await;
408 }
409
410 let now = self.context.current();
412 let completions = self
413 .transmitter
414 .limit(now, &public_key, egress_cap, ingress_cap);
415 self.process_completions(completions);
416
417 let _ = result.send(());
419 }
420 ingress::Message::AddLink {
421 sender,
422 receiver,
423 sampler,
424 success_rate,
425 result,
426 } => {
427 let (_, sender_is_new) = self.ensure_peer_exists(&sender).await;
429 let (receiver_socket, receiver_is_new) = self.ensure_peer_exists(&receiver).await;
430
431 if (sender_is_new || receiver_is_new) && self.peer_sets.is_empty() {
433 self.broadcast_peer_list().await;
434 }
435
436 let key = (sender.clone(), receiver.clone());
438 if self.links.contains_key(&key) {
439 return send_result(result, Err(Error::LinkExists));
440 }
441
442 let link = Link::new(
443 &mut self.context,
444 sender,
445 receiver,
446 receiver_socket,
447 sampler,
448 success_rate,
449 self.max_size,
450 self.received_messages.clone(),
451 );
452 self.links.insert(key, link);
453 send_result(result, Ok(()))
454 }
455 ingress::Message::RemoveLink {
456 sender,
457 receiver,
458 result,
459 } => {
460 match self.links.remove(&(sender, receiver)) {
461 Some(_) => (),
462 None => return send_result(result, Err(Error::LinkMissing)),
463 }
464 send_result(result, Ok(()))
465 }
466 ingress::Message::Block { from, to } => {
467 self.blocks.insert((from, to));
468 }
469 ingress::Message::Blocked { result } => {
470 send_result(result, Ok(self.blocks.iter().cloned().collect()))
471 }
472 }
473 }
474
475 async fn ensure_peer_exists(&mut self, public_key: &P) -> (SocketAddr, bool) {
479 if !self.peers.contains_key(public_key) {
480 let socket = self.get_next_socket();
482 let peer = Peer::new(
483 self.context.with_label("peer"),
484 public_key.clone(),
485 socket,
486 self.max_size,
487 )
488 .await;
489
490 self.peers.insert(public_key.clone(), peer);
492
493 (socket, true)
494 } else {
495 (self.peers.get(public_key).unwrap().socket, false)
496 }
497 }
498
499 async fn broadcast_peer_list(&mut self) {
507 let peer_list = self.all_tracked_peers().into_iter().collect::<Vec<_>>();
508 let mut live_subscribers = Vec::with_capacity(self.peer_subscribers.len());
509 for mut subscriber in self.peer_subscribers.drain(..) {
510 if subscriber.send(peer_list.clone()).await.is_ok() {
511 live_subscribers.push(subscriber);
512 }
513 }
514 self.peer_subscribers = live_subscribers;
515 }
516
517 fn all_tracked_peers(&self) -> Set<P> {
523 if self.peer_sets.is_empty() && self.tracked_peer_sets.is_none() {
524 self.peers
525 .keys()
526 .cloned()
527 .try_collect()
528 .expect("BTreeMap keys are unique")
529 } else {
530 self.peer_refs
531 .keys()
532 .cloned()
533 .try_collect()
534 .expect("BTreeMap keys are unique")
535 }
536 }
537}
538
539impl<E: RNetwork + Spawner + Rng + Clock + Metrics, P: PublicKey> Network<E, P> {
540 fn process_completions(&mut self, completions: Vec<Completion<P>>) {
542 for completion in completions {
543 let Some(deliver_at) = completion.deliver_at else {
545 trace!(
546 origin = ?completion.origin,
547 recipient = ?completion.recipient,
548 "message dropped before delivery",
549 );
550 continue;
551 };
552
553 let key = (completion.origin.clone(), completion.recipient.clone());
555 let Some(link) = self.links.get_mut(&key) else {
556 trace!(
558 origin = ?completion.origin,
559 recipient = ?completion.recipient,
560 "missing link for completion",
561 );
562 continue;
563 };
564 if let Err(err) = link.send(completion.channel, completion.message, deliver_at) {
565 error!(?err, "failed to send");
566 }
567 }
568 }
569
570 fn handle_task(&mut self, task: Task<P>) {
575 let (channel, origin, recipients, message, reply) = task;
577 if self.tracked_peer_sets.is_some() && !self.peer_refs.contains_key(&origin) {
578 warn!(
579 ?origin,
580 reason = "not in tracked peer set",
581 "dropping message"
582 );
583 if let Err(err) = reply.send(Vec::new()) {
584 error!(?err, "failed to send ack");
585 }
586 return;
587 }
588
589 let recipients = match recipients {
591 Recipients::All => {
592 if self.peer_sets.is_empty() {
596 self.peers.keys().cloned().collect()
597 } else {
598 self.peer_refs.keys().cloned().collect()
599 }
600 }
601 Recipients::Some(keys) => keys,
602 Recipients::One(key) => vec![key],
603 };
604
605 let now = self.context.current();
607 let mut sent = Vec::new();
608 for recipient in recipients {
609 if recipient == origin {
611 trace!(?recipient, reason = "self", "dropping message");
612 continue;
613 }
614
615 if self.tracked_peer_sets.is_some() && !self.peer_refs.contains_key(&recipient) {
617 trace!(
618 ?origin,
619 ?recipient,
620 reason = "not in tracked peer set",
621 "dropping message"
622 );
623 continue;
624 }
625
626 let o_r = (origin.clone(), recipient.clone());
628 let r_o = (recipient.clone(), origin.clone());
629 if self.disconnect_on_block
630 && (self.blocks.contains(&o_r) || self.blocks.contains(&r_o))
631 {
632 trace!(?origin, ?recipient, reason = "blocked", "dropping message");
633 continue;
634 }
635
636 let Some(link) = self.links.get_mut(&o_r) else {
638 trace!(?origin, ?recipient, reason = "no link", "dropping message");
639 continue;
640 };
641
642 self.sent_messages
648 .get_or_create(&metrics::Message::new(&origin, &recipient, channel))
649 .inc();
650
651 let latency = Duration::from_millis(link.sampler.sample(&mut self.context) as u64);
653
654 let should_deliver = self.context.gen_bool(link.success_rate);
656
657 let completions = self.transmitter.enqueue(
659 now,
660 origin.clone(),
661 recipient.clone(),
662 channel,
663 message.clone(),
664 latency,
665 should_deliver,
666 );
667 self.process_completions(completions);
668
669 sent.push(recipient);
670 }
671
672 if let Err(err) = reply.send(sent) {
674 error!(?err, "failed to send ack");
675 }
676 }
677
678 pub fn start(mut self) -> Handle<()> {
683 spawn_cell!(self.context, self.run().await)
684 }
685
686 async fn run(mut self) {
687 loop {
688 let tick = match self.transmitter.next() {
689 Some(when) => Either::Left(self.context.sleep_until(when)),
690 None => Either::Right(future::pending()),
691 };
692 select! {
693 _ = tick => {
694 let now = self.context.current();
695 let completions = self.transmitter.advance(now);
696 self.process_completions(completions);
697 },
698 message = self.ingress.next() => {
699 let message = match message {
701 Some(message) => message,
702 None => break,
703 };
704 self.handle_ingress(message).await;
705 },
706 task = self.receiver.next() => {
707 let task = match task {
709 Some(task) => task,
710 None => break,
711 };
712 self.handle_task(task);
713 },
714 }
715 }
716 }
717}
718
719pub struct ConnectedPeerProvider<P: PublicKey, E: Clock> {
724 sender: mpsc::UnboundedSender<ingress::Message<P, E>>,
725}
726
727impl<P: PublicKey, E: Clock> Clone for ConnectedPeerProvider<P, E> {
728 fn clone(&self) -> Self {
729 Self {
730 sender: self.sender.clone(),
731 }
732 }
733}
734
735impl<P: PublicKey, E: Clock> ConnectedPeerProvider<P, E> {
736 const fn new(sender: mpsc::UnboundedSender<ingress::Message<P, E>>) -> Self {
737 Self { sender }
738 }
739}
740
741impl<P: PublicKey, E: Clock> Connected for ConnectedPeerProvider<P, E> {
742 type PublicKey = P;
743
744 async fn subscribe(&mut self) -> ring::Receiver<Vec<Self::PublicKey>> {
745 let (response_tx, response_rx) = oneshot::channel();
746 let _ = self
747 .sender
748 .unbounded_send(ingress::Message::SubscribeConnected {
749 response: response_tx,
750 });
751 response_rx.await.unwrap_or_else(|_| {
753 let (_sender, receiver) = ring::channel(NZUsize!(1));
754 receiver
755 })
756 }
757}
758
759#[derive(Clone)]
763pub struct UnlimitedSender<P: PublicKey> {
764 me: P,
765 channel: Channel,
766 max_size: u32,
767 high: mpsc::UnboundedSender<Task<P>>,
768 low: mpsc::UnboundedSender<Task<P>>,
769}
770
771impl<P: PublicKey> crate::UnlimitedSender for UnlimitedSender<P> {
772 type Error = Error;
773 type PublicKey = P;
774
775 async fn send(
776 &mut self,
777 recipients: Recipients<P>,
778 message: Bytes,
779 priority: bool,
780 ) -> Result<Vec<P>, Error> {
781 if message.len() > self.max_size as usize {
783 return Err(Error::MessageTooLarge(message.len()));
784 }
785
786 let (sender, receiver) = oneshot::channel();
788 let channel = if priority { &self.high } else { &self.low };
789 channel
790 .unbounded_send((self.channel, self.me.clone(), recipients, message, sender))
791 .map_err(|_| Error::NetworkClosed)?;
792 receiver.await.map_err(|_| Error::NetworkClosed)
793 }
794}
795
796pub struct Sender<P: PublicKey, E: Clock> {
801 limited_sender: LimitedSender<E, UnlimitedSender<P>, ConnectedPeerProvider<P, E>>,
802}
803
804impl<P: PublicKey, E: Clock> Clone for Sender<P, E> {
805 fn clone(&self) -> Self {
806 Self {
807 limited_sender: self.limited_sender.clone(),
808 }
809 }
810}
811
812impl<P: PublicKey, E: Clock> Debug for Sender<P, E> {
813 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
814 f.debug_struct("Sender").finish_non_exhaustive()
815 }
816}
817
818impl<P: PublicKey, E: Clock> Sender<P, E> {
819 #[allow(clippy::too_many_arguments)]
820 fn new(
821 context: impl Spawner + Metrics,
822 me: P,
823 channel: Channel,
824 max_size: u32,
825 mut sender: mpsc::UnboundedSender<Task<P>>,
826 oracle_sender: mpsc::UnboundedSender<ingress::Message<P, E>>,
827 clock: E,
828 quota: Quota,
829 ) -> (Self, Handle<()>) {
830 let (high, mut high_receiver) = mpsc::unbounded();
832 let (low, mut low_receiver) = mpsc::unbounded();
833 let processor = context.with_label("processor").spawn(move |_| async move {
834 loop {
835 let task;
837 select! {
838 high_task = high_receiver.next() => {
839 task = match high_task {
840 Some(task) => task,
841 None => break,
842 };
843 },
844 low_task = low_receiver.next() => {
845 task = match low_task {
846 Some(task) => task,
847 None => break,
848 };
849 }
850 }
851
852 if let Err(err) = sender.send(task).await {
854 error!(?err, channel, "failed to send task");
855 }
856 }
857 });
858
859 let unlimited_sender = UnlimitedSender {
860 me,
861 channel,
862 max_size,
863 high,
864 low,
865 };
866 let peer_source = ConnectedPeerProvider::new(oracle_sender);
867 let limited_sender = LimitedSender::new(unlimited_sender, quota, clock, peer_source);
868
869 (Self { limited_sender }, processor)
870 }
871
872 pub fn split_with<F: SplitForwarder<P>>(
874 self,
875 forwarder: F,
876 ) -> (SplitSender<P, E, F>, SplitSender<P, E, F>) {
877 (
878 SplitSender {
879 replica: SplitOrigin::Primary,
880 inner: self.clone(),
881 forwarder: forwarder.clone(),
882 },
883 SplitSender {
884 replica: SplitOrigin::Secondary,
885 inner: self,
886 forwarder,
887 },
888 )
889 }
890}
891
892impl<P: PublicKey, E: Clock> crate::LimitedSender for Sender<P, E> {
893 type PublicKey = P;
894 type Checked<'a>
895 = crate::utils::limited::CheckedSender<'a, UnlimitedSender<P>>
896 where
897 Self: 'a;
898
899 async fn check(
900 &mut self,
901 recipients: Recipients<Self::PublicKey>,
902 ) -> Result<Self::Checked<'_>, SystemTime> {
903 self.limited_sender.check(recipients).await
904 }
905}
906
907pub struct SplitSender<P: PublicKey, E: Clock, F: SplitForwarder<P>> {
909 replica: SplitOrigin,
910 inner: Sender<P, E>,
911 forwarder: F,
912}
913
914impl<P: PublicKey, E: Clock, F: SplitForwarder<P>> Clone for SplitSender<P, E, F> {
915 fn clone(&self) -> Self {
916 Self {
917 replica: self.replica,
918 inner: self.inner.clone(),
919 forwarder: self.forwarder.clone(),
920 }
921 }
922}
923
924impl<P: PublicKey, E: Clock, F: SplitForwarder<P>> std::fmt::Debug for SplitSender<P, E, F> {
925 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
926 f.debug_struct("SplitSender")
927 .field("replica", &self.replica)
928 .field("inner", &self.inner)
929 .finish()
930 }
931}
932
933impl<P: PublicKey, E: Clock, F: SplitForwarder<P>> crate::LimitedSender for SplitSender<P, E, F> {
934 type PublicKey = P;
935 type Checked<'a> = SplitCheckedSender<'a, P, E, F>;
936
937 async fn check(
938 &mut self,
939 recipients: Recipients<Self::PublicKey>,
940 ) -> Result<Self::Checked<'_>, SystemTime> {
941 Ok(SplitCheckedSender {
942 checked: self.inner.limited_sender.check(recipients.clone()).await?,
945 replica: self.replica,
946 forwarder: self.forwarder.clone(),
947 recipients,
948
949 _phantom: std::marker::PhantomData,
950 })
951 }
952}
953
954pub struct SplitCheckedSender<'a, P: PublicKey, E: Clock, F: SplitForwarder<P>> {
959 checked: LimitedCheckedSender<'a, UnlimitedSender<P>>,
960 replica: SplitOrigin,
961 forwarder: F,
962 recipients: Recipients<P>,
963
964 _phantom: std::marker::PhantomData<E>,
965}
966
967impl<'a, P: PublicKey, E: Clock, F: SplitForwarder<P>> crate::CheckedSender
968 for SplitCheckedSender<'a, P, E, F>
969{
970 type PublicKey = P;
971 type Error = Error;
972
973 async fn send(
974 self,
975 message: Bytes,
976 priority: bool,
977 ) -> Result<Vec<Self::PublicKey>, Self::Error> {
978 let Some(recipients) = (self.forwarder)(self.replica, &self.recipients, &message) else {
980 return Ok(Vec::new());
981 };
982
983 self.checked
990 .into_inner()
991 .send(recipients, message, priority)
992 .await
993 }
994}
995
996type MessageReceiver<P> = mpsc::UnboundedReceiver<Message<P>>;
997
998#[derive(Debug)]
1000pub struct Receiver<P: PublicKey> {
1001 receiver: MessageReceiver<P>,
1002}
1003
1004impl<P: PublicKey> crate::Receiver for Receiver<P> {
1005 type Error = Error;
1006 type PublicKey = P;
1007
1008 async fn recv(&mut self) -> Result<Message<Self::PublicKey>, Error> {
1009 self.receiver.next().await.ok_or(Error::NetworkClosed)
1010 }
1011}
1012
1013impl<P: PublicKey> Receiver<P> {
1014 pub fn split_with<E: Spawner, R: SplitRouter<P>>(
1016 mut self,
1017 context: E,
1018 router: R,
1019 ) -> (Self, Self) {
1020 let (mut primary_tx, primary_rx) = mpsc::unbounded();
1021 let (mut secondary_tx, secondary_rx) = mpsc::unbounded();
1022 context.spawn(move |_| async move {
1023 while let Some(message) = self.receiver.next().await {
1024 let direction = router(&message);
1026 match direction {
1027 SplitTarget::None => {}
1028 SplitTarget::Primary => {
1029 if let Err(err) = primary_tx.send(message).await {
1030 error!(?err, "failed to send message to primary");
1031 }
1032 }
1033 SplitTarget::Secondary => {
1034 if let Err(err) = secondary_tx.send(message).await {
1035 error!(?err, "failed to send message to secondary");
1036 }
1037 }
1038 SplitTarget::Both => {
1039 if let Err(err) = primary_tx.send(message.clone()).await {
1040 error!(?err, "failed to send message to primary");
1041 }
1042 if let Err(err) = secondary_tx.send(message).await {
1043 error!(?err, "failed to send message to secondary");
1044 }
1045 }
1046 }
1047
1048 if primary_tx.is_closed() && secondary_tx.is_closed() {
1050 break;
1051 }
1052 }
1053 });
1054
1055 (
1056 Self {
1057 receiver: primary_rx,
1058 },
1059 Self {
1060 receiver: secondary_rx,
1061 },
1062 )
1063 }
1064}
1065
1066struct Peer<P: PublicKey> {
1070 socket: SocketAddr,
1072
1073 control: mpsc::UnboundedSender<(Channel, Handle<()>, oneshot::Sender<MessageReceiver<P>>)>,
1075}
1076
1077impl<P: PublicKey> Peer<P> {
1078 async fn new<E: Spawner + RNetwork + Metrics + Clock>(
1083 context: E,
1084 public_key: P,
1085 socket: SocketAddr,
1086 max_size: u32,
1087 ) -> Self {
1088 let (control_sender, mut control_receiver) = mpsc::unbounded();
1091
1092 let (inbox_sender, mut inbox_receiver) = mpsc::unbounded();
1095
1096 context.with_label("router").spawn(|context| async move {
1098 let mut mailboxes = HashMap::new();
1100
1101 select_loop! {
1103 context,
1104 on_stopped => {},
1105 control = control_receiver.next() => {
1107 let (channel, sender, result_tx): (Channel, Handle<()>, oneshot::Sender<MessageReceiver<P>>) = match control {
1109 Some(control) => control,
1110 None => break,
1111 };
1112
1113 let (receiver_tx, receiver_rx) = mpsc::unbounded();
1115 if let Some((_, existing_sender)) = mailboxes.insert(channel, (receiver_tx, sender)) {
1116 warn!(?public_key, ?channel, "overwriting existing channel");
1117 existing_sender.abort();
1118 }
1119 result_tx.send(receiver_rx).unwrap();
1120 },
1121
1122 inbox = inbox_receiver.next() => {
1124 let (channel, message) = match inbox {
1126 Some(message) => message,
1127 None => break,
1128 };
1129
1130 match mailboxes.get_mut(&channel) {
1132 Some((receiver_tx, _)) => {
1133 if let Err(err) = receiver_tx.send(message).await {
1134 debug!(?err, "failed to send message to mailbox");
1135 }
1136 }
1137 None => {
1138 trace!(
1139 recipient = ?public_key,
1140 channel,
1141 reason = "missing channel",
1142 "dropping message",
1143 );
1144 }
1145 }
1146 },
1147 }
1148 });
1149
1150 let (ready_tx, ready_rx) = oneshot::channel();
1152 context
1153 .with_label("listener")
1154 .spawn(move |context| async move {
1155 let mut listener = context.bind(socket).await.unwrap();
1157 let _ = ready_tx.send(());
1158
1159 while let Ok((_, _, mut stream)) = listener.accept().await {
1161 context.with_label("receiver").spawn({
1163 let mut inbox_sender = inbox_sender.clone();
1164 move |_| async move {
1165 let dialer = match recv_frame(&mut stream, max_size).await {
1167 Ok(data) => data,
1168 Err(_) => {
1169 error!("failed to receive public key from dialer");
1170 return;
1171 }
1172 };
1173 let Ok(dialer) = P::decode(dialer.as_ref()) else {
1174 error!("received public key is invalid");
1175 return;
1176 };
1177
1178 while let Ok(data) = recv_frame(&mut stream, max_size).await {
1180 let channel = Channel::from_be_bytes(
1181 data[..Channel::SIZE].try_into().unwrap(),
1182 );
1183 let message = data.slice(Channel::SIZE..);
1184 if let Err(err) = inbox_sender
1185 .send((channel, (dialer.clone(), message)))
1186 .await
1187 {
1188 debug!(?err, "failed to send message to mailbox");
1189 break;
1190 }
1191 }
1192 }
1193 });
1194 }
1195 });
1196
1197 let _ = ready_rx.await;
1199
1200 Self {
1202 socket,
1203 control: control_sender,
1204 }
1205 }
1206
1207 async fn register(
1212 &mut self,
1213 channel: Channel,
1214 sender: Handle<()>,
1215 ) -> Result<MessageReceiver<P>, Error> {
1216 let (result_tx, result_rx) = oneshot::channel();
1217 self.control
1218 .send((channel, sender, result_tx))
1219 .await
1220 .map_err(|_| Error::NetworkClosed)?;
1221 result_rx.await.map_err(|_| Error::NetworkClosed)
1222 }
1223}
1224
1225struct Link {
1228 sampler: Normal<f64>,
1229 success_rate: f64,
1230 inbox: mpsc::UnboundedSender<(Channel, Bytes, SystemTime)>,
1232}
1233
1234impl Link {
1236 #[allow(clippy::too_many_arguments)]
1237 fn new<E: Spawner + RNetwork + Clock + Metrics, P: PublicKey>(
1238 context: &mut E,
1239 dialer: P,
1240 receiver: P,
1241 socket: SocketAddr,
1242 sampler: Normal<f64>,
1243 success_rate: f64,
1244 max_size: u32,
1245 received_messages: Family<metrics::Message, Counter>,
1246 ) -> Self {
1247 let (inbox, mut outbox) = mpsc::unbounded::<(Channel, Bytes, SystemTime)>();
1250 context.with_label("link").spawn(move |context| async move {
1251 let (mut sink, _) = context.dial(socket).await.unwrap();
1253 if let Err(err) = send_frame(&mut sink, &dialer, max_size).await {
1254 error!(?err, "failed to send public key to listener");
1255 return;
1256 }
1257
1258 while let Some((channel, message, receive_complete_at)) = outbox.next().await {
1260 context.sleep_until(receive_complete_at).await;
1262
1263 let mut data = bytes::BytesMut::with_capacity(Channel::SIZE + message.len());
1265 data.extend_from_slice(&channel.to_be_bytes());
1266 data.extend_from_slice(&message);
1267 let data = data.freeze();
1268 let _ = send_frame(&mut sink, &data, max_size).await;
1269
1270 received_messages
1272 .get_or_create(&metrics::Message::new(&dialer, &receiver, channel))
1273 .inc();
1274 }
1275 });
1276
1277 Self {
1278 sampler,
1279 success_rate,
1280 inbox,
1281 }
1282 }
1283
1284 fn send(
1286 &mut self,
1287 channel: Channel,
1288 message: Bytes,
1289 receive_complete_at: SystemTime,
1290 ) -> Result<(), Error> {
1291 self.inbox
1292 .unbounded_send((channel, message, receive_complete_at))
1293 .map_err(|_| Error::NetworkClosed)?;
1294 Ok(())
1295 }
1296}
1297
1298#[cfg(test)]
1299mod tests {
1300 use super::*;
1301 use crate::{Manager, Receiver as _, Recipients, Sender as _};
1302 use bytes::Bytes;
1303 use commonware_cryptography::{ed25519, Signer as _};
1304 use commonware_runtime::{deterministic, Quota, Runner as _};
1305 use futures::FutureExt;
1306 use std::num::NonZeroU32;
1307
1308 const MAX_MESSAGE_SIZE: u32 = 1024 * 1024;
1309
1310 const TEST_QUOTA: Quota = Quota::per_second(NonZeroU32::MAX);
1312
1313 #[test]
1314 fn test_register_and_link() {
1315 let executor = deterministic::Runner::default();
1316 executor.start(|context| async move {
1317 let cfg = Config {
1318 max_size: MAX_MESSAGE_SIZE,
1319 disconnect_on_block: true,
1320 tracked_peer_sets: Some(3),
1321 };
1322 let network_context = context.with_label("network");
1323 let (network, mut oracle) = Network::new(network_context.clone(), cfg);
1324 network_context.spawn(|_| network.run());
1325
1326 let pk1 = ed25519::PrivateKey::from_seed(1).public_key();
1328 let pk2 = ed25519::PrivateKey::from_seed(2).public_key();
1329
1330 let mut manager = oracle.manager();
1332 manager
1333 .update(0, [pk1.clone(), pk2.clone()].try_into().unwrap())
1334 .await;
1335 let mut control = oracle.control(pk1.clone());
1336 control.register(0, TEST_QUOTA).await.unwrap();
1337 control.register(1, TEST_QUOTA).await.unwrap();
1338 let mut control = oracle.control(pk2.clone());
1339 control.register(0, TEST_QUOTA).await.unwrap();
1340 control.register(1, TEST_QUOTA).await.unwrap();
1341
1342 control.register(1, TEST_QUOTA).await.unwrap();
1344
1345 let link = ingress::Link {
1347 latency: Duration::from_millis(2),
1348 jitter: Duration::from_millis(1),
1349 success_rate: 0.9,
1350 };
1351 oracle
1352 .add_link(pk1.clone(), pk2.clone(), link.clone())
1353 .await
1354 .unwrap();
1355
1356 assert!(matches!(
1358 oracle.add_link(pk1, pk2, link).await,
1359 Err(Error::LinkExists)
1360 ));
1361 });
1362 }
1363
1364 #[test]
1365 fn test_split_channel_single() {
1366 let executor = deterministic::Runner::default();
1367 executor.start(|context| async move {
1368 let cfg = Config {
1369 max_size: MAX_MESSAGE_SIZE,
1370 disconnect_on_block: true,
1371 tracked_peer_sets: Some(3),
1372 };
1373 let network_context = context.with_label("network");
1374 let (network, mut oracle) = Network::new(network_context.clone(), cfg);
1375 network_context.spawn(|_| network.run());
1376
1377 let twin = ed25519::PrivateKey::from_seed(20).public_key();
1379 let peer_a = ed25519::PrivateKey::from_seed(21).public_key();
1380 let peer_b = ed25519::PrivateKey::from_seed(22).public_key();
1381
1382 let mut manager = oracle.manager();
1384 manager
1385 .update(
1386 0,
1387 [twin.clone(), peer_a.clone(), peer_b.clone()]
1388 .try_into()
1389 .unwrap(),
1390 )
1391 .await;
1392
1393 let (mut peer_a_sender, mut peer_a_recv) = oracle
1395 .control(peer_a.clone())
1396 .register(0, TEST_QUOTA)
1397 .await
1398 .unwrap();
1399 let (mut peer_b_sender, mut peer_b_recv) = oracle
1400 .control(peer_b.clone())
1401 .register(0, TEST_QUOTA)
1402 .await
1403 .unwrap();
1404
1405 let (twin_sender, twin_receiver) = oracle
1411 .control(twin.clone())
1412 .register(0, TEST_QUOTA)
1413 .await
1414 .unwrap();
1415 let peer_a_for_router = peer_a.clone();
1416 let peer_b_for_router = peer_b.clone();
1417 let (mut twin_primary_sender, mut twin_secondary_sender) =
1418 twin_sender.split_with(move |origin, _, _| match origin {
1419 SplitOrigin::Primary => Some(Recipients::One(peer_a_for_router.clone())),
1420 SplitOrigin::Secondary => Some(Recipients::One(peer_b_for_router.clone())),
1421 });
1422 let peer_a_for_recv = peer_a.clone();
1423 let peer_b_for_recv = peer_b.clone();
1424 let (mut twin_primary_recv, mut twin_secondary_recv) = twin_receiver.split_with(
1425 context.with_label("split_receiver"),
1426 move |(sender, _)| {
1427 if sender == &peer_a_for_recv {
1428 SplitTarget::Primary
1429 } else if sender == &peer_b_for_recv {
1430 SplitTarget::Secondary
1431 } else {
1432 panic!("unexpected sender");
1433 }
1434 },
1435 );
1436
1437 let link = ingress::Link {
1439 latency: Duration::from_millis(0),
1440 jitter: Duration::from_millis(0),
1441 success_rate: 1.0,
1442 };
1443 oracle
1444 .add_link(peer_a.clone(), twin.clone(), link.clone())
1445 .await
1446 .unwrap();
1447 oracle
1448 .add_link(twin.clone(), peer_a.clone(), link.clone())
1449 .await
1450 .unwrap();
1451 oracle
1452 .add_link(peer_b.clone(), twin.clone(), link.clone())
1453 .await
1454 .unwrap();
1455 oracle
1456 .add_link(twin.clone(), peer_b.clone(), link.clone())
1457 .await
1458 .unwrap();
1459
1460 let msg_a_to_twin = Bytes::from_static(b"from_a");
1462 let msg_b_to_twin = Bytes::from_static(b"from_b");
1463 let msg_primary_out = Bytes::from_static(b"primary_out");
1464 let msg_secondary_out = Bytes::from_static(b"secondary_out");
1465 peer_a_sender
1466 .send(Recipients::One(twin.clone()), msg_a_to_twin.clone(), false)
1467 .await
1468 .unwrap();
1469 peer_b_sender
1470 .send(Recipients::One(twin.clone()), msg_b_to_twin.clone(), false)
1471 .await
1472 .unwrap();
1473 twin_primary_sender
1474 .send(Recipients::All, msg_primary_out.clone(), false)
1475 .await
1476 .unwrap();
1477 twin_secondary_sender
1478 .send(Recipients::All, msg_secondary_out.clone(), false)
1479 .await
1480 .unwrap();
1481
1482 let (sender, payload) = twin_primary_recv.recv().await.unwrap();
1484 assert_eq!(sender, peer_a);
1485 assert_eq!(payload, msg_a_to_twin);
1486 let (sender, payload) = twin_secondary_recv.recv().await.unwrap();
1487 assert_eq!(sender, peer_b);
1488 assert_eq!(payload, msg_b_to_twin);
1489
1490 let (sender, payload) = peer_a_recv.recv().await.unwrap();
1492 assert_eq!(sender, twin);
1493 assert_eq!(payload, msg_primary_out);
1494 let (sender, payload) = peer_b_recv.recv().await.unwrap();
1495 assert_eq!(sender, twin);
1496 assert_eq!(payload, msg_secondary_out);
1497 });
1498 }
1499
1500 #[test]
1501 fn test_split_channel_both() {
1502 let executor = deterministic::Runner::default();
1503 executor.start(|context| async move {
1504 let cfg = Config {
1505 max_size: MAX_MESSAGE_SIZE,
1506 disconnect_on_block: true,
1507 tracked_peer_sets: Some(3),
1508 };
1509 let network_context = context.with_label("network");
1510 let (network, mut oracle) = Network::new(network_context.clone(), cfg);
1511 network_context.spawn(|_| network.run());
1512
1513 let twin = ed25519::PrivateKey::from_seed(30).public_key();
1515 let peer_c = ed25519::PrivateKey::from_seed(31).public_key();
1516
1517 let mut manager = oracle.manager();
1519 manager
1520 .update(0, [twin.clone(), peer_c.clone()].try_into().unwrap())
1521 .await;
1522
1523 let (mut peer_c_sender, _peer_c_recv) = oracle
1525 .control(peer_c.clone())
1526 .register(0, TEST_QUOTA)
1527 .await
1528 .unwrap();
1529
1530 let (twin_sender, twin_receiver) = oracle
1532 .control(twin.clone())
1533 .register(0, TEST_QUOTA)
1534 .await
1535 .unwrap();
1536 let (_twin_primary_sender, _twin_secondary_sender) =
1537 twin_sender.split_with(|_origin, recipients, _| Some(recipients.clone()));
1538 let (mut twin_primary_recv, mut twin_secondary_recv) = twin_receiver
1539 .split_with(context.with_label("split_receiver_both"), |_| {
1540 SplitTarget::Both
1541 });
1542
1543 let link = ingress::Link {
1545 latency: Duration::from_millis(0),
1546 jitter: Duration::from_millis(0),
1547 success_rate: 1.0,
1548 };
1549 oracle
1550 .add_link(peer_c.clone(), twin.clone(), link.clone())
1551 .await
1552 .unwrap();
1553 oracle
1554 .add_link(twin.clone(), peer_c.clone(), link)
1555 .await
1556 .unwrap();
1557
1558 let msg_both = Bytes::from_static(b"to_both");
1560 peer_c_sender
1561 .send(Recipients::One(twin.clone()), msg_both.clone(), false)
1562 .await
1563 .unwrap();
1564
1565 let (sender, payload) = twin_primary_recv.recv().await.unwrap();
1567 assert_eq!(sender, peer_c);
1568 assert_eq!(payload, msg_both);
1569 let (sender, payload) = twin_secondary_recv.recv().await.unwrap();
1570 assert_eq!(sender, peer_c);
1571 assert_eq!(payload, msg_both);
1572 });
1573 }
1574
1575 #[test]
1576 fn test_split_channel_none() {
1577 let executor = deterministic::Runner::default();
1578 executor.start(|context| async move {
1579 let cfg = Config {
1580 max_size: MAX_MESSAGE_SIZE,
1581 disconnect_on_block: true,
1582 tracked_peer_sets: Some(3),
1583 };
1584 let network_context = context.with_label("network");
1585 let (network, mut oracle) = Network::new(network_context.clone(), cfg);
1586 network_context.spawn(|_| network.run());
1587
1588 let twin = ed25519::PrivateKey::from_seed(30).public_key();
1590 let peer_c = ed25519::PrivateKey::from_seed(31).public_key();
1591
1592 let mut manager = oracle.manager();
1594 manager
1595 .update(0, [twin.clone(), peer_c.clone()].try_into().unwrap())
1596 .await;
1597
1598 let (mut peer_c_sender, _peer_c_recv) = oracle
1600 .control(peer_c.clone())
1601 .register(0, TEST_QUOTA)
1602 .await
1603 .unwrap();
1604
1605 let (twin_sender, twin_receiver) = oracle
1607 .control(twin.clone())
1608 .register(0, TEST_QUOTA)
1609 .await
1610 .unwrap();
1611 let (mut twin_primary_sender, mut twin_secondary_sender) =
1612 twin_sender.split_with(|_origin, _, _| None);
1613 let (mut twin_primary_recv, mut twin_secondary_recv) = twin_receiver
1614 .split_with(context.with_label("split_receiver_both"), |_| {
1615 SplitTarget::None
1616 });
1617
1618 let link = ingress::Link {
1620 latency: Duration::from_millis(0),
1621 jitter: Duration::from_millis(0),
1622 success_rate: 1.0,
1623 };
1624 oracle
1625 .add_link(peer_c.clone(), twin.clone(), link.clone())
1626 .await
1627 .unwrap();
1628 oracle
1629 .add_link(twin.clone(), peer_c.clone(), link)
1630 .await
1631 .unwrap();
1632
1633 let msg_both = Bytes::from_static(b"to_both");
1635 let sent = peer_c_sender
1636 .send(Recipients::One(twin.clone()), msg_both.clone(), false)
1637 .await
1638 .unwrap();
1639 assert_eq!(sent.len(), 1);
1640 assert_eq!(sent[0], twin);
1641
1642 context.sleep(Duration::from_millis(100)).await;
1644 assert!(twin_primary_recv.recv().now_or_never().is_none());
1645 assert!(twin_secondary_recv.recv().now_or_never().is_none());
1646
1647 let msg_both = Bytes::from_static(b"to_both");
1649 let sent = twin_primary_sender
1650 .send(Recipients::One(peer_c.clone()), msg_both.clone(), false)
1651 .await
1652 .unwrap();
1653 assert_eq!(sent.len(), 0);
1654
1655 let msg_both = Bytes::from_static(b"to_both");
1657 let sent = twin_secondary_sender
1658 .send(Recipients::One(peer_c.clone()), msg_both.clone(), false)
1659 .await
1660 .unwrap();
1661 assert_eq!(sent.len(), 0);
1662 });
1663 }
1664
1665 #[test]
1666 fn test_unordered_peer_sets() {
1667 let executor = deterministic::Runner::default();
1668 executor.start(|context| async move {
1669 let cfg = Config {
1670 max_size: MAX_MESSAGE_SIZE,
1671 disconnect_on_block: true,
1672 tracked_peer_sets: Some(3),
1673 };
1674 let network_context = context.with_label("network");
1675 let (network, oracle) = Network::new(network_context.clone(), cfg);
1676 network_context.spawn(|_| network.run());
1677
1678 let pk1 = ed25519::PrivateKey::from_seed(1).public_key();
1680 let pk2 = ed25519::PrivateKey::from_seed(2).public_key();
1681
1682 let mut manager = oracle.manager();
1684 let mut subscription = manager.subscribe().await;
1685
1686 manager
1688 .update(10, [pk1.clone(), pk2.clone()].try_into().unwrap())
1689 .await;
1690 let (id, new, all) = subscription.next().await.unwrap();
1691 assert_eq!(id, 10);
1692 assert_eq!(new.len(), 2);
1693 assert_eq!(all.len(), 2);
1694
1695 let pk3 = ed25519::PrivateKey::from_seed(3).public_key();
1697 manager.update(9, [pk3.clone()].try_into().unwrap()).await;
1698
1699 let pk4 = ed25519::PrivateKey::from_seed(4).public_key();
1701 manager.update(11, [pk4.clone()].try_into().unwrap()).await;
1702 let (id, new, all) = subscription.next().await.unwrap();
1703 assert_eq!(id, 11);
1704 assert_eq!(new, [pk4.clone()].try_into().unwrap());
1705 assert_eq!(all, [pk1, pk2, pk4].try_into().unwrap());
1706 });
1707 }
1708
1709 #[test]
1710 fn test_get_next_socket() {
1711 let cfg = Config {
1712 max_size: MAX_MESSAGE_SIZE,
1713 disconnect_on_block: true,
1714 tracked_peer_sets: None,
1715 };
1716 let runner = deterministic::Runner::default();
1717
1718 runner.start(|context| async move {
1719 type PublicKey = ed25519::PublicKey;
1720 let (mut network, _) =
1721 Network::<deterministic::Context, PublicKey>::new(context.clone(), cfg);
1722
1723 let mut original = network.next_addr;
1725 let next = network.get_next_socket();
1726 assert_eq!(next, original);
1727 let next = network.get_next_socket();
1728 original.set_port(1);
1729 assert_eq!(next, original);
1730
1731 let max_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(255, 0, 255, 255)), 65535);
1733 network.next_addr = max_addr;
1734 let next = network.get_next_socket();
1735 assert_eq!(next, max_addr);
1736 let next = network.get_next_socket();
1737 assert_eq!(
1738 next,
1739 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(255, 1, 0, 0)), 0)
1740 );
1741 });
1742 }
1743
1744 #[test]
1745 fn test_fifo_burst_same_recipient() {
1746 let cfg = Config {
1747 max_size: MAX_MESSAGE_SIZE,
1748 disconnect_on_block: true,
1749 tracked_peer_sets: Some(3),
1750 };
1751 let runner = deterministic::Runner::default();
1752
1753 runner.start(|context| async move {
1754 let (network, mut oracle) = Network::new(context.with_label("network"), cfg);
1755 let network_handle = network.start();
1756
1757 let sender_pk = ed25519::PrivateKey::from_seed(10).public_key();
1758 let recipient_pk = ed25519::PrivateKey::from_seed(11).public_key();
1759
1760 let mut manager = oracle.manager();
1761 manager
1762 .update(
1763 0,
1764 [sender_pk.clone(), recipient_pk.clone()]
1765 .try_into()
1766 .unwrap(),
1767 )
1768 .await;
1769 let (mut sender, _sender_recv) = oracle
1770 .control(sender_pk.clone())
1771 .register(0, TEST_QUOTA)
1772 .await
1773 .unwrap();
1774 let (_sender2, mut receiver) = oracle
1775 .control(recipient_pk.clone())
1776 .register(0, TEST_QUOTA)
1777 .await
1778 .unwrap();
1779
1780 oracle
1781 .limit_bandwidth(sender_pk.clone(), Some(5_000), None)
1782 .await
1783 .unwrap();
1784 oracle
1785 .limit_bandwidth(recipient_pk.clone(), None, Some(5_000))
1786 .await
1787 .unwrap();
1788
1789 oracle
1790 .add_link(
1791 sender_pk.clone(),
1792 recipient_pk.clone(),
1793 ingress::Link {
1794 latency: Duration::from_millis(0),
1795 jitter: Duration::from_millis(0),
1796 success_rate: 1.0,
1797 },
1798 )
1799 .await
1800 .unwrap();
1801
1802 const COUNT: usize = 50;
1803 let mut expected = Vec::with_capacity(COUNT);
1804 for i in 0..COUNT {
1805 let msg = Bytes::from(vec![i as u8; 64]);
1806 sender
1807 .send(Recipients::One(recipient_pk.clone()), msg.clone(), false)
1808 .await
1809 .unwrap();
1810 expected.push(msg);
1811 }
1812
1813 for expected_msg in expected {
1814 let (_pk, bytes) = receiver.recv().await.unwrap();
1815 assert_eq!(bytes, expected_msg);
1816 }
1817
1818 drop(oracle);
1819 drop(sender);
1820 network_handle.abort();
1821 });
1822 }
1823
1824 #[test]
1825 fn test_broadcast_respects_transmit_latency() {
1826 let cfg = Config {
1827 max_size: MAX_MESSAGE_SIZE,
1828 disconnect_on_block: true,
1829 tracked_peer_sets: Some(3),
1830 };
1831 let runner = deterministic::Runner::default();
1832
1833 runner.start(|context| async move {
1834 let (network, mut oracle) = Network::new(context.with_label("network"), cfg);
1835 let network_handle = network.start();
1836
1837 let sender_pk = ed25519::PrivateKey::from_seed(42).public_key();
1838 let recipient_a = ed25519::PrivateKey::from_seed(43).public_key();
1839 let recipient_b = ed25519::PrivateKey::from_seed(44).public_key();
1840
1841 let mut manager = oracle.manager();
1842 manager
1843 .update(
1844 0,
1845 [sender_pk.clone(), recipient_a.clone(), recipient_b.clone()]
1846 .try_into()
1847 .unwrap(),
1848 )
1849 .await;
1850 let (mut sender, _recv_sender) = oracle
1851 .control(sender_pk.clone())
1852 .register(0, TEST_QUOTA)
1853 .await
1854 .unwrap();
1855 let (_sender2, mut recv_a) = oracle
1856 .control(recipient_a.clone())
1857 .register(0, TEST_QUOTA)
1858 .await
1859 .unwrap();
1860 let (_sender3, mut recv_b) = oracle
1861 .control(recipient_b.clone())
1862 .register(0, TEST_QUOTA)
1863 .await
1864 .unwrap();
1865
1866 oracle
1867 .limit_bandwidth(sender_pk.clone(), Some(1_000), None)
1868 .await
1869 .unwrap();
1870 oracle
1871 .limit_bandwidth(recipient_a.clone(), None, Some(1_000))
1872 .await
1873 .unwrap();
1874 oracle
1875 .limit_bandwidth(recipient_b.clone(), None, Some(1_000))
1876 .await
1877 .unwrap();
1878
1879 let link = ingress::Link {
1880 latency: Duration::from_millis(0),
1881 jitter: Duration::from_millis(0),
1882 success_rate: 1.0,
1883 };
1884 oracle
1885 .add_link(sender_pk.clone(), recipient_a.clone(), link.clone())
1886 .await
1887 .unwrap();
1888 oracle
1889 .add_link(sender_pk.clone(), recipient_b.clone(), link)
1890 .await
1891 .unwrap();
1892
1893 let big_msg = Bytes::from(vec![7u8; 10_000]);
1894 let start = context.current();
1895 sender
1896 .send(Recipients::All, big_msg.clone(), false)
1897 .await
1898 .unwrap();
1899
1900 let (_pk, received_a) = recv_a.recv().await.unwrap();
1901 assert_eq!(received_a, big_msg);
1902 let elapsed_a = context.current().duration_since(start).unwrap();
1903 assert!(elapsed_a >= Duration::from_secs(20));
1904
1905 let (_pk, received_b) = recv_b.recv().await.unwrap();
1906 assert_eq!(received_b, big_msg);
1907 let elapsed_b = context.current().duration_since(start).unwrap();
1908 assert!(elapsed_b >= Duration::from_secs(20));
1909
1910 assert!(elapsed_a.abs_diff(elapsed_b) <= Duration::from_secs(1));
1912
1913 drop(oracle);
1914 drop(sender);
1915 network_handle.abort();
1916 });
1917 }
1918}