1use std::{
4 collections::{hash_map::Entry, BTreeSet, HashMap, HashSet, VecDeque},
5 net::SocketAddr,
6 pin::Pin,
7 sync::Arc,
8 task::{Context, Poll},
9};
10
11use bytes::Bytes;
12use futures_concurrency::stream::{stream_group, StreamGroup};
13use futures_util::FutureExt as _;
14use iroh::{
15 endpoint::Connection,
16 protocol::{AcceptError, ProtocolHandler},
17 Endpoint, EndpointAddr, EndpointId, PublicKey, RelayUrl, Watcher,
18};
19use irpc::WithChannels;
20use n0_error::{e, stack_error};
21use n0_future::{
22 task::{self, AbortOnDropHandle, JoinSet},
23 time::Instant,
24 Stream, StreamExt as _,
25};
26use rand::{rngs::StdRng, SeedableRng};
27use serde::{Deserialize, Serialize};
28use tokio::sync::{broadcast, mpsc, oneshot};
29use tokio_util::sync::CancellationToken;
30use tracing::{debug, error, error_span, trace, warn, Instrument};
31
32use self::{
33 address_lookup::GossipAddressLookup,
34 util::{RecvLoop, SendLoop, Timers},
35};
36use crate::{
37 api::{self, Command, Event, GossipApi, RpcMessage},
38 metrics::Metrics,
39 proto::{self, HyparviewConfig, PeerData, PlumtreeConfig, Scope, TopicId},
40};
41
42mod address_lookup;
43mod util;
44
45pub const GOSSIP_ALPN: &[u8] = b"/iroh-gossip/1";
47
48const SEND_QUEUE_CAP: usize = 64;
50const TO_ACTOR_CAP: usize = 64;
52const IN_EVENT_CAP: usize = 1024;
54const TOPIC_EVENT_CAP: usize = 256;
56
57pub type ProtoEvent = proto::Event<PublicKey>;
59pub type ProtoCommand = proto::Command<PublicKey>;
61
62type InEvent = proto::InEvent<PublicKey>;
63type OutEvent = proto::OutEvent<PublicKey>;
64type Timer = proto::Timer<PublicKey>;
65type ProtoMessage = proto::Message<PublicKey>;
66
67#[derive(Debug, Clone)]
86pub struct Gossip {
87 pub(crate) inner: Arc<Inner>,
88}
89
90impl std::ops::Deref for Gossip {
91 type Target = GossipApi;
92 fn deref(&self) -> &Self::Target {
93 &self.inner.api
94 }
95}
96
97#[derive(Debug)]
98enum LocalActorMessage {
99 HandleConnection(Connection),
100 Shutdown { reply: oneshot::Sender<()> },
101}
102
103#[allow(missing_docs)]
104#[stack_error(derive, add_meta)]
105#[non_exhaustive]
106pub enum Error {
107 ActorDropped {},
108}
109
110impl<T> From<mpsc::error::SendError<T>> for Error {
111 fn from(_value: mpsc::error::SendError<T>) -> Self {
112 e!(Error::ActorDropped)
113 }
114}
115impl From<oneshot::error::RecvError> for Error {
116 fn from(_value: oneshot::error::RecvError) -> Self {
117 e!(Error::ActorDropped)
118 }
119}
120
121#[derive(Debug)]
122pub(crate) struct Inner {
123 api: GossipApi,
124 local_tx: mpsc::Sender<LocalActorMessage>,
125 _actor_handle: AbortOnDropHandle<()>,
126 max_message_size: usize,
127 metrics: Arc<Metrics>,
128}
129
130impl ProtocolHandler for Gossip {
131 async fn accept(&self, connection: Connection) -> Result<(), AcceptError> {
132 self.handle_connection(connection)
133 .await
134 .map_err(AcceptError::from_err)?;
135 Ok(())
136 }
137
138 async fn shutdown(&self) {
139 if let Err(err) = self.shutdown().await {
140 warn!("error while shutting down gossip: {err:#}");
141 }
142 }
143}
144
145#[derive(Debug, Clone)]
147pub struct Builder {
148 config: proto::Config,
149 alpn: Option<Bytes>,
150}
151
152impl Builder {
153 pub fn max_message_size(mut self, size: usize) -> Self {
156 self.config.max_message_size = size;
157 self
158 }
159
160 pub fn membership_config(mut self, config: HyparviewConfig) -> Self {
162 self.config.membership = config;
163 self
164 }
165
166 pub fn broadcast_config(mut self, config: PlumtreeConfig) -> Self {
168 self.config.broadcast = config;
169 self
170 }
171
172 pub fn alpn(mut self, alpn: impl AsRef<[u8]>) -> Self {
180 self.alpn = Some(alpn.as_ref().to_vec().into());
181 self
182 }
183
184 pub fn spawn(self, endpoint: Endpoint) -> Gossip {
186 let metrics = Arc::new(Metrics::default());
187 let address_lookup = GossipAddressLookup::default();
188
189 if let Ok(endpoint_addr_lookup) = endpoint.address_lookup().as_ref() {
195 endpoint_addr_lookup.add(address_lookup.clone());
196 }
197 let (actor, rpc_tx, local_tx) = Actor::new(
198 endpoint,
199 self.config,
200 metrics.clone(),
201 self.alpn,
202 address_lookup,
203 );
204 let me = actor.endpoint.id().fmt_short();
205 let max_message_size = actor.state.max_message_size();
206
207 let actor_handle = task::spawn(actor.run().instrument(error_span!("gossip", %me)));
208
209 let api = GossipApi::local(rpc_tx);
210
211 Gossip {
212 inner: Inner {
213 api,
214 local_tx,
215 _actor_handle: AbortOnDropHandle::new(actor_handle),
216 max_message_size,
217 metrics,
218 }
219 .into(),
220 }
221 }
222}
223
224impl Gossip {
225 pub fn builder() -> Builder {
227 Builder {
228 config: Default::default(),
229 alpn: None,
230 }
231 }
232
233 #[cfg(feature = "rpc")]
235 pub async fn listen(self, endpoint: noq::Endpoint) {
236 self.inner.api.listen(endpoint).await
237 }
238
239 pub fn max_message_size(&self) -> usize {
241 self.inner.max_message_size
242 }
243
244 pub async fn handle_connection(&self, conn: Connection) -> Result<(), Error> {
248 self.inner
249 .local_tx
250 .send(LocalActorMessage::HandleConnection(conn))
251 .await?;
252 Ok(())
253 }
254
255 pub async fn shutdown(&self) -> Result<(), Error> {
260 let (reply, reply_rx) = oneshot::channel();
261 self.inner
262 .local_tx
263 .send(LocalActorMessage::Shutdown { reply })
264 .await?;
265 reply_rx.await?;
266 Ok(())
267 }
268
269 pub fn metrics(&self) -> &Arc<Metrics> {
271 &self.inner.metrics
272 }
273}
274
275struct Actor {
277 alpn: Bytes,
278 state: proto::State<PublicKey, StdRng>,
280 endpoint: Endpoint,
282 dialer: Dialer,
284 rpc_rx: mpsc::Receiver<RpcMessage>,
286 local_rx: mpsc::Receiver<LocalActorMessage>,
287 in_event_tx: mpsc::Sender<InEvent>,
289 in_event_rx: mpsc::Receiver<InEvent>,
291 timers: Timers<Timer>,
293 topics: HashMap<TopicId, TopicState>,
295 peers: HashMap<EndpointId, PeerState>,
297 command_rx: stream_group::Keyed<TopicCommandStream>,
299 quit_queue: VecDeque<TopicId>,
301 connection_tasks: JoinSet<(EndpointId, Connection, Result<(), ConnectionLoopError>)>,
303 metrics: Arc<Metrics>,
304 topic_event_forwarders: JoinSet<TopicId>,
305 address_lookup: GossipAddressLookup,
306}
307
308impl Actor {
309 fn new(
310 endpoint: Endpoint,
311 config: proto::Config,
312 metrics: Arc<Metrics>,
313 alpn: Option<Bytes>,
314 address_lookup: GossipAddressLookup,
315 ) -> (
316 Self,
317 mpsc::Sender<RpcMessage>,
318 mpsc::Sender<LocalActorMessage>,
319 ) {
320 let peer_id = endpoint.id();
321 let dialer = Dialer::new(endpoint.clone());
322 let state = proto::State::new(
323 peer_id,
324 Default::default(),
325 config,
326 rand::rngs::StdRng::from_rng(&mut rand::rng()),
327 );
328 let (rpc_tx, rpc_rx) = mpsc::channel(TO_ACTOR_CAP);
329 let (local_tx, local_rx) = mpsc::channel(16);
330 let (in_event_tx, in_event_rx) = mpsc::channel(IN_EVENT_CAP);
331
332 let actor = Actor {
333 alpn: alpn.unwrap_or_else(|| GOSSIP_ALPN.to_vec().into()),
334 endpoint,
335 state,
336 dialer,
337 rpc_rx,
338 in_event_rx,
339 in_event_tx,
340 timers: Timers::new(),
341 command_rx: StreamGroup::new().keyed(),
342 peers: Default::default(),
343 topics: Default::default(),
344 quit_queue: Default::default(),
345 connection_tasks: Default::default(),
346 metrics,
347 local_rx,
348 topic_event_forwarders: Default::default(),
349 address_lookup,
350 };
351
352 (actor, rpc_tx, local_tx)
353 }
354
355 pub async fn run(mut self) {
356 let mut addr_update_stream = self.setup().await;
357
358 let mut i = 0;
359 while self.event_loop(&mut addr_update_stream, i).await {
360 i += 1;
361 }
362 }
363
364 async fn setup(&mut self) -> impl Stream<Item = EndpointAddr> + Send + Unpin + use<> {
369 let addr_update_stream = self.endpoint.watch_addr().stream();
370 let initial_addr = self.endpoint.addr();
371 self.handle_addr_update(initial_addr).await;
372 addr_update_stream
373 }
374
375 async fn event_loop(
379 &mut self,
380 addr_updates: &mut (impl Stream<Item = EndpointAddr> + Send + Unpin),
381 i: usize,
382 ) -> bool {
383 self.metrics.actor_tick_main.inc();
384 tokio::select! {
385 biased;
386 conn = self.local_rx.recv() => {
387 match conn {
388 Some(LocalActorMessage::Shutdown { reply }) => {
389 debug!("received shutdown message, quit all topics");
390 self.quit_queue.extend(self.topics.keys().copied());
391 self.process_quit_queue().await;
392 debug!("all topics quit, stop gossip actor");
393 reply.send(()).ok();
394 return false;
395 },
396 Some(LocalActorMessage::HandleConnection(conn)) => {
397 self.handle_connection(conn.remote_id(), ConnOrigin::Accept, conn);
398 }
399 None => {
400 debug!("all gossip handles dropped, stop gossip actor");
401 return false;
402 }
403 }
404 }
405 msg = self.rpc_rx.recv() => {
406 trace!(?i, "tick: to_actor_rx");
407 self.metrics.actor_tick_rx.inc();
408 match msg {
409 Some(msg) => {
410 self.handle_rpc_msg(msg, Instant::now()).await;
411 }
412 None => {
413 debug!("all gossip handles dropped, stop gossip actor");
414 return false;
415 }
416 }
417 },
418 Some((key, (topic, command))) = self.command_rx.next(), if !self.command_rx.is_empty() => {
419 trace!(?i, "tick: command_rx");
420 self.handle_command(topic, key, command).await;
421 },
422 Some(new_address) = addr_updates.next() => {
423 trace!(?i, "tick: new_address");
424 self.metrics.actor_tick_endpoint.inc();
425 self.handle_addr_update(new_address).await;
426 }
427 (peer_id, res) = self.dialer.next_conn() => {
428 trace!(?i, "tick: dialer");
429 self.metrics.actor_tick_dialer.inc();
430 match res {
431 Some(Ok(conn)) => {
432 debug!(peer = %peer_id.fmt_short(), "dial successful");
433 self.metrics.actor_tick_dialer_success.inc();
434 self.handle_connection(peer_id, ConnOrigin::Dial, conn);
435 }
436 Some(Err(err)) => {
437 warn!(peer = %peer_id.fmt_short(), "dial failed: {err}");
438 self.metrics.actor_tick_dialer_failure.inc();
439 let peer_state = self.peers.get(&peer_id);
440 let is_active = matches!(peer_state, Some(PeerState::Active { .. }));
441 if !is_active {
442 self.handle_in_event(InEvent::PeerDisconnected(peer_id), Instant::now())
443 .await;
444 }
445 }
446 None => {
447 warn!(peer = %peer_id.fmt_short(), "dial disconnected");
448 self.metrics.actor_tick_dialer_failure.inc();
449 }
450 }
451 }
452 event = self.in_event_rx.recv() => {
453 trace!(?i, "tick: in_event_rx");
454 self.metrics.actor_tick_in_event_rx.inc();
455 let event = event.expect("unreachable: in_event_tx is never dropped before receiver");
456 self.handle_in_event(event, Instant::now()).await;
457 }
458 _ = self.timers.wait_next() => {
459 trace!(?i, "tick: timers");
460 self.metrics.actor_tick_timers.inc();
461 let now = Instant::now();
462 while let Some((_instant, timer)) = self.timers.pop_before(now) {
463 self.handle_in_event(InEvent::TimerExpired(timer), now).await;
464 }
465 }
466 Some(res) = self.connection_tasks.join_next(), if !self.connection_tasks.is_empty() => {
467 trace!(?i, "tick: connection_tasks");
468 let (peer_id, conn, result) = res.expect("connection task panicked");
469 self.handle_connection_task_finished(peer_id, conn, result).await;
470 }
471 Some(res) = self.topic_event_forwarders.join_next(), if !self.topic_event_forwarders.is_empty() => {
472 let topic_id = res.expect("topic event forwarder panicked");
473 if let Some(state) = self.topics.get_mut(&topic_id) {
474 if !state.still_needed() {
475 self.quit_queue.push_back(topic_id);
476 self.process_quit_queue().await;
477 }
478 }
479 }
480 }
481
482 true
483 }
484
485 async fn handle_addr_update(&mut self, endpoint_addr: EndpointAddr) {
486 let peer_data = encode_peer_data(&endpoint_addr.into());
488 self.handle_in_event(InEvent::UpdatePeerData(peer_data), Instant::now())
489 .await
490 }
491
492 async fn handle_command(
493 &mut self,
494 topic: TopicId,
495 key: stream_group::Key,
496 command: Option<Command>,
497 ) {
498 debug!(?topic, ?key, ?command, "handle command");
499 let Some(state) = self.topics.get_mut(&topic) else {
500 warn!("received command for unknown topic");
502 return;
503 };
504 match command {
505 Some(command) => {
506 let command = match command {
507 Command::Broadcast(message) => ProtoCommand::Broadcast(message, Scope::Swarm),
508 Command::BroadcastNeighbors(message) => {
509 ProtoCommand::Broadcast(message, Scope::Neighbors)
510 }
511 Command::JoinPeers(peers) => ProtoCommand::Join(peers),
512 };
513 self.handle_in_event(proto::InEvent::Command(topic, command), Instant::now())
514 .await;
515 }
516 None => {
517 state.command_rx_keys.remove(&key);
518 if !state.still_needed() {
519 self.quit_queue.push_back(topic);
520 self.process_quit_queue().await;
521 }
522 }
523 }
524 }
525
526 fn handle_connection(&mut self, peer_id: EndpointId, origin: ConnOrigin, conn: Connection) {
527 let (send_tx, send_rx) = mpsc::channel(SEND_QUEUE_CAP);
528 let conn_id = conn.stable_id();
529
530 let queue = match self.peers.entry(peer_id) {
531 Entry::Occupied(mut entry) => entry.get_mut().accept_conn(send_tx, conn_id),
532 Entry::Vacant(entry) => {
533 entry.insert(PeerState::Active {
534 active_send_tx: send_tx,
535 active_conn_id: conn_id,
536 other_conns: Vec::new(),
537 });
538 Vec::new()
539 }
540 };
541
542 let max_message_size = self.state.max_message_size();
543 let in_event_tx = self.in_event_tx.clone();
544
545 self.connection_tasks.spawn(
547 async move {
548 let res = connection_loop(
549 peer_id,
550 conn.clone(),
551 origin,
552 send_rx,
553 in_event_tx,
554 max_message_size,
555 queue,
556 )
557 .await;
558 (peer_id, conn, res)
559 }
560 .instrument(error_span!("conn", peer = %peer_id.fmt_short())),
561 );
562 }
563
564 #[tracing::instrument(name = "conn", skip_all, fields(peer = %peer_id.fmt_short()))]
565 async fn handle_connection_task_finished(
566 &mut self,
567 peer_id: EndpointId,
568 conn: Connection,
569 task_result: Result<(), ConnectionLoopError>,
570 ) {
571 if conn.close_reason().is_none() {
572 conn.close(0u32.into(), b"close from disconnect");
573 }
574 let reason = conn.close_reason().expect("just closed");
575 let error = task_result.err();
576 debug!(%reason, ?error, "connection closed");
577 if let Some(PeerState::Active {
578 active_conn_id,
579 other_conns,
580 ..
581 }) = self.peers.get_mut(&peer_id)
582 {
583 if conn.stable_id() == *active_conn_id {
584 debug!("active send connection closed, mark peer as disconnected");
585 self.handle_in_event(InEvent::PeerDisconnected(peer_id), Instant::now())
586 .await;
587 } else {
588 other_conns.retain(|x| *x != conn.stable_id());
589 debug!("remaining {} other connections", other_conns.len() + 1);
590 }
591 } else {
592 debug!("peer already marked as disconnected");
593 }
594 }
595
596 async fn handle_rpc_msg(&mut self, msg: RpcMessage, now: Instant) {
597 trace!("handle to_actor {msg:?}");
598 match msg {
599 RpcMessage::Join(msg) => {
600 let WithChannels {
601 inner,
602 rx,
603 tx,
604 span: _,
606 } = msg;
607 let api::JoinRequest {
608 topic_id,
609 bootstrap,
610 } = inner;
611 let TopicState {
612 neighbors,
613 event_sender,
614 command_rx_keys,
615 } = self.topics.entry(topic_id).or_default();
616 let mut sender_dead = false;
617 if !neighbors.is_empty() {
618 for neighbor in neighbors.iter() {
619 if let Err(_err) = tx.try_send(Event::NeighborUp(*neighbor)).await {
620 sender_dead = true;
621 break;
622 }
623 }
624 }
625
626 if !sender_dead {
627 let fut =
628 topic_subscriber_loop(tx, event_sender.subscribe()).map(move |_| topic_id);
629 self.topic_event_forwarders
630 .spawn(fut.instrument(tracing::Span::current()));
631 }
632 let command_rx = TopicCommandStream::new(topic_id, Box::pin(rx.into_stream()));
633 let key = self.command_rx.insert(command_rx);
634 command_rx_keys.insert(key);
635
636 self.handle_in_event(
637 InEvent::Command(
638 topic_id,
639 ProtoCommand::Join(bootstrap.into_iter().collect()),
640 ),
641 now,
642 )
643 .await;
644 }
645 }
646 }
647
648 async fn handle_in_event(&mut self, event: InEvent, now: Instant) {
649 self.handle_in_event_inner(event, now).await;
650 self.process_quit_queue().await;
651 }
652
653 async fn process_quit_queue(&mut self) {
654 while let Some(topic_id) = self.quit_queue.pop_front() {
655 self.handle_in_event_inner(
656 InEvent::Command(topic_id, ProtoCommand::Quit),
657 Instant::now(),
658 )
659 .await;
660 if self.topics.remove(&topic_id).is_some() {
661 tracing::debug!(%topic_id, "publishers and subscribers gone; unsubscribing");
662 }
663 }
664 }
665
666 async fn handle_in_event_inner(&mut self, event: InEvent, now: Instant) {
667 if matches!(event, InEvent::TimerExpired(_)) {
668 trace!(?event, "handle in_event");
669 } else {
670 debug!(?event, "handle in_event");
671 };
672 let out = self.state.handle(event, now, Some(&self.metrics));
673 for event in out {
674 if matches!(event, OutEvent::ScheduleTimer(_, _)) {
675 trace!(?event, "handle out_event");
676 } else {
677 debug!(?event, "handle out_event");
678 };
679 match event {
680 OutEvent::SendMessage(peer_id, message) => {
681 let state = self.peers.entry(peer_id).or_default();
682 match state {
683 PeerState::Active { active_send_tx, .. } => {
684 if let Err(_err) = active_send_tx.send(message).await {
685 warn!(
688 peer = %peer_id.fmt_short(),
689 "failed to send: connection task send loop terminated",
690 );
691 }
692 }
693 PeerState::Pending { queue } => {
694 if queue.is_empty() {
695 debug!(peer = %peer_id.fmt_short(), "start to dial");
696 self.dialer.queue_dial(peer_id, self.alpn.clone());
697 }
698 queue.push(message);
699 }
700 }
701 }
702 OutEvent::EmitEvent(topic_id, event) => {
703 let Some(state) = self.topics.get_mut(&topic_id) else {
704 warn!(?topic_id, "gossip state emitted event for unknown topic");
706 continue;
707 };
708 let TopicState {
709 neighbors,
710 event_sender,
711 ..
712 } = state;
713 match &event {
714 ProtoEvent::NeighborUp(neighbor) => {
715 neighbors.insert(*neighbor);
716 }
717 ProtoEvent::NeighborDown(neighbor) => {
718 neighbors.remove(neighbor);
719 }
720 _ => {}
721 }
722 event_sender.send(event).ok();
723 if !state.still_needed() {
724 self.quit_queue.push_back(topic_id);
725 }
726 }
727 OutEvent::ScheduleTimer(delay, timer) => {
728 self.timers.insert(now + delay, timer);
729 }
730 OutEvent::DisconnectPeer(peer_id) => {
731 debug!(peer=%peer_id.fmt_short(), "gossip state indicates disconnect: drop peer");
733 self.peers.remove(&peer_id);
734 }
735 OutEvent::PeerData(endpoint_id, data) => match decode_peer_data(&data) {
736 Err(err) => warn!("Failed to decode {data:?} from {endpoint_id}: {err}"),
737 Ok(info) => {
738 debug!(peer = ?endpoint_id, "add known addrs: {info:?}");
739 let mut endpoint_addr = EndpointAddr::new(endpoint_id);
740 for addr in info.direct_addresses {
741 endpoint_addr = endpoint_addr.with_ip_addr(addr);
742 }
743 if let Some(relay_url) = info.relay_url {
744 endpoint_addr = endpoint_addr.with_relay_url(relay_url);
745 }
746
747 self.address_lookup.add(endpoint_addr);
748 }
749 },
750 }
751 }
752 }
753}
754
755type ConnId = usize;
756
757#[derive(Debug)]
758enum PeerState {
759 Pending {
760 queue: Vec<ProtoMessage>,
761 },
762 Active {
763 active_send_tx: mpsc::Sender<ProtoMessage>,
764 active_conn_id: ConnId,
765 other_conns: Vec<ConnId>,
766 },
767}
768
769impl PeerState {
770 fn accept_conn(
771 &mut self,
772 send_tx: mpsc::Sender<ProtoMessage>,
773 conn_id: ConnId,
774 ) -> Vec<ProtoMessage> {
775 match self {
776 PeerState::Pending { queue } => {
777 let queue = std::mem::take(queue);
778 *self = PeerState::Active {
779 active_send_tx: send_tx,
780 active_conn_id: conn_id,
781 other_conns: Vec::new(),
782 };
783 queue
784 }
785 PeerState::Active {
786 active_send_tx,
787 active_conn_id,
788 other_conns,
789 } => {
790 other_conns.push(*active_conn_id);
796 *active_send_tx = send_tx;
797 *active_conn_id = conn_id;
798 Vec::new()
799 }
800 }
801 }
802}
803
804impl Default for PeerState {
805 fn default() -> Self {
806 PeerState::Pending { queue: Vec::new() }
807 }
808}
809
810#[derive(Debug)]
811struct TopicState {
812 neighbors: BTreeSet<EndpointId>,
813 event_sender: broadcast::Sender<ProtoEvent>,
814 command_rx_keys: HashSet<stream_group::Key>,
818}
819
820impl Default for TopicState {
821 fn default() -> Self {
822 let (event_sender, _) = broadcast::channel(TOPIC_EVENT_CAP);
823 Self {
824 neighbors: Default::default(),
825 command_rx_keys: Default::default(),
826 event_sender,
827 }
828 }
829}
830
831impl TopicState {
832 fn still_needed(&self) -> bool {
834 !self.command_rx_keys.is_empty() || self.event_sender.receiver_count() > 0
837 }
838
839 #[cfg(test)]
840 fn joined(&self) -> bool {
841 !self.neighbors.is_empty()
842 }
843}
844
845#[derive(Debug, Clone, Copy, PartialEq, Eq)]
847enum ConnOrigin {
848 Accept,
849 Dial,
850}
851
852#[allow(missing_docs)]
853#[stack_error(derive, add_meta, from_sources, std_sources)]
854#[non_exhaustive]
855enum ConnectionLoopError {
856 #[error(transparent)]
857 Write {
858 source: self::util::WriteError,
859 },
860 #[error(transparent)]
861 Read {
862 source: self::util::ReadError,
863 },
864 #[error(transparent)]
865 Connection {
866 #[error(std_err)]
867 source: iroh::endpoint::ConnectionError,
868 },
869 ActorDropped {},
870}
871
872impl<T> From<mpsc::error::SendError<T>> for ConnectionLoopError {
873 fn from(_value: mpsc::error::SendError<T>) -> Self {
874 e!(ConnectionLoopError::ActorDropped)
875 }
876}
877
878async fn connection_loop(
879 from: PublicKey,
880 conn: Connection,
881 origin: ConnOrigin,
882 send_rx: mpsc::Receiver<ProtoMessage>,
883 in_event_tx: mpsc::Sender<InEvent>,
884 max_message_size: usize,
885 queue: Vec<ProtoMessage>,
886) -> Result<(), ConnectionLoopError> {
887 debug!(?origin, "connection established");
888
889 let mut send_loop = SendLoop::new(conn.clone(), send_rx, max_message_size);
890 let mut recv_loop = RecvLoop::new(from, conn, in_event_tx, max_message_size);
891
892 let send_fut = send_loop.run(queue).instrument(error_span!("send"));
893 let recv_fut = recv_loop.run().instrument(error_span!("recv"));
894
895 let (send_res, recv_res) = tokio::join!(send_fut, recv_fut);
896 send_res?;
897 recv_res?;
898 Ok(())
899}
900
901#[derive(Default, Debug, Clone, Serialize, Deserialize)]
902struct AddrInfo {
903 relay_url: Option<RelayUrl>,
904 direct_addresses: BTreeSet<SocketAddr>,
905}
906
907impl From<EndpointAddr> for AddrInfo {
908 fn from(endpoint_addr: EndpointAddr) -> Self {
909 Self {
910 relay_url: endpoint_addr.relay_urls().next().cloned(),
911 direct_addresses: endpoint_addr.ip_addrs().cloned().collect(),
912 }
913 }
914}
915
916fn encode_peer_data(info: &AddrInfo) -> PeerData {
917 let bytes = postcard::to_stdvec(info).expect("serializing AddrInfo may not fail");
918 PeerData::new(bytes)
919}
920
921fn decode_peer_data(peer_data: &PeerData) -> Result<AddrInfo, postcard::Error> {
922 let bytes = peer_data.as_bytes();
923 if bytes.is_empty() {
924 return Ok(AddrInfo::default());
925 }
926 let info = postcard::from_bytes(bytes)?;
927 Ok(info)
928}
929
930async fn topic_subscriber_loop(
931 sender: irpc::channel::mpsc::Sender<Event>,
932 mut topic_events: broadcast::Receiver<ProtoEvent>,
933) {
934 loop {
935 tokio::select! {
936 biased;
937 msg = topic_events.recv() => {
938 let event = match msg {
939 Err(broadcast::error::RecvError::Closed) => break,
940 Err(broadcast::error::RecvError::Lagged(_)) => Event::Lagged,
941 Ok(event) => event.into(),
942 };
943 if sender.send(event).await.is_err() {
944 break;
945 }
946 }
947 _ = sender.closed() => break,
948 }
949 }
950}
951
952type BoxedCommandReceiver =
954 n0_future::stream::Boxed<Result<Command, irpc::channel::mpsc::RecvError>>;
955
956#[derive(derive_more::Debug)]
957struct TopicCommandStream {
958 topic_id: TopicId,
959 #[debug("CommandStream")]
960 stream: BoxedCommandReceiver,
961 closed: bool,
962}
963
964impl TopicCommandStream {
965 fn new(topic_id: TopicId, stream: BoxedCommandReceiver) -> Self {
966 Self {
967 topic_id,
968 stream,
969 closed: false,
970 }
971 }
972}
973
974impl Stream for TopicCommandStream {
975 type Item = (TopicId, Option<Command>);
976 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
977 if self.closed {
978 return Poll::Ready(None);
979 }
980 match Pin::new(&mut self.stream).poll_next(cx) {
981 Poll::Ready(Some(Ok(item))) => Poll::Ready(Some((self.topic_id, Some(item)))),
982 Poll::Ready(None) | Poll::Ready(Some(Err(_))) => {
983 self.closed = true;
984 Poll::Ready(Some((self.topic_id, None)))
985 }
986 Poll::Pending => Poll::Pending,
987 }
988 }
989}
990
991#[derive(Debug)]
992struct Dialer {
993 endpoint: Endpoint,
994 pending: JoinSet<(
995 EndpointId,
996 Option<Result<Connection, iroh::endpoint::ConnectError>>,
997 )>,
998 pending_dials: HashMap<EndpointId, CancellationToken>,
999}
1000
1001impl Dialer {
1002 fn new(endpoint: Endpoint) -> Self {
1004 Self {
1005 endpoint,
1006 pending: Default::default(),
1007 pending_dials: Default::default(),
1008 }
1009 }
1010
1011 fn queue_dial(&mut self, endpoint_id: EndpointId, alpn: Bytes) {
1013 if self.is_pending(endpoint_id) {
1014 return;
1015 }
1016 let cancel = CancellationToken::new();
1017 self.pending_dials.insert(endpoint_id, cancel.clone());
1018 let endpoint = self.endpoint.clone();
1019 self.pending.spawn(
1020 async move {
1021 let res = tokio::select! {
1022 biased;
1023 _ = cancel.cancelled() => None,
1024 res = endpoint.connect(endpoint_id, &alpn) => Some(res),
1025 };
1026 (endpoint_id, res)
1027 }
1028 .instrument(tracing::Span::current()),
1029 );
1030 }
1031
1032 fn is_pending(&self, endpoint: EndpointId) -> bool {
1034 self.pending_dials.contains_key(&endpoint)
1035 }
1036
1037 async fn next_conn(
1040 &mut self,
1041 ) -> (
1042 EndpointId,
1043 Option<Result<Connection, iroh::endpoint::ConnectError>>,
1044 ) {
1045 match self.pending_dials.is_empty() {
1046 false => {
1047 let (endpoint_id, res) = loop {
1048 match self.pending.join_next().await {
1049 Some(Ok((endpoint_id, res))) => {
1050 self.pending_dials.remove(&endpoint_id);
1051 break (endpoint_id, res);
1052 }
1053 Some(Err(e)) => {
1054 error!("next conn error: {:?}", e);
1055 }
1056 None => {
1057 error!("no more pending conns available");
1058 std::future::pending().await
1059 }
1060 }
1061 };
1062
1063 (endpoint_id, res)
1064 }
1065 true => std::future::pending().await,
1066 }
1067 }
1068}
1069
1070#[cfg(test)]
1071pub(crate) mod test {
1072 use std::time::Duration;
1073
1074 use bytes::Bytes;
1075 use futures_concurrency::future::TryJoin;
1076 use iroh::{
1077 address_lookup::memory::MemoryLookup, endpoint::BindError, protocol::Router,
1078 tls::CaRootsConfig, RelayMap, RelayMode, SecretKey,
1079 };
1080 use n0_error::{AnyError, Result, StdResultExt};
1081 use n0_tracing_test::traced_test;
1082 use rand::{CryptoRng, Rng};
1083 use tokio::{spawn, time::timeout};
1084 use tokio_util::sync::CancellationToken;
1085 use tracing::{info, instrument};
1086
1087 use super::*;
1088 use crate::api::{ApiError, GossipReceiver, GossipSender};
1089
1090 struct ManualActorLoop {
1091 actor: Actor,
1092 step: usize,
1093 }
1094
1095 impl std::ops::Deref for ManualActorLoop {
1096 type Target = Actor;
1097
1098 fn deref(&self) -> &Self::Target {
1099 &self.actor
1100 }
1101 }
1102
1103 impl std::ops::DerefMut for ManualActorLoop {
1104 fn deref_mut(&mut self) -> &mut Self::Target {
1105 &mut self.actor
1106 }
1107 }
1108
1109 type EndpointHandle = tokio::task::JoinHandle<Result<()>>;
1110
1111 impl ManualActorLoop {
1112 #[instrument(skip_all, fields(me = %actor.endpoint.id().fmt_short()))]
1113 async fn new(mut actor: Actor) -> Self {
1114 let _ = actor.setup().await;
1115 Self { actor, step: 0 }
1116 }
1117
1118 #[instrument(skip_all, fields(me = %self.endpoint.id().fmt_short()))]
1119 async fn step(&mut self) -> bool {
1120 let ManualActorLoop { actor, step } = self;
1121 *step += 1;
1122 let addr_update_stream = &mut futures_lite::stream::pending();
1125 actor.event_loop(addr_update_stream, *step).await
1126 }
1127
1128 async fn steps(&mut self, n: usize) {
1129 for _ in 0..n {
1130 self.step().await;
1131 }
1132 }
1133
1134 async fn finish(mut self) {
1135 while self.step().await {}
1136 }
1137 }
1138
1139 impl Gossip {
1140 async fn t_new_with_actor(
1147 rng: &mut rand_chacha::ChaCha12Rng,
1148 config: proto::Config,
1149 relay_map: RelayMap,
1150 cancel: &CancellationToken,
1151 ) -> Result<(Self, Actor, EndpointHandle), BindError> {
1152 let endpoint = create_endpoint(rng, relay_map, None).await?;
1153 let metrics = Arc::new(Metrics::default());
1154 let address_lookup = GossipAddressLookup::default();
1155 endpoint
1156 .address_lookup()
1157 .expect("endpoint is not closed")
1158 .add(address_lookup.clone());
1159
1160 let (actor, to_actor_tx, conn_tx) =
1161 Actor::new(endpoint, config, metrics.clone(), None, address_lookup);
1162 let max_message_size = actor.state.max_message_size();
1163
1164 let _actor_handle =
1165 AbortOnDropHandle::new(task::spawn(futures_lite::future::pending()));
1166 let gossip = Self {
1167 inner: Inner {
1168 api: GossipApi::local(to_actor_tx),
1169 local_tx: conn_tx,
1170 _actor_handle,
1171 max_message_size,
1172 metrics,
1173 }
1174 .into(),
1175 };
1176
1177 let endpoint_task = task::spawn(endpoint_loop(
1178 actor.endpoint.clone(),
1179 gossip.clone(),
1180 cancel.child_token(),
1181 ));
1182
1183 Ok((gossip, actor, endpoint_task))
1184 }
1185
1186 async fn t_new(
1188 rng: &mut rand_chacha::ChaCha12Rng,
1189 config: proto::Config,
1190 relay_map: RelayMap,
1191 cancel: &CancellationToken,
1192 ) -> Result<(Self, Endpoint, EndpointHandle, impl Drop + use<>), BindError> {
1193 let (g, actor, ep_handle) =
1194 Gossip::t_new_with_actor(rng, config, relay_map, cancel).await?;
1195 let ep = actor.endpoint.clone();
1196 let me = ep.id().fmt_short();
1197 let actor_handle =
1198 task::spawn(actor.run().instrument(tracing::error_span!("gossip", %me)));
1199 Ok((g, ep, ep_handle, AbortOnDropHandle::new(actor_handle)))
1200 }
1201 }
1202
1203 pub(crate) async fn create_endpoint(
1204 rng: &mut rand_chacha::ChaCha12Rng,
1205 relay_map: RelayMap,
1206 memory_lookup: Option<MemoryLookup>,
1207 ) -> Result<Endpoint, BindError> {
1208 let ep = Endpoint::empty_builder()
1209 .relay_mode(RelayMode::Custom(relay_map))
1210 .secret_key(SecretKey::generate(rng))
1211 .alpns(vec![GOSSIP_ALPN.to_vec()])
1212 .ca_roots_config(CaRootsConfig::insecure_skip_verify())
1213 .bind()
1214 .await?;
1215
1216 if let Some(memory_lookup) = memory_lookup {
1217 ep.address_lookup()
1218 .expect("endpoint is not closed")
1219 .add(memory_lookup);
1220 }
1221 ep.online().await;
1222 Ok(ep)
1223 }
1224
1225 async fn endpoint_loop(
1226 endpoint: Endpoint,
1227 gossip: Gossip,
1228 cancel: CancellationToken,
1229 ) -> Result<()> {
1230 loop {
1231 tokio::select! {
1232 biased;
1233 _ = cancel.cancelled() => break,
1234 incoming = endpoint.accept() => match incoming {
1235 None => break,
1236 Some(incoming) => {
1237 let connecting = match incoming.accept() {
1238 Ok(connecting) => connecting,
1239 Err(err) => {
1240 warn!("incoming connection failed: {err:#}");
1241 continue;
1244 }
1245 };
1246 let connection = connecting
1247 .await
1248 .std_context("await incoming connection")?;
1249 gossip.handle_connection(connection).await?
1250 }
1251 }
1252 }
1253 }
1254 Ok(())
1255 }
1256
1257 #[tokio::test]
1258 #[traced_test]
1259 async fn gossip_net_smoke() {
1260 let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
1261 let (relay_map, relay_url, _guard) = iroh::test_utils::run_relay_server().await.unwrap();
1262
1263 let memory_lookup = MemoryLookup::new();
1264
1265 let ep1 = create_endpoint(&mut rng, relay_map.clone(), Some(memory_lookup.clone()))
1266 .await
1267 .unwrap();
1268 let ep2 = create_endpoint(&mut rng, relay_map.clone(), Some(memory_lookup.clone()))
1269 .await
1270 .unwrap();
1271 let ep3 = create_endpoint(&mut rng, relay_map.clone(), Some(memory_lookup.clone()))
1272 .await
1273 .unwrap();
1274
1275 let go1 = Gossip::builder().spawn(ep1.clone());
1276 let go2 = Gossip::builder().spawn(ep2.clone());
1277 let go3 = Gossip::builder().spawn(ep3.clone());
1278 debug!("peer1 {:?}", ep1.id());
1279 debug!("peer2 {:?}", ep2.id());
1280 debug!("peer3 {:?}", ep3.id());
1281 let pi1 = ep1.id();
1282 let pi2 = ep2.id();
1283
1284 let cancel = CancellationToken::new();
1285 let tasks = [
1286 spawn(endpoint_loop(ep1.clone(), go1.clone(), cancel.clone())),
1287 spawn(endpoint_loop(ep2.clone(), go2.clone(), cancel.clone())),
1288 spawn(endpoint_loop(ep3.clone(), go3.clone(), cancel.clone())),
1289 ];
1290
1291 debug!("----- adding peers ----- ");
1292 let topic: TopicId = blake3::hash(b"foobar").into();
1293
1294 let addr1 = EndpointAddr::new(pi1).with_relay_url(relay_url.clone());
1295 let addr2 = EndpointAddr::new(pi2).with_relay_url(relay_url);
1296 memory_lookup.add_endpoint_info(addr1.clone());
1297 memory_lookup.add_endpoint_info(addr2.clone());
1298
1299 debug!("----- joining ----- ");
1300 let [sub1, mut sub2, mut sub3] = [
1302 go1.subscribe_and_join(topic, vec![]),
1303 go2.subscribe_and_join(topic, vec![pi1]),
1304 go3.subscribe_and_join(topic, vec![pi2]),
1305 ]
1306 .try_join()
1307 .await
1308 .unwrap();
1309
1310 let (sink1, _stream1) = sub1.split();
1311
1312 let len = 2;
1313
1314 let pub1 = spawn(async move {
1316 for i in 0..len {
1317 let message = format!("hi{i}");
1318 info!("go1 broadcast: {message:?}");
1319 sink1.broadcast(message.into_bytes().into()).await.unwrap();
1320 tokio::time::sleep(Duration::from_micros(1)).await;
1321 }
1322 });
1323
1324 let sub2 = spawn(async move {
1326 let mut recv = vec![];
1327 loop {
1328 let ev = sub2.next().await.unwrap().unwrap();
1329 info!("go2 event: {ev:?}");
1330 if let Event::Received(msg) = ev {
1331 recv.push(msg.content);
1332 }
1333 if recv.len() == len {
1334 return recv;
1335 }
1336 }
1337 });
1338
1339 let sub3 = spawn(async move {
1341 let mut recv = vec![];
1342 loop {
1343 let ev = sub3.next().await.unwrap().unwrap();
1344 info!("go3 event: {ev:?}");
1345 if let Event::Received(msg) = ev {
1346 recv.push(msg.content);
1347 }
1348 if recv.len() == len {
1349 return recv;
1350 }
1351 }
1352 });
1353
1354 timeout(Duration::from_secs(10), pub1)
1355 .await
1356 .unwrap()
1357 .unwrap();
1358 let recv2 = timeout(Duration::from_secs(10), sub2)
1359 .await
1360 .unwrap()
1361 .unwrap();
1362 let recv3 = timeout(Duration::from_secs(10), sub3)
1363 .await
1364 .unwrap()
1365 .unwrap();
1366
1367 let expected: HashSet<Bytes> = (0..len)
1372 .map(|i| Bytes::from(format!("hi{i}").into_bytes()))
1373 .collect();
1374 assert_eq!(HashSet::from_iter(recv2), expected);
1375 assert_eq!(HashSet::from_iter(recv3), expected);
1376
1377 cancel.cancel();
1378 for t in tasks {
1379 timeout(Duration::from_secs(10), t)
1380 .await
1381 .unwrap()
1382 .unwrap()
1383 .unwrap();
1384 }
1385 }
1386
1387 #[tokio::test]
1397 #[traced_test]
1398 async fn subscription_cleanup() -> Result {
1399 let rng = &mut rand_chacha::ChaCha12Rng::seed_from_u64(1);
1400 let ct = CancellationToken::new();
1401 let (relay_map, relay_url, _guard) = iroh::test_utils::run_relay_server().await.unwrap();
1402
1403 let (go1, actor, ep1_handle) =
1405 Gossip::t_new_with_actor(rng, Default::default(), relay_map.clone(), &ct).await?;
1406 let mut actor = ManualActorLoop::new(actor).await;
1407
1408 let (go2, ep2, ep2_handle, _test_actor_handle) =
1410 Gossip::t_new(rng, Default::default(), relay_map, &ct).await?;
1411
1412 let endpoint_id1 = actor.endpoint.id();
1413 let endpoint_id2 = ep2.id();
1414 tracing::info!(
1415 endpoint_1 = %endpoint_id1.fmt_short(),
1416 endpoint_2 = %endpoint_id2.fmt_short(),
1417 "endpoints ready"
1418 );
1419
1420 let topic: TopicId = blake3::hash(b"subscription_cleanup").into();
1421 tracing::info!(%topic, "joining");
1422
1423 let ct2 = ct.clone();
1430 let go2_task = async move {
1431 let (_pub_tx, mut sub_rx) = go2.subscribe_and_join(topic, vec![]).await?.split();
1432
1433 let subscribe_fut = async {
1434 while let Some(ev) = sub_rx.try_next().await? {
1435 match ev {
1436 Event::Lagged => tracing::debug!("missed some messages :("),
1437 Event::Received(_) => unreachable!("test does not send messages"),
1438 other => tracing::debug!(?other, "gs event"),
1439 }
1440 }
1441
1442 tracing::debug!("subscribe stream ended");
1443 Ok::<_, AnyError>(())
1444 };
1445
1446 tokio::select! {
1447 _ = ct2.cancelled() => Ok(()),
1448 res = subscribe_fut => res,
1449 }
1450 }
1451 .instrument(tracing::debug_span!("endpoint_2", %endpoint_id2));
1452 let go2_handle = task::spawn(go2_task);
1453
1454 let addr2 = EndpointAddr::new(endpoint_id2).with_relay_url(relay_url);
1456 let memory_lookup = MemoryLookup::new();
1457 memory_lookup.add_endpoint_info(addr2);
1458 actor.endpoint.address_lookup()?.add(memory_lookup);
1459 let (tx, mut rx) = mpsc::channel::<()>(1);
1461 let ct1 = ct.clone();
1462 let go1_task = async move {
1463 tracing::info!("subscribing the first time");
1465 let sub_1a = go1.subscribe_and_join(topic, vec![endpoint_id2]).await?;
1466
1467 rx.recv().await.expect("signal for second subscribe");
1469 tracing::info!("subscribing a second time");
1470 let sub_1b = go1.subscribe_and_join(topic, vec![endpoint_id2]).await?;
1471 drop(sub_1a);
1472
1473 rx.recv().await.expect("signal for second subscribe");
1475 tracing::info!("dropping all handles");
1476 drop(sub_1b);
1477
1478 ct1.cancelled().await;
1480 drop(go1);
1481
1482 Ok::<_, AnyError>(())
1483 }
1484 .instrument(tracing::debug_span!("endpoint_1", %endpoint_id1));
1485 let go1_handle = task::spawn(go1_task);
1486
1487 actor.steps(3).await; let state = actor.topics.get(&topic).expect("get registered topic");
1492 assert!(state.joined());
1493
1494 tx.send(())
1496 .await
1497 .std_context("signal additional subscribe")?;
1498 actor.steps(3).await; let state = actor.topics.get(&topic).expect("get registered topic");
1500 assert!(state.joined());
1501
1502 tx.send(()).await.std_context("signal drop handles")?;
1504 actor.steps(2).await; assert!(!actor.topics.contains_key(&topic));
1506
1507 ct.cancel();
1509 let wait = Duration::from_secs(2);
1510 timeout(wait, ep1_handle)
1511 .await
1512 .std_context("wait endpoint1 task")?
1513 .std_context("join endpoint1 task")??;
1514 timeout(wait, ep2_handle)
1515 .await
1516 .std_context("wait endpoint2 task")?
1517 .std_context("join endpoint2 task")??;
1518 timeout(wait, go1_handle)
1519 .await
1520 .std_context("wait gossip1 task")?
1521 .std_context("join gossip1 task")??;
1522 timeout(wait, go2_handle)
1523 .await
1524 .std_context("wait gossip2 task")?
1525 .std_context("join gossip2 task")??;
1526 timeout(wait, actor.finish())
1527 .await
1528 .std_context("wait actor finish")?;
1529
1530 Ok(())
1531 }
1532
1533 #[tokio::test]
1540 #[traced_test]
1541 async fn can_reconnect() -> Result {
1542 let rng = &mut rand_chacha::ChaCha12Rng::seed_from_u64(1);
1543 let ct = CancellationToken::new();
1544 let (relay_map, relay_url, _guard) = iroh::test_utils::run_relay_server().await.unwrap();
1545
1546 let (go1, ep1, ep1_handle, _test_actor_handle1) =
1547 Gossip::t_new(rng, Default::default(), relay_map.clone(), &ct).await?;
1548
1549 let (go2, ep2, ep2_handle, _test_actor_handle2) =
1550 Gossip::t_new(rng, Default::default(), relay_map, &ct).await?;
1551
1552 let endpoint_id1 = ep1.id();
1553 let endpoint_id2 = ep2.id();
1554 tracing::info!(
1555 endpoint_1 = %endpoint_id1.fmt_short(),
1556 endpoint_2 = %endpoint_id2.fmt_short(),
1557 "endpoints ready"
1558 );
1559
1560 let topic: TopicId = blake3::hash(b"can_reconnect").into();
1561 tracing::info!(%topic, "joining");
1562
1563 let ct2 = ct.child_token();
1564 let (tx, mut rx) = mpsc::channel::<()>(1);
1566 let addr1 = EndpointAddr::new(endpoint_id1).with_relay_url(relay_url.clone());
1567 let memory_lookup = MemoryLookup::new();
1568 memory_lookup.add_endpoint_info(addr1);
1569 ep2.address_lookup()?.add(memory_lookup.clone());
1570 let go2_task = async move {
1571 let mut sub = go2.subscribe(topic, Vec::new()).await?;
1572 sub.joined().await?;
1573
1574 rx.recv().await.expect("signal to unsubscribe");
1575 tracing::info!("unsubscribing");
1576 drop(sub);
1577
1578 rx.recv().await.expect("signal to subscribe again");
1579 tracing::info!("resubscribing");
1580 let mut sub = go2.subscribe(topic, vec![endpoint_id1]).await?;
1581
1582 sub.joined().await?;
1583 tracing::info!("subscription successful!");
1584
1585 ct2.cancelled().await;
1586
1587 Ok::<_, ApiError>(())
1588 }
1589 .instrument(tracing::debug_span!("endpoint_2", %endpoint_id2));
1590 let go2_handle = task::spawn(go2_task);
1591
1592 let addr2 = EndpointAddr::new(endpoint_id2).with_relay_url(relay_url);
1593 memory_lookup.add_endpoint_info(addr2);
1594 ep1.address_lookup()?.add(memory_lookup);
1595
1596 let mut sub = go1.subscribe(topic, vec![endpoint_id2]).await?;
1597 sub.joined().await?;
1599
1600 tx.send(()).await.std_context("signal unsubscribe")?;
1602
1603 let conn_timeout = Duration::from_millis(500);
1605 let ev = timeout(conn_timeout, sub.try_next())
1606 .await
1607 .std_context("wait neighbor down")??;
1608 assert_eq!(ev, Some(Event::NeighborDown(endpoint_id2)));
1609 tracing::info!("endpoint 2 left");
1610
1611 tx.send(()).await.std_context("signal resubscribe")?;
1613
1614 let conn_timeout = Duration::from_millis(500);
1615 let ev = timeout(conn_timeout, sub.try_next())
1616 .await
1617 .std_context("wait neighbor up")??;
1618 assert_eq!(ev, Some(Event::NeighborUp(endpoint_id2)));
1619 tracing::info!("endpoint 2 rejoined!");
1620
1621 ct.cancel();
1623 let wait = Duration::from_secs(2);
1624 timeout(wait, ep1_handle)
1625 .await
1626 .std_context("wait endpoint1 task")?
1627 .std_context("join endpoint1 task")??;
1628 timeout(wait, ep2_handle)
1629 .await
1630 .std_context("wait endpoint2 task")?
1631 .std_context("join endpoint2 task")??;
1632 timeout(wait, go2_handle)
1633 .await
1634 .std_context("wait gossip2 task")?
1635 .std_context("join gossip2 task")??;
1636
1637 Result::Ok(())
1638 }
1639
1640 #[tokio::test]
1641 #[traced_test]
1642 async fn can_die_and_reconnect() -> Result {
1643 fn run_in_thread<T: Send + 'static>(
1646 cancel: CancellationToken,
1647 fut: impl std::future::Future<Output = T> + Send + 'static,
1648 ) -> std::thread::JoinHandle<Option<T>> {
1649 std::thread::spawn(move || {
1650 let rt = tokio::runtime::Builder::new_current_thread()
1651 .enable_all()
1652 .build()
1653 .unwrap();
1654 rt.block_on(async move { cancel.run_until_cancelled(fut).await })
1655 })
1656 }
1657
1658 async fn spawn_gossip(
1660 secret_key: SecretKey,
1661 relay_map: RelayMap,
1662 ) -> Result<(Router, Gossip), BindError> {
1663 let ep = Endpoint::empty_builder()
1664 .relay_mode(RelayMode::Custom(relay_map))
1665 .secret_key(secret_key)
1666 .ca_roots_config(CaRootsConfig::insecure_skip_verify())
1667 .bind()
1668 .await?;
1669 let gossip = Gossip::builder().spawn(ep.clone());
1670 let router = Router::builder(ep)
1671 .accept(GOSSIP_ALPN, gossip.clone())
1672 .spawn();
1673 Ok((router, gossip))
1674 }
1675
1676 async fn broadcast_once(
1678 secret_key: SecretKey,
1679 relay_map: RelayMap,
1680 bootstrap_addr: EndpointAddr,
1681 topic_id: TopicId,
1682 message: String,
1683 ) -> Result {
1684 let (router, gossip) = spawn_gossip(secret_key, relay_map).await?;
1685 info!(endpoint_id = %router.endpoint().id().fmt_short(), "broadcast endpoint spawned");
1686 let bootstrap = vec![bootstrap_addr.id];
1687 let memory_lookup = MemoryLookup::new();
1688 memory_lookup.add_endpoint_info(bootstrap_addr);
1689 router.endpoint().address_lookup()?.add(memory_lookup);
1690 let mut topic = gossip.subscribe_and_join(topic_id, bootstrap).await?;
1691 topic.broadcast(message.as_bytes().to_vec().into()).await?;
1692 std::future::pending::<()>().await;
1693 Ok(())
1694 }
1695
1696 let (relay_map, _relay_url, _guard) = iroh::test_utils::run_relay_server().await.unwrap();
1697 let mut rng = &mut rand_chacha::ChaCha12Rng::seed_from_u64(1);
1698 let topic_id = TopicId::from_bytes(rng.random());
1699
1700 let (addr_tx, addr_rx) = tokio::sync::oneshot::channel();
1703 let (msgs_recv_tx, mut msgs_recv_rx) = tokio::sync::mpsc::channel(3);
1704 let recv_task = tokio::task::spawn({
1705 let relay_map = relay_map.clone();
1706 let secret_key = SecretKey::generate(&mut rng);
1707 async move {
1708 let (router, gossip) = spawn_gossip(secret_key, relay_map).await?;
1709 router.endpoint().online().await;
1714 let addr = router.endpoint().addr();
1715 info!(endpoint_id = %addr.id.fmt_short(), "recv endpoint spawned");
1716 addr_tx.send(addr).unwrap();
1717 let mut topic = gossip.subscribe_and_join(topic_id, vec![]).await?;
1718 while let Some(event) = topic.try_next().await.unwrap() {
1719 if let Event::Received(message) = event {
1720 let message = std::str::from_utf8(&message.content)
1721 .std_context("decode broadcast message")?
1722 .to_string();
1723 msgs_recv_tx
1724 .send(message)
1725 .await
1726 .std_context("forward received message")?;
1727 }
1728 }
1729 Ok::<_, AnyError>(())
1730 }
1731 });
1732
1733 let endpoint0_addr = addr_rx.await.std_context("receive endpoint address")?;
1734 let max_wait = Duration::from_secs(5);
1735
1736 let cancel = CancellationToken::new();
1739 let secret = SecretKey::generate(&mut rng);
1740 let join_handle_1 = run_in_thread(
1741 cancel.clone(),
1742 broadcast_once(
1743 secret.clone(),
1744 relay_map.clone(),
1745 endpoint0_addr.clone(),
1746 topic_id,
1747 "msg1".to_string(),
1748 ),
1749 );
1750 let msg = timeout(max_wait, msgs_recv_rx.recv())
1752 .await
1753 .std_context("wait for first broadcast")?
1754 .std_context("receiver dropped channel")?;
1755 assert_eq!(&msg, "msg1");
1756 info!("kill broadcast endpoint");
1757 cancel.cancel();
1758
1759 let cancel = CancellationToken::new();
1761 let join_handle_2 = run_in_thread(
1762 cancel.clone(),
1763 broadcast_once(
1764 secret.clone(),
1765 relay_map.clone(),
1766 endpoint0_addr.clone(),
1767 topic_id,
1768 "msg2".to_string(),
1769 ),
1770 );
1771 let msg = timeout(max_wait, msgs_recv_rx.recv())
1774 .await
1775 .std_context("wait for second broadcast")?
1776 .std_context("receiver dropped channel")?;
1777 assert_eq!(&msg, "msg2");
1778 info!("kill broadcast endpoint");
1779 cancel.cancel();
1780
1781 info!("kill recv endpoint");
1782 recv_task.abort();
1783 assert!(join_handle_1.join().unwrap().is_none());
1784 assert!(join_handle_2.join().unwrap().is_none());
1785
1786 Ok(())
1787 }
1788
1789 #[tokio::test]
1790 #[traced_test]
1791 async fn gossip_change_alpn() -> n0_error::Result<()> {
1792 let alpn = b"my-gossip-alpn";
1793 let topic_id = TopicId::from([0u8; 32]);
1794
1795 let ep1 = Endpoint::empty_builder().bind().await?;
1796 let ep2 = Endpoint::empty_builder().bind().await?;
1797 let gossip1 = Gossip::builder().alpn(alpn).spawn(ep1.clone());
1798 let gossip2 = Gossip::builder().alpn(alpn).spawn(ep2.clone());
1799 let router1 = Router::builder(ep1).accept(alpn, gossip1.clone()).spawn();
1800 let router2 = Router::builder(ep2).accept(alpn, gossip2.clone()).spawn();
1801
1802 let addr1 = router1.endpoint().addr();
1803 let id1 = addr1.id;
1804 let memory_lookup = MemoryLookup::new();
1805 memory_lookup.add_endpoint_info(addr1);
1806 router2.endpoint().address_lookup()?.add(memory_lookup);
1807
1808 let mut topic1 = gossip1.subscribe(topic_id, vec![]).await?;
1809 let mut topic2 = gossip2.subscribe(topic_id, vec![id1]).await?;
1810
1811 timeout(Duration::from_secs(3), topic1.joined())
1812 .await
1813 .std_context("wait topic1 join")??;
1814 timeout(Duration::from_secs(3), topic2.joined())
1815 .await
1816 .std_context("wait topic2 join")??;
1817 router1.shutdown().await.std_context("shutdown router1")?;
1818 router2.shutdown().await.std_context("shutdown router2")?;
1819 Ok(())
1820 }
1821
1822 #[tokio::test]
1823 #[traced_test]
1824 async fn gossip_rely_on_gossip_address_lookup() -> n0_error::Result<()> {
1825 let rng = &mut rand_chacha::ChaCha12Rng::seed_from_u64(1);
1826
1827 async fn spawn(
1828 rng: &mut impl CryptoRng,
1829 ) -> n0_error::Result<(EndpointId, Router, Gossip, GossipSender, GossipReceiver)> {
1830 let topic_id = TopicId::from([0u8; 32]);
1831 let ep = Endpoint::empty_builder()
1832 .secret_key(SecretKey::generate(rng))
1833 .bind()
1834 .await?;
1835 let endpoint_id = ep.id();
1836 let gossip = Gossip::builder().spawn(ep.clone());
1837 let router = Router::builder(ep)
1838 .accept(GOSSIP_ALPN, gossip.clone())
1839 .spawn();
1840 let topic = gossip.subscribe(topic_id, vec![]).await?;
1841 let (sender, receiver) = topic.split();
1842 Ok((endpoint_id, router, gossip, sender, receiver))
1843 }
1844
1845 let (n1, r1, _g1, _tx1, mut rx1) = spawn(rng).await?;
1847 let (n2, r2, _g2, tx2, mut rx2) = spawn(rng).await?;
1848 let (n3, r3, _g3, tx3, mut rx3) = spawn(rng).await?;
1849
1850 println!("endpoints {:?}", [n1, n2, n3]);
1851
1852 let addr1 = r1.endpoint().addr();
1854 let lookup = MemoryLookup::new();
1855 lookup.add_endpoint_info(addr1);
1856
1857 r2.endpoint().address_lookup()?.add(lookup.clone());
1859 tx2.join_peers(vec![n1]).await?;
1860
1861 timeout(Duration::from_secs(3), rx1.joined())
1863 .await
1864 .std_context("wait rx1 join")??;
1865 timeout(Duration::from_secs(3), rx2.joined())
1866 .await
1867 .std_context("wait rx2 join")??;
1868
1869 r3.endpoint().address_lookup()?.add(lookup.clone());
1871 tx3.join_peers(vec![n1]).await?;
1872
1873 let ev = timeout(Duration::from_secs(3), rx3.next())
1876 .await
1877 .std_context("wait rx3 first neighbor")?;
1878 assert!(matches!(ev, Some(Ok(Event::NeighborUp(_)))));
1879 let ev = timeout(Duration::from_secs(3), rx3.next())
1880 .await
1881 .std_context("wait rx3 second neighbor")?;
1882 assert!(matches!(ev, Some(Ok(Event::NeighborUp(_)))));
1883
1884 assert_eq!(sorted(rx3.neighbors()), sorted([n1, n2]));
1885
1886 let ev = timeout(Duration::from_secs(3), rx2.next())
1887 .await
1888 .std_context("wait rx2 neighbor")?;
1889 assert!(matches!(ev, Some(Ok(Event::NeighborUp(n))) if n == n3));
1890
1891 let ev = timeout(Duration::from_secs(3), rx1.next())
1892 .await
1893 .std_context("wait rx1 neighbor")?;
1894 assert!(matches!(ev, Some(Ok(Event::NeighborUp(n))) if n == n3));
1895
1896 tokio::try_join!(r1.shutdown(), r2.shutdown(), r3.shutdown())
1897 .std_context("shutdown routers")?;
1898 Ok(())
1899 }
1900
1901 fn sorted<T: Ord>(input: impl IntoIterator<Item = T>) -> Vec<T> {
1902 let mut out: Vec<_> = input.into_iter().collect();
1903 out.sort();
1904 out
1905 }
1906
1907 #[tokio::test]
1913 #[traced_test]
1914 async fn topic_stays_alive_after_sender_drop() -> n0_error::Result<()> {
1915 let topic_id = TopicId::from([99u8; 32]);
1916
1917 let ep1 = Endpoint::empty_builder().bind().await?;
1918 let ep2 = Endpoint::empty_builder().bind().await?;
1919 let gossip1 = Gossip::builder().spawn(ep1.clone());
1920 let gossip2 = Gossip::builder().spawn(ep2.clone());
1921 let router1 = Router::builder(ep1)
1922 .accept(crate::ALPN, gossip1.clone())
1923 .spawn();
1924 let router2 = Router::builder(ep2)
1925 .accept(crate::ALPN, gossip2.clone())
1926 .spawn();
1927
1928 let addr1 = router1.endpoint().addr();
1929 let id1 = addr1.id;
1930 let mem_lookup = MemoryLookup::new();
1931 mem_lookup.add_endpoint_info(addr1);
1932 router2.endpoint().address_lookup()?.add(mem_lookup);
1933
1934 let topic1 = gossip1.subscribe(topic_id, vec![]).await?;
1935 let topic2 = gossip2.subscribe(topic_id, vec![id1]).await?;
1936
1937 let (tx1, mut rx1) = topic1.split();
1938 let (tx2, mut rx2) = topic2.split();
1939
1940 timeout(Duration::from_secs(3), rx1.joined())
1942 .await
1943 .std_context("wait rx1 join")??;
1944 timeout(Duration::from_secs(3), rx2.joined())
1945 .await
1946 .std_context("wait rx2 join")??;
1947
1948 drop(tx1);
1950
1951 tx2.broadcast(b"hello from node2".to_vec().into()).await?;
1953
1954 let event = timeout(Duration::from_secs(3), rx1.next())
1956 .await
1957 .std_context("wait for message on rx1")?;
1958
1959 match event {
1960 Some(Ok(Event::Received(msg))) => {
1961 assert_eq!(&msg.content[..], b"hello from node2");
1962 }
1963 other => panic!("expected Received event, got {:?}", other),
1964 }
1965
1966 drop(tx2);
1967 drop(rx1);
1968 drop(rx2);
1969 router1.shutdown().await.std_context("shutdown router1")?;
1970 router2.shutdown().await.std_context("shutdown router2")?;
1971 Ok(())
1972 }
1973}