1use super::{
4 ingress::{self, Oracle},
5 metrics,
6 transmitter::{self, Completion},
7 Error,
8};
9use crate::{
10 utils::{
11 limited::{CheckedSender as LimitedCheckedSender, Connected, LimitedSender},
12 PeerSetsAtIndex as PeerSetsAtIndexBase,
13 },
14 Channel, Message as NetworkMessage, PeerSetUpdate, Recipients, TrackedPeers,
15 UnlimitedSender as _,
16};
17use commonware_actor::{Feedback, Unreliable};
18use commonware_codec::{DecodeExt, FixedSize};
19use commonware_cryptography::PublicKey;
20use commonware_macros::select_loop;
21use commonware_runtime::{
22 spawn_cell,
23 telemetry::metrics::{CounterFamily, MetricsExt as _},
24 Clock, ContextCell, Handle, IoBuf, IoBufs, Listener as _, Metrics, Network as RNetwork, Quota,
25 Spawner,
26};
27use commonware_stream::utils::codec::{recv_frame, send_frame};
28use commonware_utils::{
29 channel::{fallible::FallibleExt, mpsc, oneshot, ring},
30 ordered::Set,
31 NZUsize, TryCollect,
32};
33use either::Either;
34use futures::{future, Sink};
35use rand::Rng;
36use rand_distr::{Distribution, Normal};
37use std::{
38 collections::{BTreeMap, BTreeSet, HashMap, VecDeque},
39 fmt::Debug,
40 net::{IpAddr, Ipv4Addr, SocketAddr},
41 num::NonZeroUsize,
42 pin::Pin,
43 sync::{
44 atomic::{AtomicBool, Ordering},
45 Arc,
46 },
47 time::{Duration, SystemTime},
48};
49use tracing::{debug, error, trace, warn};
50
51type PeerSetsAtIndex<P> = PeerSetsAtIndexBase<Set<P>, Set<P>>;
53
54type Task<P> = (Channel, P, Recipients<P>, IoBuf);
56
57struct RegistrationGuard {
58 active: Arc<AtomicBool>,
59}
60
61impl Drop for RegistrationGuard {
62 fn drop(&mut self) {
63 self.active.store(false, Ordering::Release);
64 }
65}
66
67#[derive(Clone, Copy, Debug, PartialEq, Eq)]
69#[must_use]
70pub enum SplitTarget {
71 None,
72 Primary,
73 Secondary,
74 Both,
75}
76
77#[derive(Clone, Copy, Debug, PartialEq, Eq)]
79#[must_use]
80pub enum SplitOrigin {
81 Primary,
82 Secondary,
83}
84
85pub trait SplitForwarder<P: PublicKey>:
87 Fn(SplitOrigin, &Recipients<P>, &IoBuf) -> Option<Recipients<P>> + Send + Sync + Clone + 'static
88{
89}
90
91impl<P: PublicKey, F> SplitForwarder<P> for F where
92 F: Fn(SplitOrigin, &Recipients<P>, &IoBuf) -> Option<Recipients<P>>
93 + Send
94 + Sync
95 + Clone
96 + 'static
97{
98}
99
100pub trait SplitRouter<P: PublicKey>:
102 Fn(&NetworkMessage<P>) -> SplitTarget + Send + Sync + 'static
103{
104}
105
106impl<P: PublicKey, F> SplitRouter<P> for F where
107 F: Fn(&NetworkMessage<P>) -> SplitTarget + Send + Sync + 'static
108{
109}
110
111#[derive(Clone, Copy, Default)]
113struct PeerRefCounts {
114 primary: usize,
115 secondary: usize,
116}
117
118pub struct Config {
120 pub max_size: u32,
122
123 pub disconnect_on_block: bool,
127
128 pub tracked_peer_sets: NonZeroUsize,
133}
134
135pub struct Network<E: RNetwork + Spawner + Rng + Clock + Metrics, P: PublicKey> {
137 context: ContextCell<E>,
138
139 max_size: u32,
141
142 disconnect_on_block: bool,
146
147 next_addr: SocketAddr,
150
151 ingress: mpsc::UnboundedReceiver<ingress::Message<P, E>>,
153
154 ingress_sender: mpsc::UnboundedSender<ingress::Message<P, E>>,
156
157 links: HashMap<(P, P), Link>,
159
160 peers: BTreeMap<P, Peer<P>>,
162
163 peer_sets: BTreeMap<u64, PeerSetsAtIndex<P>>,
165
166 peer_ref_counts: BTreeMap<P, PeerRefCounts>,
168
169 tracked_peer_sets: NonZeroUsize,
171
172 blocks: BTreeSet<(P, P)>,
174
175 transmitter: transmitter::State<P>,
177
178 subscribers: Vec<mpsc::UnboundedSender<PeerSetUpdate<P>>>,
180
181 peer_subscribers: Vec<(P, ring::Sender<Vec<P>>)>,
183
184 received_messages: CounterFamily<metrics::Message>,
186 sent_messages: CounterFamily<metrics::Message>,
187}
188
189impl<E: RNetwork + Spawner + Rng + Clock + Metrics, P: PublicKey> Network<E, P> {
190 pub fn new(mut context: E, cfg: Config) -> (Self, Oracle<P, E>) {
195 let (oracle_mailbox, oracle_receiver) = mpsc::unbounded_channel();
196 let sent_messages = context.family("messages_sent", "messages sent");
197 let received_messages = context.family("messages_received", "messages received");
198
199 let next_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::from_bits(context.next_u32())), 0);
201
202 (
203 Self {
204 context: ContextCell::new(context),
205 max_size: cfg.max_size,
206 disconnect_on_block: cfg.disconnect_on_block,
207 tracked_peer_sets: cfg.tracked_peer_sets,
208 next_addr,
209 ingress: oracle_receiver,
210 ingress_sender: oracle_mailbox.clone(),
211 links: HashMap::new(),
212 peers: BTreeMap::new(),
213 peer_sets: BTreeMap::new(),
214 peer_ref_counts: BTreeMap::new(),
215 blocks: BTreeSet::new(),
216 transmitter: transmitter::State::new(),
217 subscribers: Vec::new(),
218 peer_subscribers: Vec::new(),
219 received_messages,
220 sent_messages,
221 },
222 Oracle::new(oracle_mailbox),
223 )
224 }
225
226 pub async fn new_with_peers<I>(context: E, cfg: Config, peers: I) -> (Self, Oracle<P, E>)
231 where
232 I: IntoIterator<Item = P>,
233 {
234 Self::new_with_split_peers(context, cfg, peers, std::iter::empty()).await
235 }
236
237 pub async fn new_with_split_peers<I, J>(
242 context: E,
243 cfg: Config,
244 primary: I,
245 secondary: J,
246 ) -> (Self, Oracle<P, E>)
247 where
248 I: IntoIterator<Item = P>,
249 J: IntoIterator<Item = P>,
250 {
251 let (mut network, oracle) = Self::new(context, cfg);
252 network
253 .register_tracked_peer_set(
254 0,
255 TrackedPeers::new(
256 Set::from_iter_dedup(primary),
257 Set::from_iter_dedup(secondary),
258 ),
259 )
260 .await;
261 (network, oracle)
262 }
263
264 async fn register_tracked_peer_set(&mut self, id: u64, peers: TrackedPeers<P>) -> bool {
266 let primary = peers.primary;
267 let secondary = peers.secondary;
268 let tracked_peer_sets = self.tracked_peer_sets;
269
270 if self.peer_sets.contains_key(&id) {
272 warn!(id, "peer set already exists");
273 return false;
274 }
275
276 if let Some((last, _)) = self.peer_sets.last_key_value() {
278 if id <= *last {
279 warn!(
280 new_id = id,
281 old_id = last,
282 "attempted to register peer set with non-monotonically increasing ID"
283 );
284 return false;
285 }
286 }
287
288 for public_key in primary.iter() {
290 self.ensure_peer_exists(public_key).await;
291 self.peer_ref_counts
292 .entry(public_key.clone())
293 .or_default()
294 .primary += 1;
295 }
296
297 let secondary_filtered = Set::from_iter_dedup(
299 secondary
300 .iter()
301 .filter(|s| primary.position(s).is_none())
302 .cloned(),
303 );
304 for public_key in secondary_filtered.iter() {
305 self.ensure_peer_exists(public_key).await;
306 self.peer_ref_counts
307 .entry(public_key.clone())
308 .or_default()
309 .secondary += 1;
310 }
311 self.peer_sets.insert(
312 id,
313 PeerSetsAtIndex {
314 primary: primary.clone(),
315 secondary: secondary_filtered,
316 },
317 );
318
319 while self.peer_sets.len() > tracked_peer_sets.get() {
321 let (removed_index, sets) = self.peer_sets.pop_first().unwrap();
322 debug!(index = removed_index, "removed oldest tracked peer sets");
323
324 for public_key in sets.primary.iter() {
325 let counts = self
326 .peer_ref_counts
327 .get_mut(public_key)
328 .expect("reference map out of sync with peer sets");
329 counts.primary = counts
330 .primary
331 .checked_sub(1)
332 .expect("reference count underflow");
333 if counts.primary == 0 && counts.secondary == 0 {
334 self.peer_ref_counts.remove(public_key);
335 debug!(
336 ?public_key,
337 "removed peer no longer in any tracked peer set"
338 );
339 }
340 }
341
342 for public_key in sets.secondary.iter() {
343 let counts = self
344 .peer_ref_counts
345 .get_mut(public_key)
346 .expect("reference map out of sync with peer sets");
347 counts.secondary = counts
348 .secondary
349 .checked_sub(1)
350 .expect("reference count underflow");
351 if counts.primary == 0 && counts.secondary == 0 {
352 self.peer_ref_counts.remove(public_key);
353 debug!(
354 ?public_key,
355 "removed peer no longer in any tracked peer set"
356 );
357 }
358 }
359 }
360
361 true
362 }
363
364 fn get_next_socket(&mut self) -> SocketAddr {
369 let result = self.next_addr;
370
371 match self.next_addr.port().checked_add(1) {
374 Some(port) => {
375 self.next_addr.set_port(port);
376 }
377 None => {
378 let ip = match self.next_addr.ip() {
379 IpAddr::V4(ipv4) => ipv4,
380 _ => unreachable!(),
381 };
382 let next_ip = Ipv4Addr::to_bits(ip).wrapping_add(1);
383 self.next_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::from_bits(next_ip)), 0);
384 }
385 }
386
387 result
388 }
389
390 async fn handle_ingress(&mut self, message: ingress::Message<P, E>) {
394 fn send_result<T: std::fmt::Debug>(
398 result: oneshot::Sender<Result<T, Error>>,
399 value: Result<T, Error>,
400 ) {
401 let success = value.is_ok();
402 if let Err(e) = result.send(value) {
403 error!(?e, "failed to send result to oracle (ok = {})", success);
404 }
405 }
406
407 match message {
408 ingress::Message::Send {
409 channel,
410 origin,
411 recipients,
412 message,
413 ..
414 } => {
415 self.handle_task((channel, origin, recipients, message));
416 }
417 ingress::Message::Track { id, peers } => {
418 if !self.register_tracked_peer_set(id, peers).await {
419 return;
420 }
421
422 let update = self
424 .latest_update()
425 .expect("latest update missing after successful track");
426 self.subscribers
427 .retain(|subscriber| subscriber.send_lossy(update.clone()));
428
429 self.broadcast_peer_list();
431 }
432 ingress::Message::Register {
433 channel,
434 public_key,
435 quota,
436 result,
437 } => {
438 let _ = self.ensure_peer_exists(&public_key).await;
440
441 let clock = self
443 .context
444 .child("rate_limiter")
445 .with_attribute("channel", channel)
446 .with_attribute("peer", &public_key);
447
448 let (sender, guard) = Sender::new(
450 public_key.clone(),
451 channel,
452 self.max_size,
453 self.ingress_sender.clone(),
454 self.connected_peers_for(&public_key),
455 clock,
456 quota,
457 );
458
459 let peer = self.peers.get_mut(&public_key).unwrap();
461 let receiver = match peer.register(channel, guard).await {
462 Ok(receiver) => Receiver { receiver },
463 Err(err) => return send_result(result, Err(err)),
464 };
465
466 send_result(result, Ok((sender, receiver)))
467 }
468 ingress::Message::PeerSet { id, response } => {
469 let _ = response.send(
470 self.peer_sets
471 .get(&id)
472 .map(|e| TrackedPeers::new(e.primary.clone(), e.secondary.clone())),
473 );
474 }
475 ingress::Message::Subscribe { response } => {
476 let (sender, receiver) = mpsc::unbounded_channel();
478
479 if let Some(update) = self.latest_update() {
481 sender.send_lossy(update);
482 }
483 self.subscribers.push(sender);
484
485 let _ = response.send(receiver);
487 }
488 ingress::Message::SubscribePeers { exclude, sender } => {
489 self.subscribe_connected(exclude, sender);
490 }
491 ingress::Message::LimitBandwidth {
492 public_key,
493 egress_cap,
494 ingress_cap,
495 result,
496 } => {
497 let _ = self.ensure_peer_exists(&public_key).await;
499
500 let now = self.context.current();
502 let completions = self
503 .transmitter
504 .limit(now, &public_key, egress_cap, ingress_cap);
505 self.process_completions(completions);
506
507 let _ = result.send(());
509 }
510 ingress::Message::AddLink {
511 sender,
512 receiver,
513 sampler,
514 success_rate,
515 result,
516 } => {
517 let _ = self.ensure_peer_exists(&sender).await;
519 let (receiver_socket, _) = self.ensure_peer_exists(&receiver).await;
520
521 let key = (sender.clone(), receiver.clone());
523 if self.links.contains_key(&key) {
524 return send_result(result, Err(Error::LinkExists));
525 }
526
527 let link = Link::new(
528 self.context.as_mut(),
529 sender,
530 receiver,
531 receiver_socket,
532 sampler,
533 success_rate,
534 self.max_size,
535 self.received_messages.clone(),
536 );
537 self.links.insert(key, link);
538 send_result(result, Ok(()))
539 }
540 ingress::Message::RemoveLink {
541 sender,
542 receiver,
543 result,
544 } => {
545 match self.links.remove(&(sender, receiver)) {
546 Some(_) => (),
547 None => return send_result(result, Err(Error::LinkMissing)),
548 }
549 send_result(result, Ok(()))
550 }
551 ingress::Message::Block { from, to } => {
552 self.blocks.insert((from, to));
553 }
554 ingress::Message::Blocked { result } => {
555 send_result(result, Ok(self.blocks.iter().cloned().collect()))
556 }
557 }
558 }
559
560 async fn ensure_peer_exists(&mut self, public_key: &P) -> (SocketAddr, bool) {
564 if !self.peers.contains_key(public_key) {
565 let socket = self.get_next_socket();
567 let peer = Peer::new(
568 self.context.child("peer"),
569 public_key.clone(),
570 socket,
571 self.max_size,
572 )
573 .await;
574
575 self.peers.insert(public_key.clone(), peer);
577
578 (socket, true)
579 } else {
580 (self.peers.get(public_key).unwrap().socket, false)
581 }
582 }
583
584 fn subscribe_connected(&mut self, exclude: P, mut sender: ring::Sender<Vec<P>>) {
585 let peers = self.connected_peers_for(&exclude);
586 if Pin::new(&mut sender).start_send(peers).is_ok() {
587 self.peer_subscribers.push((exclude, sender));
588 }
589 }
590
591 fn broadcast_peer_list(&mut self) {
599 if self.peer_subscribers.is_empty() {
600 return;
601 }
602
603 let peers: Vec<P> = self.peer_ref_counts.keys().cloned().collect();
604 let mut live_subscribers = Vec::with_capacity(self.peer_subscribers.len());
605 for (exclude, mut subscriber) in self.peer_subscribers.drain(..) {
606 let peer_list = if peers.contains(&exclude) {
607 peers
608 .iter()
609 .filter(|peer| *peer != &exclude)
610 .cloned()
611 .collect()
612 } else {
613 Vec::new()
614 };
615 if Pin::new(&mut subscriber).start_send(peer_list).is_ok() {
616 live_subscribers.push((exclude, subscriber));
617 }
618 }
619 self.peer_subscribers = live_subscribers;
620 }
621
622 fn aggregate_peer_membership(&self) -> TrackedPeers<P> {
627 let primary = self
628 .peer_ref_counts
629 .iter()
630 .filter(|(_, c)| c.primary > 0)
631 .map(|(k, _)| k.clone())
632 .try_collect()
633 .expect("BTreeMap keys are unique");
634 let secondary = Set::from_iter_dedup(
635 self.peer_ref_counts
636 .iter()
637 .filter(|(_, c)| c.secondary > 0 && c.primary == 0)
638 .map(|(k, _)| k.clone()),
639 );
640 TrackedPeers::new(primary, secondary)
641 }
642
643 fn latest_update(&self) -> Option<PeerSetUpdate<P>> {
645 let (index, entry) = self.peer_sets.last_key_value()?;
646 Some(PeerSetUpdate {
647 index: *index,
648 latest: TrackedPeers::new(entry.primary.clone(), entry.secondary.clone()),
649 all: self.aggregate_peer_membership(),
650 })
651 }
652
653 fn connected_peers_for(&self, sender: &P) -> Vec<P> {
655 if !self.peer_ref_counts.contains_key(sender) {
656 return Vec::new();
657 }
658 self.peer_ref_counts
659 .keys()
660 .filter(|peer| *peer != sender)
661 .cloned()
662 .collect()
663 }
664
665 fn is_connectable(&self, peer: &P) -> bool {
667 self.peer_ref_counts.contains_key(peer)
668 }
669}
670
671impl<E: RNetwork + Spawner + Rng + Clock + Metrics, P: PublicKey> Network<E, P> {
672 fn process_completions(&mut self, completions: Vec<Completion<P>>) {
674 for completion in completions {
675 let Some(deliver_at) = completion.deliver_at else {
677 trace!(
678 origin = ?completion.origin,
679 recipient = ?completion.recipient,
680 "message dropped before delivery",
681 );
682 continue;
683 };
684
685 let key = (completion.origin.clone(), completion.recipient.clone());
687 let Some(link) = self.links.get_mut(&key) else {
688 trace!(
690 origin = ?completion.origin,
691 recipient = ?completion.recipient,
692 "missing link for completion",
693 );
694 continue;
695 };
696 if let Err(err) = link.send(completion.channel, completion.message, deliver_at) {
697 error!(?err, "failed to send");
698 }
699 }
700 }
701
702 fn handle_task(&mut self, task: Task<P>) {
707 let (channel, origin, recipients, message) = task;
708
709 if !self.is_connectable(&origin) {
711 warn!(
712 ?origin,
713 reason = "not primary or secondary",
714 "dropping message"
715 );
716 return;
717 }
718
719 let recipients = match recipients {
721 Recipients::All => self.connected_peers_for(&origin),
722 Recipients::Some(keys) => keys,
723 Recipients::One(key) => vec![key],
724 };
725
726 let now = self.context.current();
728 for recipient in recipients {
729 if recipient == origin {
731 trace!(?recipient, reason = "self", "dropping message");
732 continue;
733 }
734
735 if !self.is_connectable(&recipient) {
736 trace!(
737 ?origin,
738 ?recipient,
739 reason = "not primary or secondary",
740 "dropping message"
741 );
742 continue;
743 }
744
745 let o_r = (origin.clone(), recipient.clone());
747 let r_o = (recipient.clone(), origin.clone());
748 if self.disconnect_on_block
749 && (self.blocks.contains(&o_r) || self.blocks.contains(&r_o))
750 {
751 trace!(?origin, ?recipient, reason = "blocked", "dropping message");
752 continue;
753 }
754
755 let Some(link) = self.links.get_mut(&o_r) else {
757 trace!(?origin, ?recipient, reason = "no link", "dropping message");
758 continue;
759 };
760
761 self.sent_messages
767 .get_or_create(&metrics::Message::new(&origin, &recipient, channel))
768 .inc();
769
770 let latency = Duration::from_millis(link.sampler.sample(self.context.as_mut()) as u64);
772
773 let should_deliver = self.context.gen_bool(link.success_rate);
775
776 let completions = self.transmitter.enqueue(
778 now,
779 origin.clone(),
780 recipient.clone(),
781 channel,
782 message.clone(),
783 latency,
784 should_deliver,
785 );
786 self.process_completions(completions);
787 }
788 }
789
790 fn queue_task(
791 high: &mut VecDeque<Task<P>>,
792 low: &mut VecDeque<Task<P>>,
793 task: Task<P>,
794 priority: bool,
795 ) {
796 if priority {
797 high.push_back(task);
798 } else {
799 low.push_back(task);
800 }
801 }
802
803 fn handle_tasks(&mut self, high: &mut VecDeque<Task<P>>, low: &mut VecDeque<Task<P>>) {
804 while let Some(task) = high.pop_front() {
805 self.handle_task(task);
806 }
807 while let Some(task) = low.pop_front() {
808 self.handle_task(task);
809 }
810 }
811
812 async fn handle_ordered_ingress(
813 &mut self,
814 mut message: ingress::Message<P, E>,
815 high: &mut VecDeque<Task<P>>,
816 low: &mut VecDeque<Task<P>>,
817 ) {
818 loop {
819 match message {
820 ingress::Message::Send {
821 channel,
822 origin,
823 recipients,
824 message,
825 priority,
826 } => {
827 Self::queue_task(high, low, (channel, origin, recipients, message), priority);
828 }
829 message => {
830 self.handle_tasks(high, low);
831 self.handle_ingress(message).await;
832 return;
833 }
834 }
835
836 message = match self.ingress.try_recv() {
837 Ok(message) => message,
838 Err(_) => {
839 self.handle_tasks(high, low);
840 return;
841 }
842 };
843 }
844 }
845
846 pub fn start(mut self) -> Handle<()> {
851 spawn_cell!(self.context, self.run())
852 }
853
854 async fn run(mut self) {
855 let mut high = VecDeque::new();
856 let mut low = VecDeque::new();
857 select_loop! {
858 self.context,
859 on_start => {
860 let tick = match self.transmitter.next() {
861 Some(when) => Either::Left(self.context.sleep_until(when)),
862 None => Either::Right(future::pending()),
863 };
864 },
865 on_stopped => {},
866 _ = tick => {
867 let now = self.context.current();
868 let completions = self.transmitter.advance(now);
869 self.process_completions(completions);
870 },
871 Some(message) = self.ingress.recv() else break => {
872 self.handle_ordered_ingress(message, &mut high, &mut low)
873 .await;
874 },
875 }
876 }
877}
878
879pub struct ConnectedPeerProvider<P: PublicKey, E: Clock> {
883 me: P,
884 ingress: mpsc::UnboundedSender<ingress::Message<P, E>>,
885 peers: Vec<P>,
886 _clock: std::marker::PhantomData<E>,
887}
888
889impl<P: PublicKey, E: Clock> Clone for ConnectedPeerProvider<P, E> {
890 fn clone(&self) -> Self {
891 Self {
892 me: self.me.clone(),
893 ingress: self.ingress.clone(),
894 peers: self.peers.clone(),
895 _clock: std::marker::PhantomData,
896 }
897 }
898}
899
900impl<P: PublicKey, E: Clock> ConnectedPeerProvider<P, E> {
901 const fn new(
902 me: P,
903 ingress: mpsc::UnboundedSender<ingress::Message<P, E>>,
904 peers: Vec<P>,
905 ) -> Self {
906 Self {
907 me,
908 ingress,
909 peers,
910 _clock: std::marker::PhantomData,
911 }
912 }
913}
914
915impl<P: PublicKey, E: Clock> Connected for ConnectedPeerProvider<P, E> {
916 type PublicKey = P;
917
918 fn peers(&self) -> Vec<Self::PublicKey> {
919 self.peers.clone()
920 }
921
922 fn subscribe(&self) -> ring::Receiver<Vec<Self::PublicKey>> {
923 let (sender, receiver) = ring::channel(NZUsize!(1));
924 let _ = self.ingress.send_lossy(ingress::Message::SubscribePeers {
925 exclude: self.me.clone(),
926 sender,
927 });
928 receiver
929 }
930}
931
932pub struct UnlimitedSender<P: PublicKey, E: Clock> {
936 me: P,
937 channel: Channel,
938 max_size: u32,
939 sender: mpsc::UnboundedSender<ingress::Message<P, E>>,
940 active: Arc<AtomicBool>,
941}
942
943impl<P: PublicKey, E: Clock> Clone for UnlimitedSender<P, E> {
944 fn clone(&self) -> Self {
945 Self {
946 me: self.me.clone(),
947 channel: self.channel,
948 max_size: self.max_size,
949 sender: self.sender.clone(),
950 active: self.active.clone(),
951 }
952 }
953}
954
955impl<P: PublicKey, E: Clock> crate::UnlimitedSender for UnlimitedSender<P, E> {
956 type PublicKey = P;
957
958 fn send(
959 &mut self,
960 recipients: Recipients<P>,
961 message: impl Into<IoBufs> + Send,
962 priority: bool,
963 ) -> Unreliable<Feedback> {
964 let message = message.into().coalesce();
965 assert!(
966 message.len() <= self.max_size as usize,
967 "message too large: {} > {}",
968 message.len(),
969 self.max_size
970 );
971
972 if !self.active.load(Ordering::Acquire) || self.sender.is_closed() {
973 return Unreliable::new(Feedback::Closed);
974 }
975
976 if self.sender.send_lossy(ingress::Message::Send {
980 channel: self.channel,
981 origin: self.me.clone(),
982 recipients,
983 message,
984 priority,
985 }) {
986 Unreliable::new(Feedback::Ok)
987 } else {
988 Unreliable::new(Feedback::Closed)
989 }
990 }
991}
992
993pub struct Sender<P: PublicKey, E: Clock> {
998 limited_sender: LimitedSender<E, UnlimitedSender<P, E>, ConnectedPeerProvider<P, E>>,
999}
1000
1001impl<P: PublicKey, E: Clock> Clone for Sender<P, E> {
1002 fn clone(&self) -> Self {
1003 Self {
1004 limited_sender: self.limited_sender.clone(),
1005 }
1006 }
1007}
1008
1009impl<P: PublicKey, E: Clock> Debug for Sender<P, E> {
1010 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1011 f.debug_struct("Sender").finish_non_exhaustive()
1012 }
1013}
1014
1015impl<P: PublicKey, E: Clock> Sender<P, E> {
1016 #[allow(clippy::too_many_arguments)]
1017 fn new(
1018 me: P,
1019 channel: Channel,
1020 max_size: u32,
1021 ingress: mpsc::UnboundedSender<ingress::Message<P, E>>,
1022 connected_peers: Vec<P>,
1023 clock: E,
1024 quota: Quota,
1025 ) -> (Self, RegistrationGuard) {
1026 let active = Arc::new(AtomicBool::new(true));
1027 let unlimited_sender = UnlimitedSender {
1028 me: me.clone(),
1029 channel,
1030 max_size,
1031 sender: ingress.clone(),
1032 active: active.clone(),
1033 };
1034 let peer_source = ConnectedPeerProvider::new(me, ingress, connected_peers);
1035 let limited_sender = LimitedSender::new(unlimited_sender, quota, clock, peer_source);
1036
1037 (Self { limited_sender }, RegistrationGuard { active })
1038 }
1039
1040 pub fn split_with<F: SplitForwarder<P>>(
1042 self,
1043 forwarder: F,
1044 ) -> (SplitSender<P, E, F>, SplitSender<P, E, F>) {
1045 (
1046 SplitSender {
1047 replica: SplitOrigin::Primary,
1048 inner: self.clone(),
1049 forwarder: forwarder.clone(),
1050 },
1051 SplitSender {
1052 replica: SplitOrigin::Secondary,
1053 inner: self,
1054 forwarder,
1055 },
1056 )
1057 }
1058}
1059
1060impl<P: PublicKey, E: Clock> crate::LimitedSender for Sender<P, E> {
1061 type PublicKey = P;
1062 type Checked<'a>
1063 = crate::utils::limited::CheckedSender<'a, UnlimitedSender<P, E>>
1064 where
1065 Self: 'a;
1066
1067 fn check(
1068 &mut self,
1069 recipients: Recipients<Self::PublicKey>,
1070 ) -> Result<Self::Checked<'_>, SystemTime> {
1071 self.limited_sender.check(recipients)
1072 }
1073}
1074
1075pub struct SplitSender<P: PublicKey, E: Clock, F: SplitForwarder<P>> {
1077 replica: SplitOrigin,
1078 inner: Sender<P, E>,
1079 forwarder: F,
1080}
1081
1082impl<P: PublicKey, E: Clock, F: SplitForwarder<P>> Clone for SplitSender<P, E, F> {
1083 fn clone(&self) -> Self {
1084 Self {
1085 replica: self.replica,
1086 inner: self.inner.clone(),
1087 forwarder: self.forwarder.clone(),
1088 }
1089 }
1090}
1091
1092impl<P: PublicKey, E: Clock, F: SplitForwarder<P>> std::fmt::Debug for SplitSender<P, E, F> {
1093 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1094 f.debug_struct("SplitSender")
1095 .field("replica", &self.replica)
1096 .field("inner", &self.inner)
1097 .finish()
1098 }
1099}
1100
1101impl<P: PublicKey, E: Clock, F: SplitForwarder<P>> crate::LimitedSender for SplitSender<P, E, F> {
1102 type PublicKey = P;
1103 type Checked<'a> = SplitCheckedSender<'a, P, E, F>;
1104
1105 fn check(
1106 &mut self,
1107 recipients: Recipients<Self::PublicKey>,
1108 ) -> Result<Self::Checked<'_>, SystemTime> {
1109 Ok(SplitCheckedSender {
1110 checked: self.inner.limited_sender.check(recipients.clone())?,
1113 replica: self.replica,
1114 forwarder: self.forwarder.clone(),
1115 recipients,
1116
1117 _phantom: std::marker::PhantomData,
1118 })
1119 }
1120}
1121
1122pub struct SplitCheckedSender<'a, P: PublicKey, E: Clock, F: SplitForwarder<P>> {
1127 checked: LimitedCheckedSender<'a, UnlimitedSender<P, E>>,
1128 replica: SplitOrigin,
1129 forwarder: F,
1130 recipients: Recipients<P>,
1131
1132 _phantom: std::marker::PhantomData<E>,
1133}
1134
1135impl<'a, P: PublicKey, E: Clock, F: SplitForwarder<P>> crate::CheckedSender
1136 for SplitCheckedSender<'a, P, E, F>
1137{
1138 type PublicKey = P;
1139
1140 fn recipients(&self) -> Vec<Self::PublicKey> {
1141 crate::CheckedSender::recipients(&self.checked)
1142 }
1143
1144 fn send(self, message: impl Into<IoBufs> + Send, priority: bool) -> Unreliable<Feedback> {
1145 let message = message.into().coalesce();
1147
1148 let Some(recipients) = (self.forwarder)(self.replica, &self.recipients, &message) else {
1150 return Unreliable::Rejected;
1151 };
1152
1153 self.checked
1160 .into_inner()
1161 .send(recipients, message, priority)
1162 }
1163}
1164
1165type MessageReceiver<P> = mpsc::UnboundedReceiver<NetworkMessage<P>>;
1166type ChannelRegistration<P> = (
1167 Channel,
1168 RegistrationGuard,
1169 oneshot::Sender<MessageReceiver<P>>,
1170);
1171
1172#[derive(Debug)]
1174pub struct Receiver<P: PublicKey> {
1175 receiver: MessageReceiver<P>,
1176}
1177
1178impl<P: PublicKey> crate::Receiver for Receiver<P> {
1179 type Error = Error;
1180 type PublicKey = P;
1181
1182 async fn recv(&mut self) -> Result<NetworkMessage<Self::PublicKey>, Error> {
1183 self.receiver.recv().await.ok_or(Error::NetworkClosed)
1184 }
1185}
1186
1187impl<P: PublicKey> Receiver<P> {
1188 pub fn split_with<E: Spawner, R: SplitRouter<P>>(
1190 mut self,
1191 context: E,
1192 router: R,
1193 ) -> (Self, Self) {
1194 let (primary_tx, primary_rx) = mpsc::unbounded_channel();
1195 let (secondary_tx, secondary_rx) = mpsc::unbounded_channel();
1196 context.spawn(move |_| async move {
1197 while let Some(message) = self.receiver.recv().await {
1198 let direction = router(&message);
1200 match direction {
1201 SplitTarget::None => {}
1202 SplitTarget::Primary => {
1203 if let Err(err) = primary_tx.send(message) {
1204 error!(?err, "failed to send message to primary");
1205 }
1206 }
1207 SplitTarget::Secondary => {
1208 if let Err(err) = secondary_tx.send(message) {
1209 error!(?err, "failed to send message to secondary");
1210 }
1211 }
1212 SplitTarget::Both => {
1213 if let Err(err) = primary_tx.send(message.clone()) {
1214 error!(?err, "failed to send message to primary");
1215 }
1216 if let Err(err) = secondary_tx.send(message) {
1217 error!(?err, "failed to send message to secondary");
1218 }
1219 }
1220 }
1221
1222 if primary_tx.is_closed() && secondary_tx.is_closed() {
1224 break;
1225 }
1226 }
1227 });
1228
1229 (
1230 Self {
1231 receiver: primary_rx,
1232 },
1233 Self {
1234 receiver: secondary_rx,
1235 },
1236 )
1237 }
1238}
1239
1240struct Peer<P: PublicKey> {
1244 socket: SocketAddr,
1246
1247 control: mpsc::UnboundedSender<ChannelRegistration<P>>,
1249}
1250
1251impl<P: PublicKey> Peer<P> {
1252 async fn new<E: Spawner + RNetwork + Metrics + Clock>(
1257 context: E,
1258 public_key: P,
1259 socket: SocketAddr,
1260 max_size: u32,
1261 ) -> Self {
1262 let (control_sender, mut control_receiver): (
1264 mpsc::UnboundedSender<ChannelRegistration<P>>,
1265 _,
1266 ) = mpsc::unbounded_channel();
1267
1268 let (inbox_sender, mut inbox_receiver) = mpsc::unbounded_channel();
1271
1272 context.child("router").spawn(|context| async move {
1274 let mut mailboxes = HashMap::new();
1276
1277 select_loop! {
1279 context,
1280 on_stopped => {},
1281 Some((channel, guard, result_tx)) = control_receiver.recv() else break => {
1283 let (receiver_tx, receiver_rx) = mpsc::unbounded_channel();
1285 if mailboxes.insert(channel, (receiver_tx, guard)).is_some() {
1286 warn!(?public_key, ?channel, "overwriting existing channel");
1287 }
1288 result_tx.send(receiver_rx).unwrap();
1289 },
1290
1291 Some((channel, message)) = inbox_receiver.recv() else break => {
1293 match mailboxes.get_mut(&channel) {
1295 Some((receiver_tx, _)) => {
1296 if let Err(err) = receiver_tx.send(message) {
1297 debug!(?err, "failed to send message to mailbox");
1298 }
1299 }
1300 None => {
1301 trace!(
1302 recipient = ?public_key,
1303 channel,
1304 reason = "missing channel",
1305 "dropping message",
1306 );
1307 }
1308 }
1309 },
1310 }
1311 });
1312
1313 let (ready_tx, ready_rx) = oneshot::channel();
1315 context.child("listener").spawn(move |context| async move {
1316 let mut listener = context.bind(socket).await.unwrap();
1318 let _ = ready_tx.send(());
1319
1320 while let Ok((_, _, mut stream)) = listener.accept().await {
1322 context.child("receiver").spawn({
1324 let inbox_sender = inbox_sender.clone();
1325 move |_| async move {
1326 let dialer = match recv_frame(&mut stream, max_size).await {
1328 Ok(data) => data,
1329 Err(_) => {
1330 error!("failed to receive public key from dialer");
1331 return;
1332 }
1333 };
1334 let Ok(dialer) = P::decode(dialer.coalesce()) else {
1335 error!("received public key is invalid");
1336 return;
1337 };
1338
1339 while let Ok(data) = recv_frame(&mut stream, max_size).await {
1341 let data = data.coalesce();
1342 let channel = Channel::from_be_bytes(
1343 data.as_ref()[..Channel::SIZE].try_into().unwrap(),
1344 );
1345 let message = data.slice(Channel::SIZE..);
1346 if let Err(err) =
1347 inbox_sender.send((channel, (dialer.clone(), message)))
1348 {
1349 debug!(?err, "failed to send message to mailbox");
1350 break;
1351 }
1352 }
1353 }
1354 });
1355 }
1356 });
1357
1358 let _ = ready_rx.await;
1360
1361 Self {
1363 socket,
1364 control: control_sender,
1365 }
1366 }
1367
1368 async fn register(
1373 &mut self,
1374 channel: Channel,
1375 guard: RegistrationGuard,
1376 ) -> Result<MessageReceiver<P>, Error> {
1377 let (result_tx, result_rx) = oneshot::channel();
1378 self.control
1379 .send((channel, guard, result_tx))
1380 .map_err(|_| Error::NetworkClosed)?;
1381 result_rx.await.map_err(|_| Error::NetworkClosed)
1382 }
1383}
1384
1385struct Link {
1388 sampler: Normal<f64>,
1389 success_rate: f64,
1390 inbox: mpsc::UnboundedSender<(Channel, IoBuf, SystemTime)>,
1392}
1393
1394impl Link {
1396 #[allow(clippy::too_many_arguments)]
1397 fn new<E: Spawner + RNetwork + Clock + Metrics, P: PublicKey>(
1398 context: &mut E,
1399 dialer: P,
1400 receiver: P,
1401 socket: SocketAddr,
1402 sampler: Normal<f64>,
1403 success_rate: f64,
1404 max_size: u32,
1405 received_messages: CounterFamily<metrics::Message>,
1406 ) -> Self {
1407 let (inbox, mut outbox) = mpsc::unbounded_channel::<(Channel, IoBuf, SystemTime)>();
1410 context.child("link").spawn(move |context| async move {
1411 let (mut sink, _) = context.dial(socket).await.unwrap();
1413 if let Err(err) = send_frame(&mut sink, dialer.as_ref().to_vec(), max_size).await {
1414 error!(?err, "failed to send public key to listener");
1415 return;
1416 }
1417
1418 while let Some((channel, message, receive_complete_at)) = outbox.recv().await {
1420 context.sleep_until(receive_complete_at).await;
1422
1423 let channel_bytes = channel.to_be_bytes();
1425 let mut data = Vec::with_capacity(channel_bytes.len() + message.len());
1426 data.extend_from_slice(&channel_bytes);
1427 data.extend_from_slice(message.as_ref());
1428 let _ = send_frame(&mut sink, data, max_size).await;
1429
1430 received_messages
1432 .get_or_create(&metrics::Message::new(&dialer, &receiver, channel))
1433 .inc();
1434 }
1435 });
1436
1437 Self {
1438 sampler,
1439 success_rate,
1440 inbox,
1441 }
1442 }
1443
1444 fn send(
1446 &mut self,
1447 channel: Channel,
1448 message: IoBuf,
1449 receive_complete_at: SystemTime,
1450 ) -> Result<(), Error> {
1451 self.inbox
1452 .send((channel, message, receive_complete_at))
1453 .map_err(|_| Error::NetworkClosed)?;
1454 Ok(())
1455 }
1456}
1457
1458#[cfg(test)]
1459mod tests {
1460 use super::*;
1461 use crate::{
1462 CheckedSender as _, LimitedSender as _, Manager as _, Provider, Receiver as _, Recipients,
1463 Sender as _, TrackedPeers,
1464 };
1465 use commonware_cryptography::{ed25519, Signer as _};
1466 use commonware_runtime::{deterministic, Quota, Runner as _, Supervisor as _};
1467 use commonware_utils::{ordered::Set, NZUsize};
1468 use futures::FutureExt;
1469 use std::num::NonZeroU32;
1470
1471 const MAX_MESSAGE_SIZE: u32 = 1024 * 1024;
1472
1473 const TEST_QUOTA: Quota = Quota::per_second(NonZeroU32::MAX);
1475
1476 async fn send_when_ready(
1477 context: &deterministic::Context,
1478 sender: &mut Sender<ed25519::PublicKey, deterministic::Context>,
1479 recipients: Recipients<ed25519::PublicKey>,
1480 expected_recipients: usize,
1481 message: Vec<u8>,
1482 priority: bool,
1483 ) -> SystemTime {
1484 loop {
1485 let checked = sender.check(recipients.clone()).unwrap();
1486 if checked.recipients().len() == expected_recipients {
1487 checked.send(message, priority);
1488 return context.current();
1489 }
1490 context.sleep(Duration::from_millis(1)).await;
1491 }
1492 }
1493
1494 #[test]
1497 fn test_register_and_link() {
1498 let executor = deterministic::Runner::default();
1499 executor.start(|context| async move {
1500 let cfg = Config {
1501 max_size: MAX_MESSAGE_SIZE,
1502 disconnect_on_block: true,
1503 tracked_peer_sets: NZUsize!(3),
1504 };
1505 let pk1 = ed25519::PrivateKey::from_seed(1).public_key();
1507 let pk2 = ed25519::PrivateKey::from_seed(2).public_key();
1508 let peers = [pk1.clone(), pk2.clone()];
1509
1510 let (network, oracle) =
1511 Network::new_with_peers(context.child("network"), cfg, peers).await;
1512 network.start();
1513
1514 let control = oracle.control(pk1.clone());
1515 control.register(0, TEST_QUOTA).await.unwrap();
1516 control.register(1, TEST_QUOTA).await.unwrap();
1517 let control = oracle.control(pk2.clone());
1518 control.register(0, TEST_QUOTA).await.unwrap();
1519 control.register(1, TEST_QUOTA).await.unwrap();
1520
1521 control.register(1, TEST_QUOTA).await.unwrap();
1523
1524 let link = ingress::Link {
1526 latency: Duration::from_millis(2),
1527 jitter: Duration::from_millis(1),
1528 success_rate: 0.9,
1529 };
1530 oracle
1531 .add_link(pk1.clone(), pk2.clone(), link.clone())
1532 .await
1533 .unwrap();
1534
1535 assert!(matches!(
1537 oracle.add_link(pk1, pk2, link).await,
1538 Err(Error::LinkExists)
1539 ));
1540 });
1541 }
1542
1543 #[test]
1546 fn test_new_with_split_peers_seeds_initial_update() {
1547 let executor = deterministic::Runner::default();
1548 executor.start(|context| async move {
1549 let cfg = Config {
1550 max_size: MAX_MESSAGE_SIZE,
1551 disconnect_on_block: true,
1552 tracked_peer_sets: NZUsize!(3),
1553 };
1554 let primary = ed25519::PrivateKey::from_seed(11).public_key();
1555 let secondary = ed25519::PrivateKey::from_seed(12).public_key();
1556
1557 let (network, oracle) = Network::new_with_split_peers(
1558 context.child("network"),
1559 cfg,
1560 [primary.clone()],
1561 [secondary.clone()],
1562 )
1563 .await;
1564 network.start();
1565
1566 let mut manager = oracle.manager();
1567 let peer_set = manager.peer_set(0).await.unwrap();
1568 assert_eq!(peer_set.primary, Set::try_from([primary.clone()]).unwrap());
1569 assert_eq!(
1570 peer_set.secondary,
1571 Set::try_from([secondary.clone()]).unwrap()
1572 );
1573
1574 let mut updates = manager.subscribe().await;
1575 let update = updates.recv().await.unwrap();
1576 assert_eq!(update.index, 0);
1577 assert_eq!(
1578 update.latest.primary,
1579 Set::try_from([primary.clone()]).unwrap()
1580 );
1581 assert_eq!(
1582 update.latest.secondary,
1583 Set::try_from([secondary.clone()]).unwrap()
1584 );
1585 assert_eq!(update.all.primary, Set::try_from([primary]).unwrap());
1586 assert_eq!(update.all.secondary, Set::try_from([secondary]).unwrap());
1587 });
1588 }
1589
1590 #[test]
1593 fn test_split_channel_single() {
1594 let executor = deterministic::Runner::default();
1595 executor.start(|context| async move {
1596 let cfg = Config {
1597 max_size: MAX_MESSAGE_SIZE,
1598 disconnect_on_block: true,
1599 tracked_peer_sets: NZUsize!(3),
1600 };
1601 let (network, oracle) = Network::new(context.child("network"), cfg);
1602 network.start();
1603
1604 let twin = ed25519::PrivateKey::from_seed(20).public_key();
1606 let peer_a = ed25519::PrivateKey::from_seed(21).public_key();
1607 let peer_b = ed25519::PrivateKey::from_seed(22).public_key();
1608
1609 let mut manager = oracle.manager();
1611 manager.track(
1612 0,
1613 Set::try_from([twin.clone(), peer_a.clone(), peer_b.clone()]).unwrap(),
1614 );
1615
1616 let (mut peer_a_sender, mut peer_a_recv) = oracle
1618 .control(peer_a.clone())
1619 .register(0, TEST_QUOTA)
1620 .await
1621 .unwrap();
1622 let (mut peer_b_sender, mut peer_b_recv) = oracle
1623 .control(peer_b.clone())
1624 .register(0, TEST_QUOTA)
1625 .await
1626 .unwrap();
1627
1628 let (twin_sender, twin_receiver) = oracle
1634 .control(twin.clone())
1635 .register(0, TEST_QUOTA)
1636 .await
1637 .unwrap();
1638 let peer_a_for_router = peer_a.clone();
1639 let peer_b_for_router = peer_b.clone();
1640 let (mut twin_primary_sender, mut twin_secondary_sender) =
1641 twin_sender.split_with(move |origin, _, _| match origin {
1642 SplitOrigin::Primary => Some(Recipients::One(peer_a_for_router.clone())),
1643 SplitOrigin::Secondary => Some(Recipients::One(peer_b_for_router.clone())),
1644 });
1645 let peer_a_for_recv = peer_a.clone();
1646 let peer_b_for_recv = peer_b.clone();
1647 let (mut twin_primary_recv, mut twin_secondary_recv) =
1648 twin_receiver.split_with(context.child("split_receiver"), move |(sender, _)| {
1649 if sender == &peer_a_for_recv {
1650 SplitTarget::Primary
1651 } else if sender == &peer_b_for_recv {
1652 SplitTarget::Secondary
1653 } else {
1654 panic!("unexpected sender");
1655 }
1656 });
1657
1658 let link = ingress::Link {
1660 latency: Duration::from_millis(0),
1661 jitter: Duration::from_millis(0),
1662 success_rate: 1.0,
1663 };
1664 oracle
1665 .add_link(peer_a.clone(), twin.clone(), link.clone())
1666 .await
1667 .unwrap();
1668 oracle
1669 .add_link(twin.clone(), peer_a.clone(), link.clone())
1670 .await
1671 .unwrap();
1672 oracle
1673 .add_link(peer_b.clone(), twin.clone(), link.clone())
1674 .await
1675 .unwrap();
1676 oracle
1677 .add_link(twin.clone(), peer_b.clone(), link.clone())
1678 .await
1679 .unwrap();
1680
1681 peer_a_sender.send(Recipients::One(twin.clone()), b"from_a", false);
1683 peer_b_sender.send(Recipients::One(twin.clone()), b"from_b", false);
1684 twin_primary_sender.send(Recipients::All, b"primary_out", false);
1685 twin_secondary_sender.send(Recipients::All, b"secondary_out", false);
1686
1687 let (sender, payload) = twin_primary_recv.recv().await.unwrap();
1689 assert_eq!(sender, peer_a);
1690 assert_eq!(payload, b"from_a");
1691 let (sender, payload) = twin_secondary_recv.recv().await.unwrap();
1692 assert_eq!(sender, peer_b);
1693 assert_eq!(payload, b"from_b");
1694
1695 let (sender, payload) = peer_a_recv.recv().await.unwrap();
1697 assert_eq!(sender, twin);
1698 assert_eq!(payload, b"primary_out");
1699 let (sender, payload) = peer_b_recv.recv().await.unwrap();
1700 assert_eq!(sender, twin);
1701 assert_eq!(payload, b"secondary_out");
1702 });
1703 }
1704
1705 #[test]
1707 fn test_split_channel_both() {
1708 let executor = deterministic::Runner::default();
1709 executor.start(|context| async move {
1710 let cfg = Config {
1711 max_size: MAX_MESSAGE_SIZE,
1712 disconnect_on_block: true,
1713 tracked_peer_sets: NZUsize!(3),
1714 };
1715 let (network, oracle) = Network::new(context.child("network"), cfg);
1716 network.start();
1717
1718 let twin = ed25519::PrivateKey::from_seed(30).public_key();
1720 let peer_c = ed25519::PrivateKey::from_seed(31).public_key();
1721
1722 let mut manager = oracle.manager();
1724 manager.track(0, Set::try_from([twin.clone(), peer_c.clone()]).unwrap());
1725
1726 let (mut peer_c_sender, _peer_c_recv) = oracle
1728 .control(peer_c.clone())
1729 .register(0, TEST_QUOTA)
1730 .await
1731 .unwrap();
1732
1733 let (twin_sender, twin_receiver) = oracle
1735 .control(twin.clone())
1736 .register(0, TEST_QUOTA)
1737 .await
1738 .unwrap();
1739 let (_twin_primary_sender, _twin_secondary_sender) =
1740 twin_sender.split_with(|_origin, recipients, _| Some(recipients.clone()));
1741 let (mut twin_primary_recv, mut twin_secondary_recv) = twin_receiver
1742 .split_with(context.child("split_receiver_both"), |_| SplitTarget::Both);
1743
1744 let link = ingress::Link {
1746 latency: Duration::from_millis(0),
1747 jitter: Duration::from_millis(0),
1748 success_rate: 1.0,
1749 };
1750 oracle
1751 .add_link(peer_c.clone(), twin.clone(), link.clone())
1752 .await
1753 .unwrap();
1754 oracle
1755 .add_link(twin.clone(), peer_c.clone(), link)
1756 .await
1757 .unwrap();
1758
1759 peer_c_sender.send(Recipients::One(twin.clone()), b"to_both", false);
1761
1762 let (sender, payload) = twin_primary_recv.recv().await.unwrap();
1764 assert_eq!(sender, peer_c);
1765 assert_eq!(payload, b"to_both");
1766 let (sender, payload) = twin_secondary_recv.recv().await.unwrap();
1767 assert_eq!(sender, peer_c);
1768 assert_eq!(payload, b"to_both");
1769 });
1770 }
1771
1772 #[test]
1776 fn test_split_channel_none() {
1777 let executor = deterministic::Runner::default();
1778 executor.start(|context| async move {
1779 let cfg = Config {
1780 max_size: MAX_MESSAGE_SIZE,
1781 disconnect_on_block: true,
1782 tracked_peer_sets: NZUsize!(3),
1783 };
1784 let (network, oracle) = Network::new(context.child("network"), cfg);
1785 network.start();
1786
1787 let twin = ed25519::PrivateKey::from_seed(30).public_key();
1789 let peer_c = ed25519::PrivateKey::from_seed(31).public_key();
1790
1791 let mut manager = oracle.manager();
1793 manager.track(0, Set::try_from([twin.clone(), peer_c.clone()]).unwrap());
1794
1795 let (mut peer_c_sender, _peer_c_recv) = oracle
1797 .control(peer_c.clone())
1798 .register(0, TEST_QUOTA)
1799 .await
1800 .unwrap();
1801
1802 let (twin_sender, twin_receiver) = oracle
1804 .control(twin.clone())
1805 .register(0, TEST_QUOTA)
1806 .await
1807 .unwrap();
1808 let (mut twin_primary_sender, mut twin_secondary_sender) =
1809 twin_sender.split_with(|_origin, _, _| None);
1810 let (mut twin_primary_recv, mut twin_secondary_recv) = twin_receiver
1811 .split_with(context.child("split_receiver_both"), |_| SplitTarget::None);
1812
1813 let link = ingress::Link {
1815 latency: Duration::from_millis(0),
1816 jitter: Duration::from_millis(0),
1817 success_rate: 1.0,
1818 };
1819 oracle
1820 .add_link(peer_c.clone(), twin.clone(), link.clone())
1821 .await
1822 .unwrap();
1823 oracle
1824 .add_link(twin.clone(), peer_c.clone(), link)
1825 .await
1826 .unwrap();
1827
1828 let sent = peer_c_sender.send(Recipients::One(twin.clone()), b"to_both", false);
1830 assert_eq!(sent.len(), 1);
1831 assert_eq!(sent[0], twin);
1832
1833 context.sleep(Duration::from_millis(100)).await;
1835 assert!(twin_primary_recv.recv().now_or_never().is_none());
1836 assert!(twin_secondary_recv.recv().now_or_never().is_none());
1837
1838 let sent = twin_primary_sender.send(Recipients::One(peer_c.clone()), b"to_both", false);
1840 assert!(sent.is_empty());
1841
1842 let sent =
1844 twin_secondary_sender.send(Recipients::One(peer_c.clone()), b"to_both", false);
1845 assert!(sent.is_empty());
1846 });
1847 }
1848
1849 #[test]
1852 fn test_unordered_peer_sets() {
1853 let executor = deterministic::Runner::default();
1854 executor.start(|context| async move {
1855 let cfg = Config {
1856 max_size: MAX_MESSAGE_SIZE,
1857 disconnect_on_block: true,
1858 tracked_peer_sets: NZUsize!(3),
1859 };
1860 let (network, oracle) = Network::new(context.child("network"), cfg);
1861 network.start();
1862
1863 let pk1 = ed25519::PrivateKey::from_seed(1).public_key();
1865 let pk2 = ed25519::PrivateKey::from_seed(2).public_key();
1866
1867 let mut manager = oracle.manager();
1869 let mut subscription = manager.subscribe().await;
1870
1871 manager.track(10, Set::try_from([pk1.clone(), pk2.clone()]).unwrap());
1873 let update = subscription.recv().await.unwrap();
1874 assert_eq!(update.index, 10);
1875 assert_eq!(update.latest.primary.len(), 2);
1876 assert!(update.latest.secondary.is_empty());
1877 assert_eq!(update.all.primary.len(), 2);
1878 assert!(update.all.secondary.is_empty());
1879
1880 let pk3 = ed25519::PrivateKey::from_seed(3).public_key();
1882 manager.track(9, Set::try_from([pk3.clone()]).unwrap());
1883
1884 let pk4 = ed25519::PrivateKey::from_seed(4).public_key();
1886 manager.track(11, Set::try_from([pk4.clone()]).unwrap());
1887 let update = subscription.recv().await.unwrap();
1888 assert_eq!(update.index, 11);
1889 assert_eq!(update.latest.primary, Set::try_from([pk4.clone()]).unwrap());
1890 assert!(update.latest.secondary.is_empty());
1891 assert_eq!(update.all.primary, Set::try_from([pk1, pk2, pk4]).unwrap());
1892 assert!(update.all.secondary.is_empty());
1893 });
1894 }
1895
1896 #[test]
1899 fn test_peer_set_update_all_cross_index_primary_wins() {
1900 let executor = deterministic::Runner::default();
1901 executor.start(|context| async move {
1902 let cfg = Config {
1903 max_size: MAX_MESSAGE_SIZE,
1904 disconnect_on_block: true,
1905 tracked_peer_sets: NZUsize!(3),
1906 };
1907 let (network, oracle) = Network::new(context.child("network"), cfg);
1908 network.start();
1909
1910 let pk_a = ed25519::PrivateKey::from_seed(21).public_key();
1911 let pk_b = ed25519::PrivateKey::from_seed(22).public_key();
1912 let pk_overlap = ed25519::PrivateKey::from_seed(23).public_key();
1914 let pk_sec = ed25519::PrivateKey::from_seed(24).public_key();
1916
1917 let mut manager = oracle.manager();
1918 let mut subscription = manager.subscribe().await;
1919
1920 manager.track(
1921 10,
1922 TrackedPeers::new(
1923 Set::try_from([pk_a.clone(), pk_overlap.clone()]).unwrap(),
1924 Set::default(),
1925 ),
1926 );
1927 let _ = subscription.recv().await.unwrap();
1928
1929 manager.track(
1930 11,
1931 TrackedPeers::new(
1932 Set::try_from([pk_b.clone()]).unwrap(),
1933 Set::try_from([pk_overlap.clone(), pk_sec.clone()]).unwrap(),
1934 ),
1935 );
1936 let update = subscription.recv().await.unwrap();
1937 assert_eq!(update.index, 11);
1938
1939 assert_eq!(
1940 update.latest.primary,
1941 Set::try_from([pk_b.clone()]).unwrap()
1942 );
1943 assert!(update.latest.secondary.position(&pk_overlap).is_some());
1945 assert!(update.latest.secondary.position(&pk_sec).is_some());
1946
1947 assert!(update.all.primary.position(&pk_a).is_some());
1949 assert!(update.all.primary.position(&pk_b).is_some());
1950 assert!(update.all.primary.position(&pk_overlap).is_some());
1951 assert!(
1952 update.all.secondary.position(&pk_overlap).is_none(),
1953 "aggregate secondary must omit peers who have any primary membership"
1954 );
1955 assert!(update.all.secondary.position(&pk_sec).is_some());
1956 });
1957 }
1958
1959 #[test]
1961 fn test_get_next_socket() {
1962 let cfg = Config {
1963 max_size: MAX_MESSAGE_SIZE,
1964 disconnect_on_block: true,
1965 tracked_peer_sets: NZUsize!(1),
1966 };
1967 let runner = deterministic::Runner::default();
1968
1969 runner.start(|context| async move {
1970 type PublicKey = ed25519::PublicKey;
1971 let (mut network, _) =
1972 Network::<deterministic::Context, PublicKey>::new(context.child("network"), cfg);
1973
1974 let mut original = network.next_addr;
1976 let next = network.get_next_socket();
1977 assert_eq!(next, original);
1978 let next = network.get_next_socket();
1979 original.set_port(1);
1980 assert_eq!(next, original);
1981
1982 let max_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(255, 0, 255, 255)), 65535);
1984 network.next_addr = max_addr;
1985 let next = network.get_next_socket();
1986 assert_eq!(next, max_addr);
1987 let next = network.get_next_socket();
1988 assert_eq!(
1989 next,
1990 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(255, 1, 0, 0)), 0)
1991 );
1992 });
1993 }
1994
1995 #[test]
1997 fn test_fifo_burst_same_recipient() {
1998 let cfg = Config {
1999 max_size: MAX_MESSAGE_SIZE,
2000 disconnect_on_block: true,
2001 tracked_peer_sets: NZUsize!(3),
2002 };
2003 let runner = deterministic::Runner::default();
2004
2005 runner.start(|context| async move {
2006 let (network, oracle) = Network::new(context.child("network"), cfg);
2007 let network_handle = network.start();
2008
2009 let sender_pk = ed25519::PrivateKey::from_seed(10).public_key();
2010 let recipient_pk = ed25519::PrivateKey::from_seed(11).public_key();
2011
2012 let mut manager = oracle.manager();
2013 manager.track(
2014 0,
2015 Set::try_from([sender_pk.clone(), recipient_pk.clone()]).unwrap(),
2016 );
2017 let (mut sender, _sender_recv) = oracle
2018 .control(sender_pk.clone())
2019 .register(0, TEST_QUOTA)
2020 .await
2021 .unwrap();
2022 let (_sender2, mut receiver) = oracle
2023 .control(recipient_pk.clone())
2024 .register(0, TEST_QUOTA)
2025 .await
2026 .unwrap();
2027
2028 oracle
2029 .limit_bandwidth(sender_pk.clone(), Some(5_000), None)
2030 .await
2031 .unwrap();
2032 oracle
2033 .limit_bandwidth(recipient_pk.clone(), None, Some(5_000))
2034 .await
2035 .unwrap();
2036
2037 oracle
2038 .add_link(
2039 sender_pk.clone(),
2040 recipient_pk.clone(),
2041 ingress::Link {
2042 latency: Duration::from_millis(0),
2043 jitter: Duration::from_millis(0),
2044 success_rate: 1.0,
2045 },
2046 )
2047 .await
2048 .unwrap();
2049
2050 const COUNT: usize = 50;
2051 let mut expected = Vec::with_capacity(COUNT);
2052 for i in 0..COUNT {
2053 let msg = vec![i as u8; 64];
2054 sender
2055 .check(Recipients::One(recipient_pk.clone()))
2056 .unwrap()
2057 .send(msg.clone(), false);
2058 expected.push(msg);
2059 }
2060
2061 for expected_msg in expected {
2062 let (_pk, bytes) = receiver.recv().await.unwrap();
2063 assert_eq!(bytes, expected_msg.as_slice());
2064 }
2065
2066 drop(oracle);
2067 drop(sender);
2068 network_handle.abort();
2069 });
2070 }
2071
2072 #[test]
2075 fn test_broadcast_respects_transmit_latency() {
2076 let cfg = Config {
2077 max_size: MAX_MESSAGE_SIZE,
2078 disconnect_on_block: true,
2079 tracked_peer_sets: NZUsize!(3),
2080 };
2081 let runner = deterministic::Runner::default();
2082
2083 runner.start(|context| async move {
2084 let (network, oracle) = Network::new(context.child("network"), cfg);
2085 let network_handle = network.start();
2086
2087 let sender_pk = ed25519::PrivateKey::from_seed(42).public_key();
2088 let recipient_a = ed25519::PrivateKey::from_seed(43).public_key();
2089 let recipient_b = ed25519::PrivateKey::from_seed(44).public_key();
2090
2091 let mut manager = oracle.manager();
2092 manager.track(
2093 0,
2094 Set::try_from([sender_pk.clone(), recipient_a.clone(), recipient_b.clone()])
2095 .unwrap(),
2096 );
2097 let (mut sender, _recv_sender) = oracle
2098 .control(sender_pk.clone())
2099 .register(0, TEST_QUOTA)
2100 .await
2101 .unwrap();
2102 let (_sender2, mut recv_a) = oracle
2103 .control(recipient_a.clone())
2104 .register(0, TEST_QUOTA)
2105 .await
2106 .unwrap();
2107 let (_sender3, mut recv_b) = oracle
2108 .control(recipient_b.clone())
2109 .register(0, TEST_QUOTA)
2110 .await
2111 .unwrap();
2112
2113 oracle
2114 .limit_bandwidth(sender_pk.clone(), Some(1_000), None)
2115 .await
2116 .unwrap();
2117 oracle
2118 .limit_bandwidth(recipient_a.clone(), None, Some(1_000))
2119 .await
2120 .unwrap();
2121 oracle
2122 .limit_bandwidth(recipient_b.clone(), None, Some(1_000))
2123 .await
2124 .unwrap();
2125
2126 let link = ingress::Link {
2127 latency: Duration::from_millis(0),
2128 jitter: Duration::from_millis(0),
2129 success_rate: 1.0,
2130 };
2131 oracle
2132 .add_link(sender_pk.clone(), recipient_a.clone(), link.clone())
2133 .await
2134 .unwrap();
2135 oracle
2136 .add_link(sender_pk.clone(), recipient_b.clone(), link)
2137 .await
2138 .unwrap();
2139
2140 let big_msg = vec![7u8; 10_000];
2141 let start = send_when_ready(
2142 &context,
2143 &mut sender,
2144 Recipients::All,
2145 2,
2146 big_msg.clone(),
2147 false,
2148 )
2149 .await;
2150
2151 let (_pk, received_a) = recv_a.recv().await.unwrap();
2152 assert_eq!(received_a, big_msg.as_slice());
2153 let elapsed_a = context.current().duration_since(start).unwrap();
2154 assert!(elapsed_a >= Duration::from_secs(20));
2155
2156 let (_pk, received_b) = recv_b.recv().await.unwrap();
2157 assert_eq!(received_b, big_msg.as_slice());
2158 let elapsed_b = context.current().duration_since(start).unwrap();
2159 assert!(elapsed_b >= Duration::from_secs(20));
2160
2161 assert!(elapsed_a.abs_diff(elapsed_b) <= Duration::from_secs(1));
2163
2164 drop(oracle);
2165 drop(sender);
2166 network_handle.abort();
2167 });
2168 }
2169
2170 #[test]
2173 fn test_overlapping_primary_secondary_no_duplicate_recipients() {
2174 let executor = deterministic::Runner::default();
2175 executor.start(|context| async move {
2176 let cfg = Config {
2177 max_size: MAX_MESSAGE_SIZE,
2178 disconnect_on_block: true,
2179 tracked_peer_sets: NZUsize!(3),
2180 };
2181 let (network, oracle) = Network::new(context.child("network"), cfg);
2182 network.start();
2183
2184 let pk1 = ed25519::PrivateKey::from_seed(1).public_key();
2185 let pk2 = ed25519::PrivateKey::from_seed(2).public_key();
2186 let pk3 = ed25519::PrivateKey::from_seed(3).public_key();
2187
2188 let mut manager = oracle.manager();
2189 manager.track(
2190 0,
2191 TrackedPeers::new(
2192 Set::try_from([pk1.clone(), pk2.clone()]).unwrap(),
2193 Set::try_from([pk2.clone(), pk3.clone()]).unwrap(),
2194 ),
2195 );
2196
2197 let mut updates = manager.subscribe().await;
2198 let update = updates.recv().await.unwrap();
2199 assert_eq!(update.index, 0);
2200 assert!(update.latest.primary.position(&pk2).is_some());
2201 assert!(
2202 update.latest.secondary.position(&pk2).is_none(),
2203 "overlap peer must not appear in latest.secondary"
2204 );
2205 assert!(update.latest.secondary.position(&pk3).is_some());
2206 assert!(update.all.primary.position(&pk2).is_some());
2207 assert!(
2208 update.all.secondary.position(&pk2).is_none(),
2209 "aggregate secondary must not list peers who are primary"
2210 );
2211 assert!(update.all.secondary.position(&pk3).is_some());
2212
2213 let link = ingress::Link {
2214 latency: Duration::from_millis(1),
2215 jitter: Duration::ZERO,
2216 success_rate: 1.0,
2217 };
2218 for (a, b) in [(&pk1, &pk2), (&pk1, &pk3), (&pk2, &pk3)] {
2219 oracle
2220 .add_link(a.clone(), b.clone(), link.clone())
2221 .await
2222 .unwrap();
2223 }
2224
2225 let (mut sender1, _) = oracle
2226 .control(pk1.clone())
2227 .register(0, TEST_QUOTA)
2228 .await
2229 .unwrap();
2230 let (_, mut recv2) = oracle
2231 .control(pk2.clone())
2232 .register(0, TEST_QUOTA)
2233 .await
2234 .unwrap();
2235 let (_, mut recv3) = oracle
2236 .control(pk3.clone())
2237 .register(0, TEST_QUOTA)
2238 .await
2239 .unwrap();
2240
2241 let msg = vec![42u8; 10];
2242 let checked = sender1.check(Recipients::All).unwrap();
2243 let sent_to = crate::CheckedSender::recipients(&checked);
2244 checked.send(msg.clone(), true);
2245
2246 let pk2_count = sent_to.iter().filter(|pk| *pk == &pk2).count();
2247 assert_eq!(pk2_count, 1, "pk2 received duplicate sends");
2248 assert!(sent_to.iter().any(|pk| pk == &pk3));
2249
2250 context.sleep(Duration::from_millis(10)).await;
2251 let (from2, data2) = recv2.recv().await.unwrap();
2252 assert_eq!(from2, pk1);
2253 assert_eq!(data2, msg.as_slice());
2254 let (from3, data3) = recv3.recv().await.unwrap();
2255 assert_eq!(from3, pk1);
2256 assert_eq!(data3, msg.as_slice());
2257 assert!(recv2.recv().now_or_never().is_none());
2258 });
2259 }
2260
2261 #[test]
2264 fn test_demotion_from_primary_to_secondary() {
2265 let executor = deterministic::Runner::default();
2266 executor.start(|context| async move {
2267 let cfg = Config {
2268 max_size: 1024,
2269 disconnect_on_block: true,
2270 tracked_peer_sets: NZUsize!(2),
2271 };
2272 let (network, oracle) = Network::new(context.child("network"), cfg);
2273 network.start();
2274
2275 let pk_x = ed25519::PrivateKey::from_seed(1).public_key();
2276 let pk_y = ed25519::PrivateKey::from_seed(2).public_key();
2277
2278 let mut manager = oracle.manager();
2279 let mut sub = manager.subscribe().await;
2280
2281 manager.track(
2283 0,
2284 TrackedPeers::new(
2285 Set::try_from([pk_x.clone()]).unwrap(),
2286 Set::try_from([pk_y.clone()]).unwrap(),
2287 ),
2288 );
2289
2290 let update = sub.recv().await.unwrap();
2291 assert!(update.all.primary.position(&pk_x).is_some());
2292 assert!(update.all.secondary.position(&pk_y).is_some());
2293
2294 manager.track(
2296 1,
2297 TrackedPeers::new(
2298 Set::try_from([pk_y.clone()]).unwrap(),
2299 Set::try_from([pk_x.clone()]).unwrap(),
2300 ),
2301 );
2302
2303 let update = sub.recv().await.unwrap();
2305 assert!(update.all.primary.position(&pk_x).is_some());
2306 assert!(update.all.primary.position(&pk_y).is_some());
2307 assert!(update.all.secondary.is_empty());
2308
2309 manager.track(
2311 2,
2312 TrackedPeers::new(
2313 Set::try_from([pk_y.clone()]).unwrap(),
2314 Set::try_from([pk_x.clone()]).unwrap(),
2315 ),
2316 );
2317
2318 let update = sub.recv().await.unwrap();
2320 assert!(update.all.primary.position(&pk_y).is_some());
2321 assert!(update.all.secondary.position(&pk_x).is_some());
2322 assert!(update.all.primary.position(&pk_x).is_none());
2323 });
2324 }
2325
2326 #[test]
2329 fn test_secondary_sets_remain_until_eviction() {
2330 let executor = deterministic::Runner::default();
2331 executor.start(|context| async move {
2332 let cfg = Config {
2333 max_size: MAX_MESSAGE_SIZE,
2334 disconnect_on_block: true,
2335 tracked_peer_sets: NZUsize!(2),
2336 };
2337 let (network, oracle) = Network::new(context.child("network"), cfg);
2338 network.start();
2339
2340 let primary_0 = ed25519::PrivateKey::from_seed(1).public_key();
2341 let primary_1 = ed25519::PrivateKey::from_seed(2).public_key();
2342 let primary_2 = ed25519::PrivateKey::from_seed(3).public_key();
2343 let secondary_0 = ed25519::PrivateKey::from_seed(4).public_key();
2344 let secondary_1 = ed25519::PrivateKey::from_seed(5).public_key();
2345
2346 let mut manager = oracle.manager();
2347 manager.track(
2348 0,
2349 TrackedPeers::new(
2350 Set::try_from([primary_0.clone()]).unwrap(),
2351 Set::try_from([secondary_0.clone()]).unwrap(),
2352 ),
2353 );
2354 manager.track(
2355 1,
2356 TrackedPeers::new(
2357 Set::try_from([primary_1.clone()]).unwrap(),
2358 Set::try_from([secondary_1.clone()]).unwrap(),
2359 ),
2360 );
2361
2362 let link = ingress::Link {
2363 latency: Duration::from_millis(1),
2364 jitter: Duration::ZERO,
2365 success_rate: 1.0,
2366 };
2367 oracle
2368 .add_link(primary_1.clone(), secondary_0.clone(), link.clone())
2369 .await
2370 .unwrap();
2371 oracle
2372 .add_link(primary_1.clone(), secondary_1.clone(), link.clone())
2373 .await
2374 .unwrap();
2375
2376 let (mut sender_1, _) = oracle
2377 .control(primary_1.clone())
2378 .register(0, TEST_QUOTA)
2379 .await
2380 .unwrap();
2381 let (_, mut receiver_0) = oracle
2382 .control(secondary_0.clone())
2383 .register(0, TEST_QUOTA)
2384 .await
2385 .unwrap();
2386 let (_, mut receiver_1) = oracle
2387 .control(secondary_1.clone())
2388 .register(0, TEST_QUOTA)
2389 .await
2390 .unwrap();
2391
2392 let msg_1 = vec![1u8; 8];
2393 sender_1
2394 .check(Recipients::Some(vec![
2395 secondary_0.clone(),
2396 secondary_1.clone(),
2397 ]))
2398 .unwrap()
2399 .send(msg_1.clone(), true);
2400 assert_eq!(receiver_0.recv().await.unwrap().1, msg_1.as_slice());
2401 assert_eq!(receiver_1.recv().await.unwrap().1, msg_1.as_slice());
2402
2403 crate::Manager::track(
2404 &mut manager,
2405 2,
2406 TrackedPeers::primary([primary_2.clone()].try_into().unwrap()),
2407 );
2408 oracle
2409 .add_link(primary_2.clone(), secondary_0.clone(), link.clone())
2410 .await
2411 .unwrap();
2412 oracle
2413 .add_link(primary_2.clone(), secondary_1.clone(), link)
2414 .await
2415 .unwrap();
2416
2417 let (mut sender_2, _) = oracle
2418 .control(primary_2)
2419 .register(0, TEST_QUOTA)
2420 .await
2421 .unwrap();
2422
2423 let msg_2 = vec![2u8; 8];
2424 sender_2
2425 .check(Recipients::Some(vec![
2426 secondary_0.clone(),
2427 secondary_1.clone(),
2428 ]))
2429 .unwrap()
2430 .send(msg_2.clone(), true);
2431 assert!(receiver_0.recv().now_or_never().is_none());
2432 assert_eq!(receiver_1.recv().await.unwrap().1, msg_2.as_slice());
2433 });
2434 }
2435}