Skip to main content

commonware_p2p/simulated/
network.rs

1//! Implementation of a simulated p2p network.
2
3use super::{
4    ingress::{self, Oracle},
5    metrics,
6    transmitter::{self, Completion},
7    Error,
8};
9use crate::{
10    authenticated::UnboundedMailbox,
11    utils::limited::{CheckedSender as LimitedCheckedSender, Connected, LimitedSender},
12    Channel, Message, Recipients, UnlimitedSender as _,
13};
14use commonware_codec::{DecodeExt, FixedSize};
15use commonware_cryptography::PublicKey;
16use commonware_macros::{select, select_loop};
17use commonware_runtime::{
18    spawn_cell, Clock, ContextCell, Handle, IoBuf, IoBufMut, Listener as _, Metrics,
19    Network as RNetwork, Quota, Spawner,
20};
21use commonware_stream::utils::codec::{recv_frame, send_frame};
22use commonware_utils::{
23    channel::{fallible::FallibleExt, mpsc, oneshot, ring},
24    ordered::Set,
25    NZUsize, TryCollect,
26};
27use either::Either;
28use futures::{future, SinkExt};
29use prometheus_client::metrics::{counter::Counter, family::Family};
30use rand::Rng;
31use rand_distr::{Distribution, Normal};
32use std::{
33    collections::{BTreeMap, BTreeSet, HashMap},
34    fmt::Debug,
35    net::{IpAddr, Ipv4Addr, SocketAddr},
36    time::{Duration, SystemTime},
37};
38use tracing::{debug, error, trace, warn};
39
40/// Task type representing a message to be sent within the network.
41type Task<P> = (Channel, P, Recipients<P>, IoBuf, oneshot::Sender<Vec<P>>);
42
43/// Target for a message in a split receiver.
44#[derive(Clone, Copy, Debug, PartialEq, Eq)]
45#[must_use]
46pub enum SplitTarget {
47    None,
48    Primary,
49    Secondary,
50    Both,
51}
52
53/// Origin of a message in a split sender.
54#[derive(Clone, Copy, Debug, PartialEq, Eq)]
55#[must_use]
56pub enum SplitOrigin {
57    Primary,
58    Secondary,
59}
60
61/// A function that forwards messages from [SplitOrigin] to [Recipients].
62pub trait SplitForwarder<P: PublicKey>:
63    Fn(SplitOrigin, &Recipients<P>, &IoBuf) -> Option<Recipients<P>> + Send + Sync + Clone + 'static
64{
65}
66
67impl<P: PublicKey, F> SplitForwarder<P> for F where
68    F: Fn(SplitOrigin, &Recipients<P>, &IoBuf) -> Option<Recipients<P>>
69        + Send
70        + Sync
71        + Clone
72        + 'static
73{
74}
75
76/// A function that routes incoming [Message]s to a [SplitTarget].
77pub trait SplitRouter<P: PublicKey>:
78    Fn(&Message<P>) -> SplitTarget + Send + Sync + 'static
79{
80}
81
82impl<P: PublicKey, F> SplitRouter<P> for F where
83    F: Fn(&Message<P>) -> SplitTarget + Send + Sync + 'static
84{
85}
86
87/// Configuration for the simulated network.
88pub struct Config {
89    /// Maximum size of a message that can be sent over the network.
90    pub max_size: u32,
91
92    /// True if peers should disconnect upon being blocked. While production networking would
93    /// typically disconnect, for testing purposes it may be useful to keep peers connected,
94    /// allowing byzantine actors the ability to continue sending messages.
95    pub disconnect_on_block: bool,
96
97    /// The maximum number of peer sets to track. When a new peer set is registered and this
98    /// limit is exceeded, the oldest peer set is removed. Peers that are no longer in any
99    /// tracked peer set will have their links removed and messages to them will be dropped.
100    ///
101    /// If [None], peer sets are not considered.
102    pub tracked_peer_sets: Option<usize>,
103}
104
105/// Implementation of a simulated network.
106pub struct Network<E: RNetwork + Spawner + Rng + Clock + Metrics, P: PublicKey> {
107    context: ContextCell<E>,
108
109    // Maximum size of a message that can be sent over the network
110    max_size: u32,
111
112    // True if peers should disconnect upon being blocked.
113    // While production networking would typically disconnect, for testing purposes it may be useful
114    // to keep peers connected, allowing byzantine actors the ability to continue sending messages.
115    disconnect_on_block: bool,
116
117    // Next socket address to assign to a new peer
118    // Incremented for each new peer
119    next_addr: SocketAddr,
120
121    // Channel to receive messages from the oracle
122    ingress: mpsc::UnboundedReceiver<ingress::Message<P, E>>,
123
124    // Mailbox for the oracle channel (passed to Senders for PeerSource subscriptions)
125    oracle_mailbox: UnboundedMailbox<ingress::Message<P, E>>,
126
127    // A channel to receive tasks from peers
128    // The sender is cloned and given to each peer
129    // The receiver is polled in the main loop
130    sender: mpsc::UnboundedSender<Task<P>>,
131    receiver: mpsc::UnboundedReceiver<Task<P>>,
132
133    // A map from a pair of public keys (from, to) to a link between the two peers
134    links: HashMap<(P, P), Link>,
135
136    // A map from a public key to a peer
137    peers: BTreeMap<P, Peer<P>>,
138
139    // Peer sets indexed by their ID
140    peer_sets: BTreeMap<u64, Set<P>>,
141
142    // Reference count for each peer (number of peer sets they belong to)
143    peer_refs: BTreeMap<P, usize>,
144
145    // Maximum number of peer sets to track
146    tracked_peer_sets: Option<usize>,
147
148    // A map of peers blocking each other
149    blocks: BTreeSet<(P, P)>,
150
151    // State of the transmitter
152    transmitter: transmitter::State<P>,
153
154    // Subscribers to peer set updates (used by Manager::subscribe())
155    #[allow(clippy::type_complexity)]
156    subscribers: Vec<mpsc::UnboundedSender<(u64, Set<P>, Set<P>)>>,
157
158    // Subscribers to tracked peer list updates (used by PeerSource for LimitedSender)
159    peer_subscribers: Vec<ring::Sender<Vec<P>>>,
160
161    // Metrics for received and sent messages
162    received_messages: Family<metrics::Message, Counter>,
163    sent_messages: Family<metrics::Message, Counter>,
164}
165
166impl<E: RNetwork + Spawner + Rng + Clock + Metrics, P: PublicKey> Network<E, P> {
167    /// Create a new simulated network with a given runtime and configuration.
168    ///
169    /// Returns a tuple containing the network instance and the oracle that can
170    /// be used to modify the state of the network during context.
171    pub fn new(mut context: E, cfg: Config) -> (Self, Oracle<P, E>) {
172        let (sender, receiver) = mpsc::unbounded_channel();
173        let (oracle_mailbox, oracle_receiver) = UnboundedMailbox::new();
174        let sent_messages = Family::<metrics::Message, Counter>::default();
175        let received_messages = Family::<metrics::Message, Counter>::default();
176        context.register("messages_sent", "messages sent", sent_messages.clone());
177        context.register(
178            "messages_received",
179            "messages received",
180            received_messages.clone(),
181        );
182
183        // Start with a pseudo-random IP address to assign sockets to for new peers
184        let next_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::from_bits(context.next_u32())), 0);
185
186        (
187            Self {
188                context: ContextCell::new(context),
189                max_size: cfg.max_size,
190                disconnect_on_block: cfg.disconnect_on_block,
191                tracked_peer_sets: cfg.tracked_peer_sets,
192                next_addr,
193                ingress: oracle_receiver,
194                oracle_mailbox: oracle_mailbox.clone(),
195                sender,
196                receiver,
197                links: HashMap::new(),
198                peers: BTreeMap::new(),
199                peer_sets: BTreeMap::new(),
200                peer_refs: BTreeMap::new(),
201                blocks: BTreeSet::new(),
202                transmitter: transmitter::State::new(),
203                subscribers: Vec::new(),
204                peer_subscribers: Vec::new(),
205                received_messages,
206                sent_messages,
207            },
208            Oracle::new(oracle_mailbox),
209        )
210    }
211
212    /// Returns (and increments) the next available socket address.
213    ///
214    /// The port number is incremented for each call, and the IP address is incremented if the port
215    /// number overflows.
216    fn get_next_socket(&mut self) -> SocketAddr {
217        let result = self.next_addr;
218
219        // Increment the port number, or the IP address if the port number overflows.
220        // Allows the ip address to overflow (wrapping).
221        match self.next_addr.port().checked_add(1) {
222            Some(port) => {
223                self.next_addr.set_port(port);
224            }
225            None => {
226                let ip = match self.next_addr.ip() {
227                    IpAddr::V4(ipv4) => ipv4,
228                    _ => unreachable!(),
229                };
230                let next_ip = Ipv4Addr::to_bits(ip).wrapping_add(1);
231                self.next_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::from_bits(next_ip)), 0);
232            }
233        }
234
235        result
236    }
237
238    /// Handle an ingress message.
239    ///
240    /// This method is called when a message is received from the oracle.
241    async fn handle_ingress(&mut self, message: ingress::Message<P, E>) {
242        // It is important to ensure that no failed receipt of a message will cause us to exit.
243        // This could happen if the caller drops the `Oracle` after updating the network topology.
244        // Thus, we create a helper function to send the result to the oracle and log any errors.
245        fn send_result<T: std::fmt::Debug>(
246            result: oneshot::Sender<Result<T, Error>>,
247            value: Result<T, Error>,
248        ) {
249            let success = value.is_ok();
250            if let Err(e) = result.send(value) {
251                error!(?e, "failed to send result to oracle (ok = {})", success);
252            }
253        }
254
255        match message {
256            ingress::Message::Track { id, peers } => {
257                let Some(tracked_peer_sets) = self.tracked_peer_sets else {
258                    warn!("attempted to register peer set when tracking is disabled");
259                    return;
260                };
261
262                // Check if peer set already exists
263                if self.peer_sets.contains_key(&id) {
264                    warn!(id, "peer set already exists");
265                    return;
266                }
267
268                // Ensure that peer set is monotonically increasing
269                if let Some((last, _)) = self.peer_sets.last_key_value() {
270                    if id <= *last {
271                        warn!(
272                            new_id = id,
273                            old_id = last,
274                            "attempted to register peer set with non-monotonically increasing ID"
275                        );
276                        return;
277                    }
278                }
279
280                // Create and store new peer set
281                for public_key in peers.iter() {
282                    // Create peer if it doesn't exist
283                    self.ensure_peer_exists(public_key).await;
284
285                    // Increment reference count
286                    *self.peer_refs.entry(public_key.clone()).or_insert(0) += 1;
287                }
288                self.peer_sets.insert(id, peers.clone());
289
290                // Remove oldest peer set if we exceed the limit
291                while self.peer_sets.len() > tracked_peer_sets {
292                    let (id, set) = self.peer_sets.pop_first().unwrap();
293                    debug!(id, "removed oldest peer set");
294
295                    // Decrement reference counts and clean up peers/links
296                    for public_key in set.iter() {
297                        let refs = self.peer_refs.get_mut(public_key).unwrap();
298                        *refs = refs.checked_sub(1).expect("reference count underflow");
299
300                        // If peer is no longer in any tracked set, remove it. We explicitly keep the peer around
301                        // in `self.peers` to keep its network alive, in-case the peer re-joins in a future peer set.
302                        if *refs == 0 {
303                            self.peer_refs.remove(public_key);
304                            debug!(?public_key, "removed peer no longer in any tracked set");
305                        }
306                    }
307                }
308
309                // Notify all subscribers about the new peer set
310                let all = self.all_tracked_peers();
311                let notification = (id, peers, all);
312                self.subscribers
313                    .retain(|subscriber| subscriber.send_lossy(notification.clone()));
314
315                // Broadcast updated peer list to LimitedSender subscribers
316                self.broadcast_peer_list().await;
317            }
318            ingress::Message::Register {
319                channel,
320                public_key,
321                quota,
322                result,
323            } => {
324                // If peer does not exist, then create it.
325                let (_, is_new) = self.ensure_peer_exists(&public_key).await;
326
327                // When not using peer sets, broadcast updated peer list to subscribers
328                if is_new && self.peer_sets.is_empty() {
329                    self.broadcast_peer_list().await;
330                }
331
332                // Get clock for the rate limiter
333                let clock = self
334                    .context
335                    .with_label(&format!("rate_limiter_{channel}_{public_key}"))
336                    .take();
337
338                // Create a sender that allows sending messages to the network for a certain channel
339                let (sender, handle) = Sender::new(
340                    self.context.with_label("sender"),
341                    public_key.clone(),
342                    channel,
343                    self.max_size,
344                    self.sender.clone(),
345                    self.oracle_mailbox.clone(),
346                    clock,
347                    quota,
348                );
349
350                // Create a receiver that allows receiving messages from the network for a certain channel
351                let peer = self.peers.get_mut(&public_key).unwrap();
352                let receiver = match peer.register(channel, handle).await {
353                    Ok(receiver) => Receiver { receiver },
354                    Err(err) => return send_result(result, Err(err)),
355                };
356
357                send_result(result, Ok((sender, receiver)))
358            }
359            ingress::Message::PeerSet { id, response } => {
360                if self.peer_sets.is_empty() {
361                    // Return all peers if no peer sets are registered.
362                    let _ = response.send(Some(
363                        self.peers
364                            .keys()
365                            .cloned()
366                            .try_collect()
367                            .expect("BTreeMap keys are unique"),
368                    ));
369                } else {
370                    // Return the peer set at the given index
371                    let _ = response.send(self.peer_sets.get(&id).cloned());
372                }
373            }
374            ingress::Message::Subscribe { sender } => {
375                // Send the latest peer set upon subscription
376                if let Some((index, peers)) = self.peer_sets.last_key_value() {
377                    let all = self.all_tracked_peers();
378                    let notification = (*index, peers.clone(), all);
379                    sender.send_lossy(notification);
380                }
381                self.subscribers.push(sender);
382            }
383            ingress::Message::SubscribeConnected { response } => {
384                // Create a ring channel for the subscriber
385                let (mut sender, receiver) = ring::channel(NZUsize!(1));
386
387                // Send current peer list immediately
388                let peer_list: Vec<P> = self.all_tracked_peers().into_iter().collect();
389                let _ = sender.send(peer_list).await;
390
391                // Store sender for future broadcasts
392                self.peer_subscribers.push(sender);
393
394                // Return the receiver to the subscriber
395                let _ = response.send(receiver);
396            }
397            ingress::Message::LimitBandwidth {
398                public_key,
399                egress_cap,
400                ingress_cap,
401                result,
402            } => {
403                // If peer does not exist, then create it.
404                let (_, is_new) = self.ensure_peer_exists(&public_key).await;
405
406                // When not using peer sets, broadcast updated peer list to subscribers
407                if is_new && self.peer_sets.is_empty() {
408                    self.broadcast_peer_list().await;
409                }
410
411                // Update bandwidth limits
412                let now = self.context.current();
413                let completions = self
414                    .transmitter
415                    .limit(now, &public_key, egress_cap, ingress_cap);
416                self.process_completions(completions);
417
418                // Notify the caller that the bandwidth update has been applied
419                let _ = result.send(());
420            }
421            ingress::Message::AddLink {
422                sender,
423                receiver,
424                sampler,
425                success_rate,
426                result,
427            } => {
428                // If sender or receiver does not exist, then create it.
429                let (_, sender_is_new) = self.ensure_peer_exists(&sender).await;
430                let (receiver_socket, receiver_is_new) = self.ensure_peer_exists(&receiver).await;
431
432                // When not using peer sets, broadcast updated peer list to subscribers
433                if (sender_is_new || receiver_is_new) && self.peer_sets.is_empty() {
434                    self.broadcast_peer_list().await;
435                }
436
437                // Require link to not already exist
438                let key = (sender.clone(), receiver.clone());
439                if self.links.contains_key(&key) {
440                    return send_result(result, Err(Error::LinkExists));
441                }
442
443                let link = Link::new(
444                    &mut self.context,
445                    sender,
446                    receiver,
447                    receiver_socket,
448                    sampler,
449                    success_rate,
450                    self.max_size,
451                    self.received_messages.clone(),
452                );
453                self.links.insert(key, link);
454                send_result(result, Ok(()))
455            }
456            ingress::Message::RemoveLink {
457                sender,
458                receiver,
459                result,
460            } => {
461                match self.links.remove(&(sender, receiver)) {
462                    Some(_) => (),
463                    None => return send_result(result, Err(Error::LinkMissing)),
464                }
465                send_result(result, Ok(()))
466            }
467            ingress::Message::Block { from, to } => {
468                self.blocks.insert((from, to));
469            }
470            ingress::Message::Blocked { result } => {
471                send_result(result, Ok(self.blocks.iter().cloned().collect()))
472            }
473        }
474    }
475
476    /// Ensure a peer exists, creating it if necessary.
477    ///
478    /// Returns the socket address of the peer and a boolean indicating if a new peer was created.
479    async fn ensure_peer_exists(&mut self, public_key: &P) -> (SocketAddr, bool) {
480        if !self.peers.contains_key(public_key) {
481            // Create peer
482            let socket = self.get_next_socket();
483            let peer = Peer::new(
484                self.context.with_label("peer"),
485                public_key.clone(),
486                socket,
487                self.max_size,
488            )
489            .await;
490
491            // Once ready, add to peers
492            self.peers.insert(public_key.clone(), peer);
493
494            (socket, true)
495        } else {
496            (self.peers.get(public_key).unwrap().socket, false)
497        }
498    }
499
500    /// Broadcast updated peer list to all peer subscribers.
501    ///
502    /// This is called when the peer list changes (either from peer set updates
503    /// or from new peers being added when not using peer sets).
504    ///
505    /// Subscribers whose receivers have been dropped are removed to prevent
506    /// memory leaks.
507    async fn broadcast_peer_list(&mut self) {
508        let peer_list = self.all_tracked_peers().into_iter().collect::<Vec<_>>();
509        let mut live_subscribers = Vec::with_capacity(self.peer_subscribers.len());
510        for mut subscriber in self.peer_subscribers.drain(..) {
511            if subscriber.send(peer_list.clone()).await.is_ok() {
512                live_subscribers.push(subscriber);
513            }
514        }
515        self.peer_subscribers = live_subscribers;
516    }
517
518    /// Get all tracked peers as an ordered set.
519    ///
520    /// When peer sets are registered, returns only the peers from those sets.
521    /// Otherwise, returns all registered peers (for compatibility with tests
522    /// that don't use peer sets).
523    fn all_tracked_peers(&self) -> Set<P> {
524        if self.peer_sets.is_empty() && self.tracked_peer_sets.is_none() {
525            self.peers
526                .keys()
527                .cloned()
528                .try_collect()
529                .expect("BTreeMap keys are unique")
530        } else {
531            self.peer_refs
532                .keys()
533                .cloned()
534                .try_collect()
535                .expect("BTreeMap keys are unique")
536        }
537    }
538}
539
540impl<E: RNetwork + Spawner + Rng + Clock + Metrics, P: PublicKey> Network<E, P> {
541    /// Process completions from the transmitter.
542    fn process_completions(&mut self, completions: Vec<Completion<P>>) {
543        for completion in completions {
544            // If there is no message to deliver, then skip
545            let Some(deliver_at) = completion.deliver_at else {
546                trace!(
547                    origin = ?completion.origin,
548                    recipient = ?completion.recipient,
549                    "message dropped before delivery",
550                );
551                continue;
552            };
553
554            // Send message to link
555            let key = (completion.origin.clone(), completion.recipient.clone());
556            let Some(link) = self.links.get_mut(&key) else {
557                // This can happen if the link is removed before the message is delivered
558                trace!(
559                    origin = ?completion.origin,
560                    recipient = ?completion.recipient,
561                    "missing link for completion",
562                );
563                continue;
564            };
565            if let Err(err) = link.send(completion.channel, completion.message, deliver_at) {
566                error!(?err, "failed to send");
567            }
568        }
569    }
570
571    /// Handle a task.
572    ///
573    /// This method is called when a task is received from the sender, which can come from
574    /// any peer in the network.
575    fn handle_task(&mut self, task: Task<P>) {
576        // If peer sets are enabled and we are not in one, ignore the message (we are disconnected from all)
577        let (channel, origin, recipients, message, reply) = task;
578        if self.tracked_peer_sets.is_some() && !self.peer_refs.contains_key(&origin) {
579            warn!(
580                ?origin,
581                reason = "not in tracked peer set",
582                "dropping message"
583            );
584            if let Err(err) = reply.send(Vec::new()) {
585                error!(?err, "failed to send ack");
586            }
587            return;
588        }
589
590        // Collect recipients
591        let recipients = match recipients {
592            Recipients::All => {
593                // If peer sets have been registered, send only to tracked peers
594                // Otherwise, send to all registered peers (compatibility
595                // with tests that do not register peer sets.)
596                if self.peer_sets.is_empty() {
597                    self.peers.keys().cloned().collect()
598                } else {
599                    self.peer_refs.keys().cloned().collect()
600                }
601            }
602            Recipients::Some(keys) => keys,
603            Recipients::One(key) => vec![key],
604        };
605
606        // Send to all recipients
607        let now = self.context.current();
608        let mut sent = Vec::new();
609        for recipient in recipients {
610            // Skip self
611            if recipient == origin {
612                trace!(?recipient, reason = "self", "dropping message");
613                continue;
614            }
615
616            // If tracking peer sets, ensure recipient and sender are in a tracked peer set
617            if self.tracked_peer_sets.is_some() && !self.peer_refs.contains_key(&recipient) {
618                trace!(
619                    ?origin,
620                    ?recipient,
621                    reason = "not in tracked peer set",
622                    "dropping message"
623                );
624                continue;
625            }
626
627            // Determine if the sender or recipient has blocked the other
628            let o_r = (origin.clone(), recipient.clone());
629            let r_o = (recipient.clone(), origin.clone());
630            if self.disconnect_on_block
631                && (self.blocks.contains(&o_r) || self.blocks.contains(&r_o))
632            {
633                trace!(?origin, ?recipient, reason = "blocked", "dropping message");
634                continue;
635            }
636
637            // Determine if there is a link between the origin and recipient
638            let Some(link) = self.links.get_mut(&o_r) else {
639                trace!(?origin, ?recipient, reason = "no link", "dropping message");
640                continue;
641            };
642
643            // Note: Rate limiting is handled by the Sender before messages reach here.
644            // The Sender filters recipients via LimitedSender::check() or in Sender::send().
645
646            // Record sent message as soon as we determine there is a link with recipient (approximates
647            // having an open connection)
648            self.sent_messages
649                .get_or_create(&metrics::Message::new(&origin, &recipient, channel))
650                .inc();
651
652            // Sample latency
653            let latency = Duration::from_millis(link.sampler.sample(&mut self.context) as u64);
654
655            // Determine if the message should be delivered
656            let should_deliver = self.context.gen_bool(link.success_rate);
657
658            // Enqueue message for delivery
659            let completions = self.transmitter.enqueue(
660                now,
661                origin.clone(),
662                recipient.clone(),
663                channel,
664                message.clone(),
665                latency,
666                should_deliver,
667            );
668            self.process_completions(completions);
669
670            sent.push(recipient);
671        }
672
673        // Alert application of sent messages
674        if let Err(err) = reply.send(sent) {
675            error!(?err, "failed to send ack");
676        }
677    }
678
679    /// Run the simulated network.
680    ///
681    /// It is not necessary to invoke this method before modifying the network topology, however,
682    /// no messages will be sent until this method is called.
683    pub fn start(mut self) -> Handle<()> {
684        spawn_cell!(self.context, self.run().await)
685    }
686
687    async fn run(mut self) {
688        select_loop! {
689            self.context,
690            on_start => {
691                let tick = match self.transmitter.next() {
692                    Some(when) => Either::Left(self.context.sleep_until(when)),
693                    None => Either::Right(future::pending()),
694                };
695            },
696            on_stopped => {},
697            _ = tick => {
698                let now = self.context.current();
699                let completions = self.transmitter.advance(now);
700                self.process_completions(completions);
701            },
702            Some(message) = self.ingress.recv() else break => {
703                self.handle_ingress(message).await;
704            },
705            Some(task) = self.receiver.recv() else break => {
706                self.handle_task(task);
707            },
708        }
709    }
710}
711
712/// Provides online peers from the simulated network.
713///
714/// Implements [`crate::utils::limited::Connected`] to provide peer list updates
715/// to [`crate::utils::limited::LimitedSender`].
716pub struct ConnectedPeerProvider<P: PublicKey, E: Clock> {
717    mailbox: UnboundedMailbox<ingress::Message<P, E>>,
718}
719
720impl<P: PublicKey, E: Clock> Clone for ConnectedPeerProvider<P, E> {
721    fn clone(&self) -> Self {
722        Self {
723            mailbox: self.mailbox.clone(),
724        }
725    }
726}
727
728impl<P: PublicKey, E: Clock> ConnectedPeerProvider<P, E> {
729    const fn new(mailbox: UnboundedMailbox<ingress::Message<P, E>>) -> Self {
730        Self { mailbox }
731    }
732}
733
734impl<P: PublicKey, E: Clock> Connected for ConnectedPeerProvider<P, E> {
735    type PublicKey = P;
736
737    async fn subscribe(&mut self) -> ring::Receiver<Vec<Self::PublicKey>> {
738        self.mailbox
739            .0
740            .request(|response| ingress::Message::SubscribeConnected { response })
741            .await
742            .unwrap_or_else(|| {
743                let (_sender, receiver) = ring::channel(NZUsize!(1));
744                receiver
745            })
746    }
747}
748
749/// Implementation of a [crate::Sender] for the simulated network without rate limiting.
750///
751/// This is the inner sender used by [`Sender`] which wraps it with rate limiting.
752#[derive(Clone)]
753pub struct UnlimitedSender<P: PublicKey> {
754    me: P,
755    channel: Channel,
756    max_size: u32,
757    high: mpsc::UnboundedSender<Task<P>>,
758    low: mpsc::UnboundedSender<Task<P>>,
759}
760
761impl<P: PublicKey> crate::UnlimitedSender for UnlimitedSender<P> {
762    type Error = Error;
763    type PublicKey = P;
764
765    async fn send(
766        &mut self,
767        recipients: Recipients<P>,
768        message: impl Into<IoBufMut> + Send,
769        priority: bool,
770    ) -> Result<Vec<P>, Error> {
771        let message = message.into().freeze();
772
773        // Check message size
774        if message.len() > self.max_size as usize {
775            return Err(Error::MessageTooLarge(message.len()));
776        }
777
778        // Send message
779        let (sender, receiver) = oneshot::channel();
780        let channel = if priority { &self.high } else { &self.low };
781        if channel
782            .send((self.channel, self.me.clone(), recipients, message, sender))
783            .is_err()
784        {
785            return Ok(Vec::new());
786        }
787        Ok(receiver.await.unwrap_or_default())
788    }
789}
790
791/// Implementation of a [crate::Sender] for the simulated network.
792///
793/// Also implements [crate::LimitedSender] to support rate-limit checking
794/// before sending messages.
795pub struct Sender<P: PublicKey, E: Clock> {
796    limited_sender: LimitedSender<E, UnlimitedSender<P>, ConnectedPeerProvider<P, E>>,
797}
798
799impl<P: PublicKey, E: Clock> Clone for Sender<P, E> {
800    fn clone(&self) -> Self {
801        Self {
802            limited_sender: self.limited_sender.clone(),
803        }
804    }
805}
806
807impl<P: PublicKey, E: Clock> Debug for Sender<P, E> {
808    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
809        f.debug_struct("Sender").finish_non_exhaustive()
810    }
811}
812
813impl<P: PublicKey, E: Clock> Sender<P, E> {
814    #[allow(clippy::too_many_arguments)]
815    fn new(
816        context: impl Spawner + Metrics,
817        me: P,
818        channel: Channel,
819        max_size: u32,
820        sender: mpsc::UnboundedSender<Task<P>>,
821        oracle_mailbox: UnboundedMailbox<ingress::Message<P, E>>,
822        clock: E,
823        quota: Quota,
824    ) -> (Self, Handle<()>) {
825        // Listen for messages
826        let (high, mut high_receiver) = mpsc::unbounded_channel();
827        let (low, mut low_receiver) = mpsc::unbounded_channel();
828        let processor = context.with_label("processor").spawn(move |_| async move {
829            loop {
830                // Wait for task
831                let task;
832                select! {
833                    high_task = high_receiver.recv() => {
834                        task = match high_task {
835                            Some(task) => task,
836                            None => break,
837                        };
838                    },
839                    low_task = low_receiver.recv() => {
840                        task = match low_task {
841                            Some(task) => task,
842                            None => break,
843                        };
844                    },
845                }
846
847                // Send task
848                if let Err(err) = sender.send(task) {
849                    error!(?err, channel, "failed to send task");
850                }
851            }
852        });
853
854        let unlimited_sender = UnlimitedSender {
855            me,
856            channel,
857            max_size,
858            high,
859            low,
860        };
861        let peer_source = ConnectedPeerProvider::new(oracle_mailbox);
862        let limited_sender = LimitedSender::new(unlimited_sender, quota, clock, peer_source);
863
864        (Self { limited_sender }, processor)
865    }
866
867    /// Split this [Sender] into a [SplitOrigin::Primary] and [SplitOrigin::Secondary] sender.
868    pub fn split_with<F: SplitForwarder<P>>(
869        self,
870        forwarder: F,
871    ) -> (SplitSender<P, E, F>, SplitSender<P, E, F>) {
872        (
873            SplitSender {
874                replica: SplitOrigin::Primary,
875                inner: self.clone(),
876                forwarder: forwarder.clone(),
877            },
878            SplitSender {
879                replica: SplitOrigin::Secondary,
880                inner: self,
881                forwarder,
882            },
883        )
884    }
885}
886
887impl<P: PublicKey, E: Clock> crate::LimitedSender for Sender<P, E> {
888    type PublicKey = P;
889    type Checked<'a>
890        = crate::utils::limited::CheckedSender<'a, UnlimitedSender<P>>
891    where
892        Self: 'a;
893
894    async fn check(
895        &mut self,
896        recipients: Recipients<Self::PublicKey>,
897    ) -> Result<Self::Checked<'_>, SystemTime> {
898        self.limited_sender.check(recipients).await
899    }
900}
901
902/// A sender that routes recipients per message via a user-provided function.
903pub struct SplitSender<P: PublicKey, E: Clock, F: SplitForwarder<P>> {
904    replica: SplitOrigin,
905    inner: Sender<P, E>,
906    forwarder: F,
907}
908
909impl<P: PublicKey, E: Clock, F: SplitForwarder<P>> Clone for SplitSender<P, E, F> {
910    fn clone(&self) -> Self {
911        Self {
912            replica: self.replica,
913            inner: self.inner.clone(),
914            forwarder: self.forwarder.clone(),
915        }
916    }
917}
918
919impl<P: PublicKey, E: Clock, F: SplitForwarder<P>> std::fmt::Debug for SplitSender<P, E, F> {
920    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
921        f.debug_struct("SplitSender")
922            .field("replica", &self.replica)
923            .field("inner", &self.inner)
924            .finish()
925    }
926}
927
928impl<P: PublicKey, E: Clock, F: SplitForwarder<P>> crate::LimitedSender for SplitSender<P, E, F> {
929    type PublicKey = P;
930    type Checked<'a> = SplitCheckedSender<'a, P, E, F>;
931
932    async fn check(
933        &mut self,
934        recipients: Recipients<Self::PublicKey>,
935    ) -> Result<Self::Checked<'_>, SystemTime> {
936        Ok(SplitCheckedSender {
937            // Perform a rate limit check with the entire set of original recipients although
938            // the forwarder may filter these (based on message content) during send.
939            checked: self.inner.limited_sender.check(recipients.clone()).await?,
940            replica: self.replica,
941            forwarder: self.forwarder.clone(),
942            recipients,
943
944            _phantom: std::marker::PhantomData,
945        })
946    }
947}
948
949/// A checked sender for [`SplitSender`] that defers the forwarder call to send time.
950///
951/// This is necessary because [`SplitForwarder`] may examine message content to determine
952/// routing, but the message is not available at [`LimitedSender::check`] time.
953pub struct SplitCheckedSender<'a, P: PublicKey, E: Clock, F: SplitForwarder<P>> {
954    checked: LimitedCheckedSender<'a, UnlimitedSender<P>>,
955    replica: SplitOrigin,
956    forwarder: F,
957    recipients: Recipients<P>,
958
959    _phantom: std::marker::PhantomData<E>,
960}
961
962impl<'a, P: PublicKey, E: Clock, F: SplitForwarder<P>> crate::CheckedSender
963    for SplitCheckedSender<'a, P, E, F>
964{
965    type PublicKey = P;
966    type Error = Error;
967
968    async fn send(
969        self,
970        message: impl Into<IoBufMut> + Send,
971        priority: bool,
972    ) -> Result<Vec<Self::PublicKey>, Self::Error> {
973        // Convert to IoBuf here since forwarder needs to inspect the message
974        let message = message.into().freeze();
975
976        // Determine the set of recipients that will receive the message
977        let Some(recipients) = (self.forwarder)(self.replica, &self.recipients, &message) else {
978            return Ok(Vec::new());
979        };
980
981        // Extract the inner sender and send directly with the new recipients
982        //
983        // While SplitForwarder does not enforce any relationship between the original recipients
984        // and the new recipients, it is typically some subset of the original recipients. This
985        // means we may over-rate limit some recipients (who are never actually sent a message here) but
986        // we prefer this to not providing feedback at all (we would have to skip check entirely).
987        self.checked
988            .into_inner()
989            .send(recipients, message, priority)
990            .await
991    }
992}
993
994type MessageReceiver<P> = mpsc::UnboundedReceiver<Message<P>>;
995
996/// Implementation of a [crate::Receiver] for the simulated network.
997#[derive(Debug)]
998pub struct Receiver<P: PublicKey> {
999    receiver: MessageReceiver<P>,
1000}
1001
1002impl<P: PublicKey> crate::Receiver for Receiver<P> {
1003    type Error = Error;
1004    type PublicKey = P;
1005
1006    async fn recv(&mut self) -> Result<Message<Self::PublicKey>, Error> {
1007        self.receiver.recv().await.ok_or(Error::NetworkClosed)
1008    }
1009}
1010
1011impl<P: PublicKey> Receiver<P> {
1012    /// Split this [Receiver] into a [SplitTarget::Primary] and [SplitTarget::Secondary] receiver.
1013    pub fn split_with<E: Spawner, R: SplitRouter<P>>(
1014        mut self,
1015        context: E,
1016        router: R,
1017    ) -> (Self, Self) {
1018        let (primary_tx, primary_rx) = mpsc::unbounded_channel();
1019        let (secondary_tx, secondary_rx) = mpsc::unbounded_channel();
1020        context.spawn(move |_| async move {
1021            while let Some(message) = self.receiver.recv().await {
1022                // Route message to the appropriate target
1023                let direction = router(&message);
1024                match direction {
1025                    SplitTarget::None => {}
1026                    SplitTarget::Primary => {
1027                        if let Err(err) = primary_tx.send(message) {
1028                            error!(?err, "failed to send message to primary");
1029                        }
1030                    }
1031                    SplitTarget::Secondary => {
1032                        if let Err(err) = secondary_tx.send(message) {
1033                            error!(?err, "failed to send message to secondary");
1034                        }
1035                    }
1036                    SplitTarget::Both => {
1037                        if let Err(err) = primary_tx.send(message.clone()) {
1038                            error!(?err, "failed to send message to primary");
1039                        }
1040                        if let Err(err) = secondary_tx.send(message) {
1041                            error!(?err, "failed to send message to secondary");
1042                        }
1043                    }
1044                }
1045
1046                // Exit if both channels are closed
1047                if primary_tx.is_closed() && secondary_tx.is_closed() {
1048                    break;
1049                }
1050            }
1051        });
1052
1053        (
1054            Self {
1055                receiver: primary_rx,
1056            },
1057            Self {
1058                receiver: secondary_rx,
1059            },
1060        )
1061    }
1062}
1063
1064/// A peer in the simulated network.
1065///
1066/// The peer can register channels, which allows it to receive messages sent to the channel from other peers.
1067struct Peer<P: PublicKey> {
1068    // Socket address that the peer is listening on
1069    socket: SocketAddr,
1070
1071    // Control to register new channels
1072    control: mpsc::UnboundedSender<(Channel, Handle<()>, oneshot::Sender<MessageReceiver<P>>)>,
1073}
1074
1075impl<P: PublicKey> Peer<P> {
1076    /// Create and return a new peer.
1077    ///
1078    /// The peer will listen for incoming connections on the given `socket` address.
1079    /// `max_size` is the maximum size of a message that can be sent to the peer.
1080    async fn new<E: Spawner + RNetwork + Metrics + Clock>(
1081        context: E,
1082        public_key: P,
1083        socket: SocketAddr,
1084        max_size: u32,
1085    ) -> Self {
1086        // The control is used to register channels.
1087        // There is exactly one mailbox created for each channel that the peer is registered for.
1088        #[allow(clippy::type_complexity)]
1089        let (control_sender, mut control_receiver): (
1090            mpsc::UnboundedSender<(Channel, Handle<()>, oneshot::Sender<MessageReceiver<P>>)>,
1091            _,
1092        ) = mpsc::unbounded_channel();
1093
1094        // Whenever a message is received from a peer, it is placed in the inbox.
1095        // The router polls the inbox and forwards the message to the appropriate mailbox.
1096        let (inbox_sender, mut inbox_receiver) = mpsc::unbounded_channel();
1097
1098        // Spawn router
1099        context.with_label("router").spawn(|context| async move {
1100            // Map of channels to mailboxes (senders to particular channels)
1101            let mut mailboxes = HashMap::new();
1102
1103            // Continually listen for control messages and outbound messages
1104            select_loop! {
1105                context,
1106                on_stopped => {},
1107                // Listen for control messages, which are used to register channels
1108                Some((channel, sender, result_tx)) = control_receiver.recv() else break => {
1109                    // Register channel
1110                    let (receiver_tx, receiver_rx) = mpsc::unbounded_channel();
1111                    if let Some((_, existing_sender)) =
1112                        mailboxes.insert(channel, (receiver_tx, sender))
1113                    {
1114                        warn!(?public_key, ?channel, "overwriting existing channel");
1115                        existing_sender.abort();
1116                    }
1117                    result_tx.send(receiver_rx).unwrap();
1118                },
1119
1120                // Listen for messages from the inbox, which are forwarded to the appropriate mailbox
1121                Some((channel, message)) = inbox_receiver.recv() else break => {
1122                    // Send message to mailbox
1123                    match mailboxes.get_mut(&channel) {
1124                        Some((receiver_tx, _)) => {
1125                            if let Err(err) = receiver_tx.send(message) {
1126                                debug!(?err, "failed to send message to mailbox");
1127                            }
1128                        }
1129                        None => {
1130                            trace!(
1131                                recipient = ?public_key,
1132                                channel,
1133                                reason = "missing channel",
1134                                "dropping message",
1135                            );
1136                        }
1137                    }
1138                },
1139            }
1140        });
1141
1142        // Spawn a task that accepts new connections and spawns a task for each connection
1143        let (ready_tx, ready_rx) = oneshot::channel();
1144        context
1145            .with_label("listener")
1146            .spawn(move |context| async move {
1147                // Initialize listener
1148                let mut listener = context.bind(socket).await.unwrap();
1149                let _ = ready_tx.send(());
1150
1151                // Continually accept new connections
1152                while let Ok((_, _, mut stream)) = listener.accept().await {
1153                    // New connection accepted. Spawn a task for this connection
1154                    context.with_label("receiver").spawn({
1155                        let inbox_sender = inbox_sender.clone();
1156                        move |_| async move {
1157                            // Receive dialer's public key as a handshake
1158                            let dialer = match recv_frame(&mut stream, max_size).await {
1159                                Ok(data) => data,
1160                                Err(_) => {
1161                                    error!("failed to receive public key from dialer");
1162                                    return;
1163                                }
1164                            };
1165                            let Ok(dialer) = P::decode(dialer.coalesce()) else {
1166                                error!("received public key is invalid");
1167                                return;
1168                            };
1169
1170                            // Continually receive messages from the dialer and send them to the inbox
1171                            while let Ok(data) = recv_frame(&mut stream, max_size).await {
1172                                let data = data.coalesce();
1173                                let channel = Channel::from_be_bytes(
1174                                    data.as_ref()[..Channel::SIZE].try_into().unwrap(),
1175                                );
1176                                let message = data.slice(Channel::SIZE..);
1177                                if let Err(err) =
1178                                    inbox_sender.send((channel, (dialer.clone(), message)))
1179                                {
1180                                    debug!(?err, "failed to send message to mailbox");
1181                                    break;
1182                                }
1183                            }
1184                        }
1185                    });
1186                }
1187            });
1188
1189        // Wait for listener to start before returning
1190        let _ = ready_rx.await;
1191
1192        // Return peer
1193        Self {
1194            socket,
1195            control: control_sender,
1196        }
1197    }
1198
1199    /// Register a channel with the peer.
1200    ///
1201    /// This allows the peer to receive messages sent to the channel.
1202    /// Returns a receiver that can be used to receive messages sent to the channel.
1203    async fn register(
1204        &mut self,
1205        channel: Channel,
1206        sender: Handle<()>,
1207    ) -> Result<MessageReceiver<P>, Error> {
1208        let (result_tx, result_rx) = oneshot::channel();
1209        self.control
1210            .send((channel, sender, result_tx))
1211            .map_err(|_| Error::NetworkClosed)?;
1212        result_rx.await.map_err(|_| Error::NetworkClosed)
1213    }
1214}
1215
1216// A unidirectional link between two peers.
1217// Messages can be sent over the link with a given latency, jitter, and success rate.
1218struct Link {
1219    sampler: Normal<f64>,
1220    success_rate: f64,
1221    // Messages with their receive time for ordered delivery
1222    inbox: mpsc::UnboundedSender<(Channel, IoBuf, SystemTime)>,
1223}
1224
1225/// Buffered payload waiting for earlier messages on the same link to complete.
1226impl Link {
1227    #[allow(clippy::too_many_arguments)]
1228    fn new<E: Spawner + RNetwork + Clock + Metrics, P: PublicKey>(
1229        context: &mut E,
1230        dialer: P,
1231        receiver: P,
1232        socket: SocketAddr,
1233        sampler: Normal<f64>,
1234        success_rate: f64,
1235        max_size: u32,
1236        received_messages: Family<metrics::Message, Counter>,
1237    ) -> Self {
1238        // Spawn a task that will wait for messages to be sent to the link and then send them
1239        // over the network.
1240        let (inbox, mut outbox) = mpsc::unbounded_channel::<(Channel, IoBuf, SystemTime)>();
1241        context.with_label("link").spawn(move |context| async move {
1242            // Dial the peer and handshake by sending it the dialer's public key
1243            let (mut sink, _) = context.dial(socket).await.unwrap();
1244            if let Err(err) = send_frame(&mut sink, dialer.as_ref().to_vec(), max_size).await {
1245                error!(?err, "failed to send public key to listener");
1246                return;
1247            }
1248
1249            // Process messages in order, waiting for their receive time
1250            while let Some((channel, message, receive_complete_at)) = outbox.recv().await {
1251                // Wait until the message should arrive at receiver
1252                context.sleep_until(receive_complete_at).await;
1253
1254                // Send the message
1255                let channel_bytes = channel.to_be_bytes();
1256                let mut data = Vec::with_capacity(channel_bytes.len() + message.len());
1257                data.extend_from_slice(&channel_bytes);
1258                data.extend_from_slice(message.as_ref());
1259                let _ = send_frame(&mut sink, data, max_size).await;
1260
1261                // Bump received messages metric
1262                received_messages
1263                    .get_or_create(&metrics::Message::new(&dialer, &receiver, channel))
1264                    .inc();
1265            }
1266        });
1267
1268        Self {
1269            sampler,
1270            success_rate,
1271            inbox,
1272        }
1273    }
1274
1275    // Send a message over the link with receive timing.
1276    fn send(
1277        &mut self,
1278        channel: Channel,
1279        message: IoBuf,
1280        receive_complete_at: SystemTime,
1281    ) -> Result<(), Error> {
1282        self.inbox
1283            .send((channel, message, receive_complete_at))
1284            .map_err(|_| Error::NetworkClosed)?;
1285        Ok(())
1286    }
1287}
1288
1289#[cfg(test)]
1290mod tests {
1291    use super::*;
1292    use crate::{Manager, Provider, Receiver as _, Recipients, Sender as _};
1293    use commonware_cryptography::{ed25519, Signer as _};
1294    use commonware_runtime::{deterministic, Quota, Runner as _};
1295    use futures::FutureExt;
1296    use std::num::NonZeroU32;
1297
1298    const MAX_MESSAGE_SIZE: u32 = 1024 * 1024;
1299
1300    /// Default rate limit set high enough to not interfere with normal operation
1301    const TEST_QUOTA: Quota = Quota::per_second(NonZeroU32::MAX);
1302
1303    #[test]
1304    fn test_register_and_link() {
1305        let executor = deterministic::Runner::default();
1306        executor.start(|context| async move {
1307            let cfg = Config {
1308                max_size: MAX_MESSAGE_SIZE,
1309                disconnect_on_block: true,
1310                tracked_peer_sets: Some(3),
1311            };
1312            let network_context = context.with_label("network");
1313            let (network, oracle) = Network::new(network_context.clone(), cfg);
1314            network_context.spawn(|_| network.run());
1315
1316            // Create two public keys
1317            let pk1 = ed25519::PrivateKey::from_seed(1).public_key();
1318            let pk2 = ed25519::PrivateKey::from_seed(2).public_key();
1319
1320            // Register the peer set
1321            let mut manager = oracle.manager();
1322            manager
1323                .track(0, [pk1.clone(), pk2.clone()].try_into().unwrap())
1324                .await;
1325            let control = oracle.control(pk1.clone());
1326            control.register(0, TEST_QUOTA).await.unwrap();
1327            control.register(1, TEST_QUOTA).await.unwrap();
1328            let control = oracle.control(pk2.clone());
1329            control.register(0, TEST_QUOTA).await.unwrap();
1330            control.register(1, TEST_QUOTA).await.unwrap();
1331
1332            // Overwrite if registering again
1333            control.register(1, TEST_QUOTA).await.unwrap();
1334
1335            // Add link
1336            let link = ingress::Link {
1337                latency: Duration::from_millis(2),
1338                jitter: Duration::from_millis(1),
1339                success_rate: 0.9,
1340            };
1341            oracle
1342                .add_link(pk1.clone(), pk2.clone(), link.clone())
1343                .await
1344                .unwrap();
1345
1346            // Expect error when adding link again
1347            assert!(matches!(
1348                oracle.add_link(pk1, pk2, link).await,
1349                Err(Error::LinkExists)
1350            ));
1351        });
1352    }
1353
1354    #[test]
1355    fn test_split_channel_single() {
1356        let executor = deterministic::Runner::default();
1357        executor.start(|context| async move {
1358            let cfg = Config {
1359                max_size: MAX_MESSAGE_SIZE,
1360                disconnect_on_block: true,
1361                tracked_peer_sets: Some(3),
1362            };
1363            let network_context = context.with_label("network");
1364            let (network, oracle) = Network::new(network_context.clone(), cfg);
1365            network_context.spawn(|_| network.run());
1366
1367            // Create a "twin" node that will be split, plus two normal peers
1368            let twin = ed25519::PrivateKey::from_seed(20).public_key();
1369            let peer_a = ed25519::PrivateKey::from_seed(21).public_key();
1370            let peer_b = ed25519::PrivateKey::from_seed(22).public_key();
1371
1372            // Register all peers
1373            let mut manager = oracle.manager();
1374            manager
1375                .track(
1376                    0,
1377                    [twin.clone(), peer_a.clone(), peer_b.clone()]
1378                        .try_into()
1379                        .unwrap(),
1380                )
1381                .await;
1382
1383            // Register normal peers
1384            let (mut peer_a_sender, mut peer_a_recv) = oracle
1385                .control(peer_a.clone())
1386                .register(0, TEST_QUOTA)
1387                .await
1388                .unwrap();
1389            let (mut peer_b_sender, mut peer_b_recv) = oracle
1390                .control(peer_b.clone())
1391                .register(0, TEST_QUOTA)
1392                .await
1393                .unwrap();
1394
1395            // Register and split the twin's channel:
1396            // - Primary sends only to peer_a
1397            // - Secondary sends only to peer_b
1398            // - Messages from peer_a go to primary receiver
1399            // - Messages from peer_b go to secondary receiver
1400            let (twin_sender, twin_receiver) = oracle
1401                .control(twin.clone())
1402                .register(0, TEST_QUOTA)
1403                .await
1404                .unwrap();
1405            let peer_a_for_router = peer_a.clone();
1406            let peer_b_for_router = peer_b.clone();
1407            let (mut twin_primary_sender, mut twin_secondary_sender) =
1408                twin_sender.split_with(move |origin, _, _| match origin {
1409                    SplitOrigin::Primary => Some(Recipients::One(peer_a_for_router.clone())),
1410                    SplitOrigin::Secondary => Some(Recipients::One(peer_b_for_router.clone())),
1411                });
1412            let peer_a_for_recv = peer_a.clone();
1413            let peer_b_for_recv = peer_b.clone();
1414            let (mut twin_primary_recv, mut twin_secondary_recv) = twin_receiver.split_with(
1415                context.with_label("split_receiver"),
1416                move |(sender, _)| {
1417                    if sender == &peer_a_for_recv {
1418                        SplitTarget::Primary
1419                    } else if sender == &peer_b_for_recv {
1420                        SplitTarget::Secondary
1421                    } else {
1422                        panic!("unexpected sender");
1423                    }
1424                },
1425            );
1426
1427            // Establish bidirectional links
1428            let link = ingress::Link {
1429                latency: Duration::from_millis(0),
1430                jitter: Duration::from_millis(0),
1431                success_rate: 1.0,
1432            };
1433            oracle
1434                .add_link(peer_a.clone(), twin.clone(), link.clone())
1435                .await
1436                .unwrap();
1437            oracle
1438                .add_link(twin.clone(), peer_a.clone(), link.clone())
1439                .await
1440                .unwrap();
1441            oracle
1442                .add_link(peer_b.clone(), twin.clone(), link.clone())
1443                .await
1444                .unwrap();
1445            oracle
1446                .add_link(twin.clone(), peer_b.clone(), link.clone())
1447                .await
1448                .unwrap();
1449
1450            // Send messages in both directions
1451            peer_a_sender
1452                .send(Recipients::One(twin.clone()), b"from_a", false)
1453                .await
1454                .unwrap();
1455            peer_b_sender
1456                .send(Recipients::One(twin.clone()), b"from_b", false)
1457                .await
1458                .unwrap();
1459            twin_primary_sender
1460                .send(Recipients::All, b"primary_out", false)
1461                .await
1462                .unwrap();
1463            twin_secondary_sender
1464                .send(Recipients::All, b"secondary_out", false)
1465                .await
1466                .unwrap();
1467
1468            // Verify routing: peer_a messages go to primary, peer_b to secondary
1469            let (sender, payload) = twin_primary_recv.recv().await.unwrap();
1470            assert_eq!(sender, peer_a);
1471            assert_eq!(payload, b"from_a");
1472            let (sender, payload) = twin_secondary_recv.recv().await.unwrap();
1473            assert_eq!(sender, peer_b);
1474            assert_eq!(payload, b"from_b");
1475
1476            // Verify routing: primary sends to peer_a, secondary to peer_b
1477            let (sender, payload) = peer_a_recv.recv().await.unwrap();
1478            assert_eq!(sender, twin);
1479            assert_eq!(payload, b"primary_out");
1480            let (sender, payload) = peer_b_recv.recv().await.unwrap();
1481            assert_eq!(sender, twin);
1482            assert_eq!(payload, b"secondary_out");
1483        });
1484    }
1485
1486    #[test]
1487    fn test_split_channel_both() {
1488        let executor = deterministic::Runner::default();
1489        executor.start(|context| async move {
1490            let cfg = Config {
1491                max_size: MAX_MESSAGE_SIZE,
1492                disconnect_on_block: true,
1493                tracked_peer_sets: Some(3),
1494            };
1495            let network_context = context.with_label("network");
1496            let (network, oracle) = Network::new(network_context.clone(), cfg);
1497            network_context.spawn(|_| network.run());
1498
1499            // Create a "twin" node that will be split, plus a third peer
1500            let twin = ed25519::PrivateKey::from_seed(30).public_key();
1501            let peer_c = ed25519::PrivateKey::from_seed(31).public_key();
1502
1503            // Register all peers
1504            let mut manager = oracle.manager();
1505            manager
1506                .track(0, [twin.clone(), peer_c.clone()].try_into().unwrap())
1507                .await;
1508
1509            // Register normal peer
1510            let (mut peer_c_sender, _peer_c_recv) = oracle
1511                .control(peer_c.clone())
1512                .register(0, TEST_QUOTA)
1513                .await
1514                .unwrap();
1515
1516            // Register and split the twin's channel with a router that sends to Both
1517            let (twin_sender, twin_receiver) = oracle
1518                .control(twin.clone())
1519                .register(0, TEST_QUOTA)
1520                .await
1521                .unwrap();
1522            let (_twin_primary_sender, _twin_secondary_sender) =
1523                twin_sender.split_with(|_origin, recipients, _| Some(recipients.clone()));
1524            let (mut twin_primary_recv, mut twin_secondary_recv) = twin_receiver
1525                .split_with(context.with_label("split_receiver_both"), |_| {
1526                    SplitTarget::Both
1527                });
1528
1529            // Establish bidirectional links
1530            let link = ingress::Link {
1531                latency: Duration::from_millis(0),
1532                jitter: Duration::from_millis(0),
1533                success_rate: 1.0,
1534            };
1535            oracle
1536                .add_link(peer_c.clone(), twin.clone(), link.clone())
1537                .await
1538                .unwrap();
1539            oracle
1540                .add_link(twin.clone(), peer_c.clone(), link)
1541                .await
1542                .unwrap();
1543
1544            // Send a message from peer_c to twin
1545            peer_c_sender
1546                .send(Recipients::One(twin.clone()), b"to_both", false)
1547                .await
1548                .unwrap();
1549
1550            // Verify both receivers get the message
1551            let (sender, payload) = twin_primary_recv.recv().await.unwrap();
1552            assert_eq!(sender, peer_c);
1553            assert_eq!(payload, b"to_both");
1554            let (sender, payload) = twin_secondary_recv.recv().await.unwrap();
1555            assert_eq!(sender, peer_c);
1556            assert_eq!(payload, b"to_both");
1557        });
1558    }
1559
1560    #[test]
1561    fn test_split_channel_none() {
1562        let executor = deterministic::Runner::default();
1563        executor.start(|context| async move {
1564            let cfg = Config {
1565                max_size: MAX_MESSAGE_SIZE,
1566                disconnect_on_block: true,
1567                tracked_peer_sets: Some(3),
1568            };
1569            let network_context = context.with_label("network");
1570            let (network, oracle) = Network::new(network_context.clone(), cfg);
1571            network_context.spawn(|_| network.run());
1572
1573            // Create a "twin" node that will be split, plus a third peer
1574            let twin = ed25519::PrivateKey::from_seed(30).public_key();
1575            let peer_c = ed25519::PrivateKey::from_seed(31).public_key();
1576
1577            // Register all peers
1578            let mut manager = oracle.manager();
1579            manager
1580                .track(0, [twin.clone(), peer_c.clone()].try_into().unwrap())
1581                .await;
1582
1583            // Register normal peer
1584            let (mut peer_c_sender, _peer_c_recv) = oracle
1585                .control(peer_c.clone())
1586                .register(0, TEST_QUOTA)
1587                .await
1588                .unwrap();
1589
1590            // Register and split the twin's channel with a router that sends to Both
1591            let (twin_sender, twin_receiver) = oracle
1592                .control(twin.clone())
1593                .register(0, TEST_QUOTA)
1594                .await
1595                .unwrap();
1596            let (mut twin_primary_sender, mut twin_secondary_sender) =
1597                twin_sender.split_with(|_origin, _, _| None);
1598            let (mut twin_primary_recv, mut twin_secondary_recv) = twin_receiver
1599                .split_with(context.with_label("split_receiver_both"), |_| {
1600                    SplitTarget::None
1601                });
1602
1603            // Establish bidirectional links
1604            let link = ingress::Link {
1605                latency: Duration::from_millis(0),
1606                jitter: Duration::from_millis(0),
1607                success_rate: 1.0,
1608            };
1609            oracle
1610                .add_link(peer_c.clone(), twin.clone(), link.clone())
1611                .await
1612                .unwrap();
1613            oracle
1614                .add_link(twin.clone(), peer_c.clone(), link)
1615                .await
1616                .unwrap();
1617
1618            // Send a message from peer_c to twin
1619            let sent = peer_c_sender
1620                .send(Recipients::One(twin.clone()), b"to_both", false)
1621                .await
1622                .unwrap();
1623            assert_eq!(sent.len(), 1);
1624            assert_eq!(sent[0], twin);
1625
1626            // Verify both receivers get the message
1627            context.sleep(Duration::from_millis(100)).await;
1628            assert!(twin_primary_recv.recv().now_or_never().is_none());
1629            assert!(twin_secondary_recv.recv().now_or_never().is_none());
1630
1631            // Send a message from twin to peer_c
1632            let sent = twin_primary_sender
1633                .send(Recipients::One(peer_c.clone()), b"to_both", false)
1634                .await
1635                .unwrap();
1636            assert_eq!(sent.len(), 0);
1637
1638            // Send a message from twin to peer_c
1639            let sent = twin_secondary_sender
1640                .send(Recipients::One(peer_c.clone()), b"to_both", false)
1641                .await
1642                .unwrap();
1643            assert_eq!(sent.len(), 0);
1644        });
1645    }
1646
1647    #[test]
1648    fn test_unordered_peer_sets() {
1649        let executor = deterministic::Runner::default();
1650        executor.start(|context| async move {
1651            let cfg = Config {
1652                max_size: MAX_MESSAGE_SIZE,
1653                disconnect_on_block: true,
1654                tracked_peer_sets: Some(3),
1655            };
1656            let network_context = context.with_label("network");
1657            let (network, oracle) = Network::new(network_context.clone(), cfg);
1658            network_context.spawn(|_| network.run());
1659
1660            // Create two public keys
1661            let pk1 = ed25519::PrivateKey::from_seed(1).public_key();
1662            let pk2 = ed25519::PrivateKey::from_seed(2).public_key();
1663
1664            // Subscribe to peer sets
1665            let mut manager = oracle.manager();
1666            let mut subscription = manager.subscribe().await;
1667
1668            // Register initial peer set
1669            manager
1670                .track(10, [pk1.clone(), pk2.clone()].try_into().unwrap())
1671                .await;
1672            let (id, new, all) = subscription.recv().await.unwrap();
1673            assert_eq!(id, 10);
1674            assert_eq!(new.len(), 2);
1675            assert_eq!(all.len(), 2);
1676
1677            // Register old peer sets (ignored)
1678            let pk3 = ed25519::PrivateKey::from_seed(3).public_key();
1679            manager.track(9, [pk3.clone()].try_into().unwrap()).await;
1680
1681            // Add new peer set
1682            let pk4 = ed25519::PrivateKey::from_seed(4).public_key();
1683            manager.track(11, [pk4.clone()].try_into().unwrap()).await;
1684            let (id, new, all) = subscription.recv().await.unwrap();
1685            assert_eq!(id, 11);
1686            assert_eq!(new, [pk4.clone()].try_into().unwrap());
1687            assert_eq!(all, [pk1, pk2, pk4].try_into().unwrap());
1688        });
1689    }
1690
1691    #[test]
1692    fn test_get_next_socket() {
1693        let cfg = Config {
1694            max_size: MAX_MESSAGE_SIZE,
1695            disconnect_on_block: true,
1696            tracked_peer_sets: None,
1697        };
1698        let runner = deterministic::Runner::default();
1699
1700        runner.start(|context| async move {
1701            type PublicKey = ed25519::PublicKey;
1702            let (mut network, _) =
1703                Network::<deterministic::Context, PublicKey>::new(context.clone(), cfg);
1704
1705            // Test that the next socket address is incremented correctly
1706            let mut original = network.next_addr;
1707            let next = network.get_next_socket();
1708            assert_eq!(next, original);
1709            let next = network.get_next_socket();
1710            original.set_port(1);
1711            assert_eq!(next, original);
1712
1713            // Test that the port number overflows correctly
1714            let max_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(255, 0, 255, 255)), 65535);
1715            network.next_addr = max_addr;
1716            let next = network.get_next_socket();
1717            assert_eq!(next, max_addr);
1718            let next = network.get_next_socket();
1719            assert_eq!(
1720                next,
1721                SocketAddr::new(IpAddr::V4(Ipv4Addr::new(255, 1, 0, 0)), 0)
1722            );
1723        });
1724    }
1725
1726    #[test]
1727    fn test_fifo_burst_same_recipient() {
1728        let cfg = Config {
1729            max_size: MAX_MESSAGE_SIZE,
1730            disconnect_on_block: true,
1731            tracked_peer_sets: Some(3),
1732        };
1733        let runner = deterministic::Runner::default();
1734
1735        runner.start(|context| async move {
1736            let (network, oracle) = Network::new(context.with_label("network"), cfg);
1737            let network_handle = network.start();
1738
1739            let sender_pk = ed25519::PrivateKey::from_seed(10).public_key();
1740            let recipient_pk = ed25519::PrivateKey::from_seed(11).public_key();
1741
1742            let mut manager = oracle.manager();
1743            manager
1744                .track(
1745                    0,
1746                    [sender_pk.clone(), recipient_pk.clone()]
1747                        .try_into()
1748                        .unwrap(),
1749                )
1750                .await;
1751            let (mut sender, _sender_recv) = oracle
1752                .control(sender_pk.clone())
1753                .register(0, TEST_QUOTA)
1754                .await
1755                .unwrap();
1756            let (_sender2, mut receiver) = oracle
1757                .control(recipient_pk.clone())
1758                .register(0, TEST_QUOTA)
1759                .await
1760                .unwrap();
1761
1762            oracle
1763                .limit_bandwidth(sender_pk.clone(), Some(5_000), None)
1764                .await
1765                .unwrap();
1766            oracle
1767                .limit_bandwidth(recipient_pk.clone(), None, Some(5_000))
1768                .await
1769                .unwrap();
1770
1771            oracle
1772                .add_link(
1773                    sender_pk.clone(),
1774                    recipient_pk.clone(),
1775                    ingress::Link {
1776                        latency: Duration::from_millis(0),
1777                        jitter: Duration::from_millis(0),
1778                        success_rate: 1.0,
1779                    },
1780                )
1781                .await
1782                .unwrap();
1783
1784            const COUNT: usize = 50;
1785            let mut expected = Vec::with_capacity(COUNT);
1786            for i in 0..COUNT {
1787                let msg = vec![i as u8; 64];
1788                sender
1789                    .send(Recipients::One(recipient_pk.clone()), msg.clone(), false)
1790                    .await
1791                    .unwrap();
1792                expected.push(msg);
1793            }
1794
1795            for expected_msg in expected {
1796                let (_pk, bytes) = receiver.recv().await.unwrap();
1797                assert_eq!(bytes, expected_msg.as_slice());
1798            }
1799
1800            drop(oracle);
1801            drop(sender);
1802            network_handle.abort();
1803        });
1804    }
1805
1806    #[test]
1807    fn test_broadcast_respects_transmit_latency() {
1808        let cfg = Config {
1809            max_size: MAX_MESSAGE_SIZE,
1810            disconnect_on_block: true,
1811            tracked_peer_sets: Some(3),
1812        };
1813        let runner = deterministic::Runner::default();
1814
1815        runner.start(|context| async move {
1816            let (network, oracle) = Network::new(context.with_label("network"), cfg);
1817            let network_handle = network.start();
1818
1819            let sender_pk = ed25519::PrivateKey::from_seed(42).public_key();
1820            let recipient_a = ed25519::PrivateKey::from_seed(43).public_key();
1821            let recipient_b = ed25519::PrivateKey::from_seed(44).public_key();
1822
1823            let mut manager = oracle.manager();
1824            manager
1825                .track(
1826                    0,
1827                    [sender_pk.clone(), recipient_a.clone(), recipient_b.clone()]
1828                        .try_into()
1829                        .unwrap(),
1830                )
1831                .await;
1832            let (mut sender, _recv_sender) = oracle
1833                .control(sender_pk.clone())
1834                .register(0, TEST_QUOTA)
1835                .await
1836                .unwrap();
1837            let (_sender2, mut recv_a) = oracle
1838                .control(recipient_a.clone())
1839                .register(0, TEST_QUOTA)
1840                .await
1841                .unwrap();
1842            let (_sender3, mut recv_b) = oracle
1843                .control(recipient_b.clone())
1844                .register(0, TEST_QUOTA)
1845                .await
1846                .unwrap();
1847
1848            oracle
1849                .limit_bandwidth(sender_pk.clone(), Some(1_000), None)
1850                .await
1851                .unwrap();
1852            oracle
1853                .limit_bandwidth(recipient_a.clone(), None, Some(1_000))
1854                .await
1855                .unwrap();
1856            oracle
1857                .limit_bandwidth(recipient_b.clone(), None, Some(1_000))
1858                .await
1859                .unwrap();
1860
1861            let link = ingress::Link {
1862                latency: Duration::from_millis(0),
1863                jitter: Duration::from_millis(0),
1864                success_rate: 1.0,
1865            };
1866            oracle
1867                .add_link(sender_pk.clone(), recipient_a.clone(), link.clone())
1868                .await
1869                .unwrap();
1870            oracle
1871                .add_link(sender_pk.clone(), recipient_b.clone(), link)
1872                .await
1873                .unwrap();
1874
1875            let big_msg = vec![7u8; 10_000];
1876            let start = context.current();
1877            sender
1878                .send(Recipients::All, big_msg.clone(), false)
1879                .await
1880                .unwrap();
1881
1882            let (_pk, received_a) = recv_a.recv().await.unwrap();
1883            assert_eq!(received_a, big_msg.as_slice());
1884            let elapsed_a = context.current().duration_since(start).unwrap();
1885            assert!(elapsed_a >= Duration::from_secs(20));
1886
1887            let (_pk, received_b) = recv_b.recv().await.unwrap();
1888            assert_eq!(received_b, big_msg.as_slice());
1889            let elapsed_b = context.current().duration_since(start).unwrap();
1890            assert!(elapsed_b >= Duration::from_secs(20));
1891
1892            // Because bandwidth is shared, the two messages should take about the same time
1893            assert!(elapsed_a.abs_diff(elapsed_b) <= Duration::from_secs(1));
1894
1895            drop(oracle);
1896            drop(sender);
1897            network_handle.abort();
1898        });
1899    }
1900}