commonware_p2p/simulated/
network.rs

1//! Implementation of a simulated p2p network.
2
3use super::{
4    ingress::{self, Oracle},
5    metrics,
6    transmitter::{self, Completion},
7    Error,
8};
9use crate::{
10    utils::limited::{CheckedSender as LimitedCheckedSender, Connected, LimitedSender},
11    Channel, Message, Recipients, UnlimitedSender as _,
12};
13use bytes::Bytes;
14use commonware_codec::{DecodeExt, FixedSize};
15use commonware_cryptography::PublicKey;
16use commonware_macros::{select, select_loop};
17use commonware_runtime::{
18    spawn_cell, Clock, ContextCell, Handle, Listener as _, Metrics, Network as RNetwork, Quota,
19    Spawner,
20};
21use commonware_stream::utils::codec::{recv_frame, send_frame};
22use commonware_utils::{channels::ring, ordered::Set, NZUsize, TryCollect};
23use either::Either;
24use futures::{
25    channel::{mpsc, oneshot},
26    future, SinkExt, StreamExt,
27};
28use prometheus_client::metrics::{counter::Counter, family::Family};
29use rand::Rng;
30use rand_distr::{Distribution, Normal};
31use std::{
32    collections::{BTreeMap, HashMap, HashSet},
33    fmt::Debug,
34    net::{IpAddr, Ipv4Addr, SocketAddr},
35    time::{Duration, SystemTime},
36};
37use tracing::{debug, error, trace, warn};
38
39/// Task type representing a message to be sent within the network.
40type Task<P> = (Channel, P, Recipients<P>, Bytes, oneshot::Sender<Vec<P>>);
41
42/// Target for a message in a split receiver.
43#[derive(Clone, Copy, Debug, PartialEq, Eq)]
44#[must_use]
45pub enum SplitTarget {
46    None,
47    Primary,
48    Secondary,
49    Both,
50}
51
52/// Origin of a message in a split sender.
53#[derive(Clone, Copy, Debug, PartialEq, Eq)]
54#[must_use]
55pub enum SplitOrigin {
56    Primary,
57    Secondary,
58}
59
60/// A function that forwards messages from [SplitOrigin] to [Recipients].
61pub trait SplitForwarder<P: PublicKey>:
62    Fn(SplitOrigin, &Recipients<P>, &Bytes) -> Option<Recipients<P>> + Send + Sync + Clone + 'static
63{
64}
65
66impl<P: PublicKey, F> SplitForwarder<P> for F where
67    F: Fn(SplitOrigin, &Recipients<P>, &Bytes) -> Option<Recipients<P>>
68        + Send
69        + Sync
70        + Clone
71        + 'static
72{
73}
74
75/// A function that routes incoming [Message]s to a [SplitTarget].
76pub trait SplitRouter<P: PublicKey>:
77    Fn(&Message<P>) -> SplitTarget + Send + Sync + 'static
78{
79}
80
81impl<P: PublicKey, F> SplitRouter<P> for F where
82    F: Fn(&Message<P>) -> SplitTarget + Send + Sync + 'static
83{
84}
85
86/// Configuration for the simulated network.
87pub struct Config {
88    /// Maximum size of a message that can be sent over the network.
89    pub max_size: u32,
90
91    /// True if peers should disconnect upon being blocked. While production networking would
92    /// typically disconnect, for testing purposes it may be useful to keep peers connected,
93    /// allowing byzantine actors the ability to continue sending messages.
94    pub disconnect_on_block: bool,
95
96    /// The maximum number of peer sets to track. When a new peer set is registered and this
97    /// limit is exceeded, the oldest peer set is removed. Peers that are no longer in any
98    /// tracked peer set will have their links removed and messages to them will be dropped.
99    ///
100    /// If [None], peer sets are not considered.
101    pub tracked_peer_sets: Option<usize>,
102}
103
104/// Implementation of a simulated network.
105pub struct Network<E: RNetwork + Spawner + Rng + Clock + Metrics, P: PublicKey> {
106    context: ContextCell<E>,
107
108    // Maximum size of a message that can be sent over the network
109    max_size: u32,
110
111    // True if peers should disconnect upon being blocked.
112    // While production networking would typically disconnect, for testing purposes it may be useful
113    // to keep peers connected, allowing byzantine actors the ability to continue sending messages.
114    disconnect_on_block: bool,
115
116    // Next socket address to assign to a new peer
117    // Incremented for each new peer
118    next_addr: SocketAddr,
119
120    // Channel to receive messages from the oracle
121    ingress: mpsc::UnboundedReceiver<ingress::Message<P, E>>,
122
123    // Sender to the oracle channel (passed to Senders for PeerSource subscriptions)
124    oracle_sender: mpsc::UnboundedSender<ingress::Message<P, E>>,
125
126    // A channel to receive tasks from peers
127    // The sender is cloned and given to each peer
128    // The receiver is polled in the main loop
129    sender: mpsc::UnboundedSender<Task<P>>,
130    receiver: mpsc::UnboundedReceiver<Task<P>>,
131
132    // A map from a pair of public keys (from, to) to a link between the two peers
133    links: HashMap<(P, P), Link>,
134
135    // A map from a public key to a peer
136    peers: BTreeMap<P, Peer<P>>,
137
138    // Peer sets indexed by their ID
139    peer_sets: BTreeMap<u64, Set<P>>,
140
141    // Reference count for each peer (number of peer sets they belong to)
142    peer_refs: BTreeMap<P, usize>,
143
144    // Maximum number of peer sets to track
145    tracked_peer_sets: Option<usize>,
146
147    // A map of peers blocking each other
148    blocks: HashSet<(P, P)>,
149
150    // State of the transmitter
151    transmitter: transmitter::State<P>,
152
153    // Subscribers to peer set updates (used by Manager::subscribe())
154    #[allow(clippy::type_complexity)]
155    subscribers: Vec<mpsc::UnboundedSender<(u64, Set<P>, Set<P>)>>,
156
157    // Subscribers to tracked peer list updates (used by PeerSource for LimitedSender)
158    peer_subscribers: Vec<ring::Sender<Vec<P>>>,
159
160    // Metrics for received and sent messages
161    received_messages: Family<metrics::Message, Counter>,
162    sent_messages: Family<metrics::Message, Counter>,
163}
164
165impl<E: RNetwork + Spawner + Rng + Clock + Metrics, P: PublicKey> Network<E, P> {
166    /// Create a new simulated network with a given runtime and configuration.
167    ///
168    /// Returns a tuple containing the network instance and the oracle that can
169    /// be used to modify the state of the network during context.
170    pub fn new(mut context: E, cfg: Config) -> (Self, Oracle<P, E>) {
171        let (sender, receiver) = mpsc::unbounded();
172        let (oracle_sender, oracle_receiver) = mpsc::unbounded();
173        let sent_messages = Family::<metrics::Message, Counter>::default();
174        let received_messages = Family::<metrics::Message, Counter>::default();
175        context.register("messages_sent", "messages sent", sent_messages.clone());
176        context.register(
177            "messages_received",
178            "messages received",
179            received_messages.clone(),
180        );
181
182        // Start with a pseudo-random IP address to assign sockets to for new peers
183        let next_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::from_bits(context.next_u32())), 0);
184
185        (
186            Self {
187                context: ContextCell::new(context),
188                max_size: cfg.max_size,
189                disconnect_on_block: cfg.disconnect_on_block,
190                tracked_peer_sets: cfg.tracked_peer_sets,
191                next_addr,
192                ingress: oracle_receiver,
193                oracle_sender: oracle_sender.clone(),
194                sender,
195                receiver,
196                links: HashMap::new(),
197                peers: BTreeMap::new(),
198                peer_sets: BTreeMap::new(),
199                peer_refs: BTreeMap::new(),
200                blocks: HashSet::new(),
201                transmitter: transmitter::State::new(),
202                subscribers: Vec::new(),
203                peer_subscribers: Vec::new(),
204                received_messages,
205                sent_messages,
206            },
207            Oracle::new(oracle_sender),
208        )
209    }
210
211    /// Returns (and increments) the next available socket address.
212    ///
213    /// The port number is incremented for each call, and the IP address is incremented if the port
214    /// number overflows.
215    fn get_next_socket(&mut self) -> SocketAddr {
216        let result = self.next_addr;
217
218        // Increment the port number, or the IP address if the port number overflows.
219        // Allows the ip address to overflow (wrapping).
220        match self.next_addr.port().checked_add(1) {
221            Some(port) => {
222                self.next_addr.set_port(port);
223            }
224            None => {
225                let ip = match self.next_addr.ip() {
226                    IpAddr::V4(ipv4) => ipv4,
227                    _ => unreachable!(),
228                };
229                let next_ip = Ipv4Addr::to_bits(ip).wrapping_add(1);
230                self.next_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::from_bits(next_ip)), 0);
231            }
232        }
233
234        result
235    }
236
237    /// Handle an ingress message.
238    ///
239    /// This method is called when a message is received from the oracle.
240    async fn handle_ingress(&mut self, message: ingress::Message<P, E>) {
241        // It is important to ensure that no failed receipt of a message will cause us to exit.
242        // This could happen if the caller drops the `Oracle` after updating the network topology.
243        // Thus, we create a helper function to send the result to the oracle and log any errors.
244        fn send_result<T: std::fmt::Debug>(
245            result: oneshot::Sender<Result<T, Error>>,
246            value: Result<T, Error>,
247        ) {
248            let success = value.is_ok();
249            if let Err(e) = result.send(value) {
250                error!(?e, "failed to send result to oracle (ok = {})", success);
251            }
252        }
253
254        match message {
255            ingress::Message::Update { id, peers } => {
256                let Some(tracked_peer_sets) = self.tracked_peer_sets else {
257                    warn!("attempted to register peer set when tracking is disabled");
258                    return;
259                };
260
261                // Check if peer set already exists
262                if self.peer_sets.contains_key(&id) {
263                    warn!(id, "peer set already exists");
264                    return;
265                }
266
267                // Ensure that peer set is monotonically increasing
268                if let Some((last, _)) = self.peer_sets.last_key_value() {
269                    if id <= *last {
270                        warn!(
271                            new_id = id,
272                            old_id = last,
273                            "attempted to register peer set with non-monotonically increasing ID"
274                        );
275                        return;
276                    }
277                }
278
279                // Create and store new peer set
280                for public_key in peers.iter() {
281                    // Create peer if it doesn't exist
282                    self.ensure_peer_exists(public_key).await;
283
284                    // Increment reference count
285                    *self.peer_refs.entry(public_key.clone()).or_insert(0) += 1;
286                }
287                self.peer_sets.insert(id, peers.clone());
288
289                // Remove oldest peer set if we exceed the limit
290                while self.peer_sets.len() > tracked_peer_sets {
291                    let (id, set) = self.peer_sets.pop_first().unwrap();
292                    debug!(id, "removed oldest peer set");
293
294                    // Decrement reference counts and clean up peers/links
295                    for public_key in set.iter() {
296                        let refs = self.peer_refs.get_mut(public_key).unwrap();
297                        *refs = refs.checked_sub(1).expect("reference count underflow");
298
299                        // If peer is no longer in any tracked set, remove it. We explicitly keep the peer around
300                        // in `self.peers` to keep its network alive, in-case the peer re-joins in a future peer set.
301                        if *refs == 0 {
302                            self.peer_refs.remove(public_key);
303                            debug!(?public_key, "removed peer no longer in any tracked set");
304                        }
305                    }
306                }
307
308                // Notify all subscribers about the new peer set
309                let all = self.all_tracked_peers();
310                let notification = (id, peers, all);
311                self.subscribers
312                    .retain(|subscriber| subscriber.unbounded_send(notification.clone()).is_ok());
313
314                // Broadcast updated peer list to LimitedSender subscribers
315                self.broadcast_peer_list().await;
316            }
317            ingress::Message::Register {
318                channel,
319                public_key,
320                quota,
321                result,
322            } => {
323                // If peer does not exist, then create it.
324                let (_, is_new) = self.ensure_peer_exists(&public_key).await;
325
326                // When not using peer sets, broadcast updated peer list to subscribers
327                if is_new && self.peer_sets.is_empty() {
328                    self.broadcast_peer_list().await;
329                }
330
331                // Get clock for the rate limiter
332                let clock = self
333                    .context
334                    .with_label(&format!("rate_limiter_{channel}_{public_key}"))
335                    .take();
336
337                // Create a sender that allows sending messages to the network for a certain channel
338                let (sender, handle) = Sender::new(
339                    self.context.with_label("sender"),
340                    public_key.clone(),
341                    channel,
342                    self.max_size,
343                    self.sender.clone(),
344                    self.oracle_sender.clone(),
345                    clock,
346                    quota,
347                );
348
349                // Create a receiver that allows receiving messages from the network for a certain channel
350                let peer = self.peers.get_mut(&public_key).unwrap();
351                let receiver = match peer.register(channel, handle).await {
352                    Ok(receiver) => Receiver { receiver },
353                    Err(err) => return send_result(result, Err(err)),
354                };
355
356                send_result(result, Ok((sender, receiver)))
357            }
358            ingress::Message::PeerSet { id, response } => {
359                if self.peer_sets.is_empty() {
360                    // Return all peers if no peer sets are registered.
361                    let _ = response.send(Some(
362                        self.peers
363                            .keys()
364                            .cloned()
365                            .try_collect()
366                            .expect("BTreeMap keys are unique"),
367                    ));
368                } else {
369                    // Return the peer set at the given index
370                    let _ = response.send(self.peer_sets.get(&id).cloned());
371                }
372            }
373            ingress::Message::Subscribe { sender } => {
374                // Send the latest peer set upon subscription
375                if let Some((index, peers)) = self.peer_sets.last_key_value() {
376                    let all = self.all_tracked_peers();
377                    let notification = (*index, peers.clone(), all);
378                    let _ = sender.unbounded_send(notification);
379                }
380                self.subscribers.push(sender);
381            }
382            ingress::Message::SubscribeConnected { response } => {
383                // Create a ring channel for the subscriber
384                let (mut sender, receiver) = ring::channel(NZUsize!(1));
385
386                // Send current peer list immediately
387                let peer_list: Vec<P> = self.all_tracked_peers().into_iter().collect();
388                let _ = sender.send(peer_list).await;
389
390                // Store sender for future broadcasts
391                self.peer_subscribers.push(sender);
392
393                // Return the receiver to the subscriber
394                let _ = response.send(receiver);
395            }
396            ingress::Message::LimitBandwidth {
397                public_key,
398                egress_cap,
399                ingress_cap,
400                result,
401            } => {
402                // If peer does not exist, then create it.
403                let (_, is_new) = self.ensure_peer_exists(&public_key).await;
404
405                // When not using peer sets, broadcast updated peer list to subscribers
406                if is_new && self.peer_sets.is_empty() {
407                    self.broadcast_peer_list().await;
408                }
409
410                // Update bandwidth limits
411                let now = self.context.current();
412                let completions = self
413                    .transmitter
414                    .limit(now, &public_key, egress_cap, ingress_cap);
415                self.process_completions(completions);
416
417                // Notify the caller that the bandwidth update has been applied
418                let _ = result.send(());
419            }
420            ingress::Message::AddLink {
421                sender,
422                receiver,
423                sampler,
424                success_rate,
425                result,
426            } => {
427                // If sender or receiver does not exist, then create it.
428                let (_, sender_is_new) = self.ensure_peer_exists(&sender).await;
429                let (receiver_socket, receiver_is_new) = self.ensure_peer_exists(&receiver).await;
430
431                // When not using peer sets, broadcast updated peer list to subscribers
432                if (sender_is_new || receiver_is_new) && self.peer_sets.is_empty() {
433                    self.broadcast_peer_list().await;
434                }
435
436                // Require link to not already exist
437                let key = (sender.clone(), receiver.clone());
438                if self.links.contains_key(&key) {
439                    return send_result(result, Err(Error::LinkExists));
440                }
441
442                let link = Link::new(
443                    &mut self.context,
444                    sender,
445                    receiver,
446                    receiver_socket,
447                    sampler,
448                    success_rate,
449                    self.max_size,
450                    self.received_messages.clone(),
451                );
452                self.links.insert(key, link);
453                send_result(result, Ok(()))
454            }
455            ingress::Message::RemoveLink {
456                sender,
457                receiver,
458                result,
459            } => {
460                match self.links.remove(&(sender, receiver)) {
461                    Some(_) => (),
462                    None => return send_result(result, Err(Error::LinkMissing)),
463                }
464                send_result(result, Ok(()))
465            }
466            ingress::Message::Block { from, to } => {
467                self.blocks.insert((from, to));
468            }
469            ingress::Message::Blocked { result } => {
470                send_result(result, Ok(self.blocks.iter().cloned().collect()))
471            }
472        }
473    }
474
475    /// Ensure a peer exists, creating it if necessary.
476    ///
477    /// Returns the socket address of the peer and a boolean indicating if a new peer was created.
478    async fn ensure_peer_exists(&mut self, public_key: &P) -> (SocketAddr, bool) {
479        if !self.peers.contains_key(public_key) {
480            // Create peer
481            let socket = self.get_next_socket();
482            let peer = Peer::new(
483                self.context.with_label("peer"),
484                public_key.clone(),
485                socket,
486                self.max_size,
487            )
488            .await;
489
490            // Once ready, add to peers
491            self.peers.insert(public_key.clone(), peer);
492
493            (socket, true)
494        } else {
495            (self.peers.get(public_key).unwrap().socket, false)
496        }
497    }
498
499    /// Broadcast updated peer list to all peer subscribers.
500    ///
501    /// This is called when the peer list changes (either from peer set updates
502    /// or from new peers being added when not using peer sets).
503    ///
504    /// Subscribers whose receivers have been dropped are removed to prevent
505    /// memory leaks.
506    async fn broadcast_peer_list(&mut self) {
507        let peer_list = self.all_tracked_peers().into_iter().collect::<Vec<_>>();
508        let mut live_subscribers = Vec::with_capacity(self.peer_subscribers.len());
509        for mut subscriber in self.peer_subscribers.drain(..) {
510            if subscriber.send(peer_list.clone()).await.is_ok() {
511                live_subscribers.push(subscriber);
512            }
513        }
514        self.peer_subscribers = live_subscribers;
515    }
516
517    /// Get all tracked peers as an ordered set.
518    ///
519    /// When peer sets are registered, returns only the peers from those sets.
520    /// Otherwise, returns all registered peers (for compatibility with tests
521    /// that don't use peer sets).
522    fn all_tracked_peers(&self) -> Set<P> {
523        if self.peer_sets.is_empty() && self.tracked_peer_sets.is_none() {
524            self.peers
525                .keys()
526                .cloned()
527                .try_collect()
528                .expect("BTreeMap keys are unique")
529        } else {
530            self.peer_refs
531                .keys()
532                .cloned()
533                .try_collect()
534                .expect("BTreeMap keys are unique")
535        }
536    }
537}
538
539impl<E: RNetwork + Spawner + Rng + Clock + Metrics, P: PublicKey> Network<E, P> {
540    /// Process completions from the transmitter.
541    fn process_completions(&mut self, completions: Vec<Completion<P>>) {
542        for completion in completions {
543            // If there is no message to deliver, then skip
544            let Some(deliver_at) = completion.deliver_at else {
545                trace!(
546                    origin = ?completion.origin,
547                    recipient = ?completion.recipient,
548                    "message dropped before delivery",
549                );
550                continue;
551            };
552
553            // Send message to link
554            let key = (completion.origin.clone(), completion.recipient.clone());
555            let Some(link) = self.links.get_mut(&key) else {
556                // This can happen if the link is removed before the message is delivered
557                trace!(
558                    origin = ?completion.origin,
559                    recipient = ?completion.recipient,
560                    "missing link for completion",
561                );
562                continue;
563            };
564            if let Err(err) = link.send(completion.channel, completion.message, deliver_at) {
565                error!(?err, "failed to send");
566            }
567        }
568    }
569
570    /// Handle a task.
571    ///
572    /// This method is called when a task is received from the sender, which can come from
573    /// any peer in the network.
574    fn handle_task(&mut self, task: Task<P>) {
575        // If peer sets are enabled and we are not in one, ignore the message (we are disconnected from all)
576        let (channel, origin, recipients, message, reply) = task;
577        if self.tracked_peer_sets.is_some() && !self.peer_refs.contains_key(&origin) {
578            warn!(
579                ?origin,
580                reason = "not in tracked peer set",
581                "dropping message"
582            );
583            if let Err(err) = reply.send(Vec::new()) {
584                error!(?err, "failed to send ack");
585            }
586            return;
587        }
588
589        // Collect recipients
590        let recipients = match recipients {
591            Recipients::All => {
592                // If peer sets have been registered, send only to tracked peers
593                // Otherwise, send to all registered peers (compatibility
594                // with tests that do not register peer sets.)
595                if self.peer_sets.is_empty() {
596                    self.peers.keys().cloned().collect()
597                } else {
598                    self.peer_refs.keys().cloned().collect()
599                }
600            }
601            Recipients::Some(keys) => keys,
602            Recipients::One(key) => vec![key],
603        };
604
605        // Send to all recipients
606        let now = self.context.current();
607        let mut sent = Vec::new();
608        for recipient in recipients {
609            // Skip self
610            if recipient == origin {
611                trace!(?recipient, reason = "self", "dropping message");
612                continue;
613            }
614
615            // If tracking peer sets, ensure recipient and sender are in a tracked peer set
616            if self.tracked_peer_sets.is_some() && !self.peer_refs.contains_key(&recipient) {
617                trace!(
618                    ?origin,
619                    ?recipient,
620                    reason = "not in tracked peer set",
621                    "dropping message"
622                );
623                continue;
624            }
625
626            // Determine if the sender or recipient has blocked the other
627            let o_r = (origin.clone(), recipient.clone());
628            let r_o = (recipient.clone(), origin.clone());
629            if self.disconnect_on_block
630                && (self.blocks.contains(&o_r) || self.blocks.contains(&r_o))
631            {
632                trace!(?origin, ?recipient, reason = "blocked", "dropping message");
633                continue;
634            }
635
636            // Determine if there is a link between the origin and recipient
637            let Some(link) = self.links.get_mut(&o_r) else {
638                trace!(?origin, ?recipient, reason = "no link", "dropping message");
639                continue;
640            };
641
642            // Note: Rate limiting is handled by the Sender before messages reach here.
643            // The Sender filters recipients via LimitedSender::check() or in Sender::send().
644
645            // Record sent message as soon as we determine there is a link with recipient (approximates
646            // having an open connection)
647            self.sent_messages
648                .get_or_create(&metrics::Message::new(&origin, &recipient, channel))
649                .inc();
650
651            // Sample latency
652            let latency = Duration::from_millis(link.sampler.sample(&mut self.context) as u64);
653
654            // Determine if the message should be delivered
655            let should_deliver = self.context.gen_bool(link.success_rate);
656
657            // Enqueue message for delivery
658            let completions = self.transmitter.enqueue(
659                now,
660                origin.clone(),
661                recipient.clone(),
662                channel,
663                message.clone(),
664                latency,
665                should_deliver,
666            );
667            self.process_completions(completions);
668
669            sent.push(recipient);
670        }
671
672        // Alert application of sent messages
673        if let Err(err) = reply.send(sent) {
674            error!(?err, "failed to send ack");
675        }
676    }
677
678    /// Run the simulated network.
679    ///
680    /// It is not necessary to invoke this method before modifying the network topology, however,
681    /// no messages will be sent until this method is called.
682    pub fn start(mut self) -> Handle<()> {
683        spawn_cell!(self.context, self.run().await)
684    }
685
686    async fn run(mut self) {
687        loop {
688            let tick = match self.transmitter.next() {
689                Some(when) => Either::Left(self.context.sleep_until(when)),
690                None => Either::Right(future::pending()),
691            };
692            select! {
693                _ = tick => {
694                    let now = self.context.current();
695                    let completions = self.transmitter.advance(now);
696                    self.process_completions(completions);
697                },
698                message = self.ingress.next() => {
699                    // If ingress is closed, exit
700                    let message = match message {
701                        Some(message) => message,
702                        None => break,
703                    };
704                    self.handle_ingress(message).await;
705                },
706                task = self.receiver.next() => {
707                    // If receiver is closed, exit
708                    let task = match task {
709                        Some(task) => task,
710                        None => break,
711                    };
712                    self.handle_task(task);
713                },
714            }
715        }
716    }
717}
718
719/// Provides online peers from the simulated network.
720///
721/// Implements [`crate::utils::limited::Connected`] to provide peer list updates
722/// to [`crate::utils::limited::LimitedSender`].
723pub struct ConnectedPeerProvider<P: PublicKey, E: Clock> {
724    sender: mpsc::UnboundedSender<ingress::Message<P, E>>,
725}
726
727impl<P: PublicKey, E: Clock> Clone for ConnectedPeerProvider<P, E> {
728    fn clone(&self) -> Self {
729        Self {
730            sender: self.sender.clone(),
731        }
732    }
733}
734
735impl<P: PublicKey, E: Clock> ConnectedPeerProvider<P, E> {
736    const fn new(sender: mpsc::UnboundedSender<ingress::Message<P, E>>) -> Self {
737        Self { sender }
738    }
739}
740
741impl<P: PublicKey, E: Clock> Connected for ConnectedPeerProvider<P, E> {
742    type PublicKey = P;
743
744    async fn subscribe(&mut self) -> ring::Receiver<Vec<Self::PublicKey>> {
745        let (response_tx, response_rx) = oneshot::channel();
746        let _ = self
747            .sender
748            .unbounded_send(ingress::Message::SubscribeConnected {
749                response: response_tx,
750            });
751        // If the network is closed, return an empty receiver
752        response_rx.await.unwrap_or_else(|_| {
753            let (_sender, receiver) = ring::channel(NZUsize!(1));
754            receiver
755        })
756    }
757}
758
759/// Implementation of a [crate::Sender] for the simulated network without rate limiting.
760///
761/// This is the inner sender used by [`Sender`] which wraps it with rate limiting.
762#[derive(Clone)]
763pub struct UnlimitedSender<P: PublicKey> {
764    me: P,
765    channel: Channel,
766    max_size: u32,
767    high: mpsc::UnboundedSender<Task<P>>,
768    low: mpsc::UnboundedSender<Task<P>>,
769}
770
771impl<P: PublicKey> crate::UnlimitedSender for UnlimitedSender<P> {
772    type Error = Error;
773    type PublicKey = P;
774
775    async fn send(
776        &mut self,
777        recipients: Recipients<P>,
778        message: Bytes,
779        priority: bool,
780    ) -> Result<Vec<P>, Error> {
781        // Check message size
782        if message.len() > self.max_size as usize {
783            return Err(Error::MessageTooLarge(message.len()));
784        }
785
786        // Send message
787        let (sender, receiver) = oneshot::channel();
788        let channel = if priority { &self.high } else { &self.low };
789        channel
790            .unbounded_send((self.channel, self.me.clone(), recipients, message, sender))
791            .map_err(|_| Error::NetworkClosed)?;
792        receiver.await.map_err(|_| Error::NetworkClosed)
793    }
794}
795
796/// Implementation of a [crate::Sender] for the simulated network.
797///
798/// Also implements [crate::LimitedSender] to support rate-limit checking
799/// before sending messages.
800pub struct Sender<P: PublicKey, E: Clock> {
801    limited_sender: LimitedSender<E, UnlimitedSender<P>, ConnectedPeerProvider<P, E>>,
802}
803
804impl<P: PublicKey, E: Clock> Clone for Sender<P, E> {
805    fn clone(&self) -> Self {
806        Self {
807            limited_sender: self.limited_sender.clone(),
808        }
809    }
810}
811
812impl<P: PublicKey, E: Clock> Debug for Sender<P, E> {
813    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
814        f.debug_struct("Sender").finish_non_exhaustive()
815    }
816}
817
818impl<P: PublicKey, E: Clock> Sender<P, E> {
819    #[allow(clippy::too_many_arguments)]
820    fn new(
821        context: impl Spawner + Metrics,
822        me: P,
823        channel: Channel,
824        max_size: u32,
825        mut sender: mpsc::UnboundedSender<Task<P>>,
826        oracle_sender: mpsc::UnboundedSender<ingress::Message<P, E>>,
827        clock: E,
828        quota: Quota,
829    ) -> (Self, Handle<()>) {
830        // Listen for messages
831        let (high, mut high_receiver) = mpsc::unbounded();
832        let (low, mut low_receiver) = mpsc::unbounded();
833        let processor = context.with_label("processor").spawn(move |_| async move {
834            loop {
835                // Wait for task
836                let task;
837                select! {
838                    high_task = high_receiver.next() => {
839                        task = match high_task {
840                            Some(task) => task,
841                            None => break,
842                        };
843                    },
844                    low_task = low_receiver.next() => {
845                        task = match low_task {
846                            Some(task) => task,
847                            None => break,
848                        };
849                    }
850                }
851
852                // Send task
853                if let Err(err) = sender.send(task).await {
854                    error!(?err, channel, "failed to send task");
855                }
856            }
857        });
858
859        let unlimited_sender = UnlimitedSender {
860            me,
861            channel,
862            max_size,
863            high,
864            low,
865        };
866        let peer_source = ConnectedPeerProvider::new(oracle_sender);
867        let limited_sender = LimitedSender::new(unlimited_sender, quota, clock, peer_source);
868
869        (Self { limited_sender }, processor)
870    }
871
872    /// Split this [Sender] into a [SplitOrigin::Primary] and [SplitOrigin::Secondary] sender.
873    pub fn split_with<F: SplitForwarder<P>>(
874        self,
875        forwarder: F,
876    ) -> (SplitSender<P, E, F>, SplitSender<P, E, F>) {
877        (
878            SplitSender {
879                replica: SplitOrigin::Primary,
880                inner: self.clone(),
881                forwarder: forwarder.clone(),
882            },
883            SplitSender {
884                replica: SplitOrigin::Secondary,
885                inner: self,
886                forwarder,
887            },
888        )
889    }
890}
891
892impl<P: PublicKey, E: Clock> crate::LimitedSender for Sender<P, E> {
893    type PublicKey = P;
894    type Checked<'a>
895        = crate::utils::limited::CheckedSender<'a, UnlimitedSender<P>>
896    where
897        Self: 'a;
898
899    async fn check(
900        &mut self,
901        recipients: Recipients<Self::PublicKey>,
902    ) -> Result<Self::Checked<'_>, SystemTime> {
903        self.limited_sender.check(recipients).await
904    }
905}
906
907/// A sender that routes recipients per message via a user-provided function.
908pub struct SplitSender<P: PublicKey, E: Clock, F: SplitForwarder<P>> {
909    replica: SplitOrigin,
910    inner: Sender<P, E>,
911    forwarder: F,
912}
913
914impl<P: PublicKey, E: Clock, F: SplitForwarder<P>> Clone for SplitSender<P, E, F> {
915    fn clone(&self) -> Self {
916        Self {
917            replica: self.replica,
918            inner: self.inner.clone(),
919            forwarder: self.forwarder.clone(),
920        }
921    }
922}
923
924impl<P: PublicKey, E: Clock, F: SplitForwarder<P>> std::fmt::Debug for SplitSender<P, E, F> {
925    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
926        f.debug_struct("SplitSender")
927            .field("replica", &self.replica)
928            .field("inner", &self.inner)
929            .finish()
930    }
931}
932
933impl<P: PublicKey, E: Clock, F: SplitForwarder<P>> crate::LimitedSender for SplitSender<P, E, F> {
934    type PublicKey = P;
935    type Checked<'a> = SplitCheckedSender<'a, P, E, F>;
936
937    async fn check(
938        &mut self,
939        recipients: Recipients<Self::PublicKey>,
940    ) -> Result<Self::Checked<'_>, SystemTime> {
941        Ok(SplitCheckedSender {
942            // Perform a rate limit check with the entire set of original recipients although
943            // the forwarder may filter these (based on message content) during send.
944            checked: self.inner.limited_sender.check(recipients.clone()).await?,
945            replica: self.replica,
946            forwarder: self.forwarder.clone(),
947            recipients,
948
949            _phantom: std::marker::PhantomData,
950        })
951    }
952}
953
954/// A checked sender for [`SplitSender`] that defers the forwarder call to send time.
955///
956/// This is necessary because [`SplitForwarder`] may examine message content to determine
957/// routing, but the message is not available at [`LimitedSender::check`] time.
958pub struct SplitCheckedSender<'a, P: PublicKey, E: Clock, F: SplitForwarder<P>> {
959    checked: LimitedCheckedSender<'a, UnlimitedSender<P>>,
960    replica: SplitOrigin,
961    forwarder: F,
962    recipients: Recipients<P>,
963
964    _phantom: std::marker::PhantomData<E>,
965}
966
967impl<'a, P: PublicKey, E: Clock, F: SplitForwarder<P>> crate::CheckedSender
968    for SplitCheckedSender<'a, P, E, F>
969{
970    type PublicKey = P;
971    type Error = Error;
972
973    async fn send(
974        self,
975        message: Bytes,
976        priority: bool,
977    ) -> Result<Vec<Self::PublicKey>, Self::Error> {
978        // Determine the set of recipients that will receive the message
979        let Some(recipients) = (self.forwarder)(self.replica, &self.recipients, &message) else {
980            return Ok(Vec::new());
981        };
982
983        // Extract the inner sender and send directly with the new recipients
984        //
985        // While SplitForwarder does not enforce any relationship between the original recipients
986        // and the new recipients, it is typically some subset of the original recipients. This
987        // means we may over-rate limit some recipients (who are never actually sent a message here) but
988        // we prefer this to not providing feedback at all (we would have to skip check entirely).
989        self.checked
990            .into_inner()
991            .send(recipients, message, priority)
992            .await
993    }
994}
995
996type MessageReceiver<P> = mpsc::UnboundedReceiver<Message<P>>;
997
998/// Implementation of a [crate::Receiver] for the simulated network.
999#[derive(Debug)]
1000pub struct Receiver<P: PublicKey> {
1001    receiver: MessageReceiver<P>,
1002}
1003
1004impl<P: PublicKey> crate::Receiver for Receiver<P> {
1005    type Error = Error;
1006    type PublicKey = P;
1007
1008    async fn recv(&mut self) -> Result<Message<Self::PublicKey>, Error> {
1009        self.receiver.next().await.ok_or(Error::NetworkClosed)
1010    }
1011}
1012
1013impl<P: PublicKey> Receiver<P> {
1014    /// Split this [Receiver] into a [SplitTarget::Primary] and [SplitTarget::Secondary] receiver.
1015    pub fn split_with<E: Spawner, R: SplitRouter<P>>(
1016        mut self,
1017        context: E,
1018        router: R,
1019    ) -> (Self, Self) {
1020        let (mut primary_tx, primary_rx) = mpsc::unbounded();
1021        let (mut secondary_tx, secondary_rx) = mpsc::unbounded();
1022        context.spawn(move |_| async move {
1023            while let Some(message) = self.receiver.next().await {
1024                // Route message to the appropriate target
1025                let direction = router(&message);
1026                match direction {
1027                    SplitTarget::None => {}
1028                    SplitTarget::Primary => {
1029                        if let Err(err) = primary_tx.send(message).await {
1030                            error!(?err, "failed to send message to primary");
1031                        }
1032                    }
1033                    SplitTarget::Secondary => {
1034                        if let Err(err) = secondary_tx.send(message).await {
1035                            error!(?err, "failed to send message to secondary");
1036                        }
1037                    }
1038                    SplitTarget::Both => {
1039                        if let Err(err) = primary_tx.send(message.clone()).await {
1040                            error!(?err, "failed to send message to primary");
1041                        }
1042                        if let Err(err) = secondary_tx.send(message).await {
1043                            error!(?err, "failed to send message to secondary");
1044                        }
1045                    }
1046                }
1047
1048                // Exit if both channels are closed
1049                if primary_tx.is_closed() && secondary_tx.is_closed() {
1050                    break;
1051                }
1052            }
1053        });
1054
1055        (
1056            Self {
1057                receiver: primary_rx,
1058            },
1059            Self {
1060                receiver: secondary_rx,
1061            },
1062        )
1063    }
1064}
1065
1066/// A peer in the simulated network.
1067///
1068/// The peer can register channels, which allows it to receive messages sent to the channel from other peers.
1069struct Peer<P: PublicKey> {
1070    // Socket address that the peer is listening on
1071    socket: SocketAddr,
1072
1073    // Control to register new channels
1074    control: mpsc::UnboundedSender<(Channel, Handle<()>, oneshot::Sender<MessageReceiver<P>>)>,
1075}
1076
1077impl<P: PublicKey> Peer<P> {
1078    /// Create and return a new peer.
1079    ///
1080    /// The peer will listen for incoming connections on the given `socket` address.
1081    /// `max_size` is the maximum size of a message that can be sent to the peer.
1082    async fn new<E: Spawner + RNetwork + Metrics + Clock>(
1083        context: E,
1084        public_key: P,
1085        socket: SocketAddr,
1086        max_size: u32,
1087    ) -> Self {
1088        // The control is used to register channels.
1089        // There is exactly one mailbox created for each channel that the peer is registered for.
1090        let (control_sender, mut control_receiver) = mpsc::unbounded();
1091
1092        // Whenever a message is received from a peer, it is placed in the inbox.
1093        // The router polls the inbox and forwards the message to the appropriate mailbox.
1094        let (inbox_sender, mut inbox_receiver) = mpsc::unbounded();
1095
1096        // Spawn router
1097        context.with_label("router").spawn(|context| async move {
1098            // Map of channels to mailboxes (senders to particular channels)
1099            let mut mailboxes = HashMap::new();
1100
1101            // Continually listen for control messages and outbound messages
1102            select_loop! {
1103                context,
1104                on_stopped => {},
1105                // Listen for control messages, which are used to register channels
1106                control = control_receiver.next() => {
1107                    // If control is closed, exit
1108                    let (channel, sender, result_tx): (Channel, Handle<()>, oneshot::Sender<MessageReceiver<P>>) = match control {
1109                        Some(control) => control,
1110                        None => break,
1111                    };
1112
1113                    // Register channel
1114                    let (receiver_tx, receiver_rx) = mpsc::unbounded();
1115                    if let Some((_, existing_sender)) = mailboxes.insert(channel, (receiver_tx, sender)) {
1116                        warn!(?public_key, ?channel, "overwriting existing channel");
1117                        existing_sender.abort();
1118                    }
1119                    result_tx.send(receiver_rx).unwrap();
1120                },
1121
1122                // Listen for messages from the inbox, which are forwarded to the appropriate mailbox
1123                inbox = inbox_receiver.next() => {
1124                    // If inbox is closed, exit
1125                    let (channel, message) = match inbox {
1126                        Some(message) => message,
1127                        None => break,
1128                    };
1129
1130                    // Send message to mailbox
1131                    match mailboxes.get_mut(&channel) {
1132                        Some((receiver_tx, _)) => {
1133                            if let Err(err) = receiver_tx.send(message).await {
1134                                debug!(?err, "failed to send message to mailbox");
1135                            }
1136                        }
1137                        None => {
1138                            trace!(
1139                                recipient = ?public_key,
1140                                channel,
1141                                reason = "missing channel",
1142                                "dropping message",
1143                            );
1144                        }
1145                    }
1146                },
1147            }
1148        });
1149
1150        // Spawn a task that accepts new connections and spawns a task for each connection
1151        let (ready_tx, ready_rx) = oneshot::channel();
1152        context
1153            .with_label("listener")
1154            .spawn(move |context| async move {
1155                // Initialize listener
1156                let mut listener = context.bind(socket).await.unwrap();
1157                let _ = ready_tx.send(());
1158
1159                // Continually accept new connections
1160                while let Ok((_, _, mut stream)) = listener.accept().await {
1161                    // New connection accepted. Spawn a task for this connection
1162                    context.with_label("receiver").spawn({
1163                        let mut inbox_sender = inbox_sender.clone();
1164                        move |_| async move {
1165                            // Receive dialer's public key as a handshake
1166                            let dialer = match recv_frame(&mut stream, max_size).await {
1167                                Ok(data) => data,
1168                                Err(_) => {
1169                                    error!("failed to receive public key from dialer");
1170                                    return;
1171                                }
1172                            };
1173                            let Ok(dialer) = P::decode(dialer.as_ref()) else {
1174                                error!("received public key is invalid");
1175                                return;
1176                            };
1177
1178                            // Continually receive messages from the dialer and send them to the inbox
1179                            while let Ok(data) = recv_frame(&mut stream, max_size).await {
1180                                let channel = Channel::from_be_bytes(
1181                                    data[..Channel::SIZE].try_into().unwrap(),
1182                                );
1183                                let message = data.slice(Channel::SIZE..);
1184                                if let Err(err) = inbox_sender
1185                                    .send((channel, (dialer.clone(), message)))
1186                                    .await
1187                                {
1188                                    debug!(?err, "failed to send message to mailbox");
1189                                    break;
1190                                }
1191                            }
1192                        }
1193                    });
1194                }
1195            });
1196
1197        // Wait for listener to start before returning
1198        let _ = ready_rx.await;
1199
1200        // Return peer
1201        Self {
1202            socket,
1203            control: control_sender,
1204        }
1205    }
1206
1207    /// Register a channel with the peer.
1208    ///
1209    /// This allows the peer to receive messages sent to the channel.
1210    /// Returns a receiver that can be used to receive messages sent to the channel.
1211    async fn register(
1212        &mut self,
1213        channel: Channel,
1214        sender: Handle<()>,
1215    ) -> Result<MessageReceiver<P>, Error> {
1216        let (result_tx, result_rx) = oneshot::channel();
1217        self.control
1218            .send((channel, sender, result_tx))
1219            .await
1220            .map_err(|_| Error::NetworkClosed)?;
1221        result_rx.await.map_err(|_| Error::NetworkClosed)
1222    }
1223}
1224
1225// A unidirectional link between two peers.
1226// Messages can be sent over the link with a given latency, jitter, and success rate.
1227struct Link {
1228    sampler: Normal<f64>,
1229    success_rate: f64,
1230    // Messages with their receive time for ordered delivery
1231    inbox: mpsc::UnboundedSender<(Channel, Bytes, SystemTime)>,
1232}
1233
1234/// Buffered payload waiting for earlier messages on the same link to complete.
1235impl Link {
1236    #[allow(clippy::too_many_arguments)]
1237    fn new<E: Spawner + RNetwork + Clock + Metrics, P: PublicKey>(
1238        context: &mut E,
1239        dialer: P,
1240        receiver: P,
1241        socket: SocketAddr,
1242        sampler: Normal<f64>,
1243        success_rate: f64,
1244        max_size: u32,
1245        received_messages: Family<metrics::Message, Counter>,
1246    ) -> Self {
1247        // Spawn a task that will wait for messages to be sent to the link and then send them
1248        // over the network.
1249        let (inbox, mut outbox) = mpsc::unbounded::<(Channel, Bytes, SystemTime)>();
1250        context.with_label("link").spawn(move |context| async move {
1251            // Dial the peer and handshake by sending it the dialer's public key
1252            let (mut sink, _) = context.dial(socket).await.unwrap();
1253            if let Err(err) = send_frame(&mut sink, &dialer, max_size).await {
1254                error!(?err, "failed to send public key to listener");
1255                return;
1256            }
1257
1258            // Process messages in order, waiting for their receive time
1259            while let Some((channel, message, receive_complete_at)) = outbox.next().await {
1260                // Wait until the message should arrive at receiver
1261                context.sleep_until(receive_complete_at).await;
1262
1263                // Send the message
1264                let mut data = bytes::BytesMut::with_capacity(Channel::SIZE + message.len());
1265                data.extend_from_slice(&channel.to_be_bytes());
1266                data.extend_from_slice(&message);
1267                let data = data.freeze();
1268                let _ = send_frame(&mut sink, &data, max_size).await;
1269
1270                // Bump received messages metric
1271                received_messages
1272                    .get_or_create(&metrics::Message::new(&dialer, &receiver, channel))
1273                    .inc();
1274            }
1275        });
1276
1277        Self {
1278            sampler,
1279            success_rate,
1280            inbox,
1281        }
1282    }
1283
1284    // Send a message over the link with receive timing.
1285    fn send(
1286        &mut self,
1287        channel: Channel,
1288        message: Bytes,
1289        receive_complete_at: SystemTime,
1290    ) -> Result<(), Error> {
1291        self.inbox
1292            .unbounded_send((channel, message, receive_complete_at))
1293            .map_err(|_| Error::NetworkClosed)?;
1294        Ok(())
1295    }
1296}
1297
1298#[cfg(test)]
1299mod tests {
1300    use super::*;
1301    use crate::{Manager, Receiver as _, Recipients, Sender as _};
1302    use bytes::Bytes;
1303    use commonware_cryptography::{ed25519, Signer as _};
1304    use commonware_runtime::{deterministic, Quota, Runner as _};
1305    use futures::FutureExt;
1306    use std::num::NonZeroU32;
1307
1308    const MAX_MESSAGE_SIZE: u32 = 1024 * 1024;
1309
1310    /// Default rate limit set high enough to not interfere with normal operation
1311    const TEST_QUOTA: Quota = Quota::per_second(NonZeroU32::MAX);
1312
1313    #[test]
1314    fn test_register_and_link() {
1315        let executor = deterministic::Runner::default();
1316        executor.start(|context| async move {
1317            let cfg = Config {
1318                max_size: MAX_MESSAGE_SIZE,
1319                disconnect_on_block: true,
1320                tracked_peer_sets: Some(3),
1321            };
1322            let network_context = context.with_label("network");
1323            let (network, mut oracle) = Network::new(network_context.clone(), cfg);
1324            network_context.spawn(|_| network.run());
1325
1326            // Create two public keys
1327            let pk1 = ed25519::PrivateKey::from_seed(1).public_key();
1328            let pk2 = ed25519::PrivateKey::from_seed(2).public_key();
1329
1330            // Register the peer set
1331            let mut manager = oracle.manager();
1332            manager
1333                .update(0, [pk1.clone(), pk2.clone()].try_into().unwrap())
1334                .await;
1335            let mut control = oracle.control(pk1.clone());
1336            control.register(0, TEST_QUOTA).await.unwrap();
1337            control.register(1, TEST_QUOTA).await.unwrap();
1338            let mut control = oracle.control(pk2.clone());
1339            control.register(0, TEST_QUOTA).await.unwrap();
1340            control.register(1, TEST_QUOTA).await.unwrap();
1341
1342            // Overwrite if registering again
1343            control.register(1, TEST_QUOTA).await.unwrap();
1344
1345            // Add link
1346            let link = ingress::Link {
1347                latency: Duration::from_millis(2),
1348                jitter: Duration::from_millis(1),
1349                success_rate: 0.9,
1350            };
1351            oracle
1352                .add_link(pk1.clone(), pk2.clone(), link.clone())
1353                .await
1354                .unwrap();
1355
1356            // Expect error when adding link again
1357            assert!(matches!(
1358                oracle.add_link(pk1, pk2, link).await,
1359                Err(Error::LinkExists)
1360            ));
1361        });
1362    }
1363
1364    #[test]
1365    fn test_split_channel_single() {
1366        let executor = deterministic::Runner::default();
1367        executor.start(|context| async move {
1368            let cfg = Config {
1369                max_size: MAX_MESSAGE_SIZE,
1370                disconnect_on_block: true,
1371                tracked_peer_sets: Some(3),
1372            };
1373            let network_context = context.with_label("network");
1374            let (network, mut oracle) = Network::new(network_context.clone(), cfg);
1375            network_context.spawn(|_| network.run());
1376
1377            // Create a "twin" node that will be split, plus two normal peers
1378            let twin = ed25519::PrivateKey::from_seed(20).public_key();
1379            let peer_a = ed25519::PrivateKey::from_seed(21).public_key();
1380            let peer_b = ed25519::PrivateKey::from_seed(22).public_key();
1381
1382            // Register all peers
1383            let mut manager = oracle.manager();
1384            manager
1385                .update(
1386                    0,
1387                    [twin.clone(), peer_a.clone(), peer_b.clone()]
1388                        .try_into()
1389                        .unwrap(),
1390                )
1391                .await;
1392
1393            // Register normal peers
1394            let (mut peer_a_sender, mut peer_a_recv) = oracle
1395                .control(peer_a.clone())
1396                .register(0, TEST_QUOTA)
1397                .await
1398                .unwrap();
1399            let (mut peer_b_sender, mut peer_b_recv) = oracle
1400                .control(peer_b.clone())
1401                .register(0, TEST_QUOTA)
1402                .await
1403                .unwrap();
1404
1405            // Register and split the twin's channel:
1406            // - Primary sends only to peer_a
1407            // - Secondary sends only to peer_b
1408            // - Messages from peer_a go to primary receiver
1409            // - Messages from peer_b go to secondary receiver
1410            let (twin_sender, twin_receiver) = oracle
1411                .control(twin.clone())
1412                .register(0, TEST_QUOTA)
1413                .await
1414                .unwrap();
1415            let peer_a_for_router = peer_a.clone();
1416            let peer_b_for_router = peer_b.clone();
1417            let (mut twin_primary_sender, mut twin_secondary_sender) =
1418                twin_sender.split_with(move |origin, _, _| match origin {
1419                    SplitOrigin::Primary => Some(Recipients::One(peer_a_for_router.clone())),
1420                    SplitOrigin::Secondary => Some(Recipients::One(peer_b_for_router.clone())),
1421                });
1422            let peer_a_for_recv = peer_a.clone();
1423            let peer_b_for_recv = peer_b.clone();
1424            let (mut twin_primary_recv, mut twin_secondary_recv) = twin_receiver.split_with(
1425                context.with_label("split_receiver"),
1426                move |(sender, _)| {
1427                    if sender == &peer_a_for_recv {
1428                        SplitTarget::Primary
1429                    } else if sender == &peer_b_for_recv {
1430                        SplitTarget::Secondary
1431                    } else {
1432                        panic!("unexpected sender");
1433                    }
1434                },
1435            );
1436
1437            // Establish bidirectional links
1438            let link = ingress::Link {
1439                latency: Duration::from_millis(0),
1440                jitter: Duration::from_millis(0),
1441                success_rate: 1.0,
1442            };
1443            oracle
1444                .add_link(peer_a.clone(), twin.clone(), link.clone())
1445                .await
1446                .unwrap();
1447            oracle
1448                .add_link(twin.clone(), peer_a.clone(), link.clone())
1449                .await
1450                .unwrap();
1451            oracle
1452                .add_link(peer_b.clone(), twin.clone(), link.clone())
1453                .await
1454                .unwrap();
1455            oracle
1456                .add_link(twin.clone(), peer_b.clone(), link.clone())
1457                .await
1458                .unwrap();
1459
1460            // Send messages in both directions
1461            let msg_a_to_twin = Bytes::from_static(b"from_a");
1462            let msg_b_to_twin = Bytes::from_static(b"from_b");
1463            let msg_primary_out = Bytes::from_static(b"primary_out");
1464            let msg_secondary_out = Bytes::from_static(b"secondary_out");
1465            peer_a_sender
1466                .send(Recipients::One(twin.clone()), msg_a_to_twin.clone(), false)
1467                .await
1468                .unwrap();
1469            peer_b_sender
1470                .send(Recipients::One(twin.clone()), msg_b_to_twin.clone(), false)
1471                .await
1472                .unwrap();
1473            twin_primary_sender
1474                .send(Recipients::All, msg_primary_out.clone(), false)
1475                .await
1476                .unwrap();
1477            twin_secondary_sender
1478                .send(Recipients::All, msg_secondary_out.clone(), false)
1479                .await
1480                .unwrap();
1481
1482            // Verify routing: peer_a messages go to primary, peer_b to secondary
1483            let (sender, payload) = twin_primary_recv.recv().await.unwrap();
1484            assert_eq!(sender, peer_a);
1485            assert_eq!(payload, msg_a_to_twin);
1486            let (sender, payload) = twin_secondary_recv.recv().await.unwrap();
1487            assert_eq!(sender, peer_b);
1488            assert_eq!(payload, msg_b_to_twin);
1489
1490            // Verify routing: primary sends to peer_a, secondary to peer_b
1491            let (sender, payload) = peer_a_recv.recv().await.unwrap();
1492            assert_eq!(sender, twin);
1493            assert_eq!(payload, msg_primary_out);
1494            let (sender, payload) = peer_b_recv.recv().await.unwrap();
1495            assert_eq!(sender, twin);
1496            assert_eq!(payload, msg_secondary_out);
1497        });
1498    }
1499
1500    #[test]
1501    fn test_split_channel_both() {
1502        let executor = deterministic::Runner::default();
1503        executor.start(|context| async move {
1504            let cfg = Config {
1505                max_size: MAX_MESSAGE_SIZE,
1506                disconnect_on_block: true,
1507                tracked_peer_sets: Some(3),
1508            };
1509            let network_context = context.with_label("network");
1510            let (network, mut oracle) = Network::new(network_context.clone(), cfg);
1511            network_context.spawn(|_| network.run());
1512
1513            // Create a "twin" node that will be split, plus a third peer
1514            let twin = ed25519::PrivateKey::from_seed(30).public_key();
1515            let peer_c = ed25519::PrivateKey::from_seed(31).public_key();
1516
1517            // Register all peers
1518            let mut manager = oracle.manager();
1519            manager
1520                .update(0, [twin.clone(), peer_c.clone()].try_into().unwrap())
1521                .await;
1522
1523            // Register normal peer
1524            let (mut peer_c_sender, _peer_c_recv) = oracle
1525                .control(peer_c.clone())
1526                .register(0, TEST_QUOTA)
1527                .await
1528                .unwrap();
1529
1530            // Register and split the twin's channel with a router that sends to Both
1531            let (twin_sender, twin_receiver) = oracle
1532                .control(twin.clone())
1533                .register(0, TEST_QUOTA)
1534                .await
1535                .unwrap();
1536            let (_twin_primary_sender, _twin_secondary_sender) =
1537                twin_sender.split_with(|_origin, recipients, _| Some(recipients.clone()));
1538            let (mut twin_primary_recv, mut twin_secondary_recv) = twin_receiver
1539                .split_with(context.with_label("split_receiver_both"), |_| {
1540                    SplitTarget::Both
1541                });
1542
1543            // Establish bidirectional links
1544            let link = ingress::Link {
1545                latency: Duration::from_millis(0),
1546                jitter: Duration::from_millis(0),
1547                success_rate: 1.0,
1548            };
1549            oracle
1550                .add_link(peer_c.clone(), twin.clone(), link.clone())
1551                .await
1552                .unwrap();
1553            oracle
1554                .add_link(twin.clone(), peer_c.clone(), link)
1555                .await
1556                .unwrap();
1557
1558            // Send a message from peer_c to twin
1559            let msg_both = Bytes::from_static(b"to_both");
1560            peer_c_sender
1561                .send(Recipients::One(twin.clone()), msg_both.clone(), false)
1562                .await
1563                .unwrap();
1564
1565            // Verify both receivers get the message
1566            let (sender, payload) = twin_primary_recv.recv().await.unwrap();
1567            assert_eq!(sender, peer_c);
1568            assert_eq!(payload, msg_both);
1569            let (sender, payload) = twin_secondary_recv.recv().await.unwrap();
1570            assert_eq!(sender, peer_c);
1571            assert_eq!(payload, msg_both);
1572        });
1573    }
1574
1575    #[test]
1576    fn test_split_channel_none() {
1577        let executor = deterministic::Runner::default();
1578        executor.start(|context| async move {
1579            let cfg = Config {
1580                max_size: MAX_MESSAGE_SIZE,
1581                disconnect_on_block: true,
1582                tracked_peer_sets: Some(3),
1583            };
1584            let network_context = context.with_label("network");
1585            let (network, mut oracle) = Network::new(network_context.clone(), cfg);
1586            network_context.spawn(|_| network.run());
1587
1588            // Create a "twin" node that will be split, plus a third peer
1589            let twin = ed25519::PrivateKey::from_seed(30).public_key();
1590            let peer_c = ed25519::PrivateKey::from_seed(31).public_key();
1591
1592            // Register all peers
1593            let mut manager = oracle.manager();
1594            manager
1595                .update(0, [twin.clone(), peer_c.clone()].try_into().unwrap())
1596                .await;
1597
1598            // Register normal peer
1599            let (mut peer_c_sender, _peer_c_recv) = oracle
1600                .control(peer_c.clone())
1601                .register(0, TEST_QUOTA)
1602                .await
1603                .unwrap();
1604
1605            // Register and split the twin's channel with a router that sends to Both
1606            let (twin_sender, twin_receiver) = oracle
1607                .control(twin.clone())
1608                .register(0, TEST_QUOTA)
1609                .await
1610                .unwrap();
1611            let (mut twin_primary_sender, mut twin_secondary_sender) =
1612                twin_sender.split_with(|_origin, _, _| None);
1613            let (mut twin_primary_recv, mut twin_secondary_recv) = twin_receiver
1614                .split_with(context.with_label("split_receiver_both"), |_| {
1615                    SplitTarget::None
1616                });
1617
1618            // Establish bidirectional links
1619            let link = ingress::Link {
1620                latency: Duration::from_millis(0),
1621                jitter: Duration::from_millis(0),
1622                success_rate: 1.0,
1623            };
1624            oracle
1625                .add_link(peer_c.clone(), twin.clone(), link.clone())
1626                .await
1627                .unwrap();
1628            oracle
1629                .add_link(twin.clone(), peer_c.clone(), link)
1630                .await
1631                .unwrap();
1632
1633            // Send a message from peer_c to twin
1634            let msg_both = Bytes::from_static(b"to_both");
1635            let sent = peer_c_sender
1636                .send(Recipients::One(twin.clone()), msg_both.clone(), false)
1637                .await
1638                .unwrap();
1639            assert_eq!(sent.len(), 1);
1640            assert_eq!(sent[0], twin);
1641
1642            // Verify both receivers get the message
1643            context.sleep(Duration::from_millis(100)).await;
1644            assert!(twin_primary_recv.recv().now_or_never().is_none());
1645            assert!(twin_secondary_recv.recv().now_or_never().is_none());
1646
1647            // Send a message from twin to peer_c
1648            let msg_both = Bytes::from_static(b"to_both");
1649            let sent = twin_primary_sender
1650                .send(Recipients::One(peer_c.clone()), msg_both.clone(), false)
1651                .await
1652                .unwrap();
1653            assert_eq!(sent.len(), 0);
1654
1655            // Send a message from twin to peer_c
1656            let msg_both = Bytes::from_static(b"to_both");
1657            let sent = twin_secondary_sender
1658                .send(Recipients::One(peer_c.clone()), msg_both.clone(), false)
1659                .await
1660                .unwrap();
1661            assert_eq!(sent.len(), 0);
1662        });
1663    }
1664
1665    #[test]
1666    fn test_unordered_peer_sets() {
1667        let executor = deterministic::Runner::default();
1668        executor.start(|context| async move {
1669            let cfg = Config {
1670                max_size: MAX_MESSAGE_SIZE,
1671                disconnect_on_block: true,
1672                tracked_peer_sets: Some(3),
1673            };
1674            let network_context = context.with_label("network");
1675            let (network, oracle) = Network::new(network_context.clone(), cfg);
1676            network_context.spawn(|_| network.run());
1677
1678            // Create two public keys
1679            let pk1 = ed25519::PrivateKey::from_seed(1).public_key();
1680            let pk2 = ed25519::PrivateKey::from_seed(2).public_key();
1681
1682            // Subscribe to peer sets
1683            let mut manager = oracle.manager();
1684            let mut subscription = manager.subscribe().await;
1685
1686            // Register initial peer set
1687            manager
1688                .update(10, [pk1.clone(), pk2.clone()].try_into().unwrap())
1689                .await;
1690            let (id, new, all) = subscription.next().await.unwrap();
1691            assert_eq!(id, 10);
1692            assert_eq!(new.len(), 2);
1693            assert_eq!(all.len(), 2);
1694
1695            // Register old peer sets (ignored)
1696            let pk3 = ed25519::PrivateKey::from_seed(3).public_key();
1697            manager.update(9, [pk3.clone()].try_into().unwrap()).await;
1698
1699            // Add new peer set
1700            let pk4 = ed25519::PrivateKey::from_seed(4).public_key();
1701            manager.update(11, [pk4.clone()].try_into().unwrap()).await;
1702            let (id, new, all) = subscription.next().await.unwrap();
1703            assert_eq!(id, 11);
1704            assert_eq!(new, [pk4.clone()].try_into().unwrap());
1705            assert_eq!(all, [pk1, pk2, pk4].try_into().unwrap());
1706        });
1707    }
1708
1709    #[test]
1710    fn test_get_next_socket() {
1711        let cfg = Config {
1712            max_size: MAX_MESSAGE_SIZE,
1713            disconnect_on_block: true,
1714            tracked_peer_sets: None,
1715        };
1716        let runner = deterministic::Runner::default();
1717
1718        runner.start(|context| async move {
1719            type PublicKey = ed25519::PublicKey;
1720            let (mut network, _) =
1721                Network::<deterministic::Context, PublicKey>::new(context.clone(), cfg);
1722
1723            // Test that the next socket address is incremented correctly
1724            let mut original = network.next_addr;
1725            let next = network.get_next_socket();
1726            assert_eq!(next, original);
1727            let next = network.get_next_socket();
1728            original.set_port(1);
1729            assert_eq!(next, original);
1730
1731            // Test that the port number overflows correctly
1732            let max_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(255, 0, 255, 255)), 65535);
1733            network.next_addr = max_addr;
1734            let next = network.get_next_socket();
1735            assert_eq!(next, max_addr);
1736            let next = network.get_next_socket();
1737            assert_eq!(
1738                next,
1739                SocketAddr::new(IpAddr::V4(Ipv4Addr::new(255, 1, 0, 0)), 0)
1740            );
1741        });
1742    }
1743
1744    #[test]
1745    fn test_fifo_burst_same_recipient() {
1746        let cfg = Config {
1747            max_size: MAX_MESSAGE_SIZE,
1748            disconnect_on_block: true,
1749            tracked_peer_sets: Some(3),
1750        };
1751        let runner = deterministic::Runner::default();
1752
1753        runner.start(|context| async move {
1754            let (network, mut oracle) = Network::new(context.with_label("network"), cfg);
1755            let network_handle = network.start();
1756
1757            let sender_pk = ed25519::PrivateKey::from_seed(10).public_key();
1758            let recipient_pk = ed25519::PrivateKey::from_seed(11).public_key();
1759
1760            let mut manager = oracle.manager();
1761            manager
1762                .update(
1763                    0,
1764                    [sender_pk.clone(), recipient_pk.clone()]
1765                        .try_into()
1766                        .unwrap(),
1767                )
1768                .await;
1769            let (mut sender, _sender_recv) = oracle
1770                .control(sender_pk.clone())
1771                .register(0, TEST_QUOTA)
1772                .await
1773                .unwrap();
1774            let (_sender2, mut receiver) = oracle
1775                .control(recipient_pk.clone())
1776                .register(0, TEST_QUOTA)
1777                .await
1778                .unwrap();
1779
1780            oracle
1781                .limit_bandwidth(sender_pk.clone(), Some(5_000), None)
1782                .await
1783                .unwrap();
1784            oracle
1785                .limit_bandwidth(recipient_pk.clone(), None, Some(5_000))
1786                .await
1787                .unwrap();
1788
1789            oracle
1790                .add_link(
1791                    sender_pk.clone(),
1792                    recipient_pk.clone(),
1793                    ingress::Link {
1794                        latency: Duration::from_millis(0),
1795                        jitter: Duration::from_millis(0),
1796                        success_rate: 1.0,
1797                    },
1798                )
1799                .await
1800                .unwrap();
1801
1802            const COUNT: usize = 50;
1803            let mut expected = Vec::with_capacity(COUNT);
1804            for i in 0..COUNT {
1805                let msg = Bytes::from(vec![i as u8; 64]);
1806                sender
1807                    .send(Recipients::One(recipient_pk.clone()), msg.clone(), false)
1808                    .await
1809                    .unwrap();
1810                expected.push(msg);
1811            }
1812
1813            for expected_msg in expected {
1814                let (_pk, bytes) = receiver.recv().await.unwrap();
1815                assert_eq!(bytes, expected_msg);
1816            }
1817
1818            drop(oracle);
1819            drop(sender);
1820            network_handle.abort();
1821        });
1822    }
1823
1824    #[test]
1825    fn test_broadcast_respects_transmit_latency() {
1826        let cfg = Config {
1827            max_size: MAX_MESSAGE_SIZE,
1828            disconnect_on_block: true,
1829            tracked_peer_sets: Some(3),
1830        };
1831        let runner = deterministic::Runner::default();
1832
1833        runner.start(|context| async move {
1834            let (network, mut oracle) = Network::new(context.with_label("network"), cfg);
1835            let network_handle = network.start();
1836
1837            let sender_pk = ed25519::PrivateKey::from_seed(42).public_key();
1838            let recipient_a = ed25519::PrivateKey::from_seed(43).public_key();
1839            let recipient_b = ed25519::PrivateKey::from_seed(44).public_key();
1840
1841            let mut manager = oracle.manager();
1842            manager
1843                .update(
1844                    0,
1845                    [sender_pk.clone(), recipient_a.clone(), recipient_b.clone()]
1846                        .try_into()
1847                        .unwrap(),
1848                )
1849                .await;
1850            let (mut sender, _recv_sender) = oracle
1851                .control(sender_pk.clone())
1852                .register(0, TEST_QUOTA)
1853                .await
1854                .unwrap();
1855            let (_sender2, mut recv_a) = oracle
1856                .control(recipient_a.clone())
1857                .register(0, TEST_QUOTA)
1858                .await
1859                .unwrap();
1860            let (_sender3, mut recv_b) = oracle
1861                .control(recipient_b.clone())
1862                .register(0, TEST_QUOTA)
1863                .await
1864                .unwrap();
1865
1866            oracle
1867                .limit_bandwidth(sender_pk.clone(), Some(1_000), None)
1868                .await
1869                .unwrap();
1870            oracle
1871                .limit_bandwidth(recipient_a.clone(), None, Some(1_000))
1872                .await
1873                .unwrap();
1874            oracle
1875                .limit_bandwidth(recipient_b.clone(), None, Some(1_000))
1876                .await
1877                .unwrap();
1878
1879            let link = ingress::Link {
1880                latency: Duration::from_millis(0),
1881                jitter: Duration::from_millis(0),
1882                success_rate: 1.0,
1883            };
1884            oracle
1885                .add_link(sender_pk.clone(), recipient_a.clone(), link.clone())
1886                .await
1887                .unwrap();
1888            oracle
1889                .add_link(sender_pk.clone(), recipient_b.clone(), link)
1890                .await
1891                .unwrap();
1892
1893            let big_msg = Bytes::from(vec![7u8; 10_000]);
1894            let start = context.current();
1895            sender
1896                .send(Recipients::All, big_msg.clone(), false)
1897                .await
1898                .unwrap();
1899
1900            let (_pk, received_a) = recv_a.recv().await.unwrap();
1901            assert_eq!(received_a, big_msg);
1902            let elapsed_a = context.current().duration_since(start).unwrap();
1903            assert!(elapsed_a >= Duration::from_secs(20));
1904
1905            let (_pk, received_b) = recv_b.recv().await.unwrap();
1906            assert_eq!(received_b, big_msg);
1907            let elapsed_b = context.current().duration_since(start).unwrap();
1908            assert!(elapsed_b >= Duration::from_secs(20));
1909
1910            // Because bandwidth is shared, the two messages should take about the same time
1911            assert!(elapsed_a.abs_diff(elapsed_b) <= Duration::from_secs(1));
1912
1913            drop(oracle);
1914            drop(sender);
1915            network_handle.abort();
1916        });
1917    }
1918}