Skip to main content

iroh_gossip/
net.rs

1//! Networking for the `iroh-gossip` protocol
2
3use std::{
4    collections::{hash_map::Entry, BTreeSet, HashMap, HashSet, VecDeque},
5    net::SocketAddr,
6    pin::Pin,
7    sync::Arc,
8    task::{Context, Poll},
9};
10
11use bytes::Bytes;
12use futures_concurrency::stream::{stream_group, StreamGroup};
13use futures_util::FutureExt as _;
14use iroh::{
15    endpoint::Connection,
16    protocol::{AcceptError, ProtocolHandler},
17    Endpoint, EndpointAddr, EndpointId, PublicKey, RelayUrl, Watcher,
18};
19use irpc::WithChannels;
20use n0_error::{e, stack_error};
21use n0_future::{
22    task::{self, AbortOnDropHandle, JoinSet},
23    time::Instant,
24    Stream, StreamExt as _,
25};
26use rand::{rngs::StdRng, SeedableRng};
27use serde::{Deserialize, Serialize};
28use tokio::sync::{broadcast, mpsc, oneshot};
29use tokio_util::sync::CancellationToken;
30use tracing::{debug, error, error_span, trace, warn, Instrument};
31
32use self::{
33    address_lookup::GossipAddressLookup,
34    util::{RecvLoop, SendLoop, Timers},
35};
36use crate::{
37    api::{self, Command, Event, GossipApi, RpcMessage},
38    metrics::Metrics,
39    proto::{self, HyparviewConfig, PeerData, PlumtreeConfig, Scope, TopicId},
40};
41
42mod address_lookup;
43mod util;
44
45/// ALPN protocol name
46pub const GOSSIP_ALPN: &[u8] = b"/iroh-gossip/1";
47
48/// Channel capacity for the send queue (one per connection)
49const SEND_QUEUE_CAP: usize = 64;
50/// Channel capacity for the ToActor message queue (single)
51const TO_ACTOR_CAP: usize = 64;
52/// Channel capacity for the InEvent message queue (single)
53const IN_EVENT_CAP: usize = 1024;
54/// Channel capacity for broadcast subscriber event queue (one per topic)
55const TOPIC_EVENT_CAP: usize = 256;
56
57/// Events emitted from the gossip protocol
58pub type ProtoEvent = proto::Event<PublicKey>;
59/// Commands for the gossip protocol
60pub type ProtoCommand = proto::Command<PublicKey>;
61
62type InEvent = proto::InEvent<PublicKey>;
63type OutEvent = proto::OutEvent<PublicKey>;
64type Timer = proto::Timer<PublicKey>;
65type ProtoMessage = proto::Message<PublicKey>;
66
67/// Publish and subscribe on gossiping topics.
68///
69/// Each topic is a separate broadcast tree with separate memberships.
70///
71/// A topic has to be joined before you can publish or subscribe on the topic.
72/// To join the swarm for a topic, you have to know the [`PublicKey`] of at least one peer that also joined the topic.
73///
74/// Messages published on the swarm will be delivered to all peers that joined the swarm for that
75/// topic. You will also be relaying (gossiping) messages published by other peers.
76///
77/// With the default settings, the protocol will maintain up to 5 peer connections per topic.
78///
79/// Even though the [`Gossip`] is created from a [`Endpoint`], it does not accept connections
80/// itself. You should run an accept loop on the [`Endpoint`] yourself, check the ALPN protocol of incoming
81/// connections, and if the ALPN protocol equals [`GOSSIP_ALPN`], forward the connection to the
82/// gossip actor through [Self::handle_connection].
83///
84/// The gossip actor will, however, initiate new connections to other peers by itself.
85#[derive(Debug, Clone)]
86pub struct Gossip {
87    pub(crate) inner: Arc<Inner>,
88}
89
90impl std::ops::Deref for Gossip {
91    type Target = GossipApi;
92    fn deref(&self) -> &Self::Target {
93        &self.inner.api
94    }
95}
96
97#[derive(Debug)]
98enum LocalActorMessage {
99    HandleConnection(Connection),
100    Shutdown { reply: oneshot::Sender<()> },
101}
102
103#[allow(missing_docs)]
104#[stack_error(derive, add_meta)]
105#[non_exhaustive]
106pub enum Error {
107    ActorDropped {},
108}
109
110impl<T> From<mpsc::error::SendError<T>> for Error {
111    fn from(_value: mpsc::error::SendError<T>) -> Self {
112        e!(Error::ActorDropped)
113    }
114}
115impl From<oneshot::error::RecvError> for Error {
116    fn from(_value: oneshot::error::RecvError) -> Self {
117        e!(Error::ActorDropped)
118    }
119}
120
121#[derive(Debug)]
122pub(crate) struct Inner {
123    api: GossipApi,
124    local_tx: mpsc::Sender<LocalActorMessage>,
125    _actor_handle: AbortOnDropHandle<()>,
126    max_message_size: usize,
127    metrics: Arc<Metrics>,
128}
129
130impl ProtocolHandler for Gossip {
131    async fn accept(&self, connection: Connection) -> Result<(), AcceptError> {
132        self.handle_connection(connection)
133            .await
134            .map_err(AcceptError::from_err)?;
135        Ok(())
136    }
137
138    async fn shutdown(&self) {
139        if let Err(err) = self.shutdown().await {
140            warn!("error while shutting down gossip: {err:#}");
141        }
142    }
143}
144
145/// Builder to configure and construct [`Gossip`].
146#[derive(Debug, Clone)]
147pub struct Builder {
148    config: proto::Config,
149    alpn: Option<Bytes>,
150}
151
152impl Builder {
153    /// Sets the maximum message size in bytes.
154    /// By default this is `4096` bytes.
155    pub fn max_message_size(mut self, size: usize) -> Self {
156        self.config.max_message_size = size;
157        self
158    }
159
160    /// Set the membership configuration.
161    pub fn membership_config(mut self, config: HyparviewConfig) -> Self {
162        self.config.membership = config;
163        self
164    }
165
166    /// Set the broadcast configuration.
167    pub fn broadcast_config(mut self, config: PlumtreeConfig) -> Self {
168        self.config.broadcast = config;
169        self
170    }
171
172    /// Set the ALPN this gossip instance uses.
173    ///
174    /// It has to be the same for all peers in the network. If you set a custom ALPN,
175    /// you have to use the same ALPN when registering the [`Gossip`] in on a iroh
176    /// router with [`RouterBuilder::accept`].
177    ///
178    /// [`RouterBuilder::accept`]: iroh::protocol::RouterBuilder::accept
179    pub fn alpn(mut self, alpn: impl AsRef<[u8]>) -> Self {
180        self.alpn = Some(alpn.as_ref().to_vec().into());
181        self
182    }
183
184    /// Spawn a gossip actor and get a handle for it
185    pub fn spawn(self, endpoint: Endpoint) -> Gossip {
186        let metrics = Arc::new(Metrics::default());
187        let address_lookup = GossipAddressLookup::default();
188
189        // `Endpoint::address_lookup` returns `Err` when the endpoint is closed.
190        // In that case, the gossip actor will close too very soon for other reasons,
191        // so it's fine if we only add our `GossipAddressLookup` for the non-closed
192        // case. The alternative would be to return a `Result` from `spawn`,
193        // but as long as this is the only direct error case, it seem unwarranted.
194        if let Ok(endpoint_addr_lookup) = endpoint.address_lookup().as_ref() {
195            endpoint_addr_lookup.add(address_lookup.clone());
196        }
197        let (actor, rpc_tx, local_tx) = Actor::new(
198            endpoint,
199            self.config,
200            metrics.clone(),
201            self.alpn,
202            address_lookup,
203        );
204        let me = actor.endpoint.id().fmt_short();
205        let max_message_size = actor.state.max_message_size();
206
207        let actor_handle = task::spawn(actor.run().instrument(error_span!("gossip", %me)));
208
209        let api = GossipApi::local(rpc_tx);
210
211        Gossip {
212            inner: Inner {
213                api,
214                local_tx,
215                _actor_handle: AbortOnDropHandle::new(actor_handle),
216                max_message_size,
217                metrics,
218            }
219            .into(),
220        }
221    }
222}
223
224impl Gossip {
225    /// Creates a default `Builder`, with the endpoint set.
226    pub fn builder() -> Builder {
227        Builder {
228            config: Default::default(),
229            alpn: None,
230        }
231    }
232
233    /// Listen on a noq endpoint for incoming RPC connections.
234    #[cfg(feature = "rpc")]
235    pub async fn listen(self, endpoint: noq::Endpoint) {
236        self.inner.api.listen(endpoint).await
237    }
238
239    /// Get the maximum message size configured for this gossip actor.
240    pub fn max_message_size(&self) -> usize {
241        self.inner.max_message_size
242    }
243
244    /// Handle an incoming [`Connection`].
245    ///
246    /// Make sure to check the ALPN protocol yourself before passing the connection.
247    pub async fn handle_connection(&self, conn: Connection) -> Result<(), Error> {
248        self.inner
249            .local_tx
250            .send(LocalActorMessage::HandleConnection(conn))
251            .await?;
252        Ok(())
253    }
254
255    /// Shutdown the gossip instance.
256    ///
257    /// This leaves all topics, sending `Disconnect` messages to peers, and then
258    /// stops the gossip actor loop and drops all state and connections.
259    pub async fn shutdown(&self) -> Result<(), Error> {
260        let (reply, reply_rx) = oneshot::channel();
261        self.inner
262            .local_tx
263            .send(LocalActorMessage::Shutdown { reply })
264            .await?;
265        reply_rx.await?;
266        Ok(())
267    }
268
269    /// Returns the metrics tracked for this gossip instance.
270    pub fn metrics(&self) -> &Arc<Metrics> {
271        &self.inner.metrics
272    }
273}
274
275/// Actor that sends and handles messages between the connection and main state loops
276struct Actor {
277    alpn: Bytes,
278    /// Protocol state
279    state: proto::State<PublicKey, StdRng>,
280    /// The endpoint through which we dial peers
281    endpoint: Endpoint,
282    /// Dial machine to connect to peers
283    dialer: Dialer,
284    /// Input messages to the actor
285    rpc_rx: mpsc::Receiver<RpcMessage>,
286    local_rx: mpsc::Receiver<LocalActorMessage>,
287    /// Sender for the state input (cloned into the connection loops)
288    in_event_tx: mpsc::Sender<InEvent>,
289    /// Input events to the state (emitted from the connection loops)
290    in_event_rx: mpsc::Receiver<InEvent>,
291    /// Queued timers
292    timers: Timers<Timer>,
293    /// Map of topics to their state.
294    topics: HashMap<TopicId, TopicState>,
295    /// Map of peers to their state.
296    peers: HashMap<EndpointId, PeerState>,
297    /// Stream of commands from topic handles.
298    command_rx: stream_group::Keyed<TopicCommandStream>,
299    /// Internal queue of topic to close because all handles were dropped.
300    quit_queue: VecDeque<TopicId>,
301    /// Tasks for the connection loops, to keep track of panics.
302    connection_tasks: JoinSet<(EndpointId, Connection, Result<(), ConnectionLoopError>)>,
303    metrics: Arc<Metrics>,
304    topic_event_forwarders: JoinSet<TopicId>,
305    address_lookup: GossipAddressLookup,
306}
307
308impl Actor {
309    fn new(
310        endpoint: Endpoint,
311        config: proto::Config,
312        metrics: Arc<Metrics>,
313        alpn: Option<Bytes>,
314        address_lookup: GossipAddressLookup,
315    ) -> (
316        Self,
317        mpsc::Sender<RpcMessage>,
318        mpsc::Sender<LocalActorMessage>,
319    ) {
320        let peer_id = endpoint.id();
321        let dialer = Dialer::new(endpoint.clone());
322        let state = proto::State::new(
323            peer_id,
324            Default::default(),
325            config,
326            rand::rngs::StdRng::from_rng(&mut rand::rng()),
327        );
328        let (rpc_tx, rpc_rx) = mpsc::channel(TO_ACTOR_CAP);
329        let (local_tx, local_rx) = mpsc::channel(16);
330        let (in_event_tx, in_event_rx) = mpsc::channel(IN_EVENT_CAP);
331
332        let actor = Actor {
333            alpn: alpn.unwrap_or_else(|| GOSSIP_ALPN.to_vec().into()),
334            endpoint,
335            state,
336            dialer,
337            rpc_rx,
338            in_event_rx,
339            in_event_tx,
340            timers: Timers::new(),
341            command_rx: StreamGroup::new().keyed(),
342            peers: Default::default(),
343            topics: Default::default(),
344            quit_queue: Default::default(),
345            connection_tasks: Default::default(),
346            metrics,
347            local_rx,
348            topic_event_forwarders: Default::default(),
349            address_lookup,
350        };
351
352        (actor, rpc_tx, local_tx)
353    }
354
355    pub async fn run(mut self) {
356        let mut addr_update_stream = self.setup().await;
357
358        let mut i = 0;
359        while self.event_loop(&mut addr_update_stream, i).await {
360            i += 1;
361        }
362    }
363
364    /// Performs the initial actor setup to run the [`Actor::event_loop`].
365    ///
366    /// This updates our current address and return it. It also returns the home relay stream and
367    /// direct addr stream.
368    async fn setup(&mut self) -> impl Stream<Item = EndpointAddr> + Send + Unpin + use<> {
369        let addr_update_stream = self.endpoint.watch_addr().stream();
370        let initial_addr = self.endpoint.addr();
371        self.handle_addr_update(initial_addr).await;
372        addr_update_stream
373    }
374
375    /// One event loop processing step.
376    ///
377    /// None is returned when no further processing should be performed.
378    async fn event_loop(
379        &mut self,
380        addr_updates: &mut (impl Stream<Item = EndpointAddr> + Send + Unpin),
381        i: usize,
382    ) -> bool {
383        self.metrics.actor_tick_main.inc();
384        tokio::select! {
385            biased;
386            conn = self.local_rx.recv() => {
387                match conn {
388                    Some(LocalActorMessage::Shutdown { reply }) => {
389                        debug!("received shutdown message, quit all topics");
390                        self.quit_queue.extend(self.topics.keys().copied());
391                        self.process_quit_queue().await;
392                        debug!("all topics quit, stop gossip actor");
393                        reply.send(()).ok();
394                        return false;
395                    },
396                    Some(LocalActorMessage::HandleConnection(conn)) => {
397                        self.handle_connection(conn.remote_id(), ConnOrigin::Accept, conn);
398                    }
399                    None => {
400                        debug!("all gossip handles dropped, stop gossip actor");
401                        return false;
402                    }
403                }
404            }
405            msg = self.rpc_rx.recv() => {
406                trace!(?i, "tick: to_actor_rx");
407                self.metrics.actor_tick_rx.inc();
408                match msg {
409                    Some(msg) => {
410                        self.handle_rpc_msg(msg, Instant::now()).await;
411                    }
412                    None => {
413                        debug!("all gossip handles dropped, stop gossip actor");
414                        return false;
415                    }
416                }
417            },
418            Some((key, (topic, command))) = self.command_rx.next(), if !self.command_rx.is_empty() => {
419                trace!(?i, "tick: command_rx");
420                self.handle_command(topic, key, command).await;
421            },
422            Some(new_address) = addr_updates.next() => {
423                trace!(?i, "tick: new_address");
424                self.metrics.actor_tick_endpoint.inc();
425                self.handle_addr_update(new_address).await;
426            }
427            (peer_id, res) = self.dialer.next_conn() => {
428                trace!(?i, "tick: dialer");
429                self.metrics.actor_tick_dialer.inc();
430                match res {
431                    Some(Ok(conn)) => {
432                        debug!(peer = %peer_id.fmt_short(), "dial successful");
433                        self.metrics.actor_tick_dialer_success.inc();
434                        self.handle_connection(peer_id, ConnOrigin::Dial, conn);
435                    }
436                    Some(Err(err)) => {
437                        warn!(peer = %peer_id.fmt_short(), "dial failed: {err}");
438                        self.metrics.actor_tick_dialer_failure.inc();
439                        let peer_state = self.peers.get(&peer_id);
440                        let is_active = matches!(peer_state, Some(PeerState::Active { .. }));
441                        if !is_active {
442                            self.handle_in_event(InEvent::PeerDisconnected(peer_id), Instant::now())
443                                .await;
444                        }
445                    }
446                    None => {
447                        warn!(peer = %peer_id.fmt_short(), "dial disconnected");
448                        self.metrics.actor_tick_dialer_failure.inc();
449                    }
450                }
451            }
452            event = self.in_event_rx.recv() => {
453                trace!(?i, "tick: in_event_rx");
454                self.metrics.actor_tick_in_event_rx.inc();
455                let event = event.expect("unreachable: in_event_tx is never dropped before receiver");
456                self.handle_in_event(event, Instant::now()).await;
457            }
458            _ = self.timers.wait_next() => {
459                trace!(?i, "tick: timers");
460                self.metrics.actor_tick_timers.inc();
461                let now = Instant::now();
462                while let Some((_instant, timer)) = self.timers.pop_before(now) {
463                    self.handle_in_event(InEvent::TimerExpired(timer), now).await;
464                }
465            }
466            Some(res) = self.connection_tasks.join_next(), if !self.connection_tasks.is_empty() => {
467                trace!(?i, "tick: connection_tasks");
468                let (peer_id, conn, result) = res.expect("connection task panicked");
469                self.handle_connection_task_finished(peer_id, conn, result).await;
470            }
471            Some(res) = self.topic_event_forwarders.join_next(), if !self.topic_event_forwarders.is_empty() => {
472                let topic_id = res.expect("topic event forwarder panicked");
473                if let Some(state) = self.topics.get_mut(&topic_id) {
474                    if !state.still_needed() {
475                        self.quit_queue.push_back(topic_id);
476                        self.process_quit_queue().await;
477                    }
478                }
479            }
480        }
481
482        true
483    }
484
485    async fn handle_addr_update(&mut self, endpoint_addr: EndpointAddr) {
486        // let peer_data = our_peer_data(&self.endpoint, current_addresses);
487        let peer_data = encode_peer_data(&endpoint_addr.into());
488        self.handle_in_event(InEvent::UpdatePeerData(peer_data), Instant::now())
489            .await
490    }
491
492    async fn handle_command(
493        &mut self,
494        topic: TopicId,
495        key: stream_group::Key,
496        command: Option<Command>,
497    ) {
498        debug!(?topic, ?key, ?command, "handle command");
499        let Some(state) = self.topics.get_mut(&topic) else {
500            // TODO: unreachable?
501            warn!("received command for unknown topic");
502            return;
503        };
504        match command {
505            Some(command) => {
506                let command = match command {
507                    Command::Broadcast(message) => ProtoCommand::Broadcast(message, Scope::Swarm),
508                    Command::BroadcastNeighbors(message) => {
509                        ProtoCommand::Broadcast(message, Scope::Neighbors)
510                    }
511                    Command::JoinPeers(peers) => ProtoCommand::Join(peers),
512                };
513                self.handle_in_event(proto::InEvent::Command(topic, command), Instant::now())
514                    .await;
515            }
516            None => {
517                state.command_rx_keys.remove(&key);
518                if !state.still_needed() {
519                    self.quit_queue.push_back(topic);
520                    self.process_quit_queue().await;
521                }
522            }
523        }
524    }
525
526    fn handle_connection(&mut self, peer_id: EndpointId, origin: ConnOrigin, conn: Connection) {
527        let (send_tx, send_rx) = mpsc::channel(SEND_QUEUE_CAP);
528        let conn_id = conn.stable_id();
529
530        let queue = match self.peers.entry(peer_id) {
531            Entry::Occupied(mut entry) => entry.get_mut().accept_conn(send_tx, conn_id),
532            Entry::Vacant(entry) => {
533                entry.insert(PeerState::Active {
534                    active_send_tx: send_tx,
535                    active_conn_id: conn_id,
536                    other_conns: Vec::new(),
537                });
538                Vec::new()
539            }
540        };
541
542        let max_message_size = self.state.max_message_size();
543        let in_event_tx = self.in_event_tx.clone();
544
545        // Spawn a task for this connection
546        self.connection_tasks.spawn(
547            async move {
548                let res = connection_loop(
549                    peer_id,
550                    conn.clone(),
551                    origin,
552                    send_rx,
553                    in_event_tx,
554                    max_message_size,
555                    queue,
556                )
557                .await;
558                (peer_id, conn, res)
559            }
560            .instrument(error_span!("conn", peer = %peer_id.fmt_short())),
561        );
562    }
563
564    #[tracing::instrument(name = "conn", skip_all, fields(peer = %peer_id.fmt_short()))]
565    async fn handle_connection_task_finished(
566        &mut self,
567        peer_id: EndpointId,
568        conn: Connection,
569        task_result: Result<(), ConnectionLoopError>,
570    ) {
571        if conn.close_reason().is_none() {
572            conn.close(0u32.into(), b"close from disconnect");
573        }
574        let reason = conn.close_reason().expect("just closed");
575        let error = task_result.err();
576        debug!(%reason, ?error, "connection closed");
577        if let Some(PeerState::Active {
578            active_conn_id,
579            other_conns,
580            ..
581        }) = self.peers.get_mut(&peer_id)
582        {
583            if conn.stable_id() == *active_conn_id {
584                debug!("active send connection closed, mark peer as disconnected");
585                self.handle_in_event(InEvent::PeerDisconnected(peer_id), Instant::now())
586                    .await;
587            } else {
588                other_conns.retain(|x| *x != conn.stable_id());
589                debug!("remaining {} other connections", other_conns.len() + 1);
590            }
591        } else {
592            debug!("peer already marked as disconnected");
593        }
594    }
595
596    async fn handle_rpc_msg(&mut self, msg: RpcMessage, now: Instant) {
597        trace!("handle to_actor  {msg:?}");
598        match msg {
599            RpcMessage::Join(msg) => {
600                let WithChannels {
601                    inner,
602                    rx,
603                    tx,
604                    // TODO(frando): make use of span?
605                    span: _,
606                } = msg;
607                let api::JoinRequest {
608                    topic_id,
609                    bootstrap,
610                } = inner;
611                let TopicState {
612                    neighbors,
613                    event_sender,
614                    command_rx_keys,
615                } = self.topics.entry(topic_id).or_default();
616                let mut sender_dead = false;
617                if !neighbors.is_empty() {
618                    for neighbor in neighbors.iter() {
619                        if let Err(_err) = tx.try_send(Event::NeighborUp(*neighbor)).await {
620                            sender_dead = true;
621                            break;
622                        }
623                    }
624                }
625
626                if !sender_dead {
627                    let fut =
628                        topic_subscriber_loop(tx, event_sender.subscribe()).map(move |_| topic_id);
629                    self.topic_event_forwarders
630                        .spawn(fut.instrument(tracing::Span::current()));
631                }
632                let command_rx = TopicCommandStream::new(topic_id, Box::pin(rx.into_stream()));
633                let key = self.command_rx.insert(command_rx);
634                command_rx_keys.insert(key);
635
636                self.handle_in_event(
637                    InEvent::Command(
638                        topic_id,
639                        ProtoCommand::Join(bootstrap.into_iter().collect()),
640                    ),
641                    now,
642                )
643                .await;
644            }
645        }
646    }
647
648    async fn handle_in_event(&mut self, event: InEvent, now: Instant) {
649        self.handle_in_event_inner(event, now).await;
650        self.process_quit_queue().await;
651    }
652
653    async fn process_quit_queue(&mut self) {
654        while let Some(topic_id) = self.quit_queue.pop_front() {
655            self.handle_in_event_inner(
656                InEvent::Command(topic_id, ProtoCommand::Quit),
657                Instant::now(),
658            )
659            .await;
660            if self.topics.remove(&topic_id).is_some() {
661                tracing::debug!(%topic_id, "publishers and subscribers gone; unsubscribing");
662            }
663        }
664    }
665
666    async fn handle_in_event_inner(&mut self, event: InEvent, now: Instant) {
667        if matches!(event, InEvent::TimerExpired(_)) {
668            trace!(?event, "handle in_event");
669        } else {
670            debug!(?event, "handle in_event");
671        };
672        let out = self.state.handle(event, now, Some(&self.metrics));
673        for event in out {
674            if matches!(event, OutEvent::ScheduleTimer(_, _)) {
675                trace!(?event, "handle out_event");
676            } else {
677                debug!(?event, "handle out_event");
678            };
679            match event {
680                OutEvent::SendMessage(peer_id, message) => {
681                    let state = self.peers.entry(peer_id).or_default();
682                    match state {
683                        PeerState::Active { active_send_tx, .. } => {
684                            if let Err(_err) = active_send_tx.send(message).await {
685                                // Removing the peer is handled by the in_event PeerDisconnected sent
686                                // in [`Self::handle_connection_task_finished`].
687                                warn!(
688                                    peer = %peer_id.fmt_short(),
689                                    "failed to send: connection task send loop terminated",
690                                );
691                            }
692                        }
693                        PeerState::Pending { queue } => {
694                            if queue.is_empty() {
695                                debug!(peer = %peer_id.fmt_short(), "start to dial");
696                                self.dialer.queue_dial(peer_id, self.alpn.clone());
697                            }
698                            queue.push(message);
699                        }
700                    }
701                }
702                OutEvent::EmitEvent(topic_id, event) => {
703                    let Some(state) = self.topics.get_mut(&topic_id) else {
704                        // TODO: unreachable?
705                        warn!(?topic_id, "gossip state emitted event for unknown topic");
706                        continue;
707                    };
708                    let TopicState {
709                        neighbors,
710                        event_sender,
711                        ..
712                    } = state;
713                    match &event {
714                        ProtoEvent::NeighborUp(neighbor) => {
715                            neighbors.insert(*neighbor);
716                        }
717                        ProtoEvent::NeighborDown(neighbor) => {
718                            neighbors.remove(neighbor);
719                        }
720                        _ => {}
721                    }
722                    event_sender.send(event).ok();
723                    if !state.still_needed() {
724                        self.quit_queue.push_back(topic_id);
725                    }
726                }
727                OutEvent::ScheduleTimer(delay, timer) => {
728                    self.timers.insert(now + delay, timer);
729                }
730                OutEvent::DisconnectPeer(peer_id) => {
731                    // signal disconnection by dropping the senders to the connection
732                    debug!(peer=%peer_id.fmt_short(), "gossip state indicates disconnect: drop peer");
733                    self.peers.remove(&peer_id);
734                }
735                OutEvent::PeerData(endpoint_id, data) => match decode_peer_data(&data) {
736                    Err(err) => warn!("Failed to decode {data:?} from {endpoint_id}: {err}"),
737                    Ok(info) => {
738                        debug!(peer = ?endpoint_id, "add known addrs: {info:?}");
739                        let mut endpoint_addr = EndpointAddr::new(endpoint_id);
740                        for addr in info.direct_addresses {
741                            endpoint_addr = endpoint_addr.with_ip_addr(addr);
742                        }
743                        if let Some(relay_url) = info.relay_url {
744                            endpoint_addr = endpoint_addr.with_relay_url(relay_url);
745                        }
746
747                        self.address_lookup.add(endpoint_addr);
748                    }
749                },
750            }
751        }
752    }
753}
754
755type ConnId = usize;
756
757#[derive(Debug)]
758enum PeerState {
759    Pending {
760        queue: Vec<ProtoMessage>,
761    },
762    Active {
763        active_send_tx: mpsc::Sender<ProtoMessage>,
764        active_conn_id: ConnId,
765        other_conns: Vec<ConnId>,
766    },
767}
768
769impl PeerState {
770    fn accept_conn(
771        &mut self,
772        send_tx: mpsc::Sender<ProtoMessage>,
773        conn_id: ConnId,
774    ) -> Vec<ProtoMessage> {
775        match self {
776            PeerState::Pending { queue } => {
777                let queue = std::mem::take(queue);
778                *self = PeerState::Active {
779                    active_send_tx: send_tx,
780                    active_conn_id: conn_id,
781                    other_conns: Vec::new(),
782                };
783                queue
784            }
785            PeerState::Active {
786                active_send_tx,
787                active_conn_id,
788                other_conns,
789            } => {
790                // We already have an active connection. We keep the old connection intact,
791                // but only use the new connection for sending from now on.
792                // By dropping the `send_tx` of the old connection, the send loop part of
793                // the `connection_loop` of the old connection will terminate, which will also
794                // notify the peer that the old connection may be dropped.
795                other_conns.push(*active_conn_id);
796                *active_send_tx = send_tx;
797                *active_conn_id = conn_id;
798                Vec::new()
799            }
800        }
801    }
802}
803
804impl Default for PeerState {
805    fn default() -> Self {
806        PeerState::Pending { queue: Vec::new() }
807    }
808}
809
810#[derive(Debug)]
811struct TopicState {
812    neighbors: BTreeSet<EndpointId>,
813    event_sender: broadcast::Sender<ProtoEvent>,
814    /// Keys identifying command receivers in [`Actor::command_rx`].
815    ///
816    /// This represents the receiver side of gossip's publish public API.
817    command_rx_keys: HashSet<stream_group::Key>,
818}
819
820impl Default for TopicState {
821    fn default() -> Self {
822        let (event_sender, _) = broadcast::channel(TOPIC_EVENT_CAP);
823        Self {
824            neighbors: Default::default(),
825            command_rx_keys: Default::default(),
826            event_sender,
827        }
828    }
829}
830
831impl TopicState {
832    /// Check if the topic still has any publisher or subscriber.
833    fn still_needed(&self) -> bool {
834        // Keep topic alive if either senders or receivers exist.
835        // Using || prevents topic closure when senders are dropped while receivers listen.
836        !self.command_rx_keys.is_empty() || self.event_sender.receiver_count() > 0
837    }
838
839    #[cfg(test)]
840    fn joined(&self) -> bool {
841        !self.neighbors.is_empty()
842    }
843}
844
845/// Whether a connection is initiated by us (Dial) or by the remote peer (Accept)
846#[derive(Debug, Clone, Copy, PartialEq, Eq)]
847enum ConnOrigin {
848    Accept,
849    Dial,
850}
851
852#[allow(missing_docs)]
853#[stack_error(derive, add_meta, from_sources, std_sources)]
854#[non_exhaustive]
855enum ConnectionLoopError {
856    #[error(transparent)]
857    Write {
858        source: self::util::WriteError,
859    },
860    #[error(transparent)]
861    Read {
862        source: self::util::ReadError,
863    },
864    #[error(transparent)]
865    Connection {
866        #[error(std_err)]
867        source: iroh::endpoint::ConnectionError,
868    },
869    ActorDropped {},
870}
871
872impl<T> From<mpsc::error::SendError<T>> for ConnectionLoopError {
873    fn from(_value: mpsc::error::SendError<T>) -> Self {
874        e!(ConnectionLoopError::ActorDropped)
875    }
876}
877
878async fn connection_loop(
879    from: PublicKey,
880    conn: Connection,
881    origin: ConnOrigin,
882    send_rx: mpsc::Receiver<ProtoMessage>,
883    in_event_tx: mpsc::Sender<InEvent>,
884    max_message_size: usize,
885    queue: Vec<ProtoMessage>,
886) -> Result<(), ConnectionLoopError> {
887    debug!(?origin, "connection established");
888
889    let mut send_loop = SendLoop::new(conn.clone(), send_rx, max_message_size);
890    let mut recv_loop = RecvLoop::new(from, conn, in_event_tx, max_message_size);
891
892    let send_fut = send_loop.run(queue).instrument(error_span!("send"));
893    let recv_fut = recv_loop.run().instrument(error_span!("recv"));
894
895    let (send_res, recv_res) = tokio::join!(send_fut, recv_fut);
896    send_res?;
897    recv_res?;
898    Ok(())
899}
900
901#[derive(Default, Debug, Clone, Serialize, Deserialize)]
902struct AddrInfo {
903    relay_url: Option<RelayUrl>,
904    direct_addresses: BTreeSet<SocketAddr>,
905}
906
907impl From<EndpointAddr> for AddrInfo {
908    fn from(endpoint_addr: EndpointAddr) -> Self {
909        Self {
910            relay_url: endpoint_addr.relay_urls().next().cloned(),
911            direct_addresses: endpoint_addr.ip_addrs().cloned().collect(),
912        }
913    }
914}
915
916fn encode_peer_data(info: &AddrInfo) -> PeerData {
917    let bytes = postcard::to_stdvec(info).expect("serializing AddrInfo may not fail");
918    PeerData::new(bytes)
919}
920
921fn decode_peer_data(peer_data: &PeerData) -> Result<AddrInfo, postcard::Error> {
922    let bytes = peer_data.as_bytes();
923    if bytes.is_empty() {
924        return Ok(AddrInfo::default());
925    }
926    let info = postcard::from_bytes(bytes)?;
927    Ok(info)
928}
929
930async fn topic_subscriber_loop(
931    sender: irpc::channel::mpsc::Sender<Event>,
932    mut topic_events: broadcast::Receiver<ProtoEvent>,
933) {
934    loop {
935        tokio::select! {
936           biased;
937           msg = topic_events.recv() => {
938               let event = match msg {
939                   Err(broadcast::error::RecvError::Closed) => break,
940                   Err(broadcast::error::RecvError::Lagged(_)) => Event::Lagged,
941                   Ok(event) => event.into(),
942               };
943               if sender.send(event).await.is_err() {
944                   break;
945               }
946           }
947           _ = sender.closed() => break,
948        }
949    }
950}
951
952/// A stream of commands for a gossip subscription.
953type BoxedCommandReceiver =
954    n0_future::stream::Boxed<Result<Command, irpc::channel::mpsc::RecvError>>;
955
956#[derive(derive_more::Debug)]
957struct TopicCommandStream {
958    topic_id: TopicId,
959    #[debug("CommandStream")]
960    stream: BoxedCommandReceiver,
961    closed: bool,
962}
963
964impl TopicCommandStream {
965    fn new(topic_id: TopicId, stream: BoxedCommandReceiver) -> Self {
966        Self {
967            topic_id,
968            stream,
969            closed: false,
970        }
971    }
972}
973
974impl Stream for TopicCommandStream {
975    type Item = (TopicId, Option<Command>);
976    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
977        if self.closed {
978            return Poll::Ready(None);
979        }
980        match Pin::new(&mut self.stream).poll_next(cx) {
981            Poll::Ready(Some(Ok(item))) => Poll::Ready(Some((self.topic_id, Some(item)))),
982            Poll::Ready(None) | Poll::Ready(Some(Err(_))) => {
983                self.closed = true;
984                Poll::Ready(Some((self.topic_id, None)))
985            }
986            Poll::Pending => Poll::Pending,
987        }
988    }
989}
990
991#[derive(Debug)]
992struct Dialer {
993    endpoint: Endpoint,
994    pending: JoinSet<(
995        EndpointId,
996        Option<Result<Connection, iroh::endpoint::ConnectError>>,
997    )>,
998    pending_dials: HashMap<EndpointId, CancellationToken>,
999}
1000
1001impl Dialer {
1002    /// Create a new dialer for a [`Endpoint`]
1003    fn new(endpoint: Endpoint) -> Self {
1004        Self {
1005            endpoint,
1006            pending: Default::default(),
1007            pending_dials: Default::default(),
1008        }
1009    }
1010
1011    /// Starts to dial a endpoint by [`EndpointId`].
1012    fn queue_dial(&mut self, endpoint_id: EndpointId, alpn: Bytes) {
1013        if self.is_pending(endpoint_id) {
1014            return;
1015        }
1016        let cancel = CancellationToken::new();
1017        self.pending_dials.insert(endpoint_id, cancel.clone());
1018        let endpoint = self.endpoint.clone();
1019        self.pending.spawn(
1020            async move {
1021                let res = tokio::select! {
1022                    biased;
1023                    _ = cancel.cancelled() => None,
1024                    res = endpoint.connect(endpoint_id, &alpn) => Some(res),
1025                };
1026                (endpoint_id, res)
1027            }
1028            .instrument(tracing::Span::current()),
1029        );
1030    }
1031
1032    /// Checks if a endpoint is currently being dialed.
1033    fn is_pending(&self, endpoint: EndpointId) -> bool {
1034        self.pending_dials.contains_key(&endpoint)
1035    }
1036
1037    /// Waits for the next dial operation to complete.
1038    /// `None` means disconnected
1039    async fn next_conn(
1040        &mut self,
1041    ) -> (
1042        EndpointId,
1043        Option<Result<Connection, iroh::endpoint::ConnectError>>,
1044    ) {
1045        match self.pending_dials.is_empty() {
1046            false => {
1047                let (endpoint_id, res) = loop {
1048                    match self.pending.join_next().await {
1049                        Some(Ok((endpoint_id, res))) => {
1050                            self.pending_dials.remove(&endpoint_id);
1051                            break (endpoint_id, res);
1052                        }
1053                        Some(Err(e)) => {
1054                            error!("next conn error: {:?}", e);
1055                        }
1056                        None => {
1057                            error!("no more pending conns available");
1058                            std::future::pending().await
1059                        }
1060                    }
1061                };
1062
1063                (endpoint_id, res)
1064            }
1065            true => std::future::pending().await,
1066        }
1067    }
1068}
1069
1070#[cfg(test)]
1071pub(crate) mod tests {
1072    use std::time::Duration;
1073
1074    use bytes::Bytes;
1075    use futures_concurrency::future::TryJoin;
1076    use iroh::{
1077        address_lookup::memory::MemoryLookup,
1078        endpoint::{presets, BindError},
1079        protocol::Router,
1080        tls::CaRootsConfig,
1081        RelayMap, RelayMode, SecretKey,
1082    };
1083    use n0_error::{AnyError, Result, StdResultExt};
1084    use n0_tracing_test::traced_test;
1085    use rand::{CryptoRng, RngExt};
1086    use tokio::{spawn, time::timeout};
1087    use tokio_util::sync::CancellationToken;
1088    use tracing::{info, instrument};
1089
1090    use super::*;
1091    use crate::api::{ApiError, GossipReceiver, GossipSender};
1092
1093    struct ManualActorLoop {
1094        actor: Actor,
1095        step: usize,
1096    }
1097
1098    impl std::ops::Deref for ManualActorLoop {
1099        type Target = Actor;
1100
1101        fn deref(&self) -> &Self::Target {
1102            &self.actor
1103        }
1104    }
1105
1106    impl std::ops::DerefMut for ManualActorLoop {
1107        fn deref_mut(&mut self) -> &mut Self::Target {
1108            &mut self.actor
1109        }
1110    }
1111
1112    type EndpointHandle = tokio::task::JoinHandle<Result<()>>;
1113
1114    impl ManualActorLoop {
1115        #[instrument(skip_all, fields(me = %actor.endpoint.id().fmt_short()))]
1116        async fn new(mut actor: Actor) -> Self {
1117            let _ = actor.setup().await;
1118            Self { actor, step: 0 }
1119        }
1120
1121        #[instrument(skip_all, fields(me = %self.endpoint.id().fmt_short()))]
1122        async fn step(&mut self) -> bool {
1123            let ManualActorLoop { actor, step } = self;
1124            *step += 1;
1125            // ignore updates that change our published address. This gives us better control over
1126            // events since the endpoint it no longer emitting changes
1127            let addr_update_stream = &mut futures_lite::stream::pending();
1128            actor.event_loop(addr_update_stream, *step).await
1129        }
1130
1131        async fn steps(&mut self, n: usize) {
1132            for _ in 0..n {
1133                self.step().await;
1134            }
1135        }
1136
1137        async fn finish(mut self) {
1138            while self.step().await {}
1139        }
1140    }
1141
1142    impl Gossip {
1143        /// Creates a testing gossip instance and its actor without spawning it.
1144        ///
1145        /// This creates the endpoint and spawns the endpoint loop as well. The handle for the
1146        /// endpoing task is returned along the gossip instance and actor. Since the actor is not
1147        /// actually spawned as [`Builder::spawn`] would, the gossip instance will have a
1148        /// handle to a dummy task instead.
1149        async fn t_new_with_actor(
1150            rng: &mut rand::rngs::ChaCha12Rng,
1151            config: proto::Config,
1152            relay_map: RelayMap,
1153            cancel: &CancellationToken,
1154        ) -> Result<(Self, Actor, EndpointHandle), BindError> {
1155            let endpoint = create_endpoint(rng, relay_map, None).await?;
1156            let metrics = Arc::new(Metrics::default());
1157            let address_lookup = GossipAddressLookup::default();
1158            endpoint
1159                .address_lookup()
1160                .expect("endpoint is not closed")
1161                .add(address_lookup.clone());
1162
1163            let (actor, to_actor_tx, conn_tx) =
1164                Actor::new(endpoint, config, metrics.clone(), None, address_lookup);
1165            let max_message_size = actor.state.max_message_size();
1166
1167            let _actor_handle =
1168                AbortOnDropHandle::new(task::spawn(futures_lite::future::pending()));
1169            let gossip = Self {
1170                inner: Inner {
1171                    api: GossipApi::local(to_actor_tx),
1172                    local_tx: conn_tx,
1173                    _actor_handle,
1174                    max_message_size,
1175                    metrics,
1176                }
1177                .into(),
1178            };
1179
1180            let endpoint_task = task::spawn(endpoint_loop(
1181                actor.endpoint.clone(),
1182                gossip.clone(),
1183                cancel.child_token(),
1184            ));
1185
1186            Ok((gossip, actor, endpoint_task))
1187        }
1188
1189        /// Crates a new testing gossip instance with the normal actor loop.
1190        async fn t_new(
1191            rng: &mut rand::rngs::ChaCha12Rng,
1192            config: proto::Config,
1193            relay_map: RelayMap,
1194            cancel: &CancellationToken,
1195        ) -> Result<(Self, Endpoint, EndpointHandle, impl Drop + use<>), BindError> {
1196            let (g, actor, ep_handle) =
1197                Gossip::t_new_with_actor(rng, config, relay_map, cancel).await?;
1198            let ep = actor.endpoint.clone();
1199            let me = ep.id().fmt_short();
1200            let actor_handle =
1201                task::spawn(actor.run().instrument(tracing::error_span!("gossip", %me)));
1202            Ok((g, ep, ep_handle, AbortOnDropHandle::new(actor_handle)))
1203        }
1204    }
1205
1206    pub(crate) async fn create_endpoint(
1207        rng: &mut rand::rngs::ChaCha12Rng,
1208        relay_map: RelayMap,
1209        memory_lookup: Option<MemoryLookup>,
1210    ) -> Result<Endpoint, BindError> {
1211        let ep = Endpoint::builder(presets::Minimal)
1212            .relay_mode(RelayMode::Custom(relay_map))
1213            .secret_key(SecretKey::from_bytes(&rng.random()))
1214            .alpns(vec![GOSSIP_ALPN.to_vec()])
1215            .ca_roots_config(CaRootsConfig::insecure_skip_verify())
1216            .bind()
1217            .await?;
1218
1219        if let Some(memory_lookup) = memory_lookup {
1220            ep.address_lookup()
1221                .expect("endpoint is not closed")
1222                .add(memory_lookup);
1223        }
1224        ep.online().await;
1225        Ok(ep)
1226    }
1227
1228    async fn endpoint_loop(
1229        endpoint: Endpoint,
1230        gossip: Gossip,
1231        cancel: CancellationToken,
1232    ) -> Result<()> {
1233        loop {
1234            tokio::select! {
1235                biased;
1236                _ = cancel.cancelled() => break,
1237                incoming = endpoint.accept() => match incoming {
1238                    None => break,
1239                    Some(incoming) => {
1240                        let connecting = match incoming.accept() {
1241                            Ok(connecting) => connecting,
1242                            Err(err) => {
1243                                warn!("incoming connection failed: {err:#}");
1244                                // we can carry on in these cases:
1245                                // this can be caused by retransmitted datagrams
1246                                continue;
1247                            }
1248                        };
1249                        let connection = connecting
1250                            .await
1251                            .std_context("await incoming connection")?;
1252                        gossip.handle_connection(connection).await?
1253                    }
1254                }
1255            }
1256        }
1257        Ok(())
1258    }
1259
1260    #[tokio::test]
1261    #[traced_test]
1262    async fn gossip_net_smoke() {
1263        let mut rng = rand::rngs::ChaCha12Rng::seed_from_u64(1);
1264        let (relay_map, relay_url, _guard) = iroh::test_utils::run_relay_server().await.unwrap();
1265
1266        let memory_lookup = MemoryLookup::new();
1267
1268        let ep1 = create_endpoint(&mut rng, relay_map.clone(), Some(memory_lookup.clone()))
1269            .await
1270            .unwrap();
1271        let ep2 = create_endpoint(&mut rng, relay_map.clone(), Some(memory_lookup.clone()))
1272            .await
1273            .unwrap();
1274        let ep3 = create_endpoint(&mut rng, relay_map.clone(), Some(memory_lookup.clone()))
1275            .await
1276            .unwrap();
1277
1278        let go1 = Gossip::builder().spawn(ep1.clone());
1279        let go2 = Gossip::builder().spawn(ep2.clone());
1280        let go3 = Gossip::builder().spawn(ep3.clone());
1281        debug!("peer1 {:?}", ep1.id());
1282        debug!("peer2 {:?}", ep2.id());
1283        debug!("peer3 {:?}", ep3.id());
1284        let pi1 = ep1.id();
1285        let pi2 = ep2.id();
1286
1287        let cancel = CancellationToken::new();
1288        let tasks = [
1289            spawn(endpoint_loop(ep1.clone(), go1.clone(), cancel.clone())),
1290            spawn(endpoint_loop(ep2.clone(), go2.clone(), cancel.clone())),
1291            spawn(endpoint_loop(ep3.clone(), go3.clone(), cancel.clone())),
1292        ];
1293
1294        debug!("----- adding peers  ----- ");
1295        let topic: TopicId = blake3::hash(b"foobar").into();
1296
1297        let addr1 = EndpointAddr::new(pi1).with_relay_url(relay_url.clone());
1298        let addr2 = EndpointAddr::new(pi2).with_relay_url(relay_url);
1299        memory_lookup.add_endpoint_info(addr1.clone());
1300        memory_lookup.add_endpoint_info(addr2.clone());
1301
1302        debug!("----- joining  ----- ");
1303        // join the topics and wait for the connection to succeed
1304        let [sub1, mut sub2, mut sub3] = [
1305            go1.subscribe_and_join(topic, vec![]),
1306            go2.subscribe_and_join(topic, vec![pi1]),
1307            go3.subscribe_and_join(topic, vec![pi2]),
1308        ]
1309        .try_join()
1310        .await
1311        .unwrap();
1312
1313        let (sink1, _stream1) = sub1.split();
1314
1315        let len = 2;
1316
1317        // publish messages on endpoint1
1318        let pub1 = spawn(async move {
1319            for i in 0..len {
1320                let message = format!("hi{i}");
1321                info!("go1 broadcast: {message:?}");
1322                sink1.broadcast(message.into_bytes().into()).await.unwrap();
1323                tokio::time::sleep(Duration::from_micros(1)).await;
1324            }
1325        });
1326
1327        // wait for messages on endpoint2
1328        let sub2 = spawn(async move {
1329            let mut recv = vec![];
1330            loop {
1331                let ev = sub2.next().await.unwrap().unwrap();
1332                info!("go2 event: {ev:?}");
1333                if let Event::Received(msg) = ev {
1334                    recv.push(msg.content);
1335                }
1336                if recv.len() == len {
1337                    return recv;
1338                }
1339            }
1340        });
1341
1342        // wait for messages on endpoint3
1343        let sub3 = spawn(async move {
1344            let mut recv = vec![];
1345            loop {
1346                let ev = sub3.next().await.unwrap().unwrap();
1347                info!("go3 event: {ev:?}");
1348                if let Event::Received(msg) = ev {
1349                    recv.push(msg.content);
1350                }
1351                if recv.len() == len {
1352                    return recv;
1353                }
1354            }
1355        });
1356
1357        timeout(Duration::from_secs(10), pub1)
1358            .await
1359            .unwrap()
1360            .unwrap();
1361        let recv2 = timeout(Duration::from_secs(10), sub2)
1362            .await
1363            .unwrap()
1364            .unwrap();
1365        let recv3 = timeout(Duration::from_secs(10), sub3)
1366            .await
1367            .unwrap()
1368            .unwrap();
1369
1370        // We assert the received messages, but not their order.
1371        // While commonly they will be received in-order, for go3 it may happen
1372        // that the second message arrives before the first one, because it managed to
1373        // forward-join go1 before the second message is published.
1374        let expected: HashSet<Bytes> = (0..len)
1375            .map(|i| Bytes::from(format!("hi{i}").into_bytes()))
1376            .collect();
1377        assert_eq!(HashSet::from_iter(recv2), expected);
1378        assert_eq!(HashSet::from_iter(recv3), expected);
1379
1380        cancel.cancel();
1381        for t in tasks {
1382            timeout(Duration::from_secs(10), t)
1383                .await
1384                .unwrap()
1385                .unwrap()
1386                .unwrap();
1387        }
1388    }
1389
1390    /// Test that when a gossip topic is no longer needed it's actually unsubscribed.
1391    ///
1392    /// This test will:
1393    /// - Create two endpoints, the first using manual event loop.
1394    /// - Subscribe both endpoints to the same topic. The first endpoint will subscribe twice and connect
1395    ///   to the second endpoint. The second endpoint will subscribe without bootstrap.
1396    /// - Ensure that the first endpoint removes the subscription iff all topic handles have been
1397    ///   dropped
1398    // NOTE: this is a regression test.
1399    #[tokio::test]
1400    #[traced_test]
1401    async fn subscription_cleanup() -> Result {
1402        let rng = &mut rand::rngs::ChaCha12Rng::seed_from_u64(1);
1403        let ct = CancellationToken::new();
1404        let (relay_map, relay_url, _guard) = iroh::test_utils::run_relay_server().await.unwrap();
1405
1406        // create the first endpoint with a manual actor loop
1407        let (go1, actor, ep1_handle) =
1408            Gossip::t_new_with_actor(rng, Default::default(), relay_map.clone(), &ct).await?;
1409        let mut actor = ManualActorLoop::new(actor).await;
1410
1411        // create the second endpoint with the usual actor loop
1412        let (go2, ep2, ep2_handle, _test_actor_handle) =
1413            Gossip::t_new(rng, Default::default(), relay_map, &ct).await?;
1414
1415        let endpoint_id1 = actor.endpoint.id();
1416        let endpoint_id2 = ep2.id();
1417        tracing::info!(
1418            endpoint_1 = %endpoint_id1.fmt_short(),
1419            endpoint_2 = %endpoint_id2.fmt_short(),
1420            "endpoints ready"
1421        );
1422
1423        let topic: TopicId = blake3::hash(b"subscription_cleanup").into();
1424        tracing::info!(%topic, "joining");
1425
1426        // create the tasks for each gossip instance:
1427        // - second endpoint subscribes once without bootstrap and listens to events
1428        // - first endpoint subscribes twice with the second endpoint as bootstrap. This is done on command
1429        //   from the main task (this)
1430
1431        // second endpoint
1432        let ct2 = ct.clone();
1433        let go2_task = async move {
1434            let (_pub_tx, mut sub_rx) = go2.subscribe_and_join(topic, vec![]).await?.split();
1435
1436            let subscribe_fut = async {
1437                while let Some(ev) = sub_rx.try_next().await? {
1438                    match ev {
1439                        Event::Lagged => tracing::debug!("missed some messages :("),
1440                        Event::Received(_) => unreachable!("test does not send messages"),
1441                        other => tracing::debug!(?other, "gs event"),
1442                    }
1443                }
1444
1445                tracing::debug!("subscribe stream ended");
1446                Ok::<_, AnyError>(())
1447            };
1448
1449            tokio::select! {
1450                _ = ct2.cancelled() => Ok(()),
1451                res = subscribe_fut => res,
1452            }
1453        }
1454        .instrument(tracing::debug_span!("endpoint_2", %endpoint_id2));
1455        let go2_handle = task::spawn(go2_task);
1456
1457        // first endpoint
1458        let addr2 = EndpointAddr::new(endpoint_id2).with_relay_url(relay_url);
1459        let memory_lookup = MemoryLookup::new();
1460        memory_lookup.add_endpoint_info(addr2);
1461        actor.endpoint.address_lookup()?.add(memory_lookup);
1462        // we use a channel to signal advancing steps to the task
1463        let (tx, mut rx) = mpsc::channel::<()>(1);
1464        let ct1 = ct.clone();
1465        let go1_task = async move {
1466            // first subscribe is done immediately
1467            tracing::info!("subscribing the first time");
1468            let sub_1a = go1.subscribe_and_join(topic, vec![endpoint_id2]).await?;
1469
1470            // wait for signal to subscribe a second time
1471            rx.recv().await.expect("signal for second subscribe");
1472            tracing::info!("subscribing a second time");
1473            let sub_1b = go1.subscribe_and_join(topic, vec![endpoint_id2]).await?;
1474            drop(sub_1a);
1475
1476            // wait for signal to drop the second handle as well
1477            rx.recv().await.expect("signal for second subscribe");
1478            tracing::info!("dropping all handles");
1479            drop(sub_1b);
1480
1481            // wait for cancellation
1482            ct1.cancelled().await;
1483            drop(go1);
1484
1485            Ok::<_, AnyError>(())
1486        }
1487        .instrument(tracing::debug_span!("endpoint_1", %endpoint_id1));
1488        let go1_handle = task::spawn(go1_task);
1489
1490        // advance and check that the topic is now subscribed
1491        actor.steps(3).await; // handle our subscribe;
1492                              // get peer connection;
1493                              // receive the other peer's information for a NeighborUp
1494        let state = actor.topics.get(&topic).expect("get registered topic");
1495        assert!(state.joined());
1496
1497        // signal the second subscribe, we should remain subscribed
1498        tx.send(())
1499            .await
1500            .std_context("signal additional subscribe")?;
1501        actor.steps(3).await; // subscribe; first receiver gone; first sender gone
1502        let state = actor.topics.get(&topic).expect("get registered topic");
1503        assert!(state.joined());
1504
1505        // signal to drop the second handle, the topic should no longer be subscribed
1506        tx.send(()).await.std_context("signal drop handles")?;
1507        actor.steps(2).await; // second receiver gone; second sender gone
1508        assert!(!actor.topics.contains_key(&topic));
1509
1510        // cleanup and ensure everything went as expected
1511        ct.cancel();
1512        let wait = Duration::from_secs(2);
1513        timeout(wait, ep1_handle)
1514            .await
1515            .std_context("wait endpoint1 task")?
1516            .std_context("join endpoint1 task")??;
1517        timeout(wait, ep2_handle)
1518            .await
1519            .std_context("wait endpoint2 task")?
1520            .std_context("join endpoint2 task")??;
1521        timeout(wait, go1_handle)
1522            .await
1523            .std_context("wait gossip1 task")?
1524            .std_context("join gossip1 task")??;
1525        timeout(wait, go2_handle)
1526            .await
1527            .std_context("wait gossip2 task")?
1528            .std_context("join gossip2 task")??;
1529        timeout(wait, actor.finish())
1530            .await
1531            .std_context("wait actor finish")?;
1532
1533        Ok(())
1534    }
1535
1536    /// Test that endpoints can reconnect to each other.
1537    ///
1538    /// This test will create two endpoints subscribed to the same topic. The second endpoint will
1539    /// unsubscribe and then resubscribe and connection between the endpoints should succeed both
1540    /// times.
1541    // NOTE: This is a regression test
1542    #[tokio::test]
1543    #[traced_test]
1544    async fn can_reconnect() -> Result {
1545        let rng = &mut rand::rngs::ChaCha12Rng::seed_from_u64(1);
1546        let ct = CancellationToken::new();
1547        let (relay_map, relay_url, _guard) = iroh::test_utils::run_relay_server().await.unwrap();
1548
1549        let (go1, ep1, ep1_handle, _test_actor_handle1) =
1550            Gossip::t_new(rng, Default::default(), relay_map.clone(), &ct).await?;
1551
1552        let (go2, ep2, ep2_handle, _test_actor_handle2) =
1553            Gossip::t_new(rng, Default::default(), relay_map, &ct).await?;
1554
1555        let endpoint_id1 = ep1.id();
1556        let endpoint_id2 = ep2.id();
1557        tracing::info!(
1558            endpoint_1 = %endpoint_id1.fmt_short(),
1559            endpoint_2 = %endpoint_id2.fmt_short(),
1560            "endpoints ready"
1561        );
1562
1563        let topic: TopicId = blake3::hash(b"can_reconnect").into();
1564        tracing::info!(%topic, "joining");
1565
1566        let ct2 = ct.child_token();
1567        // channel used to signal the second gossip instance to advance the test
1568        let (tx, mut rx) = mpsc::channel::<()>(1);
1569        let addr1 = EndpointAddr::new(endpoint_id1).with_relay_url(relay_url.clone());
1570        let memory_lookup = MemoryLookup::new();
1571        memory_lookup.add_endpoint_info(addr1);
1572        ep2.address_lookup()?.add(memory_lookup.clone());
1573        let go2_task = async move {
1574            let mut sub = go2.subscribe(topic, Vec::new()).await?;
1575            sub.joined().await?;
1576
1577            rx.recv().await.expect("signal to unsubscribe");
1578            tracing::info!("unsubscribing");
1579            drop(sub);
1580
1581            rx.recv().await.expect("signal to subscribe again");
1582            tracing::info!("resubscribing");
1583            let mut sub = go2.subscribe(topic, vec![endpoint_id1]).await?;
1584
1585            sub.joined().await?;
1586            tracing::info!("subscription successful!");
1587
1588            ct2.cancelled().await;
1589
1590            Ok::<_, ApiError>(())
1591        }
1592        .instrument(tracing::debug_span!("endpoint_2", %endpoint_id2));
1593        let go2_handle = task::spawn(go2_task);
1594
1595        let addr2 = EndpointAddr::new(endpoint_id2).with_relay_url(relay_url);
1596        memory_lookup.add_endpoint_info(addr2);
1597        ep1.address_lookup()?.add(memory_lookup);
1598
1599        let mut sub = go1.subscribe(topic, vec![endpoint_id2]).await?;
1600        // wait for subscribed notification
1601        sub.joined().await?;
1602
1603        // signal endpoint_2 to unsubscribe
1604        tx.send(()).await.std_context("signal unsubscribe")?;
1605
1606        // we should receive a Neighbor down event
1607        let conn_timeout = Duration::from_millis(500);
1608        let ev = timeout(conn_timeout, sub.try_next())
1609            .await
1610            .std_context("wait neighbor down")??;
1611        assert_eq!(ev, Some(Event::NeighborDown(endpoint_id2)));
1612        tracing::info!("endpoint 2 left");
1613
1614        // signal endpoint_2 to subscribe again
1615        tx.send(()).await.std_context("signal resubscribe")?;
1616
1617        let conn_timeout = Duration::from_millis(500);
1618        let ev = timeout(conn_timeout, sub.try_next())
1619            .await
1620            .std_context("wait neighbor up")??;
1621        assert_eq!(ev, Some(Event::NeighborUp(endpoint_id2)));
1622        tracing::info!("endpoint 2 rejoined!");
1623
1624        // cleanup and ensure everything went as expected
1625        ct.cancel();
1626        let wait = Duration::from_secs(2);
1627        timeout(wait, ep1_handle)
1628            .await
1629            .std_context("wait endpoint1 task")?
1630            .std_context("join endpoint1 task")??;
1631        timeout(wait, ep2_handle)
1632            .await
1633            .std_context("wait endpoint2 task")?
1634            .std_context("join endpoint2 task")??;
1635        timeout(wait, go2_handle)
1636            .await
1637            .std_context("wait gossip2 task")?
1638            .std_context("join gossip2 task")??;
1639
1640        Result::Ok(())
1641    }
1642
1643    #[tokio::test]
1644    #[traced_test]
1645    async fn can_die_and_reconnect() -> Result {
1646        /// Runs a future in a separate runtime on a separate thread, cancelling everything
1647        /// abruptly once `cancel` is invoked.
1648        fn run_in_thread<T: Send + 'static>(
1649            cancel: CancellationToken,
1650            fut: impl std::future::Future<Output = T> + Send + 'static,
1651        ) -> std::thread::JoinHandle<Option<T>> {
1652            std::thread::spawn(move || {
1653                let rt = tokio::runtime::Builder::new_current_thread()
1654                    .enable_all()
1655                    .build()
1656                    .unwrap();
1657                rt.block_on(async move { cancel.run_until_cancelled(fut).await })
1658            })
1659        }
1660
1661        /// Spawns a new endpoint and gossip instance.
1662        async fn spawn_gossip(
1663            secret_key: SecretKey,
1664            relay_map: RelayMap,
1665        ) -> Result<(Router, Gossip), BindError> {
1666            let ep = Endpoint::builder(presets::Minimal)
1667                .relay_mode(RelayMode::Custom(relay_map))
1668                .secret_key(secret_key)
1669                .ca_roots_config(CaRootsConfig::insecure_skip_verify())
1670                .bind()
1671                .await?;
1672            let gossip = Gossip::builder().spawn(ep.clone());
1673            let router = Router::builder(ep)
1674                .accept(GOSSIP_ALPN, gossip.clone())
1675                .spawn();
1676            Ok((router, gossip))
1677        }
1678
1679        /// Spawns a gossip endpoint, and broadcasts a single message, then sleep until cancelled externally.
1680        async fn broadcast_once(
1681            secret_key: SecretKey,
1682            relay_map: RelayMap,
1683            bootstrap_addr: EndpointAddr,
1684            topic_id: TopicId,
1685            message: String,
1686        ) -> Result {
1687            let (router, gossip) = spawn_gossip(secret_key, relay_map).await?;
1688            info!(endpoint_id = %router.endpoint().id().fmt_short(), "broadcast endpoint spawned");
1689            let bootstrap = vec![bootstrap_addr.id];
1690            let memory_lookup = MemoryLookup::new();
1691            memory_lookup.add_endpoint_info(bootstrap_addr);
1692            router.endpoint().address_lookup()?.add(memory_lookup);
1693            let mut topic = gossip.subscribe_and_join(topic_id, bootstrap).await?;
1694            topic.broadcast(message.as_bytes().to_vec().into()).await?;
1695            std::future::pending::<()>().await;
1696            Ok(())
1697        }
1698
1699        let (relay_map, _relay_url, _guard) = iroh::test_utils::run_relay_server().await.unwrap();
1700        let rng = &mut rand::rngs::ChaCha12Rng::seed_from_u64(1);
1701        let topic_id = TopicId::from_bytes(rng.random());
1702
1703        // spawn a gossip endpoint, send the endpoint's address on addr_tx,
1704        // then wait to receive `count` messages, and terminate.
1705        let (addr_tx, addr_rx) = tokio::sync::oneshot::channel();
1706        let (msgs_recv_tx, mut msgs_recv_rx) = tokio::sync::mpsc::channel(3);
1707        let recv_task = tokio::task::spawn({
1708            let relay_map = relay_map.clone();
1709            let secret_key = SecretKey::from_bytes(&rng.random());
1710            async move {
1711                let (router, gossip) = spawn_gossip(secret_key, relay_map).await?;
1712                // wait for the relay to be set. iroh currently has issues when trying
1713                // to immediately reconnect with changed direct addresses, but when the
1714                // relay path is available it works.
1715                // See https://github.com/n0-computer/iroh/pull/3372
1716                router.endpoint().online().await;
1717                let addr = router.endpoint().addr();
1718                info!(endpoint_id = %addr.id.fmt_short(), "recv endpoint spawned");
1719                addr_tx.send(addr).unwrap();
1720                let mut topic = gossip.subscribe_and_join(topic_id, vec![]).await?;
1721                while let Some(event) = topic.try_next().await.unwrap() {
1722                    if let Event::Received(message) = event {
1723                        let message = std::str::from_utf8(&message.content)
1724                            .std_context("decode broadcast message")?
1725                            .to_string();
1726                        msgs_recv_tx
1727                            .send(message)
1728                            .await
1729                            .std_context("forward received message")?;
1730                    }
1731                }
1732                Ok::<_, AnyError>(())
1733            }
1734        });
1735
1736        let endpoint0_addr = addr_rx.await.std_context("receive endpoint address")?;
1737        let max_wait = Duration::from_secs(5);
1738
1739        // spawn a endpoint, send a message, and then abruptly terminate the endpoint ungracefully
1740        // after the message was received on our receiver endpoint.
1741        let cancel = CancellationToken::new();
1742        let secret = SecretKey::from_bytes(&rng.random());
1743        let join_handle_1 = run_in_thread(
1744            cancel.clone(),
1745            broadcast_once(
1746                secret.clone(),
1747                relay_map.clone(),
1748                endpoint0_addr.clone(),
1749                topic_id,
1750                "msg1".to_string(),
1751            ),
1752        );
1753        // assert that we received the message on the receiver endpoint.
1754        let msg = timeout(max_wait, msgs_recv_rx.recv())
1755            .await
1756            .std_context("wait for first broadcast")?
1757            .std_context("receiver dropped channel")?;
1758        assert_eq!(&msg, "msg1");
1759        info!("kill broadcast endpoint");
1760        cancel.cancel();
1761
1762        // spawns the endpoint again with the same endpoint id, and send another message
1763        let cancel = CancellationToken::new();
1764        let join_handle_2 = run_in_thread(
1765            cancel.clone(),
1766            broadcast_once(
1767                secret.clone(),
1768                relay_map.clone(),
1769                endpoint0_addr.clone(),
1770                topic_id,
1771                "msg2".to_string(),
1772            ),
1773        );
1774        // assert that we received the message on the receiver endpoint.
1775        // this means that the reconnect with the same endpoint id worked.
1776        let msg = timeout(max_wait, msgs_recv_rx.recv())
1777            .await
1778            .std_context("wait for second broadcast")?
1779            .std_context("receiver dropped channel")?;
1780        assert_eq!(&msg, "msg2");
1781        info!("kill broadcast endpoint");
1782        cancel.cancel();
1783
1784        info!("kill recv endpoint");
1785        recv_task.abort();
1786        assert!(join_handle_1.join().unwrap().is_none());
1787        assert!(join_handle_2.join().unwrap().is_none());
1788
1789        Ok(())
1790    }
1791
1792    #[tokio::test]
1793    #[traced_test]
1794    async fn gossip_change_alpn() -> n0_error::Result<()> {
1795        let alpn = b"my-gossip-alpn";
1796        let topic_id = TopicId::from([0u8; 32]);
1797
1798        let ep1 = Endpoint::bind(presets::Minimal).await?;
1799        let ep2 = Endpoint::bind(presets::Minimal).await?;
1800        let gossip1 = Gossip::builder().alpn(alpn).spawn(ep1.clone());
1801        let gossip2 = Gossip::builder().alpn(alpn).spawn(ep2.clone());
1802        let router1 = Router::builder(ep1).accept(alpn, gossip1.clone()).spawn();
1803        let router2 = Router::builder(ep2).accept(alpn, gossip2.clone()).spawn();
1804
1805        let addr1 = router1.endpoint().addr();
1806        let id1 = addr1.id;
1807        let memory_lookup = MemoryLookup::new();
1808        memory_lookup.add_endpoint_info(addr1);
1809        router2.endpoint().address_lookup()?.add(memory_lookup);
1810
1811        let mut topic1 = gossip1.subscribe(topic_id, vec![]).await?;
1812        let mut topic2 = gossip2.subscribe(topic_id, vec![id1]).await?;
1813
1814        timeout(Duration::from_secs(3), topic1.joined())
1815            .await
1816            .std_context("wait topic1 join")??;
1817        timeout(Duration::from_secs(3), topic2.joined())
1818            .await
1819            .std_context("wait topic2 join")??;
1820        router1.shutdown().await.std_context("shutdown router1")?;
1821        router2.shutdown().await.std_context("shutdown router2")?;
1822        Ok(())
1823    }
1824
1825    #[tokio::test]
1826    #[traced_test]
1827    async fn gossip_rely_on_gossip_address_lookup() -> n0_error::Result<()> {
1828        let rng = &mut rand::rngs::ChaCha12Rng::seed_from_u64(1);
1829
1830        async fn spawn(
1831            rng: &mut impl CryptoRng,
1832        ) -> n0_error::Result<(EndpointId, Router, Gossip, GossipSender, GossipReceiver)> {
1833            let topic_id = TopicId::from([0u8; 32]);
1834            let ep = Endpoint::builder(presets::Minimal)
1835                .secret_key(SecretKey::from_bytes(&rng.random()))
1836                .bind()
1837                .await?;
1838            let endpoint_id = ep.id();
1839            let gossip = Gossip::builder().spawn(ep.clone());
1840            let router = Router::builder(ep)
1841                .accept(GOSSIP_ALPN, gossip.clone())
1842                .spawn();
1843            let topic = gossip.subscribe(topic_id, vec![]).await?;
1844            let (sender, receiver) = topic.split();
1845            Ok((endpoint_id, router, gossip, sender, receiver))
1846        }
1847
1848        // spawn 3 endpoints without relay or address lookup
1849        let (n1, r1, _g1, _tx1, mut rx1) = spawn(rng).await?;
1850        let (n2, r2, _g2, tx2, mut rx2) = spawn(rng).await?;
1851        let (n3, r3, _g3, tx3, mut rx3) = spawn(rng).await?;
1852
1853        println!("endpoints {:?}", [n1, n2, n3]);
1854
1855        // create a mem lookup that has only endpoint 1 addr info set
1856        let addr1 = r1.endpoint().addr();
1857        let lookup = MemoryLookup::new();
1858        lookup.add_endpoint_info(addr1);
1859
1860        // add addr info of endpoint1 to endpoint2 and join endpoint1
1861        r2.endpoint().address_lookup()?.add(lookup.clone());
1862        tx2.join_peers(vec![n1]).await?;
1863
1864        // await join endpoint2 -> nodde1
1865        timeout(Duration::from_secs(3), rx1.joined())
1866            .await
1867            .std_context("wait rx1 join")??;
1868        timeout(Duration::from_secs(3), rx2.joined())
1869            .await
1870            .std_context("wait rx2 join")??;
1871
1872        // add addr info of endpoint1 to endpoint3 and join endpoint1
1873        r3.endpoint().address_lookup()?.add(lookup.clone());
1874        tx3.join_peers(vec![n1]).await?;
1875
1876        // await join at endpoint3: n1 and n2
1877        // n2 only works because because we use gossip address lookup!
1878        let ev = timeout(Duration::from_secs(3), rx3.next())
1879            .await
1880            .std_context("wait rx3 first neighbor")?;
1881        assert!(matches!(ev, Some(Ok(Event::NeighborUp(_)))));
1882        let ev = timeout(Duration::from_secs(3), rx3.next())
1883            .await
1884            .std_context("wait rx3 second neighbor")?;
1885        assert!(matches!(ev, Some(Ok(Event::NeighborUp(_)))));
1886
1887        assert_eq!(sorted(rx3.neighbors()), sorted([n1, n2]));
1888
1889        let ev = timeout(Duration::from_secs(3), rx2.next())
1890            .await
1891            .std_context("wait rx2 neighbor")?;
1892        assert!(matches!(ev, Some(Ok(Event::NeighborUp(n))) if n == n3));
1893
1894        let ev = timeout(Duration::from_secs(3), rx1.next())
1895            .await
1896            .std_context("wait rx1 neighbor")?;
1897        assert!(matches!(ev, Some(Ok(Event::NeighborUp(n))) if n == n3));
1898
1899        tokio::try_join!(r1.shutdown(), r2.shutdown(), r3.shutdown())
1900            .std_context("shutdown routers")?;
1901        Ok(())
1902    }
1903
1904    fn sorted<T: Ord>(input: impl IntoIterator<Item = T>) -> Vec<T> {
1905        let mut out: Vec<_> = input.into_iter().collect();
1906        out.sort();
1907        out
1908    }
1909
1910    /// Test that dropping sender doesn't close topic while receiver is still listening.
1911    ///
1912    /// This is a common footgun: users split a GossipTopic, drop the sender early,
1913    /// and expect the receiver to keep working. With the bug (using && in still_needed),
1914    /// the topic closes immediately when sender is dropped.
1915    #[tokio::test]
1916    #[traced_test]
1917    async fn topic_stays_alive_after_sender_drop() -> n0_error::Result<()> {
1918        let topic_id = TopicId::from([99u8; 32]);
1919
1920        let ep1 = Endpoint::bind(presets::Minimal).await?;
1921        let ep2 = Endpoint::bind(presets::Minimal).await?;
1922        let gossip1 = Gossip::builder().spawn(ep1.clone());
1923        let gossip2 = Gossip::builder().spawn(ep2.clone());
1924        let router1 = Router::builder(ep1)
1925            .accept(crate::ALPN, gossip1.clone())
1926            .spawn();
1927        let router2 = Router::builder(ep2)
1928            .accept(crate::ALPN, gossip2.clone())
1929            .spawn();
1930
1931        let addr1 = router1.endpoint().addr();
1932        let id1 = addr1.id;
1933        let mem_lookup = MemoryLookup::new();
1934        mem_lookup.add_endpoint_info(addr1);
1935        router2.endpoint().address_lookup()?.add(mem_lookup);
1936
1937        let topic1 = gossip1.subscribe(topic_id, vec![]).await?;
1938        let topic2 = gossip2.subscribe(topic_id, vec![id1]).await?;
1939
1940        let (tx1, mut rx1) = topic1.split();
1941        let (tx2, mut rx2) = topic2.split();
1942
1943        // Wait for mesh to form
1944        timeout(Duration::from_secs(3), rx1.joined())
1945            .await
1946            .std_context("wait rx1 join")??;
1947        timeout(Duration::from_secs(3), rx2.joined())
1948            .await
1949            .std_context("wait rx2 join")??;
1950
1951        // Node 1 drops its sender - simulating the footgun where user drops sender early
1952        drop(tx1);
1953
1954        // Node 2 sends a message - receiver on node 1 should still get it
1955        tx2.broadcast(b"hello from node2".to_vec().into()).await?;
1956
1957        // Node 1's receiver should still work and receive the message
1958        let event = timeout(Duration::from_secs(3), rx1.next())
1959            .await
1960            .std_context("wait for message on rx1")?;
1961
1962        match event {
1963            Some(Ok(Event::Received(msg))) => {
1964                assert_eq!(&msg.content[..], b"hello from node2");
1965            }
1966            other => panic!("expected Received event, got {:?}", other),
1967        }
1968
1969        drop(tx2);
1970        drop(rx1);
1971        drop(rx2);
1972        router1.shutdown().await.std_context("shutdown router1")?;
1973        router2.shutdown().await.std_context("shutdown router2")?;
1974        Ok(())
1975    }
1976}