commonware_p2p/simulated/
network.rs

1//! Implementation of a simulated p2p network.
2
3use super::{
4    ingress::{self, Oracle},
5    metrics,
6    transmitter::{self, Completion},
7    Error,
8};
9use crate::{
10    authenticated::UnboundedMailbox,
11    utils::limited::{CheckedSender as LimitedCheckedSender, Connected, LimitedSender},
12    Channel, Message, Recipients, UnlimitedSender as _,
13};
14use bytes::{Buf, Bytes};
15use commonware_codec::{DecodeExt, FixedSize};
16use commonware_cryptography::PublicKey;
17use commonware_macros::{select, select_loop};
18use commonware_runtime::{
19    spawn_cell, Clock, ContextCell, Handle, Listener as _, Metrics, Network as RNetwork, Quota,
20    Spawner,
21};
22use commonware_stream::utils::codec::{recv_frame, send_frame};
23use commonware_utils::{
24    channels::{fallible::FallibleExt, ring},
25    ordered::Set,
26    NZUsize, TryCollect,
27};
28use either::Either;
29use futures::{
30    channel::{mpsc, oneshot},
31    future, SinkExt, StreamExt,
32};
33use prometheus_client::metrics::{counter::Counter, family::Family};
34use rand::Rng;
35use rand_distr::{Distribution, Normal};
36use std::{
37    collections::{BTreeMap, BTreeSet, HashMap},
38    fmt::Debug,
39    net::{IpAddr, Ipv4Addr, SocketAddr},
40    time::{Duration, SystemTime},
41};
42use tracing::{debug, error, trace, warn};
43
44/// Task type representing a message to be sent within the network.
45type Task<P> = (Channel, P, Recipients<P>, Bytes, oneshot::Sender<Vec<P>>);
46
47/// Target for a message in a split receiver.
48#[derive(Clone, Copy, Debug, PartialEq, Eq)]
49#[must_use]
50pub enum SplitTarget {
51    None,
52    Primary,
53    Secondary,
54    Both,
55}
56
57/// Origin of a message in a split sender.
58#[derive(Clone, Copy, Debug, PartialEq, Eq)]
59#[must_use]
60pub enum SplitOrigin {
61    Primary,
62    Secondary,
63}
64
65/// A function that forwards messages from [SplitOrigin] to [Recipients].
66pub trait SplitForwarder<P: PublicKey>:
67    Fn(SplitOrigin, &Recipients<P>, &Bytes) -> Option<Recipients<P>> + Send + Sync + Clone + 'static
68{
69}
70
71impl<P: PublicKey, F> SplitForwarder<P> for F where
72    F: Fn(SplitOrigin, &Recipients<P>, &Bytes) -> Option<Recipients<P>>
73        + Send
74        + Sync
75        + Clone
76        + 'static
77{
78}
79
80/// A function that routes incoming [Message]s to a [SplitTarget].
81pub trait SplitRouter<P: PublicKey>:
82    Fn(&Message<P>) -> SplitTarget + Send + Sync + 'static
83{
84}
85
86impl<P: PublicKey, F> SplitRouter<P> for F where
87    F: Fn(&Message<P>) -> SplitTarget + Send + Sync + 'static
88{
89}
90
91/// Configuration for the simulated network.
92pub struct Config {
93    /// Maximum size of a message that can be sent over the network.
94    pub max_size: u32,
95
96    /// True if peers should disconnect upon being blocked. While production networking would
97    /// typically disconnect, for testing purposes it may be useful to keep peers connected,
98    /// allowing byzantine actors the ability to continue sending messages.
99    pub disconnect_on_block: bool,
100
101    /// The maximum number of peer sets to track. When a new peer set is registered and this
102    /// limit is exceeded, the oldest peer set is removed. Peers that are no longer in any
103    /// tracked peer set will have their links removed and messages to them will be dropped.
104    ///
105    /// If [None], peer sets are not considered.
106    pub tracked_peer_sets: Option<usize>,
107}
108
109/// Implementation of a simulated network.
110pub struct Network<E: RNetwork + Spawner + Rng + Clock + Metrics, P: PublicKey> {
111    context: ContextCell<E>,
112
113    // Maximum size of a message that can be sent over the network
114    max_size: u32,
115
116    // True if peers should disconnect upon being blocked.
117    // While production networking would typically disconnect, for testing purposes it may be useful
118    // to keep peers connected, allowing byzantine actors the ability to continue sending messages.
119    disconnect_on_block: bool,
120
121    // Next socket address to assign to a new peer
122    // Incremented for each new peer
123    next_addr: SocketAddr,
124
125    // Channel to receive messages from the oracle
126    ingress: mpsc::UnboundedReceiver<ingress::Message<P, E>>,
127
128    // Mailbox for the oracle channel (passed to Senders for PeerSource subscriptions)
129    oracle_mailbox: UnboundedMailbox<ingress::Message<P, E>>,
130
131    // A channel to receive tasks from peers
132    // The sender is cloned and given to each peer
133    // The receiver is polled in the main loop
134    sender: mpsc::UnboundedSender<Task<P>>,
135    receiver: mpsc::UnboundedReceiver<Task<P>>,
136
137    // A map from a pair of public keys (from, to) to a link between the two peers
138    links: HashMap<(P, P), Link>,
139
140    // A map from a public key to a peer
141    peers: BTreeMap<P, Peer<P>>,
142
143    // Peer sets indexed by their ID
144    peer_sets: BTreeMap<u64, Set<P>>,
145
146    // Reference count for each peer (number of peer sets they belong to)
147    peer_refs: BTreeMap<P, usize>,
148
149    // Maximum number of peer sets to track
150    tracked_peer_sets: Option<usize>,
151
152    // A map of peers blocking each other
153    blocks: BTreeSet<(P, P)>,
154
155    // State of the transmitter
156    transmitter: transmitter::State<P>,
157
158    // Subscribers to peer set updates (used by Manager::subscribe())
159    #[allow(clippy::type_complexity)]
160    subscribers: Vec<mpsc::UnboundedSender<(u64, Set<P>, Set<P>)>>,
161
162    // Subscribers to tracked peer list updates (used by PeerSource for LimitedSender)
163    peer_subscribers: Vec<ring::Sender<Vec<P>>>,
164
165    // Metrics for received and sent messages
166    received_messages: Family<metrics::Message, Counter>,
167    sent_messages: Family<metrics::Message, Counter>,
168}
169
170impl<E: RNetwork + Spawner + Rng + Clock + Metrics, P: PublicKey> Network<E, P> {
171    /// Create a new simulated network with a given runtime and configuration.
172    ///
173    /// Returns a tuple containing the network instance and the oracle that can
174    /// be used to modify the state of the network during context.
175    pub fn new(mut context: E, cfg: Config) -> (Self, Oracle<P, E>) {
176        let (sender, receiver) = mpsc::unbounded();
177        let (oracle_mailbox, oracle_receiver) = UnboundedMailbox::new();
178        let sent_messages = Family::<metrics::Message, Counter>::default();
179        let received_messages = Family::<metrics::Message, Counter>::default();
180        context.register("messages_sent", "messages sent", sent_messages.clone());
181        context.register(
182            "messages_received",
183            "messages received",
184            received_messages.clone(),
185        );
186
187        // Start with a pseudo-random IP address to assign sockets to for new peers
188        let next_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::from_bits(context.next_u32())), 0);
189
190        (
191            Self {
192                context: ContextCell::new(context),
193                max_size: cfg.max_size,
194                disconnect_on_block: cfg.disconnect_on_block,
195                tracked_peer_sets: cfg.tracked_peer_sets,
196                next_addr,
197                ingress: oracle_receiver,
198                oracle_mailbox: oracle_mailbox.clone(),
199                sender,
200                receiver,
201                links: HashMap::new(),
202                peers: BTreeMap::new(),
203                peer_sets: BTreeMap::new(),
204                peer_refs: BTreeMap::new(),
205                blocks: BTreeSet::new(),
206                transmitter: transmitter::State::new(),
207                subscribers: Vec::new(),
208                peer_subscribers: Vec::new(),
209                received_messages,
210                sent_messages,
211            },
212            Oracle::new(oracle_mailbox),
213        )
214    }
215
216    /// Returns (and increments) the next available socket address.
217    ///
218    /// The port number is incremented for each call, and the IP address is incremented if the port
219    /// number overflows.
220    fn get_next_socket(&mut self) -> SocketAddr {
221        let result = self.next_addr;
222
223        // Increment the port number, or the IP address if the port number overflows.
224        // Allows the ip address to overflow (wrapping).
225        match self.next_addr.port().checked_add(1) {
226            Some(port) => {
227                self.next_addr.set_port(port);
228            }
229            None => {
230                let ip = match self.next_addr.ip() {
231                    IpAddr::V4(ipv4) => ipv4,
232                    _ => unreachable!(),
233                };
234                let next_ip = Ipv4Addr::to_bits(ip).wrapping_add(1);
235                self.next_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::from_bits(next_ip)), 0);
236            }
237        }
238
239        result
240    }
241
242    /// Handle an ingress message.
243    ///
244    /// This method is called when a message is received from the oracle.
245    async fn handle_ingress(&mut self, message: ingress::Message<P, E>) {
246        // It is important to ensure that no failed receipt of a message will cause us to exit.
247        // This could happen if the caller drops the `Oracle` after updating the network topology.
248        // Thus, we create a helper function to send the result to the oracle and log any errors.
249        fn send_result<T: std::fmt::Debug>(
250            result: oneshot::Sender<Result<T, Error>>,
251            value: Result<T, Error>,
252        ) {
253            let success = value.is_ok();
254            if let Err(e) = result.send(value) {
255                error!(?e, "failed to send result to oracle (ok = {})", success);
256            }
257        }
258
259        match message {
260            ingress::Message::Update { id, peers } => {
261                let Some(tracked_peer_sets) = self.tracked_peer_sets else {
262                    warn!("attempted to register peer set when tracking is disabled");
263                    return;
264                };
265
266                // Check if peer set already exists
267                if self.peer_sets.contains_key(&id) {
268                    warn!(id, "peer set already exists");
269                    return;
270                }
271
272                // Ensure that peer set is monotonically increasing
273                if let Some((last, _)) = self.peer_sets.last_key_value() {
274                    if id <= *last {
275                        warn!(
276                            new_id = id,
277                            old_id = last,
278                            "attempted to register peer set with non-monotonically increasing ID"
279                        );
280                        return;
281                    }
282                }
283
284                // Create and store new peer set
285                for public_key in peers.iter() {
286                    // Create peer if it doesn't exist
287                    self.ensure_peer_exists(public_key).await;
288
289                    // Increment reference count
290                    *self.peer_refs.entry(public_key.clone()).or_insert(0) += 1;
291                }
292                self.peer_sets.insert(id, peers.clone());
293
294                // Remove oldest peer set if we exceed the limit
295                while self.peer_sets.len() > tracked_peer_sets {
296                    let (id, set) = self.peer_sets.pop_first().unwrap();
297                    debug!(id, "removed oldest peer set");
298
299                    // Decrement reference counts and clean up peers/links
300                    for public_key in set.iter() {
301                        let refs = self.peer_refs.get_mut(public_key).unwrap();
302                        *refs = refs.checked_sub(1).expect("reference count underflow");
303
304                        // If peer is no longer in any tracked set, remove it. We explicitly keep the peer around
305                        // in `self.peers` to keep its network alive, in-case the peer re-joins in a future peer set.
306                        if *refs == 0 {
307                            self.peer_refs.remove(public_key);
308                            debug!(?public_key, "removed peer no longer in any tracked set");
309                        }
310                    }
311                }
312
313                // Notify all subscribers about the new peer set
314                let all = self.all_tracked_peers();
315                let notification = (id, peers, all);
316                self.subscribers
317                    .retain(|subscriber| subscriber.send_lossy(notification.clone()));
318
319                // Broadcast updated peer list to LimitedSender subscribers
320                self.broadcast_peer_list().await;
321            }
322            ingress::Message::Register {
323                channel,
324                public_key,
325                quota,
326                result,
327            } => {
328                // If peer does not exist, then create it.
329                let (_, is_new) = self.ensure_peer_exists(&public_key).await;
330
331                // When not using peer sets, broadcast updated peer list to subscribers
332                if is_new && self.peer_sets.is_empty() {
333                    self.broadcast_peer_list().await;
334                }
335
336                // Get clock for the rate limiter
337                let clock = self
338                    .context
339                    .with_label(&format!("rate_limiter_{channel}_{public_key}"))
340                    .take();
341
342                // Create a sender that allows sending messages to the network for a certain channel
343                let (sender, handle) = Sender::new(
344                    self.context.with_label("sender"),
345                    public_key.clone(),
346                    channel,
347                    self.max_size,
348                    self.sender.clone(),
349                    self.oracle_mailbox.clone(),
350                    clock,
351                    quota,
352                );
353
354                // Create a receiver that allows receiving messages from the network for a certain channel
355                let peer = self.peers.get_mut(&public_key).unwrap();
356                let receiver = match peer.register(channel, handle).await {
357                    Ok(receiver) => Receiver { receiver },
358                    Err(err) => return send_result(result, Err(err)),
359                };
360
361                send_result(result, Ok((sender, receiver)))
362            }
363            ingress::Message::PeerSet { id, response } => {
364                if self.peer_sets.is_empty() {
365                    // Return all peers if no peer sets are registered.
366                    let _ = response.send(Some(
367                        self.peers
368                            .keys()
369                            .cloned()
370                            .try_collect()
371                            .expect("BTreeMap keys are unique"),
372                    ));
373                } else {
374                    // Return the peer set at the given index
375                    let _ = response.send(self.peer_sets.get(&id).cloned());
376                }
377            }
378            ingress::Message::Subscribe { sender } => {
379                // Send the latest peer set upon subscription
380                if let Some((index, peers)) = self.peer_sets.last_key_value() {
381                    let all = self.all_tracked_peers();
382                    let notification = (*index, peers.clone(), all);
383                    sender.send_lossy(notification);
384                }
385                self.subscribers.push(sender);
386            }
387            ingress::Message::SubscribeConnected { response } => {
388                // Create a ring channel for the subscriber
389                let (mut sender, receiver) = ring::channel(NZUsize!(1));
390
391                // Send current peer list immediately
392                let peer_list: Vec<P> = self.all_tracked_peers().into_iter().collect();
393                let _ = sender.send(peer_list).await;
394
395                // Store sender for future broadcasts
396                self.peer_subscribers.push(sender);
397
398                // Return the receiver to the subscriber
399                let _ = response.send(receiver);
400            }
401            ingress::Message::LimitBandwidth {
402                public_key,
403                egress_cap,
404                ingress_cap,
405                result,
406            } => {
407                // If peer does not exist, then create it.
408                let (_, is_new) = self.ensure_peer_exists(&public_key).await;
409
410                // When not using peer sets, broadcast updated peer list to subscribers
411                if is_new && self.peer_sets.is_empty() {
412                    self.broadcast_peer_list().await;
413                }
414
415                // Update bandwidth limits
416                let now = self.context.current();
417                let completions = self
418                    .transmitter
419                    .limit(now, &public_key, egress_cap, ingress_cap);
420                self.process_completions(completions);
421
422                // Notify the caller that the bandwidth update has been applied
423                let _ = result.send(());
424            }
425            ingress::Message::AddLink {
426                sender,
427                receiver,
428                sampler,
429                success_rate,
430                result,
431            } => {
432                // If sender or receiver does not exist, then create it.
433                let (_, sender_is_new) = self.ensure_peer_exists(&sender).await;
434                let (receiver_socket, receiver_is_new) = self.ensure_peer_exists(&receiver).await;
435
436                // When not using peer sets, broadcast updated peer list to subscribers
437                if (sender_is_new || receiver_is_new) && self.peer_sets.is_empty() {
438                    self.broadcast_peer_list().await;
439                }
440
441                // Require link to not already exist
442                let key = (sender.clone(), receiver.clone());
443                if self.links.contains_key(&key) {
444                    return send_result(result, Err(Error::LinkExists));
445                }
446
447                let link = Link::new(
448                    &mut self.context,
449                    sender,
450                    receiver,
451                    receiver_socket,
452                    sampler,
453                    success_rate,
454                    self.max_size,
455                    self.received_messages.clone(),
456                );
457                self.links.insert(key, link);
458                send_result(result, Ok(()))
459            }
460            ingress::Message::RemoveLink {
461                sender,
462                receiver,
463                result,
464            } => {
465                match self.links.remove(&(sender, receiver)) {
466                    Some(_) => (),
467                    None => return send_result(result, Err(Error::LinkMissing)),
468                }
469                send_result(result, Ok(()))
470            }
471            ingress::Message::Block { from, to } => {
472                self.blocks.insert((from, to));
473            }
474            ingress::Message::Blocked { result } => {
475                send_result(result, Ok(self.blocks.iter().cloned().collect()))
476            }
477        }
478    }
479
480    /// Ensure a peer exists, creating it if necessary.
481    ///
482    /// Returns the socket address of the peer and a boolean indicating if a new peer was created.
483    async fn ensure_peer_exists(&mut self, public_key: &P) -> (SocketAddr, bool) {
484        if !self.peers.contains_key(public_key) {
485            // Create peer
486            let socket = self.get_next_socket();
487            let peer = Peer::new(
488                self.context.with_label("peer"),
489                public_key.clone(),
490                socket,
491                self.max_size,
492            )
493            .await;
494
495            // Once ready, add to peers
496            self.peers.insert(public_key.clone(), peer);
497
498            (socket, true)
499        } else {
500            (self.peers.get(public_key).unwrap().socket, false)
501        }
502    }
503
504    /// Broadcast updated peer list to all peer subscribers.
505    ///
506    /// This is called when the peer list changes (either from peer set updates
507    /// or from new peers being added when not using peer sets).
508    ///
509    /// Subscribers whose receivers have been dropped are removed to prevent
510    /// memory leaks.
511    async fn broadcast_peer_list(&mut self) {
512        let peer_list = self.all_tracked_peers().into_iter().collect::<Vec<_>>();
513        let mut live_subscribers = Vec::with_capacity(self.peer_subscribers.len());
514        for mut subscriber in self.peer_subscribers.drain(..) {
515            if subscriber.send(peer_list.clone()).await.is_ok() {
516                live_subscribers.push(subscriber);
517            }
518        }
519        self.peer_subscribers = live_subscribers;
520    }
521
522    /// Get all tracked peers as an ordered set.
523    ///
524    /// When peer sets are registered, returns only the peers from those sets.
525    /// Otherwise, returns all registered peers (for compatibility with tests
526    /// that don't use peer sets).
527    fn all_tracked_peers(&self) -> Set<P> {
528        if self.peer_sets.is_empty() && self.tracked_peer_sets.is_none() {
529            self.peers
530                .keys()
531                .cloned()
532                .try_collect()
533                .expect("BTreeMap keys are unique")
534        } else {
535            self.peer_refs
536                .keys()
537                .cloned()
538                .try_collect()
539                .expect("BTreeMap keys are unique")
540        }
541    }
542}
543
544impl<E: RNetwork + Spawner + Rng + Clock + Metrics, P: PublicKey> Network<E, P> {
545    /// Process completions from the transmitter.
546    fn process_completions(&mut self, completions: Vec<Completion<P>>) {
547        for completion in completions {
548            // If there is no message to deliver, then skip
549            let Some(deliver_at) = completion.deliver_at else {
550                trace!(
551                    origin = ?completion.origin,
552                    recipient = ?completion.recipient,
553                    "message dropped before delivery",
554                );
555                continue;
556            };
557
558            // Send message to link
559            let key = (completion.origin.clone(), completion.recipient.clone());
560            let Some(link) = self.links.get_mut(&key) else {
561                // This can happen if the link is removed before the message is delivered
562                trace!(
563                    origin = ?completion.origin,
564                    recipient = ?completion.recipient,
565                    "missing link for completion",
566                );
567                continue;
568            };
569            if let Err(err) = link.send(completion.channel, completion.message, deliver_at) {
570                error!(?err, "failed to send");
571            }
572        }
573    }
574
575    /// Handle a task.
576    ///
577    /// This method is called when a task is received from the sender, which can come from
578    /// any peer in the network.
579    fn handle_task(&mut self, task: Task<P>) {
580        // If peer sets are enabled and we are not in one, ignore the message (we are disconnected from all)
581        let (channel, origin, recipients, message, reply) = task;
582        if self.tracked_peer_sets.is_some() && !self.peer_refs.contains_key(&origin) {
583            warn!(
584                ?origin,
585                reason = "not in tracked peer set",
586                "dropping message"
587            );
588            if let Err(err) = reply.send(Vec::new()) {
589                error!(?err, "failed to send ack");
590            }
591            return;
592        }
593
594        // Collect recipients
595        let recipients = match recipients {
596            Recipients::All => {
597                // If peer sets have been registered, send only to tracked peers
598                // Otherwise, send to all registered peers (compatibility
599                // with tests that do not register peer sets.)
600                if self.peer_sets.is_empty() {
601                    self.peers.keys().cloned().collect()
602                } else {
603                    self.peer_refs.keys().cloned().collect()
604                }
605            }
606            Recipients::Some(keys) => keys,
607            Recipients::One(key) => vec![key],
608        };
609
610        // Send to all recipients
611        let now = self.context.current();
612        let mut sent = Vec::new();
613        for recipient in recipients {
614            // Skip self
615            if recipient == origin {
616                trace!(?recipient, reason = "self", "dropping message");
617                continue;
618            }
619
620            // If tracking peer sets, ensure recipient and sender are in a tracked peer set
621            if self.tracked_peer_sets.is_some() && !self.peer_refs.contains_key(&recipient) {
622                trace!(
623                    ?origin,
624                    ?recipient,
625                    reason = "not in tracked peer set",
626                    "dropping message"
627                );
628                continue;
629            }
630
631            // Determine if the sender or recipient has blocked the other
632            let o_r = (origin.clone(), recipient.clone());
633            let r_o = (recipient.clone(), origin.clone());
634            if self.disconnect_on_block
635                && (self.blocks.contains(&o_r) || self.blocks.contains(&r_o))
636            {
637                trace!(?origin, ?recipient, reason = "blocked", "dropping message");
638                continue;
639            }
640
641            // Determine if there is a link between the origin and recipient
642            let Some(link) = self.links.get_mut(&o_r) else {
643                trace!(?origin, ?recipient, reason = "no link", "dropping message");
644                continue;
645            };
646
647            // Note: Rate limiting is handled by the Sender before messages reach here.
648            // The Sender filters recipients via LimitedSender::check() or in Sender::send().
649
650            // Record sent message as soon as we determine there is a link with recipient (approximates
651            // having an open connection)
652            self.sent_messages
653                .get_or_create(&metrics::Message::new(&origin, &recipient, channel))
654                .inc();
655
656            // Sample latency
657            let latency = Duration::from_millis(link.sampler.sample(&mut self.context) as u64);
658
659            // Determine if the message should be delivered
660            let should_deliver = self.context.gen_bool(link.success_rate);
661
662            // Enqueue message for delivery
663            let completions = self.transmitter.enqueue(
664                now,
665                origin.clone(),
666                recipient.clone(),
667                channel,
668                message.clone(),
669                latency,
670                should_deliver,
671            );
672            self.process_completions(completions);
673
674            sent.push(recipient);
675        }
676
677        // Alert application of sent messages
678        if let Err(err) = reply.send(sent) {
679            error!(?err, "failed to send ack");
680        }
681    }
682
683    /// Run the simulated network.
684    ///
685    /// It is not necessary to invoke this method before modifying the network topology, however,
686    /// no messages will be sent until this method is called.
687    pub fn start(mut self) -> Handle<()> {
688        spawn_cell!(self.context, self.run().await)
689    }
690
691    async fn run(mut self) {
692        loop {
693            let tick = match self.transmitter.next() {
694                Some(when) => Either::Left(self.context.sleep_until(when)),
695                None => Either::Right(future::pending()),
696            };
697            select! {
698                _ = tick => {
699                    let now = self.context.current();
700                    let completions = self.transmitter.advance(now);
701                    self.process_completions(completions);
702                },
703                message = self.ingress.next() => {
704                    // If ingress is closed, exit
705                    let message = match message {
706                        Some(message) => message,
707                        None => break,
708                    };
709                    self.handle_ingress(message).await;
710                },
711                task = self.receiver.next() => {
712                    // If receiver is closed, exit
713                    let task = match task {
714                        Some(task) => task,
715                        None => break,
716                    };
717                    self.handle_task(task);
718                },
719            }
720        }
721    }
722}
723
724/// Provides online peers from the simulated network.
725///
726/// Implements [`crate::utils::limited::Connected`] to provide peer list updates
727/// to [`crate::utils::limited::LimitedSender`].
728pub struct ConnectedPeerProvider<P: PublicKey, E: Clock> {
729    mailbox: UnboundedMailbox<ingress::Message<P, E>>,
730}
731
732impl<P: PublicKey, E: Clock> Clone for ConnectedPeerProvider<P, E> {
733    fn clone(&self) -> Self {
734        Self {
735            mailbox: self.mailbox.clone(),
736        }
737    }
738}
739
740impl<P: PublicKey, E: Clock> ConnectedPeerProvider<P, E> {
741    const fn new(mailbox: UnboundedMailbox<ingress::Message<P, E>>) -> Self {
742        Self { mailbox }
743    }
744}
745
746impl<P: PublicKey, E: Clock> Connected for ConnectedPeerProvider<P, E> {
747    type PublicKey = P;
748
749    async fn subscribe(&mut self) -> ring::Receiver<Vec<Self::PublicKey>> {
750        self.mailbox
751            .0
752            .request(|response| ingress::Message::SubscribeConnected { response })
753            .await
754            .unwrap_or_else(|| {
755                let (_sender, receiver) = ring::channel(NZUsize!(1));
756                receiver
757            })
758    }
759}
760
761/// Implementation of a [crate::Sender] for the simulated network without rate limiting.
762///
763/// This is the inner sender used by [`Sender`] which wraps it with rate limiting.
764#[derive(Clone)]
765pub struct UnlimitedSender<P: PublicKey> {
766    me: P,
767    channel: Channel,
768    max_size: u32,
769    high: mpsc::UnboundedSender<Task<P>>,
770    low: mpsc::UnboundedSender<Task<P>>,
771}
772
773impl<P: PublicKey> crate::UnlimitedSender for UnlimitedSender<P> {
774    type Error = Error;
775    type PublicKey = P;
776
777    async fn send(
778        &mut self,
779        recipients: Recipients<P>,
780        mut message: impl Buf + Send,
781        priority: bool,
782    ) -> Result<Vec<P>, Error> {
783        // Check message size
784        if message.remaining() > self.max_size as usize {
785            return Err(Error::MessageTooLarge(message.remaining()));
786        }
787
788        // Send message
789        let (sender, receiver) = oneshot::channel();
790        let channel = if priority { &self.high } else { &self.low };
791        let message = message.copy_to_bytes(message.remaining());
792        if channel
793            .unbounded_send((self.channel, self.me.clone(), recipients, message, sender))
794            .is_err()
795        {
796            return Ok(Vec::new());
797        }
798        Ok(receiver.await.unwrap_or_default())
799    }
800}
801
802/// Implementation of a [crate::Sender] for the simulated network.
803///
804/// Also implements [crate::LimitedSender] to support rate-limit checking
805/// before sending messages.
806pub struct Sender<P: PublicKey, E: Clock> {
807    limited_sender: LimitedSender<E, UnlimitedSender<P>, ConnectedPeerProvider<P, E>>,
808}
809
810impl<P: PublicKey, E: Clock> Clone for Sender<P, E> {
811    fn clone(&self) -> Self {
812        Self {
813            limited_sender: self.limited_sender.clone(),
814        }
815    }
816}
817
818impl<P: PublicKey, E: Clock> Debug for Sender<P, E> {
819    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
820        f.debug_struct("Sender").finish_non_exhaustive()
821    }
822}
823
824impl<P: PublicKey, E: Clock> Sender<P, E> {
825    #[allow(clippy::too_many_arguments)]
826    fn new(
827        context: impl Spawner + Metrics,
828        me: P,
829        channel: Channel,
830        max_size: u32,
831        mut sender: mpsc::UnboundedSender<Task<P>>,
832        oracle_mailbox: UnboundedMailbox<ingress::Message<P, E>>,
833        clock: E,
834        quota: Quota,
835    ) -> (Self, Handle<()>) {
836        // Listen for messages
837        let (high, mut high_receiver) = mpsc::unbounded();
838        let (low, mut low_receiver) = mpsc::unbounded();
839        let processor = context.with_label("processor").spawn(move |_| async move {
840            loop {
841                // Wait for task
842                let task;
843                select! {
844                    high_task = high_receiver.next() => {
845                        task = match high_task {
846                            Some(task) => task,
847                            None => break,
848                        };
849                    },
850                    low_task = low_receiver.next() => {
851                        task = match low_task {
852                            Some(task) => task,
853                            None => break,
854                        };
855                    }
856                }
857
858                // Send task
859                if let Err(err) = sender.send(task).await {
860                    error!(?err, channel, "failed to send task");
861                }
862            }
863        });
864
865        let unlimited_sender = UnlimitedSender {
866            me,
867            channel,
868            max_size,
869            high,
870            low,
871        };
872        let peer_source = ConnectedPeerProvider::new(oracle_mailbox);
873        let limited_sender = LimitedSender::new(unlimited_sender, quota, clock, peer_source);
874
875        (Self { limited_sender }, processor)
876    }
877
878    /// Split this [Sender] into a [SplitOrigin::Primary] and [SplitOrigin::Secondary] sender.
879    pub fn split_with<F: SplitForwarder<P>>(
880        self,
881        forwarder: F,
882    ) -> (SplitSender<P, E, F>, SplitSender<P, E, F>) {
883        (
884            SplitSender {
885                replica: SplitOrigin::Primary,
886                inner: self.clone(),
887                forwarder: forwarder.clone(),
888            },
889            SplitSender {
890                replica: SplitOrigin::Secondary,
891                inner: self,
892                forwarder,
893            },
894        )
895    }
896}
897
898impl<P: PublicKey, E: Clock> crate::LimitedSender for Sender<P, E> {
899    type PublicKey = P;
900    type Checked<'a>
901        = crate::utils::limited::CheckedSender<'a, UnlimitedSender<P>>
902    where
903        Self: 'a;
904
905    async fn check(
906        &mut self,
907        recipients: Recipients<Self::PublicKey>,
908    ) -> Result<Self::Checked<'_>, SystemTime> {
909        self.limited_sender.check(recipients).await
910    }
911}
912
913/// A sender that routes recipients per message via a user-provided function.
914pub struct SplitSender<P: PublicKey, E: Clock, F: SplitForwarder<P>> {
915    replica: SplitOrigin,
916    inner: Sender<P, E>,
917    forwarder: F,
918}
919
920impl<P: PublicKey, E: Clock, F: SplitForwarder<P>> Clone for SplitSender<P, E, F> {
921    fn clone(&self) -> Self {
922        Self {
923            replica: self.replica,
924            inner: self.inner.clone(),
925            forwarder: self.forwarder.clone(),
926        }
927    }
928}
929
930impl<P: PublicKey, E: Clock, F: SplitForwarder<P>> std::fmt::Debug for SplitSender<P, E, F> {
931    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
932        f.debug_struct("SplitSender")
933            .field("replica", &self.replica)
934            .field("inner", &self.inner)
935            .finish()
936    }
937}
938
939impl<P: PublicKey, E: Clock, F: SplitForwarder<P>> crate::LimitedSender for SplitSender<P, E, F> {
940    type PublicKey = P;
941    type Checked<'a> = SplitCheckedSender<'a, P, E, F>;
942
943    async fn check(
944        &mut self,
945        recipients: Recipients<Self::PublicKey>,
946    ) -> Result<Self::Checked<'_>, SystemTime> {
947        Ok(SplitCheckedSender {
948            // Perform a rate limit check with the entire set of original recipients although
949            // the forwarder may filter these (based on message content) during send.
950            checked: self.inner.limited_sender.check(recipients.clone()).await?,
951            replica: self.replica,
952            forwarder: self.forwarder.clone(),
953            recipients,
954
955            _phantom: std::marker::PhantomData,
956        })
957    }
958}
959
960/// A checked sender for [`SplitSender`] that defers the forwarder call to send time.
961///
962/// This is necessary because [`SplitForwarder`] may examine message content to determine
963/// routing, but the message is not available at [`LimitedSender::check`] time.
964pub struct SplitCheckedSender<'a, P: PublicKey, E: Clock, F: SplitForwarder<P>> {
965    checked: LimitedCheckedSender<'a, UnlimitedSender<P>>,
966    replica: SplitOrigin,
967    forwarder: F,
968    recipients: Recipients<P>,
969
970    _phantom: std::marker::PhantomData<E>,
971}
972
973impl<'a, P: PublicKey, E: Clock, F: SplitForwarder<P>> crate::CheckedSender
974    for SplitCheckedSender<'a, P, E, F>
975{
976    type PublicKey = P;
977    type Error = Error;
978
979    async fn send(
980        self,
981        mut message: impl Buf + Send,
982        priority: bool,
983    ) -> Result<Vec<Self::PublicKey>, Self::Error> {
984        // Convert to Bytes here since forwarder needs to inspect the message
985        let message = message.copy_to_bytes(message.remaining());
986
987        // Determine the set of recipients that will receive the message
988        let Some(recipients) = (self.forwarder)(self.replica, &self.recipients, &message) else {
989            return Ok(Vec::new());
990        };
991
992        // Extract the inner sender and send directly with the new recipients
993        //
994        // While SplitForwarder does not enforce any relationship between the original recipients
995        // and the new recipients, it is typically some subset of the original recipients. This
996        // means we may over-rate limit some recipients (who are never actually sent a message here) but
997        // we prefer this to not providing feedback at all (we would have to skip check entirely).
998        self.checked
999            .into_inner()
1000            .send(recipients, message, priority)
1001            .await
1002    }
1003}
1004
1005type MessageReceiver<P> = mpsc::UnboundedReceiver<Message<P>>;
1006
1007/// Implementation of a [crate::Receiver] for the simulated network.
1008#[derive(Debug)]
1009pub struct Receiver<P: PublicKey> {
1010    receiver: MessageReceiver<P>,
1011}
1012
1013impl<P: PublicKey> crate::Receiver for Receiver<P> {
1014    type Error = Error;
1015    type PublicKey = P;
1016
1017    async fn recv(&mut self) -> Result<Message<Self::PublicKey>, Error> {
1018        self.receiver.next().await.ok_or(Error::NetworkClosed)
1019    }
1020}
1021
1022impl<P: PublicKey> Receiver<P> {
1023    /// Split this [Receiver] into a [SplitTarget::Primary] and [SplitTarget::Secondary] receiver.
1024    pub fn split_with<E: Spawner, R: SplitRouter<P>>(
1025        mut self,
1026        context: E,
1027        router: R,
1028    ) -> (Self, Self) {
1029        let (mut primary_tx, primary_rx) = mpsc::unbounded();
1030        let (mut secondary_tx, secondary_rx) = mpsc::unbounded();
1031        context.spawn(move |_| async move {
1032            while let Some(message) = self.receiver.next().await {
1033                // Route message to the appropriate target
1034                let direction = router(&message);
1035                match direction {
1036                    SplitTarget::None => {}
1037                    SplitTarget::Primary => {
1038                        if let Err(err) = primary_tx.send(message).await {
1039                            error!(?err, "failed to send message to primary");
1040                        }
1041                    }
1042                    SplitTarget::Secondary => {
1043                        if let Err(err) = secondary_tx.send(message).await {
1044                            error!(?err, "failed to send message to secondary");
1045                        }
1046                    }
1047                    SplitTarget::Both => {
1048                        if let Err(err) = primary_tx.send(message.clone()).await {
1049                            error!(?err, "failed to send message to primary");
1050                        }
1051                        if let Err(err) = secondary_tx.send(message).await {
1052                            error!(?err, "failed to send message to secondary");
1053                        }
1054                    }
1055                }
1056
1057                // Exit if both channels are closed
1058                if primary_tx.is_closed() && secondary_tx.is_closed() {
1059                    break;
1060                }
1061            }
1062        });
1063
1064        (
1065            Self {
1066                receiver: primary_rx,
1067            },
1068            Self {
1069                receiver: secondary_rx,
1070            },
1071        )
1072    }
1073}
1074
1075/// A peer in the simulated network.
1076///
1077/// The peer can register channels, which allows it to receive messages sent to the channel from other peers.
1078struct Peer<P: PublicKey> {
1079    // Socket address that the peer is listening on
1080    socket: SocketAddr,
1081
1082    // Control to register new channels
1083    control: mpsc::UnboundedSender<(Channel, Handle<()>, oneshot::Sender<MessageReceiver<P>>)>,
1084}
1085
1086impl<P: PublicKey> Peer<P> {
1087    /// Create and return a new peer.
1088    ///
1089    /// The peer will listen for incoming connections on the given `socket` address.
1090    /// `max_size` is the maximum size of a message that can be sent to the peer.
1091    async fn new<E: Spawner + RNetwork + Metrics + Clock>(
1092        context: E,
1093        public_key: P,
1094        socket: SocketAddr,
1095        max_size: u32,
1096    ) -> Self {
1097        // The control is used to register channels.
1098        // There is exactly one mailbox created for each channel that the peer is registered for.
1099        let (control_sender, mut control_receiver) = mpsc::unbounded();
1100
1101        // Whenever a message is received from a peer, it is placed in the inbox.
1102        // The router polls the inbox and forwards the message to the appropriate mailbox.
1103        let (inbox_sender, mut inbox_receiver) = mpsc::unbounded();
1104
1105        // Spawn router
1106        context.with_label("router").spawn(|context| async move {
1107            // Map of channels to mailboxes (senders to particular channels)
1108            let mut mailboxes = HashMap::new();
1109
1110            // Continually listen for control messages and outbound messages
1111            select_loop! {
1112                context,
1113                on_stopped => {},
1114                // Listen for control messages, which are used to register channels
1115                control = control_receiver.next() => {
1116                    // If control is closed, exit
1117                    let (channel, sender, result_tx): (Channel, Handle<()>, oneshot::Sender<MessageReceiver<P>>) = match control {
1118                        Some(control) => control,
1119                        None => break,
1120                    };
1121
1122                    // Register channel
1123                    let (receiver_tx, receiver_rx) = mpsc::unbounded();
1124                    if let Some((_, existing_sender)) = mailboxes.insert(channel, (receiver_tx, sender)) {
1125                        warn!(?public_key, ?channel, "overwriting existing channel");
1126                        existing_sender.abort();
1127                    }
1128                    result_tx.send(receiver_rx).unwrap();
1129                },
1130
1131                // Listen for messages from the inbox, which are forwarded to the appropriate mailbox
1132                inbox = inbox_receiver.next() => {
1133                    // If inbox is closed, exit
1134                    let (channel, message) = match inbox {
1135                        Some(message) => message,
1136                        None => break,
1137                    };
1138
1139                    // Send message to mailbox
1140                    match mailboxes.get_mut(&channel) {
1141                        Some((receiver_tx, _)) => {
1142                            if let Err(err) = receiver_tx.send(message).await {
1143                                debug!(?err, "failed to send message to mailbox");
1144                            }
1145                        }
1146                        None => {
1147                            trace!(
1148                                recipient = ?public_key,
1149                                channel,
1150                                reason = "missing channel",
1151                                "dropping message",
1152                            );
1153                        }
1154                    }
1155                },
1156            }
1157        });
1158
1159        // Spawn a task that accepts new connections and spawns a task for each connection
1160        let (ready_tx, ready_rx) = oneshot::channel();
1161        context
1162            .with_label("listener")
1163            .spawn(move |context| async move {
1164                // Initialize listener
1165                let mut listener = context.bind(socket).await.unwrap();
1166                let _ = ready_tx.send(());
1167
1168                // Continually accept new connections
1169                while let Ok((_, _, mut stream)) = listener.accept().await {
1170                    // New connection accepted. Spawn a task for this connection
1171                    context.with_label("receiver").spawn({
1172                        let mut inbox_sender = inbox_sender.clone();
1173                        move |_| async move {
1174                            // Receive dialer's public key as a handshake
1175                            let dialer = match recv_frame(&mut stream, max_size).await {
1176                                Ok(data) => data,
1177                                Err(_) => {
1178                                    error!("failed to receive public key from dialer");
1179                                    return;
1180                                }
1181                            };
1182                            let Ok(dialer) = P::decode(dialer.as_ref()) else {
1183                                error!("received public key is invalid");
1184                                return;
1185                            };
1186
1187                            // Continually receive messages from the dialer and send them to the inbox
1188                            while let Ok(data) = recv_frame(&mut stream, max_size).await {
1189                                let channel = Channel::from_be_bytes(
1190                                    data[..Channel::SIZE].try_into().unwrap(),
1191                                );
1192                                let message = data.slice(Channel::SIZE..);
1193                                if let Err(err) = inbox_sender
1194                                    .send((channel, (dialer.clone(), message)))
1195                                    .await
1196                                {
1197                                    debug!(?err, "failed to send message to mailbox");
1198                                    break;
1199                                }
1200                            }
1201                        }
1202                    });
1203                }
1204            });
1205
1206        // Wait for listener to start before returning
1207        let _ = ready_rx.await;
1208
1209        // Return peer
1210        Self {
1211            socket,
1212            control: control_sender,
1213        }
1214    }
1215
1216    /// Register a channel with the peer.
1217    ///
1218    /// This allows the peer to receive messages sent to the channel.
1219    /// Returns a receiver that can be used to receive messages sent to the channel.
1220    async fn register(
1221        &mut self,
1222        channel: Channel,
1223        sender: Handle<()>,
1224    ) -> Result<MessageReceiver<P>, Error> {
1225        let (result_tx, result_rx) = oneshot::channel();
1226        self.control
1227            .send((channel, sender, result_tx))
1228            .await
1229            .map_err(|_| Error::NetworkClosed)?;
1230        result_rx.await.map_err(|_| Error::NetworkClosed)
1231    }
1232}
1233
1234// A unidirectional link between two peers.
1235// Messages can be sent over the link with a given latency, jitter, and success rate.
1236struct Link {
1237    sampler: Normal<f64>,
1238    success_rate: f64,
1239    // Messages with their receive time for ordered delivery
1240    inbox: mpsc::UnboundedSender<(Channel, Bytes, SystemTime)>,
1241}
1242
1243/// Buffered payload waiting for earlier messages on the same link to complete.
1244impl Link {
1245    #[allow(clippy::too_many_arguments)]
1246    fn new<E: Spawner + RNetwork + Clock + Metrics, P: PublicKey>(
1247        context: &mut E,
1248        dialer: P,
1249        receiver: P,
1250        socket: SocketAddr,
1251        sampler: Normal<f64>,
1252        success_rate: f64,
1253        max_size: u32,
1254        received_messages: Family<metrics::Message, Counter>,
1255    ) -> Self {
1256        // Spawn a task that will wait for messages to be sent to the link and then send them
1257        // over the network.
1258        let (inbox, mut outbox) = mpsc::unbounded::<(Channel, Bytes, SystemTime)>();
1259        context.with_label("link").spawn(move |context| async move {
1260            // Dial the peer and handshake by sending it the dialer's public key
1261            let (mut sink, _) = context.dial(socket).await.unwrap();
1262            if let Err(err) = send_frame(&mut sink, dialer.as_ref(), max_size).await {
1263                error!(?err, "failed to send public key to listener");
1264                return;
1265            }
1266
1267            // Process messages in order, waiting for their receive time
1268            while let Some((channel, message, receive_complete_at)) = outbox.next().await {
1269                // Wait until the message should arrive at receiver
1270                context.sleep_until(receive_complete_at).await;
1271
1272                // Send the message
1273                let data = Bytes::from_owner(channel.to_be_bytes()).chain(message);
1274                let _ = send_frame(&mut sink, data, max_size).await;
1275
1276                // Bump received messages metric
1277                received_messages
1278                    .get_or_create(&metrics::Message::new(&dialer, &receiver, channel))
1279                    .inc();
1280            }
1281        });
1282
1283        Self {
1284            sampler,
1285            success_rate,
1286            inbox,
1287        }
1288    }
1289
1290    // Send a message over the link with receive timing.
1291    fn send(
1292        &mut self,
1293        channel: Channel,
1294        message: Bytes,
1295        receive_complete_at: SystemTime,
1296    ) -> Result<(), Error> {
1297        self.inbox
1298            .unbounded_send((channel, message, receive_complete_at))
1299            .map_err(|_| Error::NetworkClosed)?;
1300        Ok(())
1301    }
1302}
1303
1304#[cfg(test)]
1305mod tests {
1306    use super::*;
1307    use crate::{Manager, Receiver as _, Recipients, Sender as _};
1308    use bytes::Bytes;
1309    use commonware_cryptography::{ed25519, Signer as _};
1310    use commonware_runtime::{deterministic, Quota, Runner as _};
1311    use futures::FutureExt;
1312    use std::num::NonZeroU32;
1313
1314    const MAX_MESSAGE_SIZE: u32 = 1024 * 1024;
1315
1316    /// Default rate limit set high enough to not interfere with normal operation
1317    const TEST_QUOTA: Quota = Quota::per_second(NonZeroU32::MAX);
1318
1319    #[test]
1320    fn test_register_and_link() {
1321        let executor = deterministic::Runner::default();
1322        executor.start(|context| async move {
1323            let cfg = Config {
1324                max_size: MAX_MESSAGE_SIZE,
1325                disconnect_on_block: true,
1326                tracked_peer_sets: Some(3),
1327            };
1328            let network_context = context.with_label("network");
1329            let (network, oracle) = Network::new(network_context.clone(), cfg);
1330            network_context.spawn(|_| network.run());
1331
1332            // Create two public keys
1333            let pk1 = ed25519::PrivateKey::from_seed(1).public_key();
1334            let pk2 = ed25519::PrivateKey::from_seed(2).public_key();
1335
1336            // Register the peer set
1337            let mut manager = oracle.manager();
1338            manager
1339                .update(0, [pk1.clone(), pk2.clone()].try_into().unwrap())
1340                .await;
1341            let control = oracle.control(pk1.clone());
1342            control.register(0, TEST_QUOTA).await.unwrap();
1343            control.register(1, TEST_QUOTA).await.unwrap();
1344            let control = oracle.control(pk2.clone());
1345            control.register(0, TEST_QUOTA).await.unwrap();
1346            control.register(1, TEST_QUOTA).await.unwrap();
1347
1348            // Overwrite if registering again
1349            control.register(1, TEST_QUOTA).await.unwrap();
1350
1351            // Add link
1352            let link = ingress::Link {
1353                latency: Duration::from_millis(2),
1354                jitter: Duration::from_millis(1),
1355                success_rate: 0.9,
1356            };
1357            oracle
1358                .add_link(pk1.clone(), pk2.clone(), link.clone())
1359                .await
1360                .unwrap();
1361
1362            // Expect error when adding link again
1363            assert!(matches!(
1364                oracle.add_link(pk1, pk2, link).await,
1365                Err(Error::LinkExists)
1366            ));
1367        });
1368    }
1369
1370    #[test]
1371    fn test_split_channel_single() {
1372        let executor = deterministic::Runner::default();
1373        executor.start(|context| async move {
1374            let cfg = Config {
1375                max_size: MAX_MESSAGE_SIZE,
1376                disconnect_on_block: true,
1377                tracked_peer_sets: Some(3),
1378            };
1379            let network_context = context.with_label("network");
1380            let (network, oracle) = Network::new(network_context.clone(), cfg);
1381            network_context.spawn(|_| network.run());
1382
1383            // Create a "twin" node that will be split, plus two normal peers
1384            let twin = ed25519::PrivateKey::from_seed(20).public_key();
1385            let peer_a = ed25519::PrivateKey::from_seed(21).public_key();
1386            let peer_b = ed25519::PrivateKey::from_seed(22).public_key();
1387
1388            // Register all peers
1389            let mut manager = oracle.manager();
1390            manager
1391                .update(
1392                    0,
1393                    [twin.clone(), peer_a.clone(), peer_b.clone()]
1394                        .try_into()
1395                        .unwrap(),
1396                )
1397                .await;
1398
1399            // Register normal peers
1400            let (mut peer_a_sender, mut peer_a_recv) = oracle
1401                .control(peer_a.clone())
1402                .register(0, TEST_QUOTA)
1403                .await
1404                .unwrap();
1405            let (mut peer_b_sender, mut peer_b_recv) = oracle
1406                .control(peer_b.clone())
1407                .register(0, TEST_QUOTA)
1408                .await
1409                .unwrap();
1410
1411            // Register and split the twin's channel:
1412            // - Primary sends only to peer_a
1413            // - Secondary sends only to peer_b
1414            // - Messages from peer_a go to primary receiver
1415            // - Messages from peer_b go to secondary receiver
1416            let (twin_sender, twin_receiver) = oracle
1417                .control(twin.clone())
1418                .register(0, TEST_QUOTA)
1419                .await
1420                .unwrap();
1421            let peer_a_for_router = peer_a.clone();
1422            let peer_b_for_router = peer_b.clone();
1423            let (mut twin_primary_sender, mut twin_secondary_sender) =
1424                twin_sender.split_with(move |origin, _, _| match origin {
1425                    SplitOrigin::Primary => Some(Recipients::One(peer_a_for_router.clone())),
1426                    SplitOrigin::Secondary => Some(Recipients::One(peer_b_for_router.clone())),
1427                });
1428            let peer_a_for_recv = peer_a.clone();
1429            let peer_b_for_recv = peer_b.clone();
1430            let (mut twin_primary_recv, mut twin_secondary_recv) = twin_receiver.split_with(
1431                context.with_label("split_receiver"),
1432                move |(sender, _)| {
1433                    if sender == &peer_a_for_recv {
1434                        SplitTarget::Primary
1435                    } else if sender == &peer_b_for_recv {
1436                        SplitTarget::Secondary
1437                    } else {
1438                        panic!("unexpected sender");
1439                    }
1440                },
1441            );
1442
1443            // Establish bidirectional links
1444            let link = ingress::Link {
1445                latency: Duration::from_millis(0),
1446                jitter: Duration::from_millis(0),
1447                success_rate: 1.0,
1448            };
1449            oracle
1450                .add_link(peer_a.clone(), twin.clone(), link.clone())
1451                .await
1452                .unwrap();
1453            oracle
1454                .add_link(twin.clone(), peer_a.clone(), link.clone())
1455                .await
1456                .unwrap();
1457            oracle
1458                .add_link(peer_b.clone(), twin.clone(), link.clone())
1459                .await
1460                .unwrap();
1461            oracle
1462                .add_link(twin.clone(), peer_b.clone(), link.clone())
1463                .await
1464                .unwrap();
1465
1466            // Send messages in both directions
1467            let msg_a_to_twin = Bytes::from_static(b"from_a");
1468            let msg_b_to_twin = Bytes::from_static(b"from_b");
1469            let msg_primary_out = Bytes::from_static(b"primary_out");
1470            let msg_secondary_out = Bytes::from_static(b"secondary_out");
1471            peer_a_sender
1472                .send(Recipients::One(twin.clone()), msg_a_to_twin.clone(), false)
1473                .await
1474                .unwrap();
1475            peer_b_sender
1476                .send(Recipients::One(twin.clone()), msg_b_to_twin.clone(), false)
1477                .await
1478                .unwrap();
1479            twin_primary_sender
1480                .send(Recipients::All, msg_primary_out.clone(), false)
1481                .await
1482                .unwrap();
1483            twin_secondary_sender
1484                .send(Recipients::All, msg_secondary_out.clone(), false)
1485                .await
1486                .unwrap();
1487
1488            // Verify routing: peer_a messages go to primary, peer_b to secondary
1489            let (sender, payload) = twin_primary_recv.recv().await.unwrap();
1490            assert_eq!(sender, peer_a);
1491            assert_eq!(payload, msg_a_to_twin);
1492            let (sender, payload) = twin_secondary_recv.recv().await.unwrap();
1493            assert_eq!(sender, peer_b);
1494            assert_eq!(payload, msg_b_to_twin);
1495
1496            // Verify routing: primary sends to peer_a, secondary to peer_b
1497            let (sender, payload) = peer_a_recv.recv().await.unwrap();
1498            assert_eq!(sender, twin);
1499            assert_eq!(payload, msg_primary_out);
1500            let (sender, payload) = peer_b_recv.recv().await.unwrap();
1501            assert_eq!(sender, twin);
1502            assert_eq!(payload, msg_secondary_out);
1503        });
1504    }
1505
1506    #[test]
1507    fn test_split_channel_both() {
1508        let executor = deterministic::Runner::default();
1509        executor.start(|context| async move {
1510            let cfg = Config {
1511                max_size: MAX_MESSAGE_SIZE,
1512                disconnect_on_block: true,
1513                tracked_peer_sets: Some(3),
1514            };
1515            let network_context = context.with_label("network");
1516            let (network, oracle) = Network::new(network_context.clone(), cfg);
1517            network_context.spawn(|_| network.run());
1518
1519            // Create a "twin" node that will be split, plus a third peer
1520            let twin = ed25519::PrivateKey::from_seed(30).public_key();
1521            let peer_c = ed25519::PrivateKey::from_seed(31).public_key();
1522
1523            // Register all peers
1524            let mut manager = oracle.manager();
1525            manager
1526                .update(0, [twin.clone(), peer_c.clone()].try_into().unwrap())
1527                .await;
1528
1529            // Register normal peer
1530            let (mut peer_c_sender, _peer_c_recv) = oracle
1531                .control(peer_c.clone())
1532                .register(0, TEST_QUOTA)
1533                .await
1534                .unwrap();
1535
1536            // Register and split the twin's channel with a router that sends to Both
1537            let (twin_sender, twin_receiver) = oracle
1538                .control(twin.clone())
1539                .register(0, TEST_QUOTA)
1540                .await
1541                .unwrap();
1542            let (_twin_primary_sender, _twin_secondary_sender) =
1543                twin_sender.split_with(|_origin, recipients, _| Some(recipients.clone()));
1544            let (mut twin_primary_recv, mut twin_secondary_recv) = twin_receiver
1545                .split_with(context.with_label("split_receiver_both"), |_| {
1546                    SplitTarget::Both
1547                });
1548
1549            // Establish bidirectional links
1550            let link = ingress::Link {
1551                latency: Duration::from_millis(0),
1552                jitter: Duration::from_millis(0),
1553                success_rate: 1.0,
1554            };
1555            oracle
1556                .add_link(peer_c.clone(), twin.clone(), link.clone())
1557                .await
1558                .unwrap();
1559            oracle
1560                .add_link(twin.clone(), peer_c.clone(), link)
1561                .await
1562                .unwrap();
1563
1564            // Send a message from peer_c to twin
1565            let msg_both = Bytes::from_static(b"to_both");
1566            peer_c_sender
1567                .send(Recipients::One(twin.clone()), msg_both.clone(), false)
1568                .await
1569                .unwrap();
1570
1571            // Verify both receivers get the message
1572            let (sender, payload) = twin_primary_recv.recv().await.unwrap();
1573            assert_eq!(sender, peer_c);
1574            assert_eq!(payload, msg_both);
1575            let (sender, payload) = twin_secondary_recv.recv().await.unwrap();
1576            assert_eq!(sender, peer_c);
1577            assert_eq!(payload, msg_both);
1578        });
1579    }
1580
1581    #[test]
1582    fn test_split_channel_none() {
1583        let executor = deterministic::Runner::default();
1584        executor.start(|context| async move {
1585            let cfg = Config {
1586                max_size: MAX_MESSAGE_SIZE,
1587                disconnect_on_block: true,
1588                tracked_peer_sets: Some(3),
1589            };
1590            let network_context = context.with_label("network");
1591            let (network, oracle) = Network::new(network_context.clone(), cfg);
1592            network_context.spawn(|_| network.run());
1593
1594            // Create a "twin" node that will be split, plus a third peer
1595            let twin = ed25519::PrivateKey::from_seed(30).public_key();
1596            let peer_c = ed25519::PrivateKey::from_seed(31).public_key();
1597
1598            // Register all peers
1599            let mut manager = oracle.manager();
1600            manager
1601                .update(0, [twin.clone(), peer_c.clone()].try_into().unwrap())
1602                .await;
1603
1604            // Register normal peer
1605            let (mut peer_c_sender, _peer_c_recv) = oracle
1606                .control(peer_c.clone())
1607                .register(0, TEST_QUOTA)
1608                .await
1609                .unwrap();
1610
1611            // Register and split the twin's channel with a router that sends to Both
1612            let (twin_sender, twin_receiver) = oracle
1613                .control(twin.clone())
1614                .register(0, TEST_QUOTA)
1615                .await
1616                .unwrap();
1617            let (mut twin_primary_sender, mut twin_secondary_sender) =
1618                twin_sender.split_with(|_origin, _, _| None);
1619            let (mut twin_primary_recv, mut twin_secondary_recv) = twin_receiver
1620                .split_with(context.with_label("split_receiver_both"), |_| {
1621                    SplitTarget::None
1622                });
1623
1624            // Establish bidirectional links
1625            let link = ingress::Link {
1626                latency: Duration::from_millis(0),
1627                jitter: Duration::from_millis(0),
1628                success_rate: 1.0,
1629            };
1630            oracle
1631                .add_link(peer_c.clone(), twin.clone(), link.clone())
1632                .await
1633                .unwrap();
1634            oracle
1635                .add_link(twin.clone(), peer_c.clone(), link)
1636                .await
1637                .unwrap();
1638
1639            // Send a message from peer_c to twin
1640            let msg_both = Bytes::from_static(b"to_both");
1641            let sent = peer_c_sender
1642                .send(Recipients::One(twin.clone()), msg_both.clone(), false)
1643                .await
1644                .unwrap();
1645            assert_eq!(sent.len(), 1);
1646            assert_eq!(sent[0], twin);
1647
1648            // Verify both receivers get the message
1649            context.sleep(Duration::from_millis(100)).await;
1650            assert!(twin_primary_recv.recv().now_or_never().is_none());
1651            assert!(twin_secondary_recv.recv().now_or_never().is_none());
1652
1653            // Send a message from twin to peer_c
1654            let msg_both = Bytes::from_static(b"to_both");
1655            let sent = twin_primary_sender
1656                .send(Recipients::One(peer_c.clone()), msg_both.clone(), false)
1657                .await
1658                .unwrap();
1659            assert_eq!(sent.len(), 0);
1660
1661            // Send a message from twin to peer_c
1662            let msg_both = Bytes::from_static(b"to_both");
1663            let sent = twin_secondary_sender
1664                .send(Recipients::One(peer_c.clone()), msg_both.clone(), false)
1665                .await
1666                .unwrap();
1667            assert_eq!(sent.len(), 0);
1668        });
1669    }
1670
1671    #[test]
1672    fn test_unordered_peer_sets() {
1673        let executor = deterministic::Runner::default();
1674        executor.start(|context| async move {
1675            let cfg = Config {
1676                max_size: MAX_MESSAGE_SIZE,
1677                disconnect_on_block: true,
1678                tracked_peer_sets: Some(3),
1679            };
1680            let network_context = context.with_label("network");
1681            let (network, oracle) = Network::new(network_context.clone(), cfg);
1682            network_context.spawn(|_| network.run());
1683
1684            // Create two public keys
1685            let pk1 = ed25519::PrivateKey::from_seed(1).public_key();
1686            let pk2 = ed25519::PrivateKey::from_seed(2).public_key();
1687
1688            // Subscribe to peer sets
1689            let mut manager = oracle.manager();
1690            let mut subscription = manager.subscribe().await;
1691
1692            // Register initial peer set
1693            manager
1694                .update(10, [pk1.clone(), pk2.clone()].try_into().unwrap())
1695                .await;
1696            let (id, new, all) = subscription.next().await.unwrap();
1697            assert_eq!(id, 10);
1698            assert_eq!(new.len(), 2);
1699            assert_eq!(all.len(), 2);
1700
1701            // Register old peer sets (ignored)
1702            let pk3 = ed25519::PrivateKey::from_seed(3).public_key();
1703            manager.update(9, [pk3.clone()].try_into().unwrap()).await;
1704
1705            // Add new peer set
1706            let pk4 = ed25519::PrivateKey::from_seed(4).public_key();
1707            manager.update(11, [pk4.clone()].try_into().unwrap()).await;
1708            let (id, new, all) = subscription.next().await.unwrap();
1709            assert_eq!(id, 11);
1710            assert_eq!(new, [pk4.clone()].try_into().unwrap());
1711            assert_eq!(all, [pk1, pk2, pk4].try_into().unwrap());
1712        });
1713    }
1714
1715    #[test]
1716    fn test_get_next_socket() {
1717        let cfg = Config {
1718            max_size: MAX_MESSAGE_SIZE,
1719            disconnect_on_block: true,
1720            tracked_peer_sets: None,
1721        };
1722        let runner = deterministic::Runner::default();
1723
1724        runner.start(|context| async move {
1725            type PublicKey = ed25519::PublicKey;
1726            let (mut network, _) =
1727                Network::<deterministic::Context, PublicKey>::new(context.clone(), cfg);
1728
1729            // Test that the next socket address is incremented correctly
1730            let mut original = network.next_addr;
1731            let next = network.get_next_socket();
1732            assert_eq!(next, original);
1733            let next = network.get_next_socket();
1734            original.set_port(1);
1735            assert_eq!(next, original);
1736
1737            // Test that the port number overflows correctly
1738            let max_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(255, 0, 255, 255)), 65535);
1739            network.next_addr = max_addr;
1740            let next = network.get_next_socket();
1741            assert_eq!(next, max_addr);
1742            let next = network.get_next_socket();
1743            assert_eq!(
1744                next,
1745                SocketAddr::new(IpAddr::V4(Ipv4Addr::new(255, 1, 0, 0)), 0)
1746            );
1747        });
1748    }
1749
1750    #[test]
1751    fn test_fifo_burst_same_recipient() {
1752        let cfg = Config {
1753            max_size: MAX_MESSAGE_SIZE,
1754            disconnect_on_block: true,
1755            tracked_peer_sets: Some(3),
1756        };
1757        let runner = deterministic::Runner::default();
1758
1759        runner.start(|context| async move {
1760            let (network, oracle) = Network::new(context.with_label("network"), cfg);
1761            let network_handle = network.start();
1762
1763            let sender_pk = ed25519::PrivateKey::from_seed(10).public_key();
1764            let recipient_pk = ed25519::PrivateKey::from_seed(11).public_key();
1765
1766            let mut manager = oracle.manager();
1767            manager
1768                .update(
1769                    0,
1770                    [sender_pk.clone(), recipient_pk.clone()]
1771                        .try_into()
1772                        .unwrap(),
1773                )
1774                .await;
1775            let (mut sender, _sender_recv) = oracle
1776                .control(sender_pk.clone())
1777                .register(0, TEST_QUOTA)
1778                .await
1779                .unwrap();
1780            let (_sender2, mut receiver) = oracle
1781                .control(recipient_pk.clone())
1782                .register(0, TEST_QUOTA)
1783                .await
1784                .unwrap();
1785
1786            oracle
1787                .limit_bandwidth(sender_pk.clone(), Some(5_000), None)
1788                .await
1789                .unwrap();
1790            oracle
1791                .limit_bandwidth(recipient_pk.clone(), None, Some(5_000))
1792                .await
1793                .unwrap();
1794
1795            oracle
1796                .add_link(
1797                    sender_pk.clone(),
1798                    recipient_pk.clone(),
1799                    ingress::Link {
1800                        latency: Duration::from_millis(0),
1801                        jitter: Duration::from_millis(0),
1802                        success_rate: 1.0,
1803                    },
1804                )
1805                .await
1806                .unwrap();
1807
1808            const COUNT: usize = 50;
1809            let mut expected = Vec::with_capacity(COUNT);
1810            for i in 0..COUNT {
1811                let msg = Bytes::from(vec![i as u8; 64]);
1812                sender
1813                    .send(Recipients::One(recipient_pk.clone()), msg.clone(), false)
1814                    .await
1815                    .unwrap();
1816                expected.push(msg);
1817            }
1818
1819            for expected_msg in expected {
1820                let (_pk, bytes) = receiver.recv().await.unwrap();
1821                assert_eq!(bytes, expected_msg);
1822            }
1823
1824            drop(oracle);
1825            drop(sender);
1826            network_handle.abort();
1827        });
1828    }
1829
1830    #[test]
1831    fn test_broadcast_respects_transmit_latency() {
1832        let cfg = Config {
1833            max_size: MAX_MESSAGE_SIZE,
1834            disconnect_on_block: true,
1835            tracked_peer_sets: Some(3),
1836        };
1837        let runner = deterministic::Runner::default();
1838
1839        runner.start(|context| async move {
1840            let (network, oracle) = Network::new(context.with_label("network"), cfg);
1841            let network_handle = network.start();
1842
1843            let sender_pk = ed25519::PrivateKey::from_seed(42).public_key();
1844            let recipient_a = ed25519::PrivateKey::from_seed(43).public_key();
1845            let recipient_b = ed25519::PrivateKey::from_seed(44).public_key();
1846
1847            let mut manager = oracle.manager();
1848            manager
1849                .update(
1850                    0,
1851                    [sender_pk.clone(), recipient_a.clone(), recipient_b.clone()]
1852                        .try_into()
1853                        .unwrap(),
1854                )
1855                .await;
1856            let (mut sender, _recv_sender) = oracle
1857                .control(sender_pk.clone())
1858                .register(0, TEST_QUOTA)
1859                .await
1860                .unwrap();
1861            let (_sender2, mut recv_a) = oracle
1862                .control(recipient_a.clone())
1863                .register(0, TEST_QUOTA)
1864                .await
1865                .unwrap();
1866            let (_sender3, mut recv_b) = oracle
1867                .control(recipient_b.clone())
1868                .register(0, TEST_QUOTA)
1869                .await
1870                .unwrap();
1871
1872            oracle
1873                .limit_bandwidth(sender_pk.clone(), Some(1_000), None)
1874                .await
1875                .unwrap();
1876            oracle
1877                .limit_bandwidth(recipient_a.clone(), None, Some(1_000))
1878                .await
1879                .unwrap();
1880            oracle
1881                .limit_bandwidth(recipient_b.clone(), None, Some(1_000))
1882                .await
1883                .unwrap();
1884
1885            let link = ingress::Link {
1886                latency: Duration::from_millis(0),
1887                jitter: Duration::from_millis(0),
1888                success_rate: 1.0,
1889            };
1890            oracle
1891                .add_link(sender_pk.clone(), recipient_a.clone(), link.clone())
1892                .await
1893                .unwrap();
1894            oracle
1895                .add_link(sender_pk.clone(), recipient_b.clone(), link)
1896                .await
1897                .unwrap();
1898
1899            let big_msg = Bytes::from(vec![7u8; 10_000]);
1900            let start = context.current();
1901            sender
1902                .send(Recipients::All, big_msg.clone(), false)
1903                .await
1904                .unwrap();
1905
1906            let (_pk, received_a) = recv_a.recv().await.unwrap();
1907            assert_eq!(received_a, big_msg);
1908            let elapsed_a = context.current().duration_since(start).unwrap();
1909            assert!(elapsed_a >= Duration::from_secs(20));
1910
1911            let (_pk, received_b) = recv_b.recv().await.unwrap();
1912            assert_eq!(received_b, big_msg);
1913            let elapsed_b = context.current().duration_since(start).unwrap();
1914            assert!(elapsed_b >= Duration::from_secs(20));
1915
1916            // Because bandwidth is shared, the two messages should take about the same time
1917            assert!(elapsed_a.abs_diff(elapsed_b) <= Duration::from_secs(1));
1918
1919            drop(oracle);
1920            drop(sender);
1921            network_handle.abort();
1922        });
1923    }
1924}