Skip to main content

iroh_gossip/
net.rs

1//! Networking for the `iroh-gossip` protocol
2
3use std::{
4    collections::{hash_map::Entry, BTreeSet, HashMap, HashSet, VecDeque},
5    net::SocketAddr,
6    pin::Pin,
7    sync::Arc,
8    task::{Context, Poll},
9};
10
11use bytes::Bytes;
12use futures_concurrency::stream::{stream_group, StreamGroup};
13use futures_util::FutureExt as _;
14use iroh::{
15    endpoint::Connection,
16    protocol::{AcceptError, ProtocolHandler},
17    Endpoint, EndpointAddr, EndpointId, PublicKey, RelayUrl, Watcher,
18};
19use irpc::WithChannels;
20use n0_error::{e, stack_error};
21use n0_future::{
22    task::{self, AbortOnDropHandle, JoinSet},
23    time::Instant,
24    Stream, StreamExt as _,
25};
26use rand::{rngs::StdRng, SeedableRng};
27use serde::{Deserialize, Serialize};
28use tokio::sync::{broadcast, mpsc, oneshot};
29use tokio_util::sync::CancellationToken;
30use tracing::{debug, error, error_span, trace, warn, Instrument};
31
32use self::{
33    address_lookup::GossipAddressLookup,
34    util::{RecvLoop, SendLoop, Timers},
35};
36use crate::{
37    api::{self, Command, Event, GossipApi, RpcMessage},
38    metrics::Metrics,
39    proto::{self, HyparviewConfig, PeerData, PlumtreeConfig, Scope, TopicId},
40};
41
42mod address_lookup;
43mod util;
44
45/// ALPN protocol name
46pub const GOSSIP_ALPN: &[u8] = b"/iroh-gossip/1";
47
48/// Channel capacity for the send queue (one per connection)
49const SEND_QUEUE_CAP: usize = 64;
50/// Channel capacity for the ToActor message queue (single)
51const TO_ACTOR_CAP: usize = 64;
52/// Channel capacity for the InEvent message queue (single)
53const IN_EVENT_CAP: usize = 1024;
54/// Channel capacity for broadcast subscriber event queue (one per topic)
55const TOPIC_EVENT_CAP: usize = 256;
56
57/// Events emitted from the gossip protocol
58pub type ProtoEvent = proto::Event<PublicKey>;
59/// Commands for the gossip protocol
60pub type ProtoCommand = proto::Command<PublicKey>;
61
62type InEvent = proto::InEvent<PublicKey>;
63type OutEvent = proto::OutEvent<PublicKey>;
64type Timer = proto::Timer<PublicKey>;
65type ProtoMessage = proto::Message<PublicKey>;
66
67/// Publish and subscribe on gossiping topics.
68///
69/// Each topic is a separate broadcast tree with separate memberships.
70///
71/// A topic has to be joined before you can publish or subscribe on the topic.
72/// To join the swarm for a topic, you have to know the [`PublicKey`] of at least one peer that also joined the topic.
73///
74/// Messages published on the swarm will be delivered to all peers that joined the swarm for that
75/// topic. You will also be relaying (gossiping) messages published by other peers.
76///
77/// With the default settings, the protocol will maintain up to 5 peer connections per topic.
78///
79/// Even though the [`Gossip`] is created from a [`Endpoint`], it does not accept connections
80/// itself. You should run an accept loop on the [`Endpoint`] yourself, check the ALPN protocol of incoming
81/// connections, and if the ALPN protocol equals [`GOSSIP_ALPN`], forward the connection to the
82/// gossip actor through [Self::handle_connection].
83///
84/// The gossip actor will, however, initiate new connections to other peers by itself.
85#[derive(Debug, Clone)]
86pub struct Gossip {
87    pub(crate) inner: Arc<Inner>,
88}
89
90impl std::ops::Deref for Gossip {
91    type Target = GossipApi;
92    fn deref(&self) -> &Self::Target {
93        &self.inner.api
94    }
95}
96
97#[derive(Debug)]
98enum LocalActorMessage {
99    HandleConnection(Connection),
100    Shutdown { reply: oneshot::Sender<()> },
101}
102
103#[allow(missing_docs)]
104#[stack_error(derive, add_meta)]
105#[non_exhaustive]
106pub enum Error {
107    ActorDropped {},
108}
109
110impl<T> From<mpsc::error::SendError<T>> for Error {
111    fn from(_value: mpsc::error::SendError<T>) -> Self {
112        e!(Error::ActorDropped)
113    }
114}
115impl From<oneshot::error::RecvError> for Error {
116    fn from(_value: oneshot::error::RecvError) -> Self {
117        e!(Error::ActorDropped)
118    }
119}
120
121#[derive(Debug)]
122pub(crate) struct Inner {
123    api: GossipApi,
124    local_tx: mpsc::Sender<LocalActorMessage>,
125    _actor_handle: AbortOnDropHandle<()>,
126    max_message_size: usize,
127    metrics: Arc<Metrics>,
128}
129
130impl ProtocolHandler for Gossip {
131    async fn accept(&self, connection: Connection) -> Result<(), AcceptError> {
132        self.handle_connection(connection)
133            .await
134            .map_err(AcceptError::from_err)?;
135        Ok(())
136    }
137
138    async fn shutdown(&self) {
139        if let Err(err) = self.shutdown().await {
140            warn!("error while shutting down gossip: {err:#}");
141        }
142    }
143}
144
145/// Builder to configure and construct [`Gossip`].
146#[derive(Debug, Clone)]
147pub struct Builder {
148    config: proto::Config,
149    alpn: Option<Bytes>,
150}
151
152impl Builder {
153    /// Sets the maximum message size in bytes.
154    /// By default this is `4096` bytes.
155    pub fn max_message_size(mut self, size: usize) -> Self {
156        self.config.max_message_size = size;
157        self
158    }
159
160    /// Set the membership configuration.
161    pub fn membership_config(mut self, config: HyparviewConfig) -> Self {
162        self.config.membership = config;
163        self
164    }
165
166    /// Set the broadcast configuration.
167    pub fn broadcast_config(mut self, config: PlumtreeConfig) -> Self {
168        self.config.broadcast = config;
169        self
170    }
171
172    /// Set the ALPN this gossip instance uses.
173    ///
174    /// It has to be the same for all peers in the network. If you set a custom ALPN,
175    /// you have to use the same ALPN when registering the [`Gossip`] in on a iroh
176    /// router with [`RouterBuilder::accept`].
177    ///
178    /// [`RouterBuilder::accept`]: iroh::protocol::RouterBuilder::accept
179    pub fn alpn(mut self, alpn: impl AsRef<[u8]>) -> Self {
180        self.alpn = Some(alpn.as_ref().to_vec().into());
181        self
182    }
183
184    /// Spawn a gossip actor and get a handle for it
185    pub fn spawn(self, endpoint: Endpoint) -> Gossip {
186        let metrics = Arc::new(Metrics::default());
187        let address_lookup = GossipAddressLookup::default();
188
189        // `Endpoint::address_lookup` returns `Err` when the endpoint is closed.
190        // In that case, the gossip actor will close too very soon for other reasons,
191        // so it's fine if we only add our `GossipAddressLookup` for the non-closed
192        // case. The alternative would be to return a `Result` from `spawn`,
193        // but as long as this is the only direct error case, it seem unwarranted.
194        if let Ok(endpoint_addr_lookup) = endpoint.address_lookup().as_ref() {
195            endpoint_addr_lookup.add(address_lookup.clone());
196        }
197        let (actor, rpc_tx, local_tx) = Actor::new(
198            endpoint,
199            self.config,
200            metrics.clone(),
201            self.alpn,
202            address_lookup,
203        );
204        let me = actor.endpoint.id().fmt_short();
205        let max_message_size = actor.state.max_message_size();
206
207        let actor_handle = task::spawn(actor.run().instrument(error_span!("gossip", %me)));
208
209        let api = GossipApi::local(rpc_tx);
210
211        Gossip {
212            inner: Inner {
213                api,
214                local_tx,
215                _actor_handle: AbortOnDropHandle::new(actor_handle),
216                max_message_size,
217                metrics,
218            }
219            .into(),
220        }
221    }
222}
223
224impl Gossip {
225    /// Creates a default `Builder`, with the endpoint set.
226    pub fn builder() -> Builder {
227        Builder {
228            config: Default::default(),
229            alpn: None,
230        }
231    }
232
233    /// Listen on a noq endpoint for incoming RPC connections.
234    #[cfg(feature = "rpc")]
235    pub async fn listen(self, endpoint: noq::Endpoint) {
236        self.inner.api.listen(endpoint).await
237    }
238
239    /// Get the maximum message size configured for this gossip actor.
240    pub fn max_message_size(&self) -> usize {
241        self.inner.max_message_size
242    }
243
244    /// Handle an incoming [`Connection`].
245    ///
246    /// Make sure to check the ALPN protocol yourself before passing the connection.
247    pub async fn handle_connection(&self, conn: Connection) -> Result<(), Error> {
248        self.inner
249            .local_tx
250            .send(LocalActorMessage::HandleConnection(conn))
251            .await?;
252        Ok(())
253    }
254
255    /// Shutdown the gossip instance.
256    ///
257    /// This leaves all topics, sending `Disconnect` messages to peers, and then
258    /// stops the gossip actor loop and drops all state and connections.
259    pub async fn shutdown(&self) -> Result<(), Error> {
260        let (reply, reply_rx) = oneshot::channel();
261        self.inner
262            .local_tx
263            .send(LocalActorMessage::Shutdown { reply })
264            .await?;
265        reply_rx.await?;
266        Ok(())
267    }
268
269    /// Returns the metrics tracked for this gossip instance.
270    pub fn metrics(&self) -> &Arc<Metrics> {
271        &self.inner.metrics
272    }
273}
274
275/// Actor that sends and handles messages between the connection and main state loops
276struct Actor {
277    alpn: Bytes,
278    /// Protocol state
279    state: proto::State<PublicKey, StdRng>,
280    /// The endpoint through which we dial peers
281    endpoint: Endpoint,
282    /// Dial machine to connect to peers
283    dialer: Dialer,
284    /// Input messages to the actor
285    rpc_rx: mpsc::Receiver<RpcMessage>,
286    local_rx: mpsc::Receiver<LocalActorMessage>,
287    /// Sender for the state input (cloned into the connection loops)
288    in_event_tx: mpsc::Sender<InEvent>,
289    /// Input events to the state (emitted from the connection loops)
290    in_event_rx: mpsc::Receiver<InEvent>,
291    /// Queued timers
292    timers: Timers<Timer>,
293    /// Map of topics to their state.
294    topics: HashMap<TopicId, TopicState>,
295    /// Map of peers to their state.
296    peers: HashMap<EndpointId, PeerState>,
297    /// Stream of commands from topic handles.
298    command_rx: stream_group::Keyed<TopicCommandStream>,
299    /// Internal queue of topic to close because all handles were dropped.
300    quit_queue: VecDeque<TopicId>,
301    /// Tasks for the connection loops, to keep track of panics.
302    connection_tasks: JoinSet<(EndpointId, Connection, Result<(), ConnectionLoopError>)>,
303    metrics: Arc<Metrics>,
304    topic_event_forwarders: JoinSet<TopicId>,
305    address_lookup: GossipAddressLookup,
306}
307
308impl Actor {
309    fn new(
310        endpoint: Endpoint,
311        config: proto::Config,
312        metrics: Arc<Metrics>,
313        alpn: Option<Bytes>,
314        address_lookup: GossipAddressLookup,
315    ) -> (
316        Self,
317        mpsc::Sender<RpcMessage>,
318        mpsc::Sender<LocalActorMessage>,
319    ) {
320        let peer_id = endpoint.id();
321        let dialer = Dialer::new(endpoint.clone());
322        let state = proto::State::new(
323            peer_id,
324            Default::default(),
325            config,
326            rand::rngs::StdRng::from_rng(&mut rand::rng()),
327        );
328        let (rpc_tx, rpc_rx) = mpsc::channel(TO_ACTOR_CAP);
329        let (local_tx, local_rx) = mpsc::channel(16);
330        let (in_event_tx, in_event_rx) = mpsc::channel(IN_EVENT_CAP);
331
332        let actor = Actor {
333            alpn: alpn.unwrap_or_else(|| GOSSIP_ALPN.to_vec().into()),
334            endpoint,
335            state,
336            dialer,
337            rpc_rx,
338            in_event_rx,
339            in_event_tx,
340            timers: Timers::new(),
341            command_rx: StreamGroup::new().keyed(),
342            peers: Default::default(),
343            topics: Default::default(),
344            quit_queue: Default::default(),
345            connection_tasks: Default::default(),
346            metrics,
347            local_rx,
348            topic_event_forwarders: Default::default(),
349            address_lookup,
350        };
351
352        (actor, rpc_tx, local_tx)
353    }
354
355    pub async fn run(mut self) {
356        let mut addr_update_stream = self.setup().await;
357
358        let mut i = 0;
359        while self.event_loop(&mut addr_update_stream, i).await {
360            i += 1;
361        }
362    }
363
364    /// Performs the initial actor setup to run the [`Actor::event_loop`].
365    ///
366    /// This updates our current address and return it. It also returns the home relay stream and
367    /// direct addr stream.
368    async fn setup(&mut self) -> impl Stream<Item = EndpointAddr> + Send + Unpin + use<> {
369        let addr_update_stream = self.endpoint.watch_addr().stream();
370        let initial_addr = self.endpoint.addr();
371        self.handle_addr_update(initial_addr).await;
372        addr_update_stream
373    }
374
375    /// One event loop processing step.
376    ///
377    /// None is returned when no further processing should be performed.
378    async fn event_loop(
379        &mut self,
380        addr_updates: &mut (impl Stream<Item = EndpointAddr> + Send + Unpin),
381        i: usize,
382    ) -> bool {
383        self.metrics.actor_tick_main.inc();
384        tokio::select! {
385            biased;
386            conn = self.local_rx.recv() => {
387                match conn {
388                    Some(LocalActorMessage::Shutdown { reply }) => {
389                        debug!("received shutdown message, quit all topics");
390                        self.quit_queue.extend(self.topics.keys().copied());
391                        self.process_quit_queue().await;
392                        debug!("all topics quit, stop gossip actor");
393                        reply.send(()).ok();
394                        return false;
395                    },
396                    Some(LocalActorMessage::HandleConnection(conn)) => {
397                        self.handle_connection(conn.remote_id(), ConnOrigin::Accept, conn);
398                    }
399                    None => {
400                        debug!("all gossip handles dropped, stop gossip actor");
401                        return false;
402                    }
403                }
404            }
405            msg = self.rpc_rx.recv() => {
406                trace!(?i, "tick: to_actor_rx");
407                self.metrics.actor_tick_rx.inc();
408                match msg {
409                    Some(msg) => {
410                        self.handle_rpc_msg(msg, Instant::now()).await;
411                    }
412                    None => {
413                        debug!("all gossip handles dropped, stop gossip actor");
414                        return false;
415                    }
416                }
417            },
418            Some((key, (topic, command))) = self.command_rx.next(), if !self.command_rx.is_empty() => {
419                trace!(?i, "tick: command_rx");
420                self.handle_command(topic, key, command).await;
421            },
422            Some(new_address) = addr_updates.next() => {
423                trace!(?i, "tick: new_address");
424                self.metrics.actor_tick_endpoint.inc();
425                self.handle_addr_update(new_address).await;
426            }
427            (peer_id, res) = self.dialer.next_conn() => {
428                trace!(?i, "tick: dialer");
429                self.metrics.actor_tick_dialer.inc();
430                match res {
431                    Some(Ok(conn)) => {
432                        debug!(peer = %peer_id.fmt_short(), "dial successful");
433                        self.metrics.actor_tick_dialer_success.inc();
434                        self.handle_connection(peer_id, ConnOrigin::Dial, conn);
435                    }
436                    Some(Err(err)) => {
437                        warn!(peer = %peer_id.fmt_short(), "dial failed: {err}");
438                        self.metrics.actor_tick_dialer_failure.inc();
439                        let peer_state = self.peers.get(&peer_id);
440                        let is_active = matches!(peer_state, Some(PeerState::Active { .. }));
441                        if !is_active {
442                            self.handle_in_event(InEvent::PeerDisconnected(peer_id), Instant::now())
443                                .await;
444                        }
445                    }
446                    None => {
447                        warn!(peer = %peer_id.fmt_short(), "dial disconnected");
448                        self.metrics.actor_tick_dialer_failure.inc();
449                    }
450                }
451            }
452            event = self.in_event_rx.recv() => {
453                trace!(?i, "tick: in_event_rx");
454                self.metrics.actor_tick_in_event_rx.inc();
455                let event = event.expect("unreachable: in_event_tx is never dropped before receiver");
456                self.handle_in_event(event, Instant::now()).await;
457            }
458            _ = self.timers.wait_next() => {
459                trace!(?i, "tick: timers");
460                self.metrics.actor_tick_timers.inc();
461                let now = Instant::now();
462                while let Some((_instant, timer)) = self.timers.pop_before(now) {
463                    self.handle_in_event(InEvent::TimerExpired(timer), now).await;
464                }
465            }
466            Some(res) = self.connection_tasks.join_next(), if !self.connection_tasks.is_empty() => {
467                trace!(?i, "tick: connection_tasks");
468                let (peer_id, conn, result) = res.expect("connection task panicked");
469                self.handle_connection_task_finished(peer_id, conn, result).await;
470            }
471            Some(res) = self.topic_event_forwarders.join_next(), if !self.topic_event_forwarders.is_empty() => {
472                let topic_id = res.expect("topic event forwarder panicked");
473                if let Some(state) = self.topics.get_mut(&topic_id) {
474                    if !state.still_needed() {
475                        self.quit_queue.push_back(topic_id);
476                        self.process_quit_queue().await;
477                    }
478                }
479            }
480        }
481
482        true
483    }
484
485    async fn handle_addr_update(&mut self, endpoint_addr: EndpointAddr) {
486        // let peer_data = our_peer_data(&self.endpoint, current_addresses);
487        let peer_data = encode_peer_data(&endpoint_addr.into());
488        self.handle_in_event(InEvent::UpdatePeerData(peer_data), Instant::now())
489            .await
490    }
491
492    async fn handle_command(
493        &mut self,
494        topic: TopicId,
495        key: stream_group::Key,
496        command: Option<Command>,
497    ) {
498        debug!(?topic, ?key, ?command, "handle command");
499        let Some(state) = self.topics.get_mut(&topic) else {
500            // TODO: unreachable?
501            warn!("received command for unknown topic");
502            return;
503        };
504        match command {
505            Some(command) => {
506                let command = match command {
507                    Command::Broadcast(message) => ProtoCommand::Broadcast(message, Scope::Swarm),
508                    Command::BroadcastNeighbors(message) => {
509                        ProtoCommand::Broadcast(message, Scope::Neighbors)
510                    }
511                    Command::JoinPeers(peers) => ProtoCommand::Join(peers),
512                };
513                self.handle_in_event(proto::InEvent::Command(topic, command), Instant::now())
514                    .await;
515            }
516            None => {
517                state.command_rx_keys.remove(&key);
518                if !state.still_needed() {
519                    self.quit_queue.push_back(topic);
520                    self.process_quit_queue().await;
521                }
522            }
523        }
524    }
525
526    fn handle_connection(&mut self, peer_id: EndpointId, origin: ConnOrigin, conn: Connection) {
527        let (send_tx, send_rx) = mpsc::channel(SEND_QUEUE_CAP);
528        let conn_id = conn.stable_id();
529
530        let queue = match self.peers.entry(peer_id) {
531            Entry::Occupied(mut entry) => entry.get_mut().accept_conn(send_tx, conn_id),
532            Entry::Vacant(entry) => {
533                entry.insert(PeerState::Active {
534                    active_send_tx: send_tx,
535                    active_conn_id: conn_id,
536                    other_conns: Vec::new(),
537                });
538                Vec::new()
539            }
540        };
541
542        let max_message_size = self.state.max_message_size();
543        let in_event_tx = self.in_event_tx.clone();
544
545        // Spawn a task for this connection
546        self.connection_tasks.spawn(
547            async move {
548                let res = connection_loop(
549                    peer_id,
550                    conn.clone(),
551                    origin,
552                    send_rx,
553                    in_event_tx,
554                    max_message_size,
555                    queue,
556                )
557                .await;
558                (peer_id, conn, res)
559            }
560            .instrument(error_span!("conn", peer = %peer_id.fmt_short())),
561        );
562    }
563
564    #[tracing::instrument(name = "conn", skip_all, fields(peer = %peer_id.fmt_short()))]
565    async fn handle_connection_task_finished(
566        &mut self,
567        peer_id: EndpointId,
568        conn: Connection,
569        task_result: Result<(), ConnectionLoopError>,
570    ) {
571        if conn.close_reason().is_none() {
572            conn.close(0u32.into(), b"close from disconnect");
573        }
574        let reason = conn.close_reason().expect("just closed");
575        let error = task_result.err();
576        debug!(%reason, ?error, "connection closed");
577        if let Some(PeerState::Active {
578            active_conn_id,
579            other_conns,
580            ..
581        }) = self.peers.get_mut(&peer_id)
582        {
583            if conn.stable_id() == *active_conn_id {
584                debug!("active send connection closed, mark peer as disconnected");
585                self.handle_in_event(InEvent::PeerDisconnected(peer_id), Instant::now())
586                    .await;
587            } else {
588                other_conns.retain(|x| *x != conn.stable_id());
589                debug!("remaining {} other connections", other_conns.len() + 1);
590            }
591        } else {
592            debug!("peer already marked as disconnected");
593        }
594    }
595
596    async fn handle_rpc_msg(&mut self, msg: RpcMessage, now: Instant) {
597        trace!("handle to_actor  {msg:?}");
598        match msg {
599            RpcMessage::Join(msg) => {
600                let WithChannels {
601                    inner,
602                    rx,
603                    tx,
604                    // TODO(frando): make use of span?
605                    span: _,
606                } = msg;
607                let api::JoinRequest {
608                    topic_id,
609                    bootstrap,
610                } = inner;
611                let TopicState {
612                    neighbors,
613                    event_sender,
614                    command_rx_keys,
615                } = self.topics.entry(topic_id).or_default();
616                let mut sender_dead = false;
617                if !neighbors.is_empty() {
618                    for neighbor in neighbors.iter() {
619                        if let Err(_err) = tx.try_send(Event::NeighborUp(*neighbor)).await {
620                            sender_dead = true;
621                            break;
622                        }
623                    }
624                }
625
626                if !sender_dead {
627                    let fut =
628                        topic_subscriber_loop(tx, event_sender.subscribe()).map(move |_| topic_id);
629                    self.topic_event_forwarders
630                        .spawn(fut.instrument(tracing::Span::current()));
631                }
632                let command_rx = TopicCommandStream::new(topic_id, Box::pin(rx.into_stream()));
633                let key = self.command_rx.insert(command_rx);
634                command_rx_keys.insert(key);
635
636                self.handle_in_event(
637                    InEvent::Command(
638                        topic_id,
639                        ProtoCommand::Join(bootstrap.into_iter().collect()),
640                    ),
641                    now,
642                )
643                .await;
644            }
645        }
646    }
647
648    async fn handle_in_event(&mut self, event: InEvent, now: Instant) {
649        self.handle_in_event_inner(event, now).await;
650        self.process_quit_queue().await;
651    }
652
653    async fn process_quit_queue(&mut self) {
654        while let Some(topic_id) = self.quit_queue.pop_front() {
655            self.handle_in_event_inner(
656                InEvent::Command(topic_id, ProtoCommand::Quit),
657                Instant::now(),
658            )
659            .await;
660            if self.topics.remove(&topic_id).is_some() {
661                tracing::debug!(%topic_id, "publishers and subscribers gone; unsubscribing");
662            }
663        }
664    }
665
666    async fn handle_in_event_inner(&mut self, event: InEvent, now: Instant) {
667        if matches!(event, InEvent::TimerExpired(_)) {
668            trace!(?event, "handle in_event");
669        } else {
670            debug!(?event, "handle in_event");
671        };
672        let out = self.state.handle(event, now, Some(&self.metrics));
673        for event in out {
674            if matches!(event, OutEvent::ScheduleTimer(_, _)) {
675                trace!(?event, "handle out_event");
676            } else {
677                debug!(?event, "handle out_event");
678            };
679            match event {
680                OutEvent::SendMessage(peer_id, message) => {
681                    let state = self.peers.entry(peer_id).or_default();
682                    match state {
683                        PeerState::Active { active_send_tx, .. } => {
684                            if let Err(_err) = active_send_tx.send(message).await {
685                                // Removing the peer is handled by the in_event PeerDisconnected sent
686                                // in [`Self::handle_connection_task_finished`].
687                                warn!(
688                                    peer = %peer_id.fmt_short(),
689                                    "failed to send: connection task send loop terminated",
690                                );
691                            }
692                        }
693                        PeerState::Pending { queue } => {
694                            if queue.is_empty() {
695                                debug!(peer = %peer_id.fmt_short(), "start to dial");
696                                self.dialer.queue_dial(peer_id, self.alpn.clone());
697                            }
698                            queue.push(message);
699                        }
700                    }
701                }
702                OutEvent::EmitEvent(topic_id, event) => {
703                    let Some(state) = self.topics.get_mut(&topic_id) else {
704                        // TODO: unreachable?
705                        warn!(?topic_id, "gossip state emitted event for unknown topic");
706                        continue;
707                    };
708                    let TopicState {
709                        neighbors,
710                        event_sender,
711                        ..
712                    } = state;
713                    match &event {
714                        ProtoEvent::NeighborUp(neighbor) => {
715                            neighbors.insert(*neighbor);
716                        }
717                        ProtoEvent::NeighborDown(neighbor) => {
718                            neighbors.remove(neighbor);
719                        }
720                        _ => {}
721                    }
722                    event_sender.send(event).ok();
723                    if !state.still_needed() {
724                        self.quit_queue.push_back(topic_id);
725                    }
726                }
727                OutEvent::ScheduleTimer(delay, timer) => {
728                    self.timers.insert(now + delay, timer);
729                }
730                OutEvent::DisconnectPeer(peer_id) => {
731                    // signal disconnection by dropping the senders to the connection
732                    debug!(peer=%peer_id.fmt_short(), "gossip state indicates disconnect: drop peer");
733                    self.peers.remove(&peer_id);
734                }
735                OutEvent::PeerData(endpoint_id, data) => match decode_peer_data(&data) {
736                    Err(err) => warn!("Failed to decode {data:?} from {endpoint_id}: {err}"),
737                    Ok(info) => {
738                        debug!(peer = ?endpoint_id, "add known addrs: {info:?}");
739                        let mut endpoint_addr = EndpointAddr::new(endpoint_id);
740                        for addr in info.direct_addresses {
741                            endpoint_addr = endpoint_addr.with_ip_addr(addr);
742                        }
743                        if let Some(relay_url) = info.relay_url {
744                            endpoint_addr = endpoint_addr.with_relay_url(relay_url);
745                        }
746
747                        self.address_lookup.add(endpoint_addr);
748                    }
749                },
750            }
751        }
752    }
753}
754
755type ConnId = usize;
756
757#[derive(Debug)]
758enum PeerState {
759    Pending {
760        queue: Vec<ProtoMessage>,
761    },
762    Active {
763        active_send_tx: mpsc::Sender<ProtoMessage>,
764        active_conn_id: ConnId,
765        other_conns: Vec<ConnId>,
766    },
767}
768
769impl PeerState {
770    fn accept_conn(
771        &mut self,
772        send_tx: mpsc::Sender<ProtoMessage>,
773        conn_id: ConnId,
774    ) -> Vec<ProtoMessage> {
775        match self {
776            PeerState::Pending { queue } => {
777                let queue = std::mem::take(queue);
778                *self = PeerState::Active {
779                    active_send_tx: send_tx,
780                    active_conn_id: conn_id,
781                    other_conns: Vec::new(),
782                };
783                queue
784            }
785            PeerState::Active {
786                active_send_tx,
787                active_conn_id,
788                other_conns,
789            } => {
790                // We already have an active connection. We keep the old connection intact,
791                // but only use the new connection for sending from now on.
792                // By dropping the `send_tx` of the old connection, the send loop part of
793                // the `connection_loop` of the old connection will terminate, which will also
794                // notify the peer that the old connection may be dropped.
795                other_conns.push(*active_conn_id);
796                *active_send_tx = send_tx;
797                *active_conn_id = conn_id;
798                Vec::new()
799            }
800        }
801    }
802}
803
804impl Default for PeerState {
805    fn default() -> Self {
806        PeerState::Pending { queue: Vec::new() }
807    }
808}
809
810#[derive(Debug)]
811struct TopicState {
812    neighbors: BTreeSet<EndpointId>,
813    event_sender: broadcast::Sender<ProtoEvent>,
814    /// Keys identifying command receivers in [`Actor::command_rx`].
815    ///
816    /// This represents the receiver side of gossip's publish public API.
817    command_rx_keys: HashSet<stream_group::Key>,
818}
819
820impl Default for TopicState {
821    fn default() -> Self {
822        let (event_sender, _) = broadcast::channel(TOPIC_EVENT_CAP);
823        Self {
824            neighbors: Default::default(),
825            command_rx_keys: Default::default(),
826            event_sender,
827        }
828    }
829}
830
831impl TopicState {
832    /// Check if the topic still has any publisher or subscriber.
833    fn still_needed(&self) -> bool {
834        // Keep topic alive if either senders or receivers exist.
835        // Using || prevents topic closure when senders are dropped while receivers listen.
836        !self.command_rx_keys.is_empty() || self.event_sender.receiver_count() > 0
837    }
838
839    #[cfg(test)]
840    fn joined(&self) -> bool {
841        !self.neighbors.is_empty()
842    }
843}
844
845/// Whether a connection is initiated by us (Dial) or by the remote peer (Accept)
846#[derive(Debug, Clone, Copy, PartialEq, Eq)]
847enum ConnOrigin {
848    Accept,
849    Dial,
850}
851
852#[allow(missing_docs)]
853#[stack_error(derive, add_meta, from_sources, std_sources)]
854#[non_exhaustive]
855enum ConnectionLoopError {
856    #[error(transparent)]
857    Write {
858        source: self::util::WriteError,
859    },
860    #[error(transparent)]
861    Read {
862        source: self::util::ReadError,
863    },
864    #[error(transparent)]
865    Connection {
866        #[error(std_err)]
867        source: iroh::endpoint::ConnectionError,
868    },
869    ActorDropped {},
870}
871
872impl<T> From<mpsc::error::SendError<T>> for ConnectionLoopError {
873    fn from(_value: mpsc::error::SendError<T>) -> Self {
874        e!(ConnectionLoopError::ActorDropped)
875    }
876}
877
878async fn connection_loop(
879    from: PublicKey,
880    conn: Connection,
881    origin: ConnOrigin,
882    send_rx: mpsc::Receiver<ProtoMessage>,
883    in_event_tx: mpsc::Sender<InEvent>,
884    max_message_size: usize,
885    queue: Vec<ProtoMessage>,
886) -> Result<(), ConnectionLoopError> {
887    debug!(?origin, "connection established");
888
889    let mut send_loop = SendLoop::new(conn.clone(), send_rx, max_message_size);
890    let mut recv_loop = RecvLoop::new(from, conn, in_event_tx, max_message_size);
891
892    let send_fut = send_loop.run(queue).instrument(error_span!("send"));
893    let recv_fut = recv_loop.run().instrument(error_span!("recv"));
894
895    let (send_res, recv_res) = tokio::join!(send_fut, recv_fut);
896    send_res?;
897    recv_res?;
898    Ok(())
899}
900
901#[derive(Default, Debug, Clone, Serialize, Deserialize)]
902struct AddrInfo {
903    relay_url: Option<RelayUrl>,
904    direct_addresses: BTreeSet<SocketAddr>,
905}
906
907impl From<EndpointAddr> for AddrInfo {
908    fn from(endpoint_addr: EndpointAddr) -> Self {
909        Self {
910            relay_url: endpoint_addr.relay_urls().next().cloned(),
911            direct_addresses: endpoint_addr.ip_addrs().cloned().collect(),
912        }
913    }
914}
915
916fn encode_peer_data(info: &AddrInfo) -> PeerData {
917    let bytes = postcard::to_stdvec(info).expect("serializing AddrInfo may not fail");
918    PeerData::new(bytes)
919}
920
921fn decode_peer_data(peer_data: &PeerData) -> Result<AddrInfo, postcard::Error> {
922    let bytes = peer_data.as_bytes();
923    if bytes.is_empty() {
924        return Ok(AddrInfo::default());
925    }
926    let info = postcard::from_bytes(bytes)?;
927    Ok(info)
928}
929
930async fn topic_subscriber_loop(
931    sender: irpc::channel::mpsc::Sender<Event>,
932    mut topic_events: broadcast::Receiver<ProtoEvent>,
933) {
934    loop {
935        tokio::select! {
936           biased;
937           msg = topic_events.recv() => {
938               let event = match msg {
939                   Err(broadcast::error::RecvError::Closed) => break,
940                   Err(broadcast::error::RecvError::Lagged(_)) => Event::Lagged,
941                   Ok(event) => event.into(),
942               };
943               if sender.send(event).await.is_err() {
944                   break;
945               }
946           }
947           _ = sender.closed() => break,
948        }
949    }
950}
951
952/// A stream of commands for a gossip subscription.
953type BoxedCommandReceiver =
954    n0_future::stream::Boxed<Result<Command, irpc::channel::mpsc::RecvError>>;
955
956#[derive(derive_more::Debug)]
957struct TopicCommandStream {
958    topic_id: TopicId,
959    #[debug("CommandStream")]
960    stream: BoxedCommandReceiver,
961    closed: bool,
962}
963
964impl TopicCommandStream {
965    fn new(topic_id: TopicId, stream: BoxedCommandReceiver) -> Self {
966        Self {
967            topic_id,
968            stream,
969            closed: false,
970        }
971    }
972}
973
974impl Stream for TopicCommandStream {
975    type Item = (TopicId, Option<Command>);
976    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
977        if self.closed {
978            return Poll::Ready(None);
979        }
980        match Pin::new(&mut self.stream).poll_next(cx) {
981            Poll::Ready(Some(Ok(item))) => Poll::Ready(Some((self.topic_id, Some(item)))),
982            Poll::Ready(None) | Poll::Ready(Some(Err(_))) => {
983                self.closed = true;
984                Poll::Ready(Some((self.topic_id, None)))
985            }
986            Poll::Pending => Poll::Pending,
987        }
988    }
989}
990
991#[derive(Debug)]
992struct Dialer {
993    endpoint: Endpoint,
994    pending: JoinSet<(
995        EndpointId,
996        Option<Result<Connection, iroh::endpoint::ConnectError>>,
997    )>,
998    pending_dials: HashMap<EndpointId, CancellationToken>,
999}
1000
1001impl Dialer {
1002    /// Create a new dialer for a [`Endpoint`]
1003    fn new(endpoint: Endpoint) -> Self {
1004        Self {
1005            endpoint,
1006            pending: Default::default(),
1007            pending_dials: Default::default(),
1008        }
1009    }
1010
1011    /// Starts to dial a endpoint by [`EndpointId`].
1012    fn queue_dial(&mut self, endpoint_id: EndpointId, alpn: Bytes) {
1013        if self.is_pending(endpoint_id) {
1014            return;
1015        }
1016        let cancel = CancellationToken::new();
1017        self.pending_dials.insert(endpoint_id, cancel.clone());
1018        let endpoint = self.endpoint.clone();
1019        self.pending.spawn(
1020            async move {
1021                let res = tokio::select! {
1022                    biased;
1023                    _ = cancel.cancelled() => None,
1024                    res = endpoint.connect(endpoint_id, &alpn) => Some(res),
1025                };
1026                (endpoint_id, res)
1027            }
1028            .instrument(tracing::Span::current()),
1029        );
1030    }
1031
1032    /// Checks if a endpoint is currently being dialed.
1033    fn is_pending(&self, endpoint: EndpointId) -> bool {
1034        self.pending_dials.contains_key(&endpoint)
1035    }
1036
1037    /// Waits for the next dial operation to complete.
1038    /// `None` means disconnected
1039    async fn next_conn(
1040        &mut self,
1041    ) -> (
1042        EndpointId,
1043        Option<Result<Connection, iroh::endpoint::ConnectError>>,
1044    ) {
1045        match self.pending_dials.is_empty() {
1046            false => {
1047                let (endpoint_id, res) = loop {
1048                    match self.pending.join_next().await {
1049                        Some(Ok((endpoint_id, res))) => {
1050                            self.pending_dials.remove(&endpoint_id);
1051                            break (endpoint_id, res);
1052                        }
1053                        Some(Err(e)) => {
1054                            error!("next conn error: {:?}", e);
1055                        }
1056                        None => {
1057                            error!("no more pending conns available");
1058                            std::future::pending().await
1059                        }
1060                    }
1061                };
1062
1063                (endpoint_id, res)
1064            }
1065            true => std::future::pending().await,
1066        }
1067    }
1068}
1069
1070#[cfg(test)]
1071pub(crate) mod test {
1072    use std::time::Duration;
1073
1074    use bytes::Bytes;
1075    use futures_concurrency::future::TryJoin;
1076    use iroh::{
1077        address_lookup::memory::MemoryLookup, endpoint::BindError, protocol::Router,
1078        tls::CaRootsConfig, RelayMap, RelayMode, SecretKey,
1079    };
1080    use n0_error::{AnyError, Result, StdResultExt};
1081    use n0_tracing_test::traced_test;
1082    use rand::{CryptoRng, Rng};
1083    use tokio::{spawn, time::timeout};
1084    use tokio_util::sync::CancellationToken;
1085    use tracing::{info, instrument};
1086
1087    use super::*;
1088    use crate::api::{ApiError, GossipReceiver, GossipSender};
1089
1090    struct ManualActorLoop {
1091        actor: Actor,
1092        step: usize,
1093    }
1094
1095    impl std::ops::Deref for ManualActorLoop {
1096        type Target = Actor;
1097
1098        fn deref(&self) -> &Self::Target {
1099            &self.actor
1100        }
1101    }
1102
1103    impl std::ops::DerefMut for ManualActorLoop {
1104        fn deref_mut(&mut self) -> &mut Self::Target {
1105            &mut self.actor
1106        }
1107    }
1108
1109    type EndpointHandle = tokio::task::JoinHandle<Result<()>>;
1110
1111    impl ManualActorLoop {
1112        #[instrument(skip_all, fields(me = %actor.endpoint.id().fmt_short()))]
1113        async fn new(mut actor: Actor) -> Self {
1114            let _ = actor.setup().await;
1115            Self { actor, step: 0 }
1116        }
1117
1118        #[instrument(skip_all, fields(me = %self.endpoint.id().fmt_short()))]
1119        async fn step(&mut self) -> bool {
1120            let ManualActorLoop { actor, step } = self;
1121            *step += 1;
1122            // ignore updates that change our published address. This gives us better control over
1123            // events since the endpoint it no longer emitting changes
1124            let addr_update_stream = &mut futures_lite::stream::pending();
1125            actor.event_loop(addr_update_stream, *step).await
1126        }
1127
1128        async fn steps(&mut self, n: usize) {
1129            for _ in 0..n {
1130                self.step().await;
1131            }
1132        }
1133
1134        async fn finish(mut self) {
1135            while self.step().await {}
1136        }
1137    }
1138
1139    impl Gossip {
1140        /// Creates a testing gossip instance and its actor without spawning it.
1141        ///
1142        /// This creates the endpoint and spawns the endpoint loop as well. The handle for the
1143        /// endpoing task is returned along the gossip instance and actor. Since the actor is not
1144        /// actually spawned as [`Builder::spawn`] would, the gossip instance will have a
1145        /// handle to a dummy task instead.
1146        async fn t_new_with_actor(
1147            rng: &mut rand_chacha::ChaCha12Rng,
1148            config: proto::Config,
1149            relay_map: RelayMap,
1150            cancel: &CancellationToken,
1151        ) -> Result<(Self, Actor, EndpointHandle), BindError> {
1152            let endpoint = create_endpoint(rng, relay_map, None).await?;
1153            let metrics = Arc::new(Metrics::default());
1154            let address_lookup = GossipAddressLookup::default();
1155            endpoint
1156                .address_lookup()
1157                .expect("endpoint is not closed")
1158                .add(address_lookup.clone());
1159
1160            let (actor, to_actor_tx, conn_tx) =
1161                Actor::new(endpoint, config, metrics.clone(), None, address_lookup);
1162            let max_message_size = actor.state.max_message_size();
1163
1164            let _actor_handle =
1165                AbortOnDropHandle::new(task::spawn(futures_lite::future::pending()));
1166            let gossip = Self {
1167                inner: Inner {
1168                    api: GossipApi::local(to_actor_tx),
1169                    local_tx: conn_tx,
1170                    _actor_handle,
1171                    max_message_size,
1172                    metrics,
1173                }
1174                .into(),
1175            };
1176
1177            let endpoint_task = task::spawn(endpoint_loop(
1178                actor.endpoint.clone(),
1179                gossip.clone(),
1180                cancel.child_token(),
1181            ));
1182
1183            Ok((gossip, actor, endpoint_task))
1184        }
1185
1186        /// Crates a new testing gossip instance with the normal actor loop.
1187        async fn t_new(
1188            rng: &mut rand_chacha::ChaCha12Rng,
1189            config: proto::Config,
1190            relay_map: RelayMap,
1191            cancel: &CancellationToken,
1192        ) -> Result<(Self, Endpoint, EndpointHandle, impl Drop + use<>), BindError> {
1193            let (g, actor, ep_handle) =
1194                Gossip::t_new_with_actor(rng, config, relay_map, cancel).await?;
1195            let ep = actor.endpoint.clone();
1196            let me = ep.id().fmt_short();
1197            let actor_handle =
1198                task::spawn(actor.run().instrument(tracing::error_span!("gossip", %me)));
1199            Ok((g, ep, ep_handle, AbortOnDropHandle::new(actor_handle)))
1200        }
1201    }
1202
1203    pub(crate) async fn create_endpoint(
1204        rng: &mut rand_chacha::ChaCha12Rng,
1205        relay_map: RelayMap,
1206        memory_lookup: Option<MemoryLookup>,
1207    ) -> Result<Endpoint, BindError> {
1208        let ep = Endpoint::empty_builder()
1209            .relay_mode(RelayMode::Custom(relay_map))
1210            .secret_key(SecretKey::generate(rng))
1211            .alpns(vec![GOSSIP_ALPN.to_vec()])
1212            .ca_roots_config(CaRootsConfig::insecure_skip_verify())
1213            .bind()
1214            .await?;
1215
1216        if let Some(memory_lookup) = memory_lookup {
1217            ep.address_lookup()
1218                .expect("endpoint is not closed")
1219                .add(memory_lookup);
1220        }
1221        ep.online().await;
1222        Ok(ep)
1223    }
1224
1225    async fn endpoint_loop(
1226        endpoint: Endpoint,
1227        gossip: Gossip,
1228        cancel: CancellationToken,
1229    ) -> Result<()> {
1230        loop {
1231            tokio::select! {
1232                biased;
1233                _ = cancel.cancelled() => break,
1234                incoming = endpoint.accept() => match incoming {
1235                    None => break,
1236                    Some(incoming) => {
1237                        let connecting = match incoming.accept() {
1238                            Ok(connecting) => connecting,
1239                            Err(err) => {
1240                                warn!("incoming connection failed: {err:#}");
1241                                // we can carry on in these cases:
1242                                // this can be caused by retransmitted datagrams
1243                                continue;
1244                            }
1245                        };
1246                        let connection = connecting
1247                            .await
1248                            .std_context("await incoming connection")?;
1249                        gossip.handle_connection(connection).await?
1250                    }
1251                }
1252            }
1253        }
1254        Ok(())
1255    }
1256
1257    #[tokio::test]
1258    #[traced_test]
1259    async fn gossip_net_smoke() {
1260        let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
1261        let (relay_map, relay_url, _guard) = iroh::test_utils::run_relay_server().await.unwrap();
1262
1263        let memory_lookup = MemoryLookup::new();
1264
1265        let ep1 = create_endpoint(&mut rng, relay_map.clone(), Some(memory_lookup.clone()))
1266            .await
1267            .unwrap();
1268        let ep2 = create_endpoint(&mut rng, relay_map.clone(), Some(memory_lookup.clone()))
1269            .await
1270            .unwrap();
1271        let ep3 = create_endpoint(&mut rng, relay_map.clone(), Some(memory_lookup.clone()))
1272            .await
1273            .unwrap();
1274
1275        let go1 = Gossip::builder().spawn(ep1.clone());
1276        let go2 = Gossip::builder().spawn(ep2.clone());
1277        let go3 = Gossip::builder().spawn(ep3.clone());
1278        debug!("peer1 {:?}", ep1.id());
1279        debug!("peer2 {:?}", ep2.id());
1280        debug!("peer3 {:?}", ep3.id());
1281        let pi1 = ep1.id();
1282        let pi2 = ep2.id();
1283
1284        let cancel = CancellationToken::new();
1285        let tasks = [
1286            spawn(endpoint_loop(ep1.clone(), go1.clone(), cancel.clone())),
1287            spawn(endpoint_loop(ep2.clone(), go2.clone(), cancel.clone())),
1288            spawn(endpoint_loop(ep3.clone(), go3.clone(), cancel.clone())),
1289        ];
1290
1291        debug!("----- adding peers  ----- ");
1292        let topic: TopicId = blake3::hash(b"foobar").into();
1293
1294        let addr1 = EndpointAddr::new(pi1).with_relay_url(relay_url.clone());
1295        let addr2 = EndpointAddr::new(pi2).with_relay_url(relay_url);
1296        memory_lookup.add_endpoint_info(addr1.clone());
1297        memory_lookup.add_endpoint_info(addr2.clone());
1298
1299        debug!("----- joining  ----- ");
1300        // join the topics and wait for the connection to succeed
1301        let [sub1, mut sub2, mut sub3] = [
1302            go1.subscribe_and_join(topic, vec![]),
1303            go2.subscribe_and_join(topic, vec![pi1]),
1304            go3.subscribe_and_join(topic, vec![pi2]),
1305        ]
1306        .try_join()
1307        .await
1308        .unwrap();
1309
1310        let (sink1, _stream1) = sub1.split();
1311
1312        let len = 2;
1313
1314        // publish messages on endpoint1
1315        let pub1 = spawn(async move {
1316            for i in 0..len {
1317                let message = format!("hi{i}");
1318                info!("go1 broadcast: {message:?}");
1319                sink1.broadcast(message.into_bytes().into()).await.unwrap();
1320                tokio::time::sleep(Duration::from_micros(1)).await;
1321            }
1322        });
1323
1324        // wait for messages on endpoint2
1325        let sub2 = spawn(async move {
1326            let mut recv = vec![];
1327            loop {
1328                let ev = sub2.next().await.unwrap().unwrap();
1329                info!("go2 event: {ev:?}");
1330                if let Event::Received(msg) = ev {
1331                    recv.push(msg.content);
1332                }
1333                if recv.len() == len {
1334                    return recv;
1335                }
1336            }
1337        });
1338
1339        // wait for messages on endpoint3
1340        let sub3 = spawn(async move {
1341            let mut recv = vec![];
1342            loop {
1343                let ev = sub3.next().await.unwrap().unwrap();
1344                info!("go3 event: {ev:?}");
1345                if let Event::Received(msg) = ev {
1346                    recv.push(msg.content);
1347                }
1348                if recv.len() == len {
1349                    return recv;
1350                }
1351            }
1352        });
1353
1354        timeout(Duration::from_secs(10), pub1)
1355            .await
1356            .unwrap()
1357            .unwrap();
1358        let recv2 = timeout(Duration::from_secs(10), sub2)
1359            .await
1360            .unwrap()
1361            .unwrap();
1362        let recv3 = timeout(Duration::from_secs(10), sub3)
1363            .await
1364            .unwrap()
1365            .unwrap();
1366
1367        // We assert the received messages, but not their order.
1368        // While commonly they will be received in-order, for go3 it may happen
1369        // that the second message arrives before the first one, because it managed to
1370        // forward-join go1 before the second message is published.
1371        let expected: HashSet<Bytes> = (0..len)
1372            .map(|i| Bytes::from(format!("hi{i}").into_bytes()))
1373            .collect();
1374        assert_eq!(HashSet::from_iter(recv2), expected);
1375        assert_eq!(HashSet::from_iter(recv3), expected);
1376
1377        cancel.cancel();
1378        for t in tasks {
1379            timeout(Duration::from_secs(10), t)
1380                .await
1381                .unwrap()
1382                .unwrap()
1383                .unwrap();
1384        }
1385    }
1386
1387    /// Test that when a gossip topic is no longer needed it's actually unsubscribed.
1388    ///
1389    /// This test will:
1390    /// - Create two endpoints, the first using manual event loop.
1391    /// - Subscribe both endpoints to the same topic. The first endpoint will subscribe twice and connect
1392    ///   to the second endpoint. The second endpoint will subscribe without bootstrap.
1393    /// - Ensure that the first endpoint removes the subscription iff all topic handles have been
1394    ///   dropped
1395    // NOTE: this is a regression test.
1396    #[tokio::test]
1397    #[traced_test]
1398    async fn subscription_cleanup() -> Result {
1399        let rng = &mut rand_chacha::ChaCha12Rng::seed_from_u64(1);
1400        let ct = CancellationToken::new();
1401        let (relay_map, relay_url, _guard) = iroh::test_utils::run_relay_server().await.unwrap();
1402
1403        // create the first endpoint with a manual actor loop
1404        let (go1, actor, ep1_handle) =
1405            Gossip::t_new_with_actor(rng, Default::default(), relay_map.clone(), &ct).await?;
1406        let mut actor = ManualActorLoop::new(actor).await;
1407
1408        // create the second endpoint with the usual actor loop
1409        let (go2, ep2, ep2_handle, _test_actor_handle) =
1410            Gossip::t_new(rng, Default::default(), relay_map, &ct).await?;
1411
1412        let endpoint_id1 = actor.endpoint.id();
1413        let endpoint_id2 = ep2.id();
1414        tracing::info!(
1415            endpoint_1 = %endpoint_id1.fmt_short(),
1416            endpoint_2 = %endpoint_id2.fmt_short(),
1417            "endpoints ready"
1418        );
1419
1420        let topic: TopicId = blake3::hash(b"subscription_cleanup").into();
1421        tracing::info!(%topic, "joining");
1422
1423        // create the tasks for each gossip instance:
1424        // - second endpoint subscribes once without bootstrap and listens to events
1425        // - first endpoint subscribes twice with the second endpoint as bootstrap. This is done on command
1426        //   from the main task (this)
1427
1428        // second endpoint
1429        let ct2 = ct.clone();
1430        let go2_task = async move {
1431            let (_pub_tx, mut sub_rx) = go2.subscribe_and_join(topic, vec![]).await?.split();
1432
1433            let subscribe_fut = async {
1434                while let Some(ev) = sub_rx.try_next().await? {
1435                    match ev {
1436                        Event::Lagged => tracing::debug!("missed some messages :("),
1437                        Event::Received(_) => unreachable!("test does not send messages"),
1438                        other => tracing::debug!(?other, "gs event"),
1439                    }
1440                }
1441
1442                tracing::debug!("subscribe stream ended");
1443                Ok::<_, AnyError>(())
1444            };
1445
1446            tokio::select! {
1447                _ = ct2.cancelled() => Ok(()),
1448                res = subscribe_fut => res,
1449            }
1450        }
1451        .instrument(tracing::debug_span!("endpoint_2", %endpoint_id2));
1452        let go2_handle = task::spawn(go2_task);
1453
1454        // first endpoint
1455        let addr2 = EndpointAddr::new(endpoint_id2).with_relay_url(relay_url);
1456        let memory_lookup = MemoryLookup::new();
1457        memory_lookup.add_endpoint_info(addr2);
1458        actor.endpoint.address_lookup()?.add(memory_lookup);
1459        // we use a channel to signal advancing steps to the task
1460        let (tx, mut rx) = mpsc::channel::<()>(1);
1461        let ct1 = ct.clone();
1462        let go1_task = async move {
1463            // first subscribe is done immediately
1464            tracing::info!("subscribing the first time");
1465            let sub_1a = go1.subscribe_and_join(topic, vec![endpoint_id2]).await?;
1466
1467            // wait for signal to subscribe a second time
1468            rx.recv().await.expect("signal for second subscribe");
1469            tracing::info!("subscribing a second time");
1470            let sub_1b = go1.subscribe_and_join(topic, vec![endpoint_id2]).await?;
1471            drop(sub_1a);
1472
1473            // wait for signal to drop the second handle as well
1474            rx.recv().await.expect("signal for second subscribe");
1475            tracing::info!("dropping all handles");
1476            drop(sub_1b);
1477
1478            // wait for cancellation
1479            ct1.cancelled().await;
1480            drop(go1);
1481
1482            Ok::<_, AnyError>(())
1483        }
1484        .instrument(tracing::debug_span!("endpoint_1", %endpoint_id1));
1485        let go1_handle = task::spawn(go1_task);
1486
1487        // advance and check that the topic is now subscribed
1488        actor.steps(3).await; // handle our subscribe;
1489                              // get peer connection;
1490                              // receive the other peer's information for a NeighborUp
1491        let state = actor.topics.get(&topic).expect("get registered topic");
1492        assert!(state.joined());
1493
1494        // signal the second subscribe, we should remain subscribed
1495        tx.send(())
1496            .await
1497            .std_context("signal additional subscribe")?;
1498        actor.steps(3).await; // subscribe; first receiver gone; first sender gone
1499        let state = actor.topics.get(&topic).expect("get registered topic");
1500        assert!(state.joined());
1501
1502        // signal to drop the second handle, the topic should no longer be subscribed
1503        tx.send(()).await.std_context("signal drop handles")?;
1504        actor.steps(2).await; // second receiver gone; second sender gone
1505        assert!(!actor.topics.contains_key(&topic));
1506
1507        // cleanup and ensure everything went as expected
1508        ct.cancel();
1509        let wait = Duration::from_secs(2);
1510        timeout(wait, ep1_handle)
1511            .await
1512            .std_context("wait endpoint1 task")?
1513            .std_context("join endpoint1 task")??;
1514        timeout(wait, ep2_handle)
1515            .await
1516            .std_context("wait endpoint2 task")?
1517            .std_context("join endpoint2 task")??;
1518        timeout(wait, go1_handle)
1519            .await
1520            .std_context("wait gossip1 task")?
1521            .std_context("join gossip1 task")??;
1522        timeout(wait, go2_handle)
1523            .await
1524            .std_context("wait gossip2 task")?
1525            .std_context("join gossip2 task")??;
1526        timeout(wait, actor.finish())
1527            .await
1528            .std_context("wait actor finish")?;
1529
1530        Ok(())
1531    }
1532
1533    /// Test that endpoints can reconnect to each other.
1534    ///
1535    /// This test will create two endpoints subscribed to the same topic. The second endpoint will
1536    /// unsubscribe and then resubscribe and connection between the endpoints should succeed both
1537    /// times.
1538    // NOTE: This is a regression test
1539    #[tokio::test]
1540    #[traced_test]
1541    async fn can_reconnect() -> Result {
1542        let rng = &mut rand_chacha::ChaCha12Rng::seed_from_u64(1);
1543        let ct = CancellationToken::new();
1544        let (relay_map, relay_url, _guard) = iroh::test_utils::run_relay_server().await.unwrap();
1545
1546        let (go1, ep1, ep1_handle, _test_actor_handle1) =
1547            Gossip::t_new(rng, Default::default(), relay_map.clone(), &ct).await?;
1548
1549        let (go2, ep2, ep2_handle, _test_actor_handle2) =
1550            Gossip::t_new(rng, Default::default(), relay_map, &ct).await?;
1551
1552        let endpoint_id1 = ep1.id();
1553        let endpoint_id2 = ep2.id();
1554        tracing::info!(
1555            endpoint_1 = %endpoint_id1.fmt_short(),
1556            endpoint_2 = %endpoint_id2.fmt_short(),
1557            "endpoints ready"
1558        );
1559
1560        let topic: TopicId = blake3::hash(b"can_reconnect").into();
1561        tracing::info!(%topic, "joining");
1562
1563        let ct2 = ct.child_token();
1564        // channel used to signal the second gossip instance to advance the test
1565        let (tx, mut rx) = mpsc::channel::<()>(1);
1566        let addr1 = EndpointAddr::new(endpoint_id1).with_relay_url(relay_url.clone());
1567        let memory_lookup = MemoryLookup::new();
1568        memory_lookup.add_endpoint_info(addr1);
1569        ep2.address_lookup()?.add(memory_lookup.clone());
1570        let go2_task = async move {
1571            let mut sub = go2.subscribe(topic, Vec::new()).await?;
1572            sub.joined().await?;
1573
1574            rx.recv().await.expect("signal to unsubscribe");
1575            tracing::info!("unsubscribing");
1576            drop(sub);
1577
1578            rx.recv().await.expect("signal to subscribe again");
1579            tracing::info!("resubscribing");
1580            let mut sub = go2.subscribe(topic, vec![endpoint_id1]).await?;
1581
1582            sub.joined().await?;
1583            tracing::info!("subscription successful!");
1584
1585            ct2.cancelled().await;
1586
1587            Ok::<_, ApiError>(())
1588        }
1589        .instrument(tracing::debug_span!("endpoint_2", %endpoint_id2));
1590        let go2_handle = task::spawn(go2_task);
1591
1592        let addr2 = EndpointAddr::new(endpoint_id2).with_relay_url(relay_url);
1593        memory_lookup.add_endpoint_info(addr2);
1594        ep1.address_lookup()?.add(memory_lookup);
1595
1596        let mut sub = go1.subscribe(topic, vec![endpoint_id2]).await?;
1597        // wait for subscribed notification
1598        sub.joined().await?;
1599
1600        // signal endpoint_2 to unsubscribe
1601        tx.send(()).await.std_context("signal unsubscribe")?;
1602
1603        // we should receive a Neighbor down event
1604        let conn_timeout = Duration::from_millis(500);
1605        let ev = timeout(conn_timeout, sub.try_next())
1606            .await
1607            .std_context("wait neighbor down")??;
1608        assert_eq!(ev, Some(Event::NeighborDown(endpoint_id2)));
1609        tracing::info!("endpoint 2 left");
1610
1611        // signal endpoint_2 to subscribe again
1612        tx.send(()).await.std_context("signal resubscribe")?;
1613
1614        let conn_timeout = Duration::from_millis(500);
1615        let ev = timeout(conn_timeout, sub.try_next())
1616            .await
1617            .std_context("wait neighbor up")??;
1618        assert_eq!(ev, Some(Event::NeighborUp(endpoint_id2)));
1619        tracing::info!("endpoint 2 rejoined!");
1620
1621        // cleanup and ensure everything went as expected
1622        ct.cancel();
1623        let wait = Duration::from_secs(2);
1624        timeout(wait, ep1_handle)
1625            .await
1626            .std_context("wait endpoint1 task")?
1627            .std_context("join endpoint1 task")??;
1628        timeout(wait, ep2_handle)
1629            .await
1630            .std_context("wait endpoint2 task")?
1631            .std_context("join endpoint2 task")??;
1632        timeout(wait, go2_handle)
1633            .await
1634            .std_context("wait gossip2 task")?
1635            .std_context("join gossip2 task")??;
1636
1637        Result::Ok(())
1638    }
1639
1640    #[tokio::test]
1641    #[traced_test]
1642    async fn can_die_and_reconnect() -> Result {
1643        /// Runs a future in a separate runtime on a separate thread, cancelling everything
1644        /// abruptly once `cancel` is invoked.
1645        fn run_in_thread<T: Send + 'static>(
1646            cancel: CancellationToken,
1647            fut: impl std::future::Future<Output = T> + Send + 'static,
1648        ) -> std::thread::JoinHandle<Option<T>> {
1649            std::thread::spawn(move || {
1650                let rt = tokio::runtime::Builder::new_current_thread()
1651                    .enable_all()
1652                    .build()
1653                    .unwrap();
1654                rt.block_on(async move { cancel.run_until_cancelled(fut).await })
1655            })
1656        }
1657
1658        /// Spawns a new endpoint and gossip instance.
1659        async fn spawn_gossip(
1660            secret_key: SecretKey,
1661            relay_map: RelayMap,
1662        ) -> Result<(Router, Gossip), BindError> {
1663            let ep = Endpoint::empty_builder()
1664                .relay_mode(RelayMode::Custom(relay_map))
1665                .secret_key(secret_key)
1666                .ca_roots_config(CaRootsConfig::insecure_skip_verify())
1667                .bind()
1668                .await?;
1669            let gossip = Gossip::builder().spawn(ep.clone());
1670            let router = Router::builder(ep)
1671                .accept(GOSSIP_ALPN, gossip.clone())
1672                .spawn();
1673            Ok((router, gossip))
1674        }
1675
1676        /// Spawns a gossip endpoint, and broadcasts a single message, then sleep until cancelled externally.
1677        async fn broadcast_once(
1678            secret_key: SecretKey,
1679            relay_map: RelayMap,
1680            bootstrap_addr: EndpointAddr,
1681            topic_id: TopicId,
1682            message: String,
1683        ) -> Result {
1684            let (router, gossip) = spawn_gossip(secret_key, relay_map).await?;
1685            info!(endpoint_id = %router.endpoint().id().fmt_short(), "broadcast endpoint spawned");
1686            let bootstrap = vec![bootstrap_addr.id];
1687            let memory_lookup = MemoryLookup::new();
1688            memory_lookup.add_endpoint_info(bootstrap_addr);
1689            router.endpoint().address_lookup()?.add(memory_lookup);
1690            let mut topic = gossip.subscribe_and_join(topic_id, bootstrap).await?;
1691            topic.broadcast(message.as_bytes().to_vec().into()).await?;
1692            std::future::pending::<()>().await;
1693            Ok(())
1694        }
1695
1696        let (relay_map, _relay_url, _guard) = iroh::test_utils::run_relay_server().await.unwrap();
1697        let mut rng = &mut rand_chacha::ChaCha12Rng::seed_from_u64(1);
1698        let topic_id = TopicId::from_bytes(rng.random());
1699
1700        // spawn a gossip endpoint, send the endpoint's address on addr_tx,
1701        // then wait to receive `count` messages, and terminate.
1702        let (addr_tx, addr_rx) = tokio::sync::oneshot::channel();
1703        let (msgs_recv_tx, mut msgs_recv_rx) = tokio::sync::mpsc::channel(3);
1704        let recv_task = tokio::task::spawn({
1705            let relay_map = relay_map.clone();
1706            let secret_key = SecretKey::generate(&mut rng);
1707            async move {
1708                let (router, gossip) = spawn_gossip(secret_key, relay_map).await?;
1709                // wait for the relay to be set. iroh currently has issues when trying
1710                // to immediately reconnect with changed direct addresses, but when the
1711                // relay path is available it works.
1712                // See https://github.com/n0-computer/iroh/pull/3372
1713                router.endpoint().online().await;
1714                let addr = router.endpoint().addr();
1715                info!(endpoint_id = %addr.id.fmt_short(), "recv endpoint spawned");
1716                addr_tx.send(addr).unwrap();
1717                let mut topic = gossip.subscribe_and_join(topic_id, vec![]).await?;
1718                while let Some(event) = topic.try_next().await.unwrap() {
1719                    if let Event::Received(message) = event {
1720                        let message = std::str::from_utf8(&message.content)
1721                            .std_context("decode broadcast message")?
1722                            .to_string();
1723                        msgs_recv_tx
1724                            .send(message)
1725                            .await
1726                            .std_context("forward received message")?;
1727                    }
1728                }
1729                Ok::<_, AnyError>(())
1730            }
1731        });
1732
1733        let endpoint0_addr = addr_rx.await.std_context("receive endpoint address")?;
1734        let max_wait = Duration::from_secs(5);
1735
1736        // spawn a endpoint, send a message, and then abruptly terminate the endpoint ungracefully
1737        // after the message was received on our receiver endpoint.
1738        let cancel = CancellationToken::new();
1739        let secret = SecretKey::generate(&mut rng);
1740        let join_handle_1 = run_in_thread(
1741            cancel.clone(),
1742            broadcast_once(
1743                secret.clone(),
1744                relay_map.clone(),
1745                endpoint0_addr.clone(),
1746                topic_id,
1747                "msg1".to_string(),
1748            ),
1749        );
1750        // assert that we received the message on the receiver endpoint.
1751        let msg = timeout(max_wait, msgs_recv_rx.recv())
1752            .await
1753            .std_context("wait for first broadcast")?
1754            .std_context("receiver dropped channel")?;
1755        assert_eq!(&msg, "msg1");
1756        info!("kill broadcast endpoint");
1757        cancel.cancel();
1758
1759        // spawns the endpoint again with the same endpoint id, and send another message
1760        let cancel = CancellationToken::new();
1761        let join_handle_2 = run_in_thread(
1762            cancel.clone(),
1763            broadcast_once(
1764                secret.clone(),
1765                relay_map.clone(),
1766                endpoint0_addr.clone(),
1767                topic_id,
1768                "msg2".to_string(),
1769            ),
1770        );
1771        // assert that we received the message on the receiver endpoint.
1772        // this means that the reconnect with the same endpoint id worked.
1773        let msg = timeout(max_wait, msgs_recv_rx.recv())
1774            .await
1775            .std_context("wait for second broadcast")?
1776            .std_context("receiver dropped channel")?;
1777        assert_eq!(&msg, "msg2");
1778        info!("kill broadcast endpoint");
1779        cancel.cancel();
1780
1781        info!("kill recv endpoint");
1782        recv_task.abort();
1783        assert!(join_handle_1.join().unwrap().is_none());
1784        assert!(join_handle_2.join().unwrap().is_none());
1785
1786        Ok(())
1787    }
1788
1789    #[tokio::test]
1790    #[traced_test]
1791    async fn gossip_change_alpn() -> n0_error::Result<()> {
1792        let alpn = b"my-gossip-alpn";
1793        let topic_id = TopicId::from([0u8; 32]);
1794
1795        let ep1 = Endpoint::empty_builder().bind().await?;
1796        let ep2 = Endpoint::empty_builder().bind().await?;
1797        let gossip1 = Gossip::builder().alpn(alpn).spawn(ep1.clone());
1798        let gossip2 = Gossip::builder().alpn(alpn).spawn(ep2.clone());
1799        let router1 = Router::builder(ep1).accept(alpn, gossip1.clone()).spawn();
1800        let router2 = Router::builder(ep2).accept(alpn, gossip2.clone()).spawn();
1801
1802        let addr1 = router1.endpoint().addr();
1803        let id1 = addr1.id;
1804        let memory_lookup = MemoryLookup::new();
1805        memory_lookup.add_endpoint_info(addr1);
1806        router2.endpoint().address_lookup()?.add(memory_lookup);
1807
1808        let mut topic1 = gossip1.subscribe(topic_id, vec![]).await?;
1809        let mut topic2 = gossip2.subscribe(topic_id, vec![id1]).await?;
1810
1811        timeout(Duration::from_secs(3), topic1.joined())
1812            .await
1813            .std_context("wait topic1 join")??;
1814        timeout(Duration::from_secs(3), topic2.joined())
1815            .await
1816            .std_context("wait topic2 join")??;
1817        router1.shutdown().await.std_context("shutdown router1")?;
1818        router2.shutdown().await.std_context("shutdown router2")?;
1819        Ok(())
1820    }
1821
1822    #[tokio::test]
1823    #[traced_test]
1824    async fn gossip_rely_on_gossip_address_lookup() -> n0_error::Result<()> {
1825        let rng = &mut rand_chacha::ChaCha12Rng::seed_from_u64(1);
1826
1827        async fn spawn(
1828            rng: &mut impl CryptoRng,
1829        ) -> n0_error::Result<(EndpointId, Router, Gossip, GossipSender, GossipReceiver)> {
1830            let topic_id = TopicId::from([0u8; 32]);
1831            let ep = Endpoint::empty_builder()
1832                .secret_key(SecretKey::generate(rng))
1833                .bind()
1834                .await?;
1835            let endpoint_id = ep.id();
1836            let gossip = Gossip::builder().spawn(ep.clone());
1837            let router = Router::builder(ep)
1838                .accept(GOSSIP_ALPN, gossip.clone())
1839                .spawn();
1840            let topic = gossip.subscribe(topic_id, vec![]).await?;
1841            let (sender, receiver) = topic.split();
1842            Ok((endpoint_id, router, gossip, sender, receiver))
1843        }
1844
1845        // spawn 3 endpoints without relay or address lookup
1846        let (n1, r1, _g1, _tx1, mut rx1) = spawn(rng).await?;
1847        let (n2, r2, _g2, tx2, mut rx2) = spawn(rng).await?;
1848        let (n3, r3, _g3, tx3, mut rx3) = spawn(rng).await?;
1849
1850        println!("endpoints {:?}", [n1, n2, n3]);
1851
1852        // create a mem lookup that has only endpoint 1 addr info set
1853        let addr1 = r1.endpoint().addr();
1854        let lookup = MemoryLookup::new();
1855        lookup.add_endpoint_info(addr1);
1856
1857        // add addr info of endpoint1 to endpoint2 and join endpoint1
1858        r2.endpoint().address_lookup()?.add(lookup.clone());
1859        tx2.join_peers(vec![n1]).await?;
1860
1861        // await join endpoint2 -> nodde1
1862        timeout(Duration::from_secs(3), rx1.joined())
1863            .await
1864            .std_context("wait rx1 join")??;
1865        timeout(Duration::from_secs(3), rx2.joined())
1866            .await
1867            .std_context("wait rx2 join")??;
1868
1869        // add addr info of endpoint1 to endpoint3 and join endpoint1
1870        r3.endpoint().address_lookup()?.add(lookup.clone());
1871        tx3.join_peers(vec![n1]).await?;
1872
1873        // await join at endpoint3: n1 and n2
1874        // n2 only works because because we use gossip address lookup!
1875        let ev = timeout(Duration::from_secs(3), rx3.next())
1876            .await
1877            .std_context("wait rx3 first neighbor")?;
1878        assert!(matches!(ev, Some(Ok(Event::NeighborUp(_)))));
1879        let ev = timeout(Duration::from_secs(3), rx3.next())
1880            .await
1881            .std_context("wait rx3 second neighbor")?;
1882        assert!(matches!(ev, Some(Ok(Event::NeighborUp(_)))));
1883
1884        assert_eq!(sorted(rx3.neighbors()), sorted([n1, n2]));
1885
1886        let ev = timeout(Duration::from_secs(3), rx2.next())
1887            .await
1888            .std_context("wait rx2 neighbor")?;
1889        assert!(matches!(ev, Some(Ok(Event::NeighborUp(n))) if n == n3));
1890
1891        let ev = timeout(Duration::from_secs(3), rx1.next())
1892            .await
1893            .std_context("wait rx1 neighbor")?;
1894        assert!(matches!(ev, Some(Ok(Event::NeighborUp(n))) if n == n3));
1895
1896        tokio::try_join!(r1.shutdown(), r2.shutdown(), r3.shutdown())
1897            .std_context("shutdown routers")?;
1898        Ok(())
1899    }
1900
1901    fn sorted<T: Ord>(input: impl IntoIterator<Item = T>) -> Vec<T> {
1902        let mut out: Vec<_> = input.into_iter().collect();
1903        out.sort();
1904        out
1905    }
1906
1907    /// Test that dropping sender doesn't close topic while receiver is still listening.
1908    ///
1909    /// This is a common footgun: users split a GossipTopic, drop the sender early,
1910    /// and expect the receiver to keep working. With the bug (using && in still_needed),
1911    /// the topic closes immediately when sender is dropped.
1912    #[tokio::test]
1913    #[traced_test]
1914    async fn topic_stays_alive_after_sender_drop() -> n0_error::Result<()> {
1915        let topic_id = TopicId::from([99u8; 32]);
1916
1917        let ep1 = Endpoint::empty_builder().bind().await?;
1918        let ep2 = Endpoint::empty_builder().bind().await?;
1919        let gossip1 = Gossip::builder().spawn(ep1.clone());
1920        let gossip2 = Gossip::builder().spawn(ep2.clone());
1921        let router1 = Router::builder(ep1)
1922            .accept(crate::ALPN, gossip1.clone())
1923            .spawn();
1924        let router2 = Router::builder(ep2)
1925            .accept(crate::ALPN, gossip2.clone())
1926            .spawn();
1927
1928        let addr1 = router1.endpoint().addr();
1929        let id1 = addr1.id;
1930        let mem_lookup = MemoryLookup::new();
1931        mem_lookup.add_endpoint_info(addr1);
1932        router2.endpoint().address_lookup()?.add(mem_lookup);
1933
1934        let topic1 = gossip1.subscribe(topic_id, vec![]).await?;
1935        let topic2 = gossip2.subscribe(topic_id, vec![id1]).await?;
1936
1937        let (tx1, mut rx1) = topic1.split();
1938        let (tx2, mut rx2) = topic2.split();
1939
1940        // Wait for mesh to form
1941        timeout(Duration::from_secs(3), rx1.joined())
1942            .await
1943            .std_context("wait rx1 join")??;
1944        timeout(Duration::from_secs(3), rx2.joined())
1945            .await
1946            .std_context("wait rx2 join")??;
1947
1948        // Node 1 drops its sender - simulating the footgun where user drops sender early
1949        drop(tx1);
1950
1951        // Node 2 sends a message - receiver on node 1 should still get it
1952        tx2.broadcast(b"hello from node2".to_vec().into()).await?;
1953
1954        // Node 1's receiver should still work and receive the message
1955        let event = timeout(Duration::from_secs(3), rx1.next())
1956            .await
1957            .std_context("wait for message on rx1")?;
1958
1959        match event {
1960            Some(Ok(Event::Received(msg))) => {
1961                assert_eq!(&msg.content[..], b"hello from node2");
1962            }
1963            other => panic!("expected Received event, got {:?}", other),
1964        }
1965
1966        drop(tx2);
1967        drop(rx1);
1968        drop(rx2);
1969        router1.shutdown().await.std_context("shutdown router1")?;
1970        router2.shutdown().await.std_context("shutdown router2")?;
1971        Ok(())
1972    }
1973}