Skip to main content

commonware_p2p/simulated/
network.rs

1//! Implementation of a simulated p2p network.
2
3use super::{
4    ingress::{self, Oracle},
5    metrics,
6    transmitter::{self, Completion},
7    Error,
8};
9use crate::{
10    utils::{
11        limited::{CheckedSender as LimitedCheckedSender, Connected, LimitedSender},
12        PeerSetsAtIndex as PeerSetsAtIndexBase,
13    },
14    Channel, Message as NetworkMessage, PeerSetUpdate, Recipients, TrackedPeers,
15    UnlimitedSender as _,
16};
17use commonware_actor::{Feedback, Unreliable};
18use commonware_codec::{DecodeExt, FixedSize};
19use commonware_cryptography::PublicKey;
20use commonware_macros::select_loop;
21use commonware_runtime::{
22    spawn_cell,
23    telemetry::metrics::{CounterFamily, MetricsExt as _},
24    Clock, ContextCell, Handle, IoBuf, IoBufs, Listener as _, Metrics, Network as RNetwork, Quota,
25    Spawner,
26};
27use commonware_stream::utils::codec::{recv_frame, send_frame};
28use commonware_utils::{
29    channel::{fallible::FallibleExt, mpsc, oneshot, ring},
30    ordered::Set,
31    NZUsize, TryCollect,
32};
33use either::Either;
34use futures::{future, Sink};
35use rand::Rng;
36use rand_distr::{Distribution, Normal};
37use std::{
38    collections::{BTreeMap, BTreeSet, HashMap, VecDeque},
39    fmt::Debug,
40    net::{IpAddr, Ipv4Addr, SocketAddr},
41    num::NonZeroUsize,
42    pin::Pin,
43    sync::{
44        atomic::{AtomicBool, Ordering},
45        Arc,
46    },
47    time::{Duration, SystemTime},
48};
49use tracing::{debug, error, trace, warn};
50
51/// Primary and secondary [`Set`] at one peer set index.
52type PeerSetsAtIndex<P> = PeerSetsAtIndexBase<Set<P>, Set<P>>;
53
54/// Task type representing a message to be sent within the network.
55type Task<P> = (Channel, P, Recipients<P>, IoBuf);
56
57struct RegistrationGuard {
58    active: Arc<AtomicBool>,
59}
60
61impl Drop for RegistrationGuard {
62    fn drop(&mut self) {
63        self.active.store(false, Ordering::Release);
64    }
65}
66
67/// Target for a message in a split receiver.
68#[derive(Clone, Copy, Debug, PartialEq, Eq)]
69#[must_use]
70pub enum SplitTarget {
71    None,
72    Primary,
73    Secondary,
74    Both,
75}
76
77/// Origin of a message in a split sender.
78#[derive(Clone, Copy, Debug, PartialEq, Eq)]
79#[must_use]
80pub enum SplitOrigin {
81    Primary,
82    Secondary,
83}
84
85/// A function that forwards messages from [SplitOrigin] to [Recipients].
86pub trait SplitForwarder<P: PublicKey>:
87    Fn(SplitOrigin, &Recipients<P>, &IoBuf) -> Option<Recipients<P>> + Send + Sync + Clone + 'static
88{
89}
90
91impl<P: PublicKey, F> SplitForwarder<P> for F where
92    F: Fn(SplitOrigin, &Recipients<P>, &IoBuf) -> Option<Recipients<P>>
93        + Send
94        + Sync
95        + Clone
96        + 'static
97{
98}
99
100/// A function that routes incoming [NetworkMessage]s to a [SplitTarget].
101pub trait SplitRouter<P: PublicKey>:
102    Fn(&NetworkMessage<P>) -> SplitTarget + Send + Sync + 'static
103{
104}
105
106impl<P: PublicKey, F> SplitRouter<P> for F where
107    F: Fn(&NetworkMessage<P>) -> SplitTarget + Send + Sync + 'static
108{
109}
110
111/// Reference counts for how many tracked peer sets list a peer as primary vs secondary.
112#[derive(Clone, Copy, Default)]
113struct PeerRefCounts {
114    primary: usize,
115    secondary: usize,
116}
117
118/// Configuration for the simulated network.
119pub struct Config {
120    /// Maximum size of a message that can be sent over the network.
121    pub max_size: u32,
122
123    /// True if peers should disconnect upon being blocked. While production networking would
124    /// typically disconnect, for testing purposes it may be useful to keep peers connected,
125    /// allowing byzantine actors the ability to continue sending messages.
126    pub disconnect_on_block: bool,
127
128    /// The maximum number of peer sets to track (`tracked_peer_sets`). When a new peer set is
129    /// tracked and this limit is exceeded, the oldest peer set is removed. Peers that are no
130    /// longer in any tracked peer set will have their links removed and messages to them will be
131    /// dropped.
132    pub tracked_peer_sets: NonZeroUsize,
133}
134
135/// Implementation of a simulated network.
136pub struct Network<E: RNetwork + Spawner + Rng + Clock + Metrics, P: PublicKey> {
137    context: ContextCell<E>,
138
139    // Maximum size of a message that can be sent over the network
140    max_size: u32,
141
142    // True if peers should disconnect upon being blocked.
143    // While production networking would typically disconnect, for testing purposes it may be useful
144    // to keep peers connected, allowing byzantine actors the ability to continue sending messages.
145    disconnect_on_block: bool,
146
147    // Next socket address to assign to a new peer
148    // Incremented for each new peer
149    next_addr: SocketAddr,
150
151    // Channel to receive messages from the oracle and peer sources.
152    ingress: mpsc::UnboundedReceiver<ingress::Message<P, E>>,
153
154    // Sender for peer sources to subscribe through the main ingress path.
155    ingress_sender: mpsc::UnboundedSender<ingress::Message<P, E>>,
156
157    // A map from a pair of public keys (from, to) to a link between the two peers
158    links: HashMap<(P, P), Link>,
159
160    // A map from a public key to a peer
161    peers: BTreeMap<P, Peer<P>>,
162
163    // Primary and secondary peer sets indexed by peer set ID.
164    peer_sets: BTreeMap<u64, PeerSetsAtIndex<P>>,
165
166    // Per-peer reference counts across tracked peer sets (entry removed when both are zero).
167    peer_ref_counts: BTreeMap<P, PeerRefCounts>,
168
169    // Maximum number of peer sets to track.
170    tracked_peer_sets: NonZeroUsize,
171
172    // A map of peers blocking each other
173    blocks: BTreeSet<(P, P)>,
174
175    // State of the transmitter
176    transmitter: transmitter::State<P>,
177
178    // Subscribers to primary peer set updates (used by `Manager::subscribe`).
179    subscribers: Vec<mpsc::UnboundedSender<PeerSetUpdate<P>>>,
180
181    // Subscribers to the connectable peer list (used by PeerSource for LimitedSender)
182    peer_subscribers: Vec<(P, ring::Sender<Vec<P>>)>,
183
184    // Metrics for received and sent messages
185    received_messages: CounterFamily<metrics::Message>,
186    sent_messages: CounterFamily<metrics::Message>,
187}
188
189impl<E: RNetwork + Spawner + Rng + Clock + Metrics, P: PublicKey> Network<E, P> {
190    /// Create a new simulated network with a given runtime and configuration.
191    ///
192    /// Returns a tuple containing the network instance and the oracle that can
193    /// be used to modify the state of the network during context.
194    pub fn new(mut context: E, cfg: Config) -> (Self, Oracle<P, E>) {
195        let (oracle_mailbox, oracle_receiver) = mpsc::unbounded_channel();
196        let sent_messages = context.family("messages_sent", "messages sent");
197        let received_messages = context.family("messages_received", "messages received");
198
199        // Start with a pseudo-random IP address to assign sockets to for new peers
200        let next_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::from_bits(context.next_u32())), 0);
201
202        (
203            Self {
204                context: ContextCell::new(context),
205                max_size: cfg.max_size,
206                disconnect_on_block: cfg.disconnect_on_block,
207                tracked_peer_sets: cfg.tracked_peer_sets,
208                next_addr,
209                ingress: oracle_receiver,
210                ingress_sender: oracle_mailbox.clone(),
211                links: HashMap::new(),
212                peers: BTreeMap::new(),
213                peer_sets: BTreeMap::new(),
214                peer_ref_counts: BTreeMap::new(),
215                blocks: BTreeSet::new(),
216                transmitter: transmitter::State::new(),
217                subscribers: Vec::new(),
218                peer_subscribers: Vec::new(),
219                received_messages,
220                sent_messages,
221            },
222            Oracle::new(oracle_mailbox),
223        )
224    }
225
226    /// Create a new simulated network with an initial primary peer set.
227    ///
228    /// This is a convenience for test setups that would otherwise call
229    /// [`crate::Manager::track`] immediately after construction.
230    pub async fn new_with_peers<I>(context: E, cfg: Config, peers: I) -> (Self, Oracle<P, E>)
231    where
232        I: IntoIterator<Item = P>,
233    {
234        Self::new_with_split_peers(context, cfg, peers, std::iter::empty()).await
235    }
236
237    /// Create a new simulated network with primary and secondary peers split into two sets.
238    ///
239    /// Peers are tracked at peer set ID `0` as [`TrackedPeers`], matching the most common test
240    /// setup.
241    pub async fn new_with_split_peers<I, J>(
242        context: E,
243        cfg: Config,
244        primary: I,
245        secondary: J,
246    ) -> (Self, Oracle<P, E>)
247    where
248        I: IntoIterator<Item = P>,
249        J: IntoIterator<Item = P>,
250    {
251        let (mut network, oracle) = Self::new(context, cfg);
252        network
253            .register_tracked_peer_set(
254                0,
255                TrackedPeers::new(
256                    Set::from_iter_dedup(primary),
257                    Set::from_iter_dedup(secondary),
258                ),
259            )
260            .await;
261        (network, oracle)
262    }
263
264    /// Apply a tracked peer set to network state.
265    async fn register_tracked_peer_set(&mut self, id: u64, peers: TrackedPeers<P>) -> bool {
266        let primary = peers.primary;
267        let secondary = peers.secondary;
268        let tracked_peer_sets = self.tracked_peer_sets;
269
270        // Check if peer set already exists
271        if self.peer_sets.contains_key(&id) {
272            warn!(id, "peer set already exists");
273            return false;
274        }
275
276        // Ensure that peer set is monotonically increasing
277        if let Some((last, _)) = self.peer_sets.last_key_value() {
278            if id <= *last {
279                warn!(
280                    new_id = id,
281                    old_id = last,
282                    "attempted to register peer set with non-monotonically increasing ID"
283                );
284                return false;
285            }
286        }
287
288        // Create and store new primary peer set.
289        for public_key in primary.iter() {
290            self.ensure_peer_exists(public_key).await;
291            self.peer_ref_counts
292                .entry(public_key.clone())
293                .or_default()
294                .primary += 1;
295        }
296
297        // Secondary peers: Peers in both roles count only as primary.
298        let secondary_filtered = Set::from_iter_dedup(
299            secondary
300                .iter()
301                .filter(|s| primary.position(s).is_none())
302                .cloned(),
303        );
304        for public_key in secondary_filtered.iter() {
305            self.ensure_peer_exists(public_key).await;
306            self.peer_ref_counts
307                .entry(public_key.clone())
308                .or_default()
309                .secondary += 1;
310        }
311        self.peer_sets.insert(
312            id,
313            PeerSetsAtIndex {
314                primary: primary.clone(),
315                secondary: secondary_filtered,
316            },
317        );
318
319        // Remove oldest tracked peer sets if we exceed the limit.
320        while self.peer_sets.len() > tracked_peer_sets.get() {
321            let (removed_index, sets) = self.peer_sets.pop_first().unwrap();
322            debug!(index = removed_index, "removed oldest tracked peer sets");
323
324            for public_key in sets.primary.iter() {
325                let counts = self
326                    .peer_ref_counts
327                    .get_mut(public_key)
328                    .expect("reference map out of sync with peer sets");
329                counts.primary = counts
330                    .primary
331                    .checked_sub(1)
332                    .expect("reference count underflow");
333                if counts.primary == 0 && counts.secondary == 0 {
334                    self.peer_ref_counts.remove(public_key);
335                    debug!(
336                        ?public_key,
337                        "removed peer no longer in any tracked peer set"
338                    );
339                }
340            }
341
342            for public_key in sets.secondary.iter() {
343                let counts = self
344                    .peer_ref_counts
345                    .get_mut(public_key)
346                    .expect("reference map out of sync with peer sets");
347                counts.secondary = counts
348                    .secondary
349                    .checked_sub(1)
350                    .expect("reference count underflow");
351                if counts.primary == 0 && counts.secondary == 0 {
352                    self.peer_ref_counts.remove(public_key);
353                    debug!(
354                        ?public_key,
355                        "removed peer no longer in any tracked peer set"
356                    );
357                }
358            }
359        }
360
361        true
362    }
363
364    /// Returns (and increments) the next available socket address.
365    ///
366    /// The port number is incremented for each call, and the IP address is incremented if the port
367    /// number overflows.
368    fn get_next_socket(&mut self) -> SocketAddr {
369        let result = self.next_addr;
370
371        // Increment the port number, or the IP address if the port number overflows.
372        // Allows the ip address to overflow (wrapping).
373        match self.next_addr.port().checked_add(1) {
374            Some(port) => {
375                self.next_addr.set_port(port);
376            }
377            None => {
378                let ip = match self.next_addr.ip() {
379                    IpAddr::V4(ipv4) => ipv4,
380                    _ => unreachable!(),
381                };
382                let next_ip = Ipv4Addr::to_bits(ip).wrapping_add(1);
383                self.next_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::from_bits(next_ip)), 0);
384            }
385        }
386
387        result
388    }
389
390    /// Handle an ingress message.
391    ///
392    /// This method is called when a message is received from the oracle.
393    async fn handle_ingress(&mut self, message: ingress::Message<P, E>) {
394        // It is important to ensure that no failed receipt of a message will cause us to exit.
395        // This could happen if the caller drops the `Oracle` after updating the network topology.
396        // Thus, we create a helper function to send the result to the oracle and log any errors.
397        fn send_result<T: std::fmt::Debug>(
398            result: oneshot::Sender<Result<T, Error>>,
399            value: Result<T, Error>,
400        ) {
401            let success = value.is_ok();
402            if let Err(e) = result.send(value) {
403                error!(?e, "failed to send result to oracle (ok = {})", success);
404            }
405        }
406
407        match message {
408            ingress::Message::Send {
409                channel,
410                origin,
411                recipients,
412                message,
413                ..
414            } => {
415                self.handle_task((channel, origin, recipients, message));
416            }
417            ingress::Message::Track { id, peers } => {
418                if !self.register_tracked_peer_set(id, peers).await {
419                    return;
420                }
421
422                // Notify all subscribers about the new peer set.
423                let update = self
424                    .latest_update()
425                    .expect("latest update missing after successful track");
426                self.subscribers
427                    .retain(|subscriber| subscriber.send_lossy(update.clone()));
428
429                // Broadcast updated tracked membership to SubscribeConnected subscribers.
430                self.broadcast_peer_list();
431            }
432            ingress::Message::Register {
433                channel,
434                public_key,
435                quota,
436                result,
437            } => {
438                // If peer does not exist, then create it.
439                let _ = self.ensure_peer_exists(&public_key).await;
440
441                // Get clock for the rate limiter
442                let clock = self
443                    .context
444                    .child("rate_limiter")
445                    .with_attribute("channel", channel)
446                    .with_attribute("peer", &public_key);
447
448                // Create a sender that allows sending messages to the network for a certain channel
449                let (sender, guard) = Sender::new(
450                    public_key.clone(),
451                    channel,
452                    self.max_size,
453                    self.ingress_sender.clone(),
454                    self.connected_peers_for(&public_key),
455                    clock,
456                    quota,
457                );
458
459                // Create a receiver that allows receiving messages from the network for a certain channel
460                let peer = self.peers.get_mut(&public_key).unwrap();
461                let receiver = match peer.register(channel, guard).await {
462                    Ok(receiver) => Receiver { receiver },
463                    Err(err) => return send_result(result, Err(err)),
464                };
465
466                send_result(result, Ok((sender, receiver)))
467            }
468            ingress::Message::PeerSet { id, response } => {
469                let _ = response.send(
470                    self.peer_sets
471                        .get(&id)
472                        .map(|e| TrackedPeers::new(e.primary.clone(), e.secondary.clone())),
473                );
474            }
475            ingress::Message::Subscribe { response } => {
476                // Create a new subscription channel
477                let (sender, receiver) = mpsc::unbounded_channel();
478
479                // Send the latest peer set upon subscription.
480                if let Some(update) = self.latest_update() {
481                    sender.send_lossy(update);
482                }
483                self.subscribers.push(sender);
484
485                // Return the receiver to the caller
486                let _ = response.send(receiver);
487            }
488            ingress::Message::SubscribePeers { exclude, sender } => {
489                self.subscribe_connected(exclude, sender);
490            }
491            ingress::Message::LimitBandwidth {
492                public_key,
493                egress_cap,
494                ingress_cap,
495                result,
496            } => {
497                // If peer does not exist, then create it.
498                let _ = self.ensure_peer_exists(&public_key).await;
499
500                // Update bandwidth limits
501                let now = self.context.current();
502                let completions = self
503                    .transmitter
504                    .limit(now, &public_key, egress_cap, ingress_cap);
505                self.process_completions(completions);
506
507                // Notify the caller that the bandwidth update has been applied
508                let _ = result.send(());
509            }
510            ingress::Message::AddLink {
511                sender,
512                receiver,
513                sampler,
514                success_rate,
515                result,
516            } => {
517                // If sender or receiver does not exist, then create it.
518                let _ = self.ensure_peer_exists(&sender).await;
519                let (receiver_socket, _) = self.ensure_peer_exists(&receiver).await;
520
521                // Require link to not already exist
522                let key = (sender.clone(), receiver.clone());
523                if self.links.contains_key(&key) {
524                    return send_result(result, Err(Error::LinkExists));
525                }
526
527                let link = Link::new(
528                    self.context.as_mut(),
529                    sender,
530                    receiver,
531                    receiver_socket,
532                    sampler,
533                    success_rate,
534                    self.max_size,
535                    self.received_messages.clone(),
536                );
537                self.links.insert(key, link);
538                send_result(result, Ok(()))
539            }
540            ingress::Message::RemoveLink {
541                sender,
542                receiver,
543                result,
544            } => {
545                match self.links.remove(&(sender, receiver)) {
546                    Some(_) => (),
547                    None => return send_result(result, Err(Error::LinkMissing)),
548                }
549                send_result(result, Ok(()))
550            }
551            ingress::Message::Block { from, to } => {
552                self.blocks.insert((from, to));
553            }
554            ingress::Message::Blocked { result } => {
555                send_result(result, Ok(self.blocks.iter().cloned().collect()))
556            }
557        }
558    }
559
560    /// Ensure a peer exists, creating it if necessary.
561    ///
562    /// Returns the socket address of the peer and a boolean indicating if a new peer was created.
563    async fn ensure_peer_exists(&mut self, public_key: &P) -> (SocketAddr, bool) {
564        if !self.peers.contains_key(public_key) {
565            // Create peer
566            let socket = self.get_next_socket();
567            let peer = Peer::new(
568                self.context.child("peer"),
569                public_key.clone(),
570                socket,
571                self.max_size,
572            )
573            .await;
574
575            // Once ready, add to peers
576            self.peers.insert(public_key.clone(), peer);
577
578            (socket, true)
579        } else {
580            (self.peers.get(public_key).unwrap().socket, false)
581        }
582    }
583
584    fn subscribe_connected(&mut self, exclude: P, mut sender: ring::Sender<Vec<P>>) {
585        let peers = self.connected_peers_for(&exclude);
586        if Pin::new(&mut sender).start_send(peers).is_ok() {
587            self.peer_subscribers.push((exclude, sender));
588        }
589    }
590
591    /// Broadcast updated peer list to all connected peer subscribers.
592    ///
593    /// This runs when tracked membership changes ([`ingress::Message::Track`]), not when peers
594    /// are first discovered via register, links, or bandwidth limits.
595    ///
596    /// Subscribers whose receivers have been dropped are removed to prevent
597    /// memory leaks.
598    fn broadcast_peer_list(&mut self) {
599        if self.peer_subscribers.is_empty() {
600            return;
601        }
602
603        let peers: Vec<P> = self.peer_ref_counts.keys().cloned().collect();
604        let mut live_subscribers = Vec::with_capacity(self.peer_subscribers.len());
605        for (exclude, mut subscriber) in self.peer_subscribers.drain(..) {
606            let peer_list = if peers.contains(&exclude) {
607                peers
608                    .iter()
609                    .filter(|peer| *peer != &exclude)
610                    .cloned()
611                    .collect()
612            } else {
613                Vec::new()
614            };
615            if Pin::new(&mut subscriber).start_send(peer_list).is_ok() {
616                live_subscribers.push((exclude, subscriber));
617            }
618        }
619        self.peer_subscribers = live_subscribers;
620    }
621
622    /// Primary and secondary peers across all tracked peer sets (reference-counted union).
623    ///
624    /// Primary wins over secondary for the same public key: `secondary` includes only peers whose
625    /// only role across tracked sets is secondary (same as [`crate::Provider::subscribe`] for [`PeerSetUpdate::all`]).
626    fn aggregate_peer_membership(&self) -> TrackedPeers<P> {
627        let primary = self
628            .peer_ref_counts
629            .iter()
630            .filter(|(_, c)| c.primary > 0)
631            .map(|(k, _)| k.clone())
632            .try_collect()
633            .expect("BTreeMap keys are unique");
634        let secondary = Set::from_iter_dedup(
635            self.peer_ref_counts
636                .iter()
637                .filter(|(_, c)| c.secondary > 0 && c.primary == 0)
638                .map(|(k, _)| k.clone()),
639        );
640        TrackedPeers::new(primary, secondary)
641    }
642
643    /// Returns a [`PeerSetUpdate`] for the latest peer set (by id), if any.
644    fn latest_update(&self) -> Option<PeerSetUpdate<P>> {
645        let (index, entry) = self.peer_sets.last_key_value()?;
646        Some(PeerSetUpdate {
647            index: *index,
648            latest: TrackedPeers::new(entry.primary.clone(), entry.secondary.clone()),
649            all: self.aggregate_peer_membership(),
650        })
651    }
652
653    /// Peers used when expanding [`Recipients::All`] for a sender.
654    fn connected_peers_for(&self, sender: &P) -> Vec<P> {
655        if !self.peer_ref_counts.contains_key(sender) {
656            return Vec::new();
657        }
658        self.peer_ref_counts
659            .keys()
660            .filter(|peer| *peer != sender)
661            .cloned()
662            .collect()
663    }
664
665    /// Returns whether the peer is currently allowed to use the network.
666    fn is_connectable(&self, peer: &P) -> bool {
667        self.peer_ref_counts.contains_key(peer)
668    }
669}
670
671impl<E: RNetwork + Spawner + Rng + Clock + Metrics, P: PublicKey> Network<E, P> {
672    /// Process completions from the transmitter.
673    fn process_completions(&mut self, completions: Vec<Completion<P>>) {
674        for completion in completions {
675            // If there is no message to deliver, then skip
676            let Some(deliver_at) = completion.deliver_at else {
677                trace!(
678                    origin = ?completion.origin,
679                    recipient = ?completion.recipient,
680                    "message dropped before delivery",
681                );
682                continue;
683            };
684
685            // Send message to link
686            let key = (completion.origin.clone(), completion.recipient.clone());
687            let Some(link) = self.links.get_mut(&key) else {
688                // This can happen if the link is removed before the message is delivered
689                trace!(
690                    origin = ?completion.origin,
691                    recipient = ?completion.recipient,
692                    "missing link for completion",
693                );
694                continue;
695            };
696            if let Err(err) = link.send(completion.channel, completion.message, deliver_at) {
697                error!(?err, "failed to send");
698            }
699        }
700    }
701
702    /// Handle a task.
703    ///
704    /// This method is called when a task is received from the sender, which can come from
705    /// any peer in the network.
706    fn handle_task(&mut self, task: Task<P>) {
707        let (channel, origin, recipients, message) = task;
708
709        // If tracking peer sets, ensure recipient and sender are in a tracked peer set
710        if !self.is_connectable(&origin) {
711            warn!(
712                ?origin,
713                reason = "not primary or secondary",
714                "dropping message"
715            );
716            return;
717        }
718
719        // Collect recipients
720        let recipients = match recipients {
721            Recipients::All => self.connected_peers_for(&origin),
722            Recipients::Some(keys) => keys,
723            Recipients::One(key) => vec![key],
724        };
725
726        // Send to all recipients
727        let now = self.context.current();
728        for recipient in recipients {
729            // Skip self
730            if recipient == origin {
731                trace!(?recipient, reason = "self", "dropping message");
732                continue;
733            }
734
735            if !self.is_connectable(&recipient) {
736                trace!(
737                    ?origin,
738                    ?recipient,
739                    reason = "not primary or secondary",
740                    "dropping message"
741                );
742                continue;
743            }
744
745            // Determine if the sender or recipient has blocked the other
746            let o_r = (origin.clone(), recipient.clone());
747            let r_o = (recipient.clone(), origin.clone());
748            if self.disconnect_on_block
749                && (self.blocks.contains(&o_r) || self.blocks.contains(&r_o))
750            {
751                trace!(?origin, ?recipient, reason = "blocked", "dropping message");
752                continue;
753            }
754
755            // Determine if there is a link between the origin and recipient
756            let Some(link) = self.links.get_mut(&o_r) else {
757                trace!(?origin, ?recipient, reason = "no link", "dropping message");
758                continue;
759            };
760
761            // Note: Rate limiting is handled by the Sender before messages reach here.
762            // The Sender filters recipients via LimitedSender::check() or in Sender::send().
763
764            // Record sent message as soon as we determine there is a link with recipient (approximates
765            // having an open connection)
766            self.sent_messages
767                .get_or_create(&metrics::Message::new(&origin, &recipient, channel))
768                .inc();
769
770            // Sample latency
771            let latency = Duration::from_millis(link.sampler.sample(self.context.as_mut()) as u64);
772
773            // Determine if the message should be delivered
774            let should_deliver = self.context.gen_bool(link.success_rate);
775
776            // Enqueue message for delivery
777            let completions = self.transmitter.enqueue(
778                now,
779                origin.clone(),
780                recipient.clone(),
781                channel,
782                message.clone(),
783                latency,
784                should_deliver,
785            );
786            self.process_completions(completions);
787        }
788    }
789
790    fn queue_task(
791        high: &mut VecDeque<Task<P>>,
792        low: &mut VecDeque<Task<P>>,
793        task: Task<P>,
794        priority: bool,
795    ) {
796        if priority {
797            high.push_back(task);
798        } else {
799            low.push_back(task);
800        }
801    }
802
803    fn handle_tasks(&mut self, high: &mut VecDeque<Task<P>>, low: &mut VecDeque<Task<P>>) {
804        while let Some(task) = high.pop_front() {
805            self.handle_task(task);
806        }
807        while let Some(task) = low.pop_front() {
808            self.handle_task(task);
809        }
810    }
811
812    async fn handle_ordered_ingress(
813        &mut self,
814        mut message: ingress::Message<P, E>,
815        high: &mut VecDeque<Task<P>>,
816        low: &mut VecDeque<Task<P>>,
817    ) {
818        loop {
819            match message {
820                ingress::Message::Send {
821                    channel,
822                    origin,
823                    recipients,
824                    message,
825                    priority,
826                } => {
827                    Self::queue_task(high, low, (channel, origin, recipients, message), priority);
828                }
829                message => {
830                    self.handle_tasks(high, low);
831                    self.handle_ingress(message).await;
832                    return;
833                }
834            }
835
836            message = match self.ingress.try_recv() {
837                Ok(message) => message,
838                Err(_) => {
839                    self.handle_tasks(high, low);
840                    return;
841                }
842            };
843        }
844    }
845
846    /// Run the simulated network.
847    ///
848    /// It is not necessary to invoke this method before modifying the network topology, however,
849    /// no messages will be sent until this method is called.
850    pub fn start(mut self) -> Handle<()> {
851        spawn_cell!(self.context, self.run())
852    }
853
854    async fn run(mut self) {
855        let mut high = VecDeque::new();
856        let mut low = VecDeque::new();
857        select_loop! {
858            self.context,
859            on_start => {
860                let tick = match self.transmitter.next() {
861                    Some(when) => Either::Left(self.context.sleep_until(when)),
862                    None => Either::Right(future::pending()),
863                };
864            },
865            on_stopped => {},
866            _ = tick => {
867                let now = self.context.current();
868                let completions = self.transmitter.advance(now);
869                self.process_completions(completions);
870            },
871            Some(message) = self.ingress.recv() else break => {
872                self.handle_ordered_ingress(message, &mut high, &mut low)
873                    .await;
874            },
875        }
876    }
877}
878
879/// Provides known peers from the simulated network.
880///
881/// This is derived from tracked peer sets and is not a liveness signal.
882pub struct ConnectedPeerProvider<P: PublicKey, E: Clock> {
883    me: P,
884    ingress: mpsc::UnboundedSender<ingress::Message<P, E>>,
885    peers: Vec<P>,
886    _clock: std::marker::PhantomData<E>,
887}
888
889impl<P: PublicKey, E: Clock> Clone for ConnectedPeerProvider<P, E> {
890    fn clone(&self) -> Self {
891        Self {
892            me: self.me.clone(),
893            ingress: self.ingress.clone(),
894            peers: self.peers.clone(),
895            _clock: std::marker::PhantomData,
896        }
897    }
898}
899
900impl<P: PublicKey, E: Clock> ConnectedPeerProvider<P, E> {
901    const fn new(
902        me: P,
903        ingress: mpsc::UnboundedSender<ingress::Message<P, E>>,
904        peers: Vec<P>,
905    ) -> Self {
906        Self {
907            me,
908            ingress,
909            peers,
910            _clock: std::marker::PhantomData,
911        }
912    }
913}
914
915impl<P: PublicKey, E: Clock> Connected for ConnectedPeerProvider<P, E> {
916    type PublicKey = P;
917
918    fn peers(&self) -> Vec<Self::PublicKey> {
919        self.peers.clone()
920    }
921
922    fn subscribe(&self) -> ring::Receiver<Vec<Self::PublicKey>> {
923        let (sender, receiver) = ring::channel(NZUsize!(1));
924        let _ = self.ingress.send_lossy(ingress::Message::SubscribePeers {
925            exclude: self.me.clone(),
926            sender,
927        });
928        receiver
929    }
930}
931
932/// Implementation of a [crate::Sender] for the simulated network without rate limiting.
933///
934/// This is the inner sender used by [`Sender`] which wraps it with rate limiting.
935pub struct UnlimitedSender<P: PublicKey, E: Clock> {
936    me: P,
937    channel: Channel,
938    max_size: u32,
939    sender: mpsc::UnboundedSender<ingress::Message<P, E>>,
940    active: Arc<AtomicBool>,
941}
942
943impl<P: PublicKey, E: Clock> Clone for UnlimitedSender<P, E> {
944    fn clone(&self) -> Self {
945        Self {
946            me: self.me.clone(),
947            channel: self.channel,
948            max_size: self.max_size,
949            sender: self.sender.clone(),
950            active: self.active.clone(),
951        }
952    }
953}
954
955impl<P: PublicKey, E: Clock> crate::UnlimitedSender for UnlimitedSender<P, E> {
956    type PublicKey = P;
957
958    fn send(
959        &mut self,
960        recipients: Recipients<P>,
961        message: impl Into<IoBufs> + Send,
962        priority: bool,
963    ) -> Unreliable<Feedback> {
964        let message = message.into().coalesce();
965        assert!(
966            message.len() <= self.max_size as usize,
967            "message too large: {} > {}",
968            message.len(),
969            self.max_size
970        );
971
972        if !self.active.load(Ordering::Acquire) || self.sender.is_closed() {
973            return Unreliable::new(Feedback::Closed);
974        }
975
976        // The simulated network handles send submissions and topology updates
977        // through the same ingress queue so callers can mutate links immediately
978        // after `send` without racing the actor.
979        if self.sender.send_lossy(ingress::Message::Send {
980            channel: self.channel,
981            origin: self.me.clone(),
982            recipients,
983            message,
984            priority,
985        }) {
986            Unreliable::new(Feedback::Ok)
987        } else {
988            Unreliable::new(Feedback::Closed)
989        }
990    }
991}
992
993/// Implementation of a [crate::Sender] for the simulated network.
994///
995/// Also implements [crate::LimitedSender] to support rate-limit checking
996/// before sending messages.
997pub struct Sender<P: PublicKey, E: Clock> {
998    limited_sender: LimitedSender<E, UnlimitedSender<P, E>, ConnectedPeerProvider<P, E>>,
999}
1000
1001impl<P: PublicKey, E: Clock> Clone for Sender<P, E> {
1002    fn clone(&self) -> Self {
1003        Self {
1004            limited_sender: self.limited_sender.clone(),
1005        }
1006    }
1007}
1008
1009impl<P: PublicKey, E: Clock> Debug for Sender<P, E> {
1010    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1011        f.debug_struct("Sender").finish_non_exhaustive()
1012    }
1013}
1014
1015impl<P: PublicKey, E: Clock> Sender<P, E> {
1016    #[allow(clippy::too_many_arguments)]
1017    fn new(
1018        me: P,
1019        channel: Channel,
1020        max_size: u32,
1021        ingress: mpsc::UnboundedSender<ingress::Message<P, E>>,
1022        connected_peers: Vec<P>,
1023        clock: E,
1024        quota: Quota,
1025    ) -> (Self, RegistrationGuard) {
1026        let active = Arc::new(AtomicBool::new(true));
1027        let unlimited_sender = UnlimitedSender {
1028            me: me.clone(),
1029            channel,
1030            max_size,
1031            sender: ingress.clone(),
1032            active: active.clone(),
1033        };
1034        let peer_source = ConnectedPeerProvider::new(me, ingress, connected_peers);
1035        let limited_sender = LimitedSender::new(unlimited_sender, quota, clock, peer_source);
1036
1037        (Self { limited_sender }, RegistrationGuard { active })
1038    }
1039
1040    /// Split this [Sender] into a [SplitOrigin::Primary] and [SplitOrigin::Secondary] sender.
1041    pub fn split_with<F: SplitForwarder<P>>(
1042        self,
1043        forwarder: F,
1044    ) -> (SplitSender<P, E, F>, SplitSender<P, E, F>) {
1045        (
1046            SplitSender {
1047                replica: SplitOrigin::Primary,
1048                inner: self.clone(),
1049                forwarder: forwarder.clone(),
1050            },
1051            SplitSender {
1052                replica: SplitOrigin::Secondary,
1053                inner: self,
1054                forwarder,
1055            },
1056        )
1057    }
1058}
1059
1060impl<P: PublicKey, E: Clock> crate::LimitedSender for Sender<P, E> {
1061    type PublicKey = P;
1062    type Checked<'a>
1063        = crate::utils::limited::CheckedSender<'a, UnlimitedSender<P, E>>
1064    where
1065        Self: 'a;
1066
1067    fn check(
1068        &mut self,
1069        recipients: Recipients<Self::PublicKey>,
1070    ) -> Result<Self::Checked<'_>, SystemTime> {
1071        self.limited_sender.check(recipients)
1072    }
1073}
1074
1075/// A sender that routes recipients per message via a user-provided function.
1076pub struct SplitSender<P: PublicKey, E: Clock, F: SplitForwarder<P>> {
1077    replica: SplitOrigin,
1078    inner: Sender<P, E>,
1079    forwarder: F,
1080}
1081
1082impl<P: PublicKey, E: Clock, F: SplitForwarder<P>> Clone for SplitSender<P, E, F> {
1083    fn clone(&self) -> Self {
1084        Self {
1085            replica: self.replica,
1086            inner: self.inner.clone(),
1087            forwarder: self.forwarder.clone(),
1088        }
1089    }
1090}
1091
1092impl<P: PublicKey, E: Clock, F: SplitForwarder<P>> std::fmt::Debug for SplitSender<P, E, F> {
1093    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1094        f.debug_struct("SplitSender")
1095            .field("replica", &self.replica)
1096            .field("inner", &self.inner)
1097            .finish()
1098    }
1099}
1100
1101impl<P: PublicKey, E: Clock, F: SplitForwarder<P>> crate::LimitedSender for SplitSender<P, E, F> {
1102    type PublicKey = P;
1103    type Checked<'a> = SplitCheckedSender<'a, P, E, F>;
1104
1105    fn check(
1106        &mut self,
1107        recipients: Recipients<Self::PublicKey>,
1108    ) -> Result<Self::Checked<'_>, SystemTime> {
1109        Ok(SplitCheckedSender {
1110            // Perform a rate limit check with the entire set of original recipients although
1111            // the forwarder may filter these (based on message content) during send.
1112            checked: self.inner.limited_sender.check(recipients.clone())?,
1113            replica: self.replica,
1114            forwarder: self.forwarder.clone(),
1115            recipients,
1116
1117            _phantom: std::marker::PhantomData,
1118        })
1119    }
1120}
1121
1122/// A checked sender for [`SplitSender`] that defers the forwarder call to send time.
1123///
1124/// This is necessary because [`SplitForwarder`] may examine message content to determine
1125/// routing, but the message is not available at [`LimitedSender::check`] time.
1126pub struct SplitCheckedSender<'a, P: PublicKey, E: Clock, F: SplitForwarder<P>> {
1127    checked: LimitedCheckedSender<'a, UnlimitedSender<P, E>>,
1128    replica: SplitOrigin,
1129    forwarder: F,
1130    recipients: Recipients<P>,
1131
1132    _phantom: std::marker::PhantomData<E>,
1133}
1134
1135impl<'a, P: PublicKey, E: Clock, F: SplitForwarder<P>> crate::CheckedSender
1136    for SplitCheckedSender<'a, P, E, F>
1137{
1138    type PublicKey = P;
1139
1140    fn recipients(&self) -> Vec<Self::PublicKey> {
1141        crate::CheckedSender::recipients(&self.checked)
1142    }
1143
1144    fn send(self, message: impl Into<IoBufs> + Send, priority: bool) -> Unreliable<Feedback> {
1145        // Convert to IoBuf here since forwarder needs to inspect the message
1146        let message = message.into().coalesce();
1147
1148        // Determine the set of recipients that will receive the message
1149        let Some(recipients) = (self.forwarder)(self.replica, &self.recipients, &message) else {
1150            return Unreliable::Rejected;
1151        };
1152
1153        // Extract the inner sender and send directly with the new recipients
1154        //
1155        // While SplitForwarder does not enforce any relationship between the original recipients
1156        // and the new recipients, it is typically some subset of the original recipients. This
1157        // means we may over-rate limit some recipients (who are never actually sent a message here) but
1158        // we prefer this to not providing feedback at all (we would have to skip check entirely).
1159        self.checked
1160            .into_inner()
1161            .send(recipients, message, priority)
1162    }
1163}
1164
1165type MessageReceiver<P> = mpsc::UnboundedReceiver<NetworkMessage<P>>;
1166type ChannelRegistration<P> = (
1167    Channel,
1168    RegistrationGuard,
1169    oneshot::Sender<MessageReceiver<P>>,
1170);
1171
1172/// Implementation of a [crate::Receiver] for the simulated network.
1173#[derive(Debug)]
1174pub struct Receiver<P: PublicKey> {
1175    receiver: MessageReceiver<P>,
1176}
1177
1178impl<P: PublicKey> crate::Receiver for Receiver<P> {
1179    type Error = Error;
1180    type PublicKey = P;
1181
1182    async fn recv(&mut self) -> Result<NetworkMessage<Self::PublicKey>, Error> {
1183        self.receiver.recv().await.ok_or(Error::NetworkClosed)
1184    }
1185}
1186
1187impl<P: PublicKey> Receiver<P> {
1188    /// Split this [Receiver] into a [SplitTarget::Primary] and [SplitTarget::Secondary] receiver.
1189    pub fn split_with<E: Spawner, R: SplitRouter<P>>(
1190        mut self,
1191        context: E,
1192        router: R,
1193    ) -> (Self, Self) {
1194        let (primary_tx, primary_rx) = mpsc::unbounded_channel();
1195        let (secondary_tx, secondary_rx) = mpsc::unbounded_channel();
1196        context.spawn(move |_| async move {
1197            while let Some(message) = self.receiver.recv().await {
1198                // Route message to the appropriate target
1199                let direction = router(&message);
1200                match direction {
1201                    SplitTarget::None => {}
1202                    SplitTarget::Primary => {
1203                        if let Err(err) = primary_tx.send(message) {
1204                            error!(?err, "failed to send message to primary");
1205                        }
1206                    }
1207                    SplitTarget::Secondary => {
1208                        if let Err(err) = secondary_tx.send(message) {
1209                            error!(?err, "failed to send message to secondary");
1210                        }
1211                    }
1212                    SplitTarget::Both => {
1213                        if let Err(err) = primary_tx.send(message.clone()) {
1214                            error!(?err, "failed to send message to primary");
1215                        }
1216                        if let Err(err) = secondary_tx.send(message) {
1217                            error!(?err, "failed to send message to secondary");
1218                        }
1219                    }
1220                }
1221
1222                // Exit if both channels are closed
1223                if primary_tx.is_closed() && secondary_tx.is_closed() {
1224                    break;
1225                }
1226            }
1227        });
1228
1229        (
1230            Self {
1231                receiver: primary_rx,
1232            },
1233            Self {
1234                receiver: secondary_rx,
1235            },
1236        )
1237    }
1238}
1239
1240/// A peer in the simulated network.
1241///
1242/// The peer can register channels, which allows it to receive messages sent to the channel from other peers.
1243struct Peer<P: PublicKey> {
1244    // Socket address that the peer is listening on
1245    socket: SocketAddr,
1246
1247    // Control to register new channels
1248    control: mpsc::UnboundedSender<ChannelRegistration<P>>,
1249}
1250
1251impl<P: PublicKey> Peer<P> {
1252    /// Create and return a new peer.
1253    ///
1254    /// The peer will listen for incoming connections on the given `socket` address.
1255    /// `max_size` is the maximum size of a message that can be sent to the peer.
1256    async fn new<E: Spawner + RNetwork + Metrics + Clock>(
1257        context: E,
1258        public_key: P,
1259        socket: SocketAddr,
1260        max_size: u32,
1261    ) -> Self {
1262        // The control is used to register channels.
1263        let (control_sender, mut control_receiver): (
1264            mpsc::UnboundedSender<ChannelRegistration<P>>,
1265            _,
1266        ) = mpsc::unbounded_channel();
1267
1268        // Whenever a message is received from a peer, it is placed in the inbox.
1269        // The router polls the inbox and forwards the message to the appropriate mailbox.
1270        let (inbox_sender, mut inbox_receiver) = mpsc::unbounded_channel();
1271
1272        // Spawn router
1273        context.child("router").spawn(|context| async move {
1274            // Map of channels to mailboxes (senders to particular channels)
1275            let mut mailboxes = HashMap::new();
1276
1277            // Continually listen for control messages and outbound messages
1278            select_loop! {
1279                context,
1280                on_stopped => {},
1281                // Listen for control messages, which are used to register channels
1282                Some((channel, guard, result_tx)) = control_receiver.recv() else break => {
1283                    // Register channel
1284                    let (receiver_tx, receiver_rx) = mpsc::unbounded_channel();
1285                    if mailboxes.insert(channel, (receiver_tx, guard)).is_some() {
1286                        warn!(?public_key, ?channel, "overwriting existing channel");
1287                    }
1288                    result_tx.send(receiver_rx).unwrap();
1289                },
1290
1291                // Listen for messages from the inbox, which are forwarded to the appropriate mailbox
1292                Some((channel, message)) = inbox_receiver.recv() else break => {
1293                    // Send message to mailbox
1294                    match mailboxes.get_mut(&channel) {
1295                        Some((receiver_tx, _)) => {
1296                            if let Err(err) = receiver_tx.send(message) {
1297                                debug!(?err, "failed to send message to mailbox");
1298                            }
1299                        }
1300                        None => {
1301                            trace!(
1302                                recipient = ?public_key,
1303                                channel,
1304                                reason = "missing channel",
1305                                "dropping message",
1306                            );
1307                        }
1308                    }
1309                },
1310            }
1311        });
1312
1313        // Spawn a task that accepts new connections and spawns a task for each connection
1314        let (ready_tx, ready_rx) = oneshot::channel();
1315        context.child("listener").spawn(move |context| async move {
1316            // Initialize listener
1317            let mut listener = context.bind(socket).await.unwrap();
1318            let _ = ready_tx.send(());
1319
1320            // Continually accept new connections
1321            while let Ok((_, _, mut stream)) = listener.accept().await {
1322                // New connection accepted. Spawn a task for this connection
1323                context.child("receiver").spawn({
1324                    let inbox_sender = inbox_sender.clone();
1325                    move |_| async move {
1326                        // Receive dialer's public key as a handshake
1327                        let dialer = match recv_frame(&mut stream, max_size).await {
1328                            Ok(data) => data,
1329                            Err(_) => {
1330                                error!("failed to receive public key from dialer");
1331                                return;
1332                            }
1333                        };
1334                        let Ok(dialer) = P::decode(dialer.coalesce()) else {
1335                            error!("received public key is invalid");
1336                            return;
1337                        };
1338
1339                        // Continually receive messages from the dialer and send them to the inbox
1340                        while let Ok(data) = recv_frame(&mut stream, max_size).await {
1341                            let data = data.coalesce();
1342                            let channel = Channel::from_be_bytes(
1343                                data.as_ref()[..Channel::SIZE].try_into().unwrap(),
1344                            );
1345                            let message = data.slice(Channel::SIZE..);
1346                            if let Err(err) =
1347                                inbox_sender.send((channel, (dialer.clone(), message)))
1348                            {
1349                                debug!(?err, "failed to send message to mailbox");
1350                                break;
1351                            }
1352                        }
1353                    }
1354                });
1355            }
1356        });
1357
1358        // Wait for listener to start before returning
1359        let _ = ready_rx.await;
1360
1361        // Return peer
1362        Self {
1363            socket,
1364            control: control_sender,
1365        }
1366    }
1367
1368    /// Register a channel with the peer.
1369    ///
1370    /// This allows the peer to receive messages sent to the channel.
1371    /// Returns a receiver that can be used to receive messages sent to the channel.
1372    async fn register(
1373        &mut self,
1374        channel: Channel,
1375        guard: RegistrationGuard,
1376    ) -> Result<MessageReceiver<P>, Error> {
1377        let (result_tx, result_rx) = oneshot::channel();
1378        self.control
1379            .send((channel, guard, result_tx))
1380            .map_err(|_| Error::NetworkClosed)?;
1381        result_rx.await.map_err(|_| Error::NetworkClosed)
1382    }
1383}
1384
1385// A unidirectional link between two peers.
1386// Messages can be sent over the link with a given latency, jitter, and success rate.
1387struct Link {
1388    sampler: Normal<f64>,
1389    success_rate: f64,
1390    // Messages with their receive time for ordered delivery
1391    inbox: mpsc::UnboundedSender<(Channel, IoBuf, SystemTime)>,
1392}
1393
1394/// Buffered payload waiting for earlier messages on the same link to complete.
1395impl Link {
1396    #[allow(clippy::too_many_arguments)]
1397    fn new<E: Spawner + RNetwork + Clock + Metrics, P: PublicKey>(
1398        context: &mut E,
1399        dialer: P,
1400        receiver: P,
1401        socket: SocketAddr,
1402        sampler: Normal<f64>,
1403        success_rate: f64,
1404        max_size: u32,
1405        received_messages: CounterFamily<metrics::Message>,
1406    ) -> Self {
1407        // Spawn a task that will wait for messages to be sent to the link and then send them
1408        // over the network.
1409        let (inbox, mut outbox) = mpsc::unbounded_channel::<(Channel, IoBuf, SystemTime)>();
1410        context.child("link").spawn(move |context| async move {
1411            // Dial the peer and handshake by sending it the dialer's public key
1412            let (mut sink, _) = context.dial(socket).await.unwrap();
1413            if let Err(err) = send_frame(&mut sink, dialer.as_ref().to_vec(), max_size).await {
1414                error!(?err, "failed to send public key to listener");
1415                return;
1416            }
1417
1418            // Process messages in order, waiting for their receive time
1419            while let Some((channel, message, receive_complete_at)) = outbox.recv().await {
1420                // Wait until the message should arrive at receiver
1421                context.sleep_until(receive_complete_at).await;
1422
1423                // Send the message
1424                let channel_bytes = channel.to_be_bytes();
1425                let mut data = Vec::with_capacity(channel_bytes.len() + message.len());
1426                data.extend_from_slice(&channel_bytes);
1427                data.extend_from_slice(message.as_ref());
1428                let _ = send_frame(&mut sink, data, max_size).await;
1429
1430                // Bump received messages metric
1431                received_messages
1432                    .get_or_create(&metrics::Message::new(&dialer, &receiver, channel))
1433                    .inc();
1434            }
1435        });
1436
1437        Self {
1438            sampler,
1439            success_rate,
1440            inbox,
1441        }
1442    }
1443
1444    // Send a message over the link with receive timing.
1445    fn send(
1446        &mut self,
1447        channel: Channel,
1448        message: IoBuf,
1449        receive_complete_at: SystemTime,
1450    ) -> Result<(), Error> {
1451        self.inbox
1452            .send((channel, message, receive_complete_at))
1453            .map_err(|_| Error::NetworkClosed)?;
1454        Ok(())
1455    }
1456}
1457
1458#[cfg(test)]
1459mod tests {
1460    use super::*;
1461    use crate::{
1462        CheckedSender as _, LimitedSender as _, Manager as _, Provider, Receiver as _, Recipients,
1463        Sender as _, TrackedPeers,
1464    };
1465    use commonware_cryptography::{ed25519, Signer as _};
1466    use commonware_runtime::{deterministic, Quota, Runner as _, Supervisor as _};
1467    use commonware_utils::{ordered::Set, NZUsize};
1468    use futures::FutureExt;
1469    use std::num::NonZeroU32;
1470
1471    const MAX_MESSAGE_SIZE: u32 = 1024 * 1024;
1472
1473    /// Default rate limit set high enough to not interfere with normal operation
1474    const TEST_QUOTA: Quota = Quota::per_second(NonZeroU32::MAX);
1475
1476    async fn send_when_ready(
1477        context: &deterministic::Context,
1478        sender: &mut Sender<ed25519::PublicKey, deterministic::Context>,
1479        recipients: Recipients<ed25519::PublicKey>,
1480        expected_recipients: usize,
1481        message: Vec<u8>,
1482        priority: bool,
1483    ) -> SystemTime {
1484        loop {
1485            let checked = sender.check(recipients.clone()).unwrap();
1486            if checked.recipients().len() == expected_recipients {
1487                checked.send(message, priority);
1488                return context.current();
1489            }
1490            context.sleep(Duration::from_millis(1)).await;
1491        }
1492    }
1493
1494    /// [`Network::new_with_peers`] seeds peers; controls can register channels and add a link once;
1495    /// a duplicate link between the same pair returns [`Error::LinkExists`].
1496    #[test]
1497    fn test_register_and_link() {
1498        let executor = deterministic::Runner::default();
1499        executor.start(|context| async move {
1500            let cfg = Config {
1501                max_size: MAX_MESSAGE_SIZE,
1502                disconnect_on_block: true,
1503                tracked_peer_sets: NZUsize!(3),
1504            };
1505            // Create two public keys
1506            let pk1 = ed25519::PrivateKey::from_seed(1).public_key();
1507            let pk2 = ed25519::PrivateKey::from_seed(2).public_key();
1508            let peers = [pk1.clone(), pk2.clone()];
1509
1510            let (network, oracle) =
1511                Network::new_with_peers(context.child("network"), cfg, peers).await;
1512            network.start();
1513
1514            let control = oracle.control(pk1.clone());
1515            control.register(0, TEST_QUOTA).await.unwrap();
1516            control.register(1, TEST_QUOTA).await.unwrap();
1517            let control = oracle.control(pk2.clone());
1518            control.register(0, TEST_QUOTA).await.unwrap();
1519            control.register(1, TEST_QUOTA).await.unwrap();
1520
1521            // Overwrite if registering again
1522            control.register(1, TEST_QUOTA).await.unwrap();
1523
1524            // Add link
1525            let link = ingress::Link {
1526                latency: Duration::from_millis(2),
1527                jitter: Duration::from_millis(1),
1528                success_rate: 0.9,
1529            };
1530            oracle
1531                .add_link(pk1.clone(), pk2.clone(), link.clone())
1532                .await
1533                .unwrap();
1534
1535            // Expect error when adding link again
1536            assert!(matches!(
1537                oracle.add_link(pk1, pk2, link).await,
1538                Err(Error::LinkExists)
1539            ));
1540        });
1541    }
1542
1543    /// [`Network::new_with_split_peers`] registers id `0` with separate primary and secondary sets,
1544    /// exposes the same split from [`Manager::peer_set`], and emits a matching [`PeerSetUpdate`] on subscribe.
1545    #[test]
1546    fn test_new_with_split_peers_seeds_initial_update() {
1547        let executor = deterministic::Runner::default();
1548        executor.start(|context| async move {
1549            let cfg = Config {
1550                max_size: MAX_MESSAGE_SIZE,
1551                disconnect_on_block: true,
1552                tracked_peer_sets: NZUsize!(3),
1553            };
1554            let primary = ed25519::PrivateKey::from_seed(11).public_key();
1555            let secondary = ed25519::PrivateKey::from_seed(12).public_key();
1556
1557            let (network, oracle) = Network::new_with_split_peers(
1558                context.child("network"),
1559                cfg,
1560                [primary.clone()],
1561                [secondary.clone()],
1562            )
1563            .await;
1564            network.start();
1565
1566            let mut manager = oracle.manager();
1567            let peer_set = manager.peer_set(0).await.unwrap();
1568            assert_eq!(peer_set.primary, Set::try_from([primary.clone()]).unwrap());
1569            assert_eq!(
1570                peer_set.secondary,
1571                Set::try_from([secondary.clone()]).unwrap()
1572            );
1573
1574            let mut updates = manager.subscribe().await;
1575            let update = updates.recv().await.unwrap();
1576            assert_eq!(update.index, 0);
1577            assert_eq!(
1578                update.latest.primary,
1579                Set::try_from([primary.clone()]).unwrap()
1580            );
1581            assert_eq!(
1582                update.latest.secondary,
1583                Set::try_from([secondary.clone()]).unwrap()
1584            );
1585            assert_eq!(update.all.primary, Set::try_from([primary]).unwrap());
1586            assert_eq!(update.all.secondary, Set::try_from([secondary]).unwrap());
1587        });
1588    }
1589
1590    /// Split sender/receiver routes each half to a different neighbor: primary out goes only to `peer_a`,
1591    /// secondary out only to `peer_b`, and inbound mail is demuxed by sender id.
1592    #[test]
1593    fn test_split_channel_single() {
1594        let executor = deterministic::Runner::default();
1595        executor.start(|context| async move {
1596            let cfg = Config {
1597                max_size: MAX_MESSAGE_SIZE,
1598                disconnect_on_block: true,
1599                tracked_peer_sets: NZUsize!(3),
1600            };
1601            let (network, oracle) = Network::new(context.child("network"), cfg);
1602            network.start();
1603
1604            // Create a "twin" node that will be split, plus two normal peers
1605            let twin = ed25519::PrivateKey::from_seed(20).public_key();
1606            let peer_a = ed25519::PrivateKey::from_seed(21).public_key();
1607            let peer_b = ed25519::PrivateKey::from_seed(22).public_key();
1608
1609            // Register all peers
1610            let mut manager = oracle.manager();
1611            manager.track(
1612                0,
1613                Set::try_from([twin.clone(), peer_a.clone(), peer_b.clone()]).unwrap(),
1614            );
1615
1616            // Register normal peers
1617            let (mut peer_a_sender, mut peer_a_recv) = oracle
1618                .control(peer_a.clone())
1619                .register(0, TEST_QUOTA)
1620                .await
1621                .unwrap();
1622            let (mut peer_b_sender, mut peer_b_recv) = oracle
1623                .control(peer_b.clone())
1624                .register(0, TEST_QUOTA)
1625                .await
1626                .unwrap();
1627
1628            // Register and split the twin's channel:
1629            // - Primary sends only to peer_a
1630            // - Secondary sends only to peer_b
1631            // - Messages from peer_a go to primary receiver
1632            // - Messages from peer_b go to secondary receiver
1633            let (twin_sender, twin_receiver) = oracle
1634                .control(twin.clone())
1635                .register(0, TEST_QUOTA)
1636                .await
1637                .unwrap();
1638            let peer_a_for_router = peer_a.clone();
1639            let peer_b_for_router = peer_b.clone();
1640            let (mut twin_primary_sender, mut twin_secondary_sender) =
1641                twin_sender.split_with(move |origin, _, _| match origin {
1642                    SplitOrigin::Primary => Some(Recipients::One(peer_a_for_router.clone())),
1643                    SplitOrigin::Secondary => Some(Recipients::One(peer_b_for_router.clone())),
1644                });
1645            let peer_a_for_recv = peer_a.clone();
1646            let peer_b_for_recv = peer_b.clone();
1647            let (mut twin_primary_recv, mut twin_secondary_recv) =
1648                twin_receiver.split_with(context.child("split_receiver"), move |(sender, _)| {
1649                    if sender == &peer_a_for_recv {
1650                        SplitTarget::Primary
1651                    } else if sender == &peer_b_for_recv {
1652                        SplitTarget::Secondary
1653                    } else {
1654                        panic!("unexpected sender");
1655                    }
1656                });
1657
1658            // Establish bidirectional links
1659            let link = ingress::Link {
1660                latency: Duration::from_millis(0),
1661                jitter: Duration::from_millis(0),
1662                success_rate: 1.0,
1663            };
1664            oracle
1665                .add_link(peer_a.clone(), twin.clone(), link.clone())
1666                .await
1667                .unwrap();
1668            oracle
1669                .add_link(twin.clone(), peer_a.clone(), link.clone())
1670                .await
1671                .unwrap();
1672            oracle
1673                .add_link(peer_b.clone(), twin.clone(), link.clone())
1674                .await
1675                .unwrap();
1676            oracle
1677                .add_link(twin.clone(), peer_b.clone(), link.clone())
1678                .await
1679                .unwrap();
1680
1681            // Send messages in both directions
1682            peer_a_sender.send(Recipients::One(twin.clone()), b"from_a", false);
1683            peer_b_sender.send(Recipients::One(twin.clone()), b"from_b", false);
1684            twin_primary_sender.send(Recipients::All, b"primary_out", false);
1685            twin_secondary_sender.send(Recipients::All, b"secondary_out", false);
1686
1687            // Verify routing: peer_a messages go to primary, peer_b to secondary
1688            let (sender, payload) = twin_primary_recv.recv().await.unwrap();
1689            assert_eq!(sender, peer_a);
1690            assert_eq!(payload, b"from_a");
1691            let (sender, payload) = twin_secondary_recv.recv().await.unwrap();
1692            assert_eq!(sender, peer_b);
1693            assert_eq!(payload, b"from_b");
1694
1695            // Verify routing: primary sends to peer_a, secondary to peer_b
1696            let (sender, payload) = peer_a_recv.recv().await.unwrap();
1697            assert_eq!(sender, twin);
1698            assert_eq!(payload, b"primary_out");
1699            let (sender, payload) = peer_b_recv.recv().await.unwrap();
1700            assert_eq!(sender, twin);
1701            assert_eq!(payload, b"secondary_out");
1702        });
1703    }
1704
1705    /// When both split halves use [`SplitTarget::Both`], a single inbound message is delivered to primary and secondary receivers.
1706    #[test]
1707    fn test_split_channel_both() {
1708        let executor = deterministic::Runner::default();
1709        executor.start(|context| async move {
1710            let cfg = Config {
1711                max_size: MAX_MESSAGE_SIZE,
1712                disconnect_on_block: true,
1713                tracked_peer_sets: NZUsize!(3),
1714            };
1715            let (network, oracle) = Network::new(context.child("network"), cfg);
1716            network.start();
1717
1718            // Create a "twin" node that will be split, plus a third peer
1719            let twin = ed25519::PrivateKey::from_seed(30).public_key();
1720            let peer_c = ed25519::PrivateKey::from_seed(31).public_key();
1721
1722            // Register all peers
1723            let mut manager = oracle.manager();
1724            manager.track(0, Set::try_from([twin.clone(), peer_c.clone()]).unwrap());
1725
1726            // Register normal peer
1727            let (mut peer_c_sender, _peer_c_recv) = oracle
1728                .control(peer_c.clone())
1729                .register(0, TEST_QUOTA)
1730                .await
1731                .unwrap();
1732
1733            // Register and split the twin's channel with a router that sends to Both
1734            let (twin_sender, twin_receiver) = oracle
1735                .control(twin.clone())
1736                .register(0, TEST_QUOTA)
1737                .await
1738                .unwrap();
1739            let (_twin_primary_sender, _twin_secondary_sender) =
1740                twin_sender.split_with(|_origin, recipients, _| Some(recipients.clone()));
1741            let (mut twin_primary_recv, mut twin_secondary_recv) = twin_receiver
1742                .split_with(context.child("split_receiver_both"), |_| SplitTarget::Both);
1743
1744            // Establish bidirectional links
1745            let link = ingress::Link {
1746                latency: Duration::from_millis(0),
1747                jitter: Duration::from_millis(0),
1748                success_rate: 1.0,
1749            };
1750            oracle
1751                .add_link(peer_c.clone(), twin.clone(), link.clone())
1752                .await
1753                .unwrap();
1754            oracle
1755                .add_link(twin.clone(), peer_c.clone(), link)
1756                .await
1757                .unwrap();
1758
1759            // Send a message from peer_c to twin
1760            peer_c_sender.send(Recipients::One(twin.clone()), b"to_both", false);
1761
1762            // Verify both receivers get the message
1763            let (sender, payload) = twin_primary_recv.recv().await.unwrap();
1764            assert_eq!(sender, peer_c);
1765            assert_eq!(payload, b"to_both");
1766            let (sender, payload) = twin_secondary_recv.recv().await.unwrap();
1767            assert_eq!(sender, peer_c);
1768            assert_eq!(payload, b"to_both");
1769        });
1770    }
1771
1772    /// [`SplitTarget::None`] and a send router returning `None` drop traffic: inbound is not
1773    /// delivered to either half, and outbound sends report no recipients after the split forwarder
1774    /// drops them.
1775    #[test]
1776    fn test_split_channel_none() {
1777        let executor = deterministic::Runner::default();
1778        executor.start(|context| async move {
1779            let cfg = Config {
1780                max_size: MAX_MESSAGE_SIZE,
1781                disconnect_on_block: true,
1782                tracked_peer_sets: NZUsize!(3),
1783            };
1784            let (network, oracle) = Network::new(context.child("network"), cfg);
1785            network.start();
1786
1787            // Create a "twin" node that will be split, plus a third peer
1788            let twin = ed25519::PrivateKey::from_seed(30).public_key();
1789            let peer_c = ed25519::PrivateKey::from_seed(31).public_key();
1790
1791            // Register all peers
1792            let mut manager = oracle.manager();
1793            manager.track(0, Set::try_from([twin.clone(), peer_c.clone()]).unwrap());
1794
1795            // Register normal peer
1796            let (mut peer_c_sender, _peer_c_recv) = oracle
1797                .control(peer_c.clone())
1798                .register(0, TEST_QUOTA)
1799                .await
1800                .unwrap();
1801
1802            // Register and split the twin's channel with a router that sends to Both
1803            let (twin_sender, twin_receiver) = oracle
1804                .control(twin.clone())
1805                .register(0, TEST_QUOTA)
1806                .await
1807                .unwrap();
1808            let (mut twin_primary_sender, mut twin_secondary_sender) =
1809                twin_sender.split_with(|_origin, _, _| None);
1810            let (mut twin_primary_recv, mut twin_secondary_recv) = twin_receiver
1811                .split_with(context.child("split_receiver_both"), |_| SplitTarget::None);
1812
1813            // Establish bidirectional links
1814            let link = ingress::Link {
1815                latency: Duration::from_millis(0),
1816                jitter: Duration::from_millis(0),
1817                success_rate: 1.0,
1818            };
1819            oracle
1820                .add_link(peer_c.clone(), twin.clone(), link.clone())
1821                .await
1822                .unwrap();
1823            oracle
1824                .add_link(twin.clone(), peer_c.clone(), link)
1825                .await
1826                .unwrap();
1827
1828            // Send a message from peer_c to twin
1829            let sent = peer_c_sender.send(Recipients::One(twin.clone()), b"to_both", false);
1830            assert_eq!(sent.len(), 1);
1831            assert_eq!(sent[0], twin);
1832
1833            // Verify both receivers get the message
1834            context.sleep(Duration::from_millis(100)).await;
1835            assert!(twin_primary_recv.recv().now_or_never().is_none());
1836            assert!(twin_secondary_recv.recv().now_or_never().is_none());
1837
1838            // Send a message from twin to peer_c
1839            let sent = twin_primary_sender.send(Recipients::One(peer_c.clone()), b"to_both", false);
1840            assert!(sent.is_empty());
1841
1842            // Send a message from twin to peer_c
1843            let sent =
1844                twin_secondary_sender.send(Recipients::One(peer_c.clone()), b"to_both", false);
1845            assert!(sent.is_empty());
1846        });
1847    }
1848
1849    /// [`Manager::track`] indices may arrive out of order: older indices are ignored; subscribers see updates in commit order
1850    /// and [`PeerSetUpdate::all`] accumulates primaries across applied sets.
1851    #[test]
1852    fn test_unordered_peer_sets() {
1853        let executor = deterministic::Runner::default();
1854        executor.start(|context| async move {
1855            let cfg = Config {
1856                max_size: MAX_MESSAGE_SIZE,
1857                disconnect_on_block: true,
1858                tracked_peer_sets: NZUsize!(3),
1859            };
1860            let (network, oracle) = Network::new(context.child("network"), cfg);
1861            network.start();
1862
1863            // Create two public keys
1864            let pk1 = ed25519::PrivateKey::from_seed(1).public_key();
1865            let pk2 = ed25519::PrivateKey::from_seed(2).public_key();
1866
1867            // Subscribe to peer sets
1868            let mut manager = oracle.manager();
1869            let mut subscription = manager.subscribe().await;
1870
1871            // Register initial peer set
1872            manager.track(10, Set::try_from([pk1.clone(), pk2.clone()]).unwrap());
1873            let update = subscription.recv().await.unwrap();
1874            assert_eq!(update.index, 10);
1875            assert_eq!(update.latest.primary.len(), 2);
1876            assert!(update.latest.secondary.is_empty());
1877            assert_eq!(update.all.primary.len(), 2);
1878            assert!(update.all.secondary.is_empty());
1879
1880            // Register old peer sets (ignored)
1881            let pk3 = ed25519::PrivateKey::from_seed(3).public_key();
1882            manager.track(9, Set::try_from([pk3.clone()]).unwrap());
1883
1884            // Add new peer set
1885            let pk4 = ed25519::PrivateKey::from_seed(4).public_key();
1886            manager.track(11, Set::try_from([pk4.clone()]).unwrap());
1887            let update = subscription.recv().await.unwrap();
1888            assert_eq!(update.index, 11);
1889            assert_eq!(update.latest.primary, Set::try_from([pk4.clone()]).unwrap());
1890            assert!(update.latest.secondary.is_empty());
1891            assert_eq!(update.all.primary, Set::try_from([pk1, pk2, pk4]).unwrap());
1892            assert!(update.all.secondary.is_empty());
1893        });
1894    }
1895
1896    /// [`PeerSetUpdate::all`] uses primary-wins across *tracked* indices: a peer who is primary in one
1897    /// peer set and secondary in another is listed only under `all.primary` (not in `all.secondary`).
1898    #[test]
1899    fn test_peer_set_update_all_cross_index_primary_wins() {
1900        let executor = deterministic::Runner::default();
1901        executor.start(|context| async move {
1902            let cfg = Config {
1903                max_size: MAX_MESSAGE_SIZE,
1904                disconnect_on_block: true,
1905                tracked_peer_sets: NZUsize!(3),
1906            };
1907            let (network, oracle) = Network::new(context.child("network"), cfg);
1908            network.start();
1909
1910            let pk_a = ed25519::PrivateKey::from_seed(21).public_key();
1911            let pk_b = ed25519::PrivateKey::from_seed(22).public_key();
1912            // Appears as primary in set 10 and (redundantly) as secondary in set 11.
1913            let pk_overlap = ed25519::PrivateKey::from_seed(23).public_key();
1914            // Secondary-only in set 11; should still appear under aggregate secondary.
1915            let pk_sec = ed25519::PrivateKey::from_seed(24).public_key();
1916
1917            let mut manager = oracle.manager();
1918            let mut subscription = manager.subscribe().await;
1919
1920            manager.track(
1921                10,
1922                TrackedPeers::new(
1923                    Set::try_from([pk_a.clone(), pk_overlap.clone()]).unwrap(),
1924                    Set::default(),
1925                ),
1926            );
1927            let _ = subscription.recv().await.unwrap();
1928
1929            manager.track(
1930                11,
1931                TrackedPeers::new(
1932                    Set::try_from([pk_b.clone()]).unwrap(),
1933                    Set::try_from([pk_overlap.clone(), pk_sec.clone()]).unwrap(),
1934                ),
1935            );
1936            let update = subscription.recv().await.unwrap();
1937            assert_eq!(update.index, 11);
1938
1939            assert_eq!(
1940                update.latest.primary,
1941                Set::try_from([pk_b.clone()]).unwrap()
1942            );
1943            // At index 11 alone, pk_overlap is secondary-only (primary at 11 is pk_b).
1944            assert!(update.latest.secondary.position(&pk_overlap).is_some());
1945            assert!(update.latest.secondary.position(&pk_sec).is_some());
1946
1947            // Across tracked sets: pk_overlap is primary in set 10 -> aggregate lists them only under primary.
1948            assert!(update.all.primary.position(&pk_a).is_some());
1949            assert!(update.all.primary.position(&pk_b).is_some());
1950            assert!(update.all.primary.position(&pk_overlap).is_some());
1951            assert!(
1952                update.all.secondary.position(&pk_overlap).is_none(),
1953                "aggregate secondary must omit peers who have any primary membership"
1954            );
1955            assert!(update.all.secondary.position(&pk_sec).is_some());
1956        });
1957    }
1958
1959    /// [`Network::get_next_socket`] hands out the current address then advances port, wrapping IPv4 and port at boundaries.
1960    #[test]
1961    fn test_get_next_socket() {
1962        let cfg = Config {
1963            max_size: MAX_MESSAGE_SIZE,
1964            disconnect_on_block: true,
1965            tracked_peer_sets: NZUsize!(1),
1966        };
1967        let runner = deterministic::Runner::default();
1968
1969        runner.start(|context| async move {
1970            type PublicKey = ed25519::PublicKey;
1971            let (mut network, _) =
1972                Network::<deterministic::Context, PublicKey>::new(context.child("network"), cfg);
1973
1974            // Test that the next socket address is incremented correctly
1975            let mut original = network.next_addr;
1976            let next = network.get_next_socket();
1977            assert_eq!(next, original);
1978            let next = network.get_next_socket();
1979            original.set_port(1);
1980            assert_eq!(next, original);
1981
1982            // Test that the port number overflows correctly
1983            let max_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(255, 0, 255, 255)), 65535);
1984            network.next_addr = max_addr;
1985            let next = network.get_next_socket();
1986            assert_eq!(next, max_addr);
1987            let next = network.get_next_socket();
1988            assert_eq!(
1989                next,
1990                SocketAddr::new(IpAddr::V4(Ipv4Addr::new(255, 1, 0, 0)), 0)
1991            );
1992        });
1993    }
1994
1995    /// Many sequential sends to one recipient arrive in order when symmetric per-link bandwidth limits apply.
1996    #[test]
1997    fn test_fifo_burst_same_recipient() {
1998        let cfg = Config {
1999            max_size: MAX_MESSAGE_SIZE,
2000            disconnect_on_block: true,
2001            tracked_peer_sets: NZUsize!(3),
2002        };
2003        let runner = deterministic::Runner::default();
2004
2005        runner.start(|context| async move {
2006            let (network, oracle) = Network::new(context.child("network"), cfg);
2007            let network_handle = network.start();
2008
2009            let sender_pk = ed25519::PrivateKey::from_seed(10).public_key();
2010            let recipient_pk = ed25519::PrivateKey::from_seed(11).public_key();
2011
2012            let mut manager = oracle.manager();
2013            manager.track(
2014                0,
2015                Set::try_from([sender_pk.clone(), recipient_pk.clone()]).unwrap(),
2016            );
2017            let (mut sender, _sender_recv) = oracle
2018                .control(sender_pk.clone())
2019                .register(0, TEST_QUOTA)
2020                .await
2021                .unwrap();
2022            let (_sender2, mut receiver) = oracle
2023                .control(recipient_pk.clone())
2024                .register(0, TEST_QUOTA)
2025                .await
2026                .unwrap();
2027
2028            oracle
2029                .limit_bandwidth(sender_pk.clone(), Some(5_000), None)
2030                .await
2031                .unwrap();
2032            oracle
2033                .limit_bandwidth(recipient_pk.clone(), None, Some(5_000))
2034                .await
2035                .unwrap();
2036
2037            oracle
2038                .add_link(
2039                    sender_pk.clone(),
2040                    recipient_pk.clone(),
2041                    ingress::Link {
2042                        latency: Duration::from_millis(0),
2043                        jitter: Duration::from_millis(0),
2044                        success_rate: 1.0,
2045                    },
2046                )
2047                .await
2048                .unwrap();
2049
2050            const COUNT: usize = 50;
2051            let mut expected = Vec::with_capacity(COUNT);
2052            for i in 0..COUNT {
2053                let msg = vec![i as u8; 64];
2054                sender
2055                    .check(Recipients::One(recipient_pk.clone()))
2056                    .unwrap()
2057                    .send(msg.clone(), false);
2058                expected.push(msg);
2059            }
2060
2061            for expected_msg in expected {
2062                let (_pk, bytes) = receiver.recv().await.unwrap();
2063                assert_eq!(bytes, expected_msg.as_slice());
2064            }
2065
2066            drop(oracle);
2067            drop(sender);
2068            network_handle.abort();
2069        });
2070    }
2071
2072    /// [`Recipients::All`] to two links shares the sender cap: both deliveries are delayed in line with the shared bandwidth model,
2073    /// not delivered back-to-back.
2074    #[test]
2075    fn test_broadcast_respects_transmit_latency() {
2076        let cfg = Config {
2077            max_size: MAX_MESSAGE_SIZE,
2078            disconnect_on_block: true,
2079            tracked_peer_sets: NZUsize!(3),
2080        };
2081        let runner = deterministic::Runner::default();
2082
2083        runner.start(|context| async move {
2084            let (network, oracle) = Network::new(context.child("network"), cfg);
2085            let network_handle = network.start();
2086
2087            let sender_pk = ed25519::PrivateKey::from_seed(42).public_key();
2088            let recipient_a = ed25519::PrivateKey::from_seed(43).public_key();
2089            let recipient_b = ed25519::PrivateKey::from_seed(44).public_key();
2090
2091            let mut manager = oracle.manager();
2092            manager.track(
2093                0,
2094                Set::try_from([sender_pk.clone(), recipient_a.clone(), recipient_b.clone()])
2095                    .unwrap(),
2096            );
2097            let (mut sender, _recv_sender) = oracle
2098                .control(sender_pk.clone())
2099                .register(0, TEST_QUOTA)
2100                .await
2101                .unwrap();
2102            let (_sender2, mut recv_a) = oracle
2103                .control(recipient_a.clone())
2104                .register(0, TEST_QUOTA)
2105                .await
2106                .unwrap();
2107            let (_sender3, mut recv_b) = oracle
2108                .control(recipient_b.clone())
2109                .register(0, TEST_QUOTA)
2110                .await
2111                .unwrap();
2112
2113            oracle
2114                .limit_bandwidth(sender_pk.clone(), Some(1_000), None)
2115                .await
2116                .unwrap();
2117            oracle
2118                .limit_bandwidth(recipient_a.clone(), None, Some(1_000))
2119                .await
2120                .unwrap();
2121            oracle
2122                .limit_bandwidth(recipient_b.clone(), None, Some(1_000))
2123                .await
2124                .unwrap();
2125
2126            let link = ingress::Link {
2127                latency: Duration::from_millis(0),
2128                jitter: Duration::from_millis(0),
2129                success_rate: 1.0,
2130            };
2131            oracle
2132                .add_link(sender_pk.clone(), recipient_a.clone(), link.clone())
2133                .await
2134                .unwrap();
2135            oracle
2136                .add_link(sender_pk.clone(), recipient_b.clone(), link)
2137                .await
2138                .unwrap();
2139
2140            let big_msg = vec![7u8; 10_000];
2141            let start = send_when_ready(
2142                &context,
2143                &mut sender,
2144                Recipients::All,
2145                2,
2146                big_msg.clone(),
2147                false,
2148            )
2149            .await;
2150
2151            let (_pk, received_a) = recv_a.recv().await.unwrap();
2152            assert_eq!(received_a, big_msg.as_slice());
2153            let elapsed_a = context.current().duration_since(start).unwrap();
2154            assert!(elapsed_a >= Duration::from_secs(20));
2155
2156            let (_pk, received_b) = recv_b.recv().await.unwrap();
2157            assert_eq!(received_b, big_msg.as_slice());
2158            let elapsed_b = context.current().duration_since(start).unwrap();
2159            assert!(elapsed_b >= Duration::from_secs(20));
2160
2161            // Because bandwidth is shared, the two messages should take about the same time
2162            assert!(elapsed_a.abs_diff(elapsed_b) <= Duration::from_secs(1));
2163
2164            drop(oracle);
2165            drop(sender);
2166            network_handle.abort();
2167        });
2168    }
2169
2170    /// A peer listed in both primary and secondary appears only in [`PeerSetUpdate::latest`] primary; aggregate secondary omits
2171    /// primary keys. [`Recipients::All`] from another peer lists the overlap peer once and still reaches secondary-only peers.
2172    #[test]
2173    fn test_overlapping_primary_secondary_no_duplicate_recipients() {
2174        let executor = deterministic::Runner::default();
2175        executor.start(|context| async move {
2176            let cfg = Config {
2177                max_size: MAX_MESSAGE_SIZE,
2178                disconnect_on_block: true,
2179                tracked_peer_sets: NZUsize!(3),
2180            };
2181            let (network, oracle) = Network::new(context.child("network"), cfg);
2182            network.start();
2183
2184            let pk1 = ed25519::PrivateKey::from_seed(1).public_key();
2185            let pk2 = ed25519::PrivateKey::from_seed(2).public_key();
2186            let pk3 = ed25519::PrivateKey::from_seed(3).public_key();
2187
2188            let mut manager = oracle.manager();
2189            manager.track(
2190                0,
2191                TrackedPeers::new(
2192                    Set::try_from([pk1.clone(), pk2.clone()]).unwrap(),
2193                    Set::try_from([pk2.clone(), pk3.clone()]).unwrap(),
2194                ),
2195            );
2196
2197            let mut updates = manager.subscribe().await;
2198            let update = updates.recv().await.unwrap();
2199            assert_eq!(update.index, 0);
2200            assert!(update.latest.primary.position(&pk2).is_some());
2201            assert!(
2202                update.latest.secondary.position(&pk2).is_none(),
2203                "overlap peer must not appear in latest.secondary"
2204            );
2205            assert!(update.latest.secondary.position(&pk3).is_some());
2206            assert!(update.all.primary.position(&pk2).is_some());
2207            assert!(
2208                update.all.secondary.position(&pk2).is_none(),
2209                "aggregate secondary must not list peers who are primary"
2210            );
2211            assert!(update.all.secondary.position(&pk3).is_some());
2212
2213            let link = ingress::Link {
2214                latency: Duration::from_millis(1),
2215                jitter: Duration::ZERO,
2216                success_rate: 1.0,
2217            };
2218            for (a, b) in [(&pk1, &pk2), (&pk1, &pk3), (&pk2, &pk3)] {
2219                oracle
2220                    .add_link(a.clone(), b.clone(), link.clone())
2221                    .await
2222                    .unwrap();
2223            }
2224
2225            let (mut sender1, _) = oracle
2226                .control(pk1.clone())
2227                .register(0, TEST_QUOTA)
2228                .await
2229                .unwrap();
2230            let (_, mut recv2) = oracle
2231                .control(pk2.clone())
2232                .register(0, TEST_QUOTA)
2233                .await
2234                .unwrap();
2235            let (_, mut recv3) = oracle
2236                .control(pk3.clone())
2237                .register(0, TEST_QUOTA)
2238                .await
2239                .unwrap();
2240
2241            let msg = vec![42u8; 10];
2242            let checked = sender1.check(Recipients::All).unwrap();
2243            let sent_to = crate::CheckedSender::recipients(&checked);
2244            checked.send(msg.clone(), true);
2245
2246            let pk2_count = sent_to.iter().filter(|pk| *pk == &pk2).count();
2247            assert_eq!(pk2_count, 1, "pk2 received duplicate sends");
2248            assert!(sent_to.iter().any(|pk| pk == &pk3));
2249
2250            context.sleep(Duration::from_millis(10)).await;
2251            let (from2, data2) = recv2.recv().await.unwrap();
2252            assert_eq!(from2, pk1);
2253            assert_eq!(data2, msg.as_slice());
2254            let (from3, data3) = recv3.recv().await.unwrap();
2255            assert_eq!(from3, pk1);
2256            assert_eq!(data3, msg.as_slice());
2257            assert!(recv2.recv().now_or_never().is_none());
2258        });
2259    }
2260
2261    /// A peer can be demoted from primary to secondary across tracked peer set indices.
2262    /// After the old primary-containing set is evicted, the peer is purely secondary.
2263    #[test]
2264    fn test_demotion_from_primary_to_secondary() {
2265        let executor = deterministic::Runner::default();
2266        executor.start(|context| async move {
2267            let cfg = Config {
2268                max_size: 1024,
2269                disconnect_on_block: true,
2270                tracked_peer_sets: NZUsize!(2),
2271            };
2272            let (network, oracle) = Network::new(context.child("network"), cfg);
2273            network.start();
2274
2275            let pk_x = ed25519::PrivateKey::from_seed(1).public_key();
2276            let pk_y = ed25519::PrivateKey::from_seed(2).public_key();
2277
2278            let mut manager = oracle.manager();
2279            let mut sub = manager.subscribe().await;
2280
2281            // Index 0: X is primary, Y is secondary.
2282            manager.track(
2283                0,
2284                TrackedPeers::new(
2285                    Set::try_from([pk_x.clone()]).unwrap(),
2286                    Set::try_from([pk_y.clone()]).unwrap(),
2287                ),
2288            );
2289
2290            let update = sub.recv().await.unwrap();
2291            assert!(update.all.primary.position(&pk_x).is_some());
2292            assert!(update.all.secondary.position(&pk_y).is_some());
2293
2294            // Index 1: X is demoted to secondary, Y is promoted to primary.
2295            manager.track(
2296                1,
2297                TrackedPeers::new(
2298                    Set::try_from([pk_y.clone()]).unwrap(),
2299                    Set::try_from([pk_x.clone()]).unwrap(),
2300                ),
2301            );
2302
2303            // Both indices retained: both peers are primary somewhere -> aggregate primary.
2304            let update = sub.recv().await.unwrap();
2305            assert!(update.all.primary.position(&pk_x).is_some());
2306            assert!(update.all.primary.position(&pk_y).is_some());
2307            assert!(update.all.secondary.is_empty());
2308
2309            // Index 2: same as index 1. Evicts index 0.
2310            manager.track(
2311                2,
2312                TrackedPeers::new(
2313                    Set::try_from([pk_y.clone()]).unwrap(),
2314                    Set::try_from([pk_x.clone()]).unwrap(),
2315                ),
2316            );
2317
2318            // Index 0 evicted. X is now purely secondary.
2319            let update = sub.recv().await.unwrap();
2320            assert!(update.all.primary.position(&pk_y).is_some());
2321            assert!(update.all.secondary.position(&pk_x).is_some());
2322            assert!(update.all.primary.position(&pk_x).is_none());
2323        });
2324    }
2325
2326    /// After advancing tracked peer sets, secondaries from an older snapshot remain addressable until evicted from history:
2327    /// a new primary can still reach them, while a newer-only primary does not receive messages intended for that tracked secondary view.
2328    #[test]
2329    fn test_secondary_sets_remain_until_eviction() {
2330        let executor = deterministic::Runner::default();
2331        executor.start(|context| async move {
2332            let cfg = Config {
2333                max_size: MAX_MESSAGE_SIZE,
2334                disconnect_on_block: true,
2335                tracked_peer_sets: NZUsize!(2),
2336            };
2337            let (network, oracle) = Network::new(context.child("network"), cfg);
2338            network.start();
2339
2340            let primary_0 = ed25519::PrivateKey::from_seed(1).public_key();
2341            let primary_1 = ed25519::PrivateKey::from_seed(2).public_key();
2342            let primary_2 = ed25519::PrivateKey::from_seed(3).public_key();
2343            let secondary_0 = ed25519::PrivateKey::from_seed(4).public_key();
2344            let secondary_1 = ed25519::PrivateKey::from_seed(5).public_key();
2345
2346            let mut manager = oracle.manager();
2347            manager.track(
2348                0,
2349                TrackedPeers::new(
2350                    Set::try_from([primary_0.clone()]).unwrap(),
2351                    Set::try_from([secondary_0.clone()]).unwrap(),
2352                ),
2353            );
2354            manager.track(
2355                1,
2356                TrackedPeers::new(
2357                    Set::try_from([primary_1.clone()]).unwrap(),
2358                    Set::try_from([secondary_1.clone()]).unwrap(),
2359                ),
2360            );
2361
2362            let link = ingress::Link {
2363                latency: Duration::from_millis(1),
2364                jitter: Duration::ZERO,
2365                success_rate: 1.0,
2366            };
2367            oracle
2368                .add_link(primary_1.clone(), secondary_0.clone(), link.clone())
2369                .await
2370                .unwrap();
2371            oracle
2372                .add_link(primary_1.clone(), secondary_1.clone(), link.clone())
2373                .await
2374                .unwrap();
2375
2376            let (mut sender_1, _) = oracle
2377                .control(primary_1.clone())
2378                .register(0, TEST_QUOTA)
2379                .await
2380                .unwrap();
2381            let (_, mut receiver_0) = oracle
2382                .control(secondary_0.clone())
2383                .register(0, TEST_QUOTA)
2384                .await
2385                .unwrap();
2386            let (_, mut receiver_1) = oracle
2387                .control(secondary_1.clone())
2388                .register(0, TEST_QUOTA)
2389                .await
2390                .unwrap();
2391
2392            let msg_1 = vec![1u8; 8];
2393            sender_1
2394                .check(Recipients::Some(vec![
2395                    secondary_0.clone(),
2396                    secondary_1.clone(),
2397                ]))
2398                .unwrap()
2399                .send(msg_1.clone(), true);
2400            assert_eq!(receiver_0.recv().await.unwrap().1, msg_1.as_slice());
2401            assert_eq!(receiver_1.recv().await.unwrap().1, msg_1.as_slice());
2402
2403            crate::Manager::track(
2404                &mut manager,
2405                2,
2406                TrackedPeers::primary([primary_2.clone()].try_into().unwrap()),
2407            );
2408            oracle
2409                .add_link(primary_2.clone(), secondary_0.clone(), link.clone())
2410                .await
2411                .unwrap();
2412            oracle
2413                .add_link(primary_2.clone(), secondary_1.clone(), link)
2414                .await
2415                .unwrap();
2416
2417            let (mut sender_2, _) = oracle
2418                .control(primary_2)
2419                .register(0, TEST_QUOTA)
2420                .await
2421                .unwrap();
2422
2423            let msg_2 = vec![2u8; 8];
2424            sender_2
2425                .check(Recipients::Some(vec![
2426                    secondary_0.clone(),
2427                    secondary_1.clone(),
2428                ]))
2429                .unwrap()
2430                .send(msg_2.clone(), true);
2431            assert!(receiver_0.recv().now_or_never().is_none());
2432            assert_eq!(receiver_1.recv().await.unwrap().1, msg_2.as_slice());
2433        });
2434    }
2435}