1use std::{
4 collections::{hash_map::Entry, BTreeSet, HashMap, HashSet, VecDeque},
5 net::SocketAddr,
6 pin::Pin,
7 sync::Arc,
8 task::{Context, Poll},
9};
10
11use bytes::Bytes;
12use futures_concurrency::stream::{stream_group, StreamGroup};
13use futures_util::FutureExt as _;
14use iroh::{
15 endpoint::Connection,
16 protocol::{AcceptError, ProtocolHandler},
17 Endpoint, EndpointAddr, EndpointId, PublicKey, RelayUrl, Watcher,
18};
19use irpc::WithChannels;
20use n0_error::{e, stack_error};
21use n0_future::{
22 task::{self, AbortOnDropHandle, JoinSet},
23 time::Instant,
24 Stream, StreamExt as _,
25};
26use rand::{rngs::StdRng, SeedableRng};
27use serde::{Deserialize, Serialize};
28use tokio::sync::{broadcast, mpsc, oneshot};
29use tokio_util::sync::CancellationToken;
30use tracing::{debug, error, error_span, trace, warn, Instrument};
31
32use self::{
33 address_lookup::GossipAddressLookup,
34 util::{RecvLoop, SendLoop, Timers},
35};
36use crate::{
37 api::{self, Command, Event, GossipApi, RpcMessage},
38 metrics::Metrics,
39 proto::{self, HyparviewConfig, PeerData, PlumtreeConfig, Scope, TopicId},
40};
41
42mod address_lookup;
43mod util;
44
45pub const GOSSIP_ALPN: &[u8] = b"/iroh-gossip/1";
47
48const SEND_QUEUE_CAP: usize = 64;
50const TO_ACTOR_CAP: usize = 64;
52const IN_EVENT_CAP: usize = 1024;
54const TOPIC_EVENT_CAP: usize = 256;
56
57pub type ProtoEvent = proto::Event<PublicKey>;
59pub type ProtoCommand = proto::Command<PublicKey>;
61
62type InEvent = proto::InEvent<PublicKey>;
63type OutEvent = proto::OutEvent<PublicKey>;
64type Timer = proto::Timer<PublicKey>;
65type ProtoMessage = proto::Message<PublicKey>;
66
67#[derive(Debug, Clone)]
86pub struct Gossip {
87 pub(crate) inner: Arc<Inner>,
88}
89
90impl std::ops::Deref for Gossip {
91 type Target = GossipApi;
92 fn deref(&self) -> &Self::Target {
93 &self.inner.api
94 }
95}
96
97#[derive(Debug)]
98enum LocalActorMessage {
99 HandleConnection(Connection),
100 Shutdown { reply: oneshot::Sender<()> },
101}
102
103#[allow(missing_docs)]
104#[stack_error(derive, add_meta)]
105#[non_exhaustive]
106pub enum Error {
107 ActorDropped {},
108}
109
110impl<T> From<mpsc::error::SendError<T>> for Error {
111 fn from(_value: mpsc::error::SendError<T>) -> Self {
112 e!(Error::ActorDropped)
113 }
114}
115impl From<oneshot::error::RecvError> for Error {
116 fn from(_value: oneshot::error::RecvError) -> Self {
117 e!(Error::ActorDropped)
118 }
119}
120
121#[derive(Debug)]
122pub(crate) struct Inner {
123 api: GossipApi,
124 local_tx: mpsc::Sender<LocalActorMessage>,
125 _actor_handle: AbortOnDropHandle<()>,
126 max_message_size: usize,
127 metrics: Arc<Metrics>,
128}
129
130impl ProtocolHandler for Gossip {
131 async fn accept(&self, connection: Connection) -> Result<(), AcceptError> {
132 self.handle_connection(connection)
133 .await
134 .map_err(AcceptError::from_err)?;
135 Ok(())
136 }
137
138 async fn shutdown(&self) {
139 if let Err(err) = self.shutdown().await {
140 warn!("error while shutting down gossip: {err:#}");
141 }
142 }
143}
144
145#[derive(Debug, Clone)]
147pub struct Builder {
148 config: proto::Config,
149 alpn: Option<Bytes>,
150}
151
152impl Builder {
153 pub fn max_message_size(mut self, size: usize) -> Self {
156 self.config.max_message_size = size;
157 self
158 }
159
160 pub fn membership_config(mut self, config: HyparviewConfig) -> Self {
162 self.config.membership = config;
163 self
164 }
165
166 pub fn broadcast_config(mut self, config: PlumtreeConfig) -> Self {
168 self.config.broadcast = config;
169 self
170 }
171
172 pub fn alpn(mut self, alpn: impl AsRef<[u8]>) -> Self {
180 self.alpn = Some(alpn.as_ref().to_vec().into());
181 self
182 }
183
184 pub fn spawn(self, endpoint: Endpoint) -> Gossip {
186 let metrics = Arc::new(Metrics::default());
187 let address_lookup = GossipAddressLookup::default();
188
189 if let Ok(endpoint_addr_lookup) = endpoint.address_lookup().as_ref() {
195 endpoint_addr_lookup.add(address_lookup.clone());
196 }
197 let (actor, rpc_tx, local_tx) = Actor::new(
198 endpoint,
199 self.config,
200 metrics.clone(),
201 self.alpn,
202 address_lookup,
203 );
204 let me = actor.endpoint.id().fmt_short();
205 let max_message_size = actor.state.max_message_size();
206
207 let actor_handle = task::spawn(actor.run().instrument(error_span!("gossip", %me)));
208
209 let api = GossipApi::local(rpc_tx);
210
211 Gossip {
212 inner: Inner {
213 api,
214 local_tx,
215 _actor_handle: AbortOnDropHandle::new(actor_handle),
216 max_message_size,
217 metrics,
218 }
219 .into(),
220 }
221 }
222}
223
224impl Gossip {
225 pub fn builder() -> Builder {
227 Builder {
228 config: Default::default(),
229 alpn: None,
230 }
231 }
232
233 #[cfg(feature = "rpc")]
235 pub async fn listen(self, endpoint: noq::Endpoint) {
236 self.inner.api.listen(endpoint).await
237 }
238
239 pub fn max_message_size(&self) -> usize {
241 self.inner.max_message_size
242 }
243
244 pub async fn handle_connection(&self, conn: Connection) -> Result<(), Error> {
248 self.inner
249 .local_tx
250 .send(LocalActorMessage::HandleConnection(conn))
251 .await?;
252 Ok(())
253 }
254
255 pub async fn shutdown(&self) -> Result<(), Error> {
260 let (reply, reply_rx) = oneshot::channel();
261 self.inner
262 .local_tx
263 .send(LocalActorMessage::Shutdown { reply })
264 .await?;
265 reply_rx.await?;
266 Ok(())
267 }
268
269 pub fn metrics(&self) -> &Arc<Metrics> {
271 &self.inner.metrics
272 }
273}
274
275struct Actor {
277 alpn: Bytes,
278 state: proto::State<PublicKey, StdRng>,
280 endpoint: Endpoint,
282 dialer: Dialer,
284 rpc_rx: mpsc::Receiver<RpcMessage>,
286 local_rx: mpsc::Receiver<LocalActorMessage>,
287 in_event_tx: mpsc::Sender<InEvent>,
289 in_event_rx: mpsc::Receiver<InEvent>,
291 timers: Timers<Timer>,
293 topics: HashMap<TopicId, TopicState>,
295 peers: HashMap<EndpointId, PeerState>,
297 command_rx: stream_group::Keyed<TopicCommandStream>,
299 quit_queue: VecDeque<TopicId>,
301 connection_tasks: JoinSet<(EndpointId, Connection, Result<(), ConnectionLoopError>)>,
303 metrics: Arc<Metrics>,
304 topic_event_forwarders: JoinSet<TopicId>,
305 address_lookup: GossipAddressLookup,
306}
307
308impl Actor {
309 fn new(
310 endpoint: Endpoint,
311 config: proto::Config,
312 metrics: Arc<Metrics>,
313 alpn: Option<Bytes>,
314 address_lookup: GossipAddressLookup,
315 ) -> (
316 Self,
317 mpsc::Sender<RpcMessage>,
318 mpsc::Sender<LocalActorMessage>,
319 ) {
320 let peer_id = endpoint.id();
321 let dialer = Dialer::new(endpoint.clone());
322 let state = proto::State::new(
323 peer_id,
324 Default::default(),
325 config,
326 rand::rngs::StdRng::from_rng(&mut rand::rng()),
327 );
328 let (rpc_tx, rpc_rx) = mpsc::channel(TO_ACTOR_CAP);
329 let (local_tx, local_rx) = mpsc::channel(16);
330 let (in_event_tx, in_event_rx) = mpsc::channel(IN_EVENT_CAP);
331
332 let actor = Actor {
333 alpn: alpn.unwrap_or_else(|| GOSSIP_ALPN.to_vec().into()),
334 endpoint,
335 state,
336 dialer,
337 rpc_rx,
338 in_event_rx,
339 in_event_tx,
340 timers: Timers::new(),
341 command_rx: StreamGroup::new().keyed(),
342 peers: Default::default(),
343 topics: Default::default(),
344 quit_queue: Default::default(),
345 connection_tasks: Default::default(),
346 metrics,
347 local_rx,
348 topic_event_forwarders: Default::default(),
349 address_lookup,
350 };
351
352 (actor, rpc_tx, local_tx)
353 }
354
355 pub async fn run(mut self) {
356 let mut addr_update_stream = self.setup().await;
357
358 let mut i = 0;
359 while self.event_loop(&mut addr_update_stream, i).await {
360 i += 1;
361 }
362 }
363
364 async fn setup(&mut self) -> impl Stream<Item = EndpointAddr> + Send + Unpin + use<> {
369 let addr_update_stream = self.endpoint.watch_addr().stream();
370 let initial_addr = self.endpoint.addr();
371 self.handle_addr_update(initial_addr).await;
372 addr_update_stream
373 }
374
375 async fn event_loop(
379 &mut self,
380 addr_updates: &mut (impl Stream<Item = EndpointAddr> + Send + Unpin),
381 i: usize,
382 ) -> bool {
383 self.metrics.actor_tick_main.inc();
384 tokio::select! {
385 biased;
386 conn = self.local_rx.recv() => {
387 match conn {
388 Some(LocalActorMessage::Shutdown { reply }) => {
389 debug!("received shutdown message, quit all topics");
390 self.quit_queue.extend(self.topics.keys().copied());
391 self.process_quit_queue().await;
392 debug!("all topics quit, stop gossip actor");
393 reply.send(()).ok();
394 return false;
395 },
396 Some(LocalActorMessage::HandleConnection(conn)) => {
397 self.handle_connection(conn.remote_id(), ConnOrigin::Accept, conn);
398 }
399 None => {
400 debug!("all gossip handles dropped, stop gossip actor");
401 return false;
402 }
403 }
404 }
405 msg = self.rpc_rx.recv() => {
406 trace!(?i, "tick: to_actor_rx");
407 self.metrics.actor_tick_rx.inc();
408 match msg {
409 Some(msg) => {
410 self.handle_rpc_msg(msg, Instant::now()).await;
411 }
412 None => {
413 debug!("all gossip handles dropped, stop gossip actor");
414 return false;
415 }
416 }
417 },
418 Some((key, (topic, command))) = self.command_rx.next(), if !self.command_rx.is_empty() => {
419 trace!(?i, "tick: command_rx");
420 self.handle_command(topic, key, command).await;
421 },
422 Some(new_address) = addr_updates.next() => {
423 trace!(?i, "tick: new_address");
424 self.metrics.actor_tick_endpoint.inc();
425 self.handle_addr_update(new_address).await;
426 }
427 (peer_id, res) = self.dialer.next_conn() => {
428 trace!(?i, "tick: dialer");
429 self.metrics.actor_tick_dialer.inc();
430 match res {
431 Some(Ok(conn)) => {
432 debug!(peer = %peer_id.fmt_short(), "dial successful");
433 self.metrics.actor_tick_dialer_success.inc();
434 self.handle_connection(peer_id, ConnOrigin::Dial, conn);
435 }
436 Some(Err(err)) => {
437 warn!(peer = %peer_id.fmt_short(), "dial failed: {err}");
438 self.metrics.actor_tick_dialer_failure.inc();
439 let peer_state = self.peers.get(&peer_id);
440 let is_active = matches!(peer_state, Some(PeerState::Active { .. }));
441 if !is_active {
442 self.handle_in_event(InEvent::PeerDisconnected(peer_id), Instant::now())
443 .await;
444 }
445 }
446 None => {
447 warn!(peer = %peer_id.fmt_short(), "dial disconnected");
448 self.metrics.actor_tick_dialer_failure.inc();
449 }
450 }
451 }
452 event = self.in_event_rx.recv() => {
453 trace!(?i, "tick: in_event_rx");
454 self.metrics.actor_tick_in_event_rx.inc();
455 let event = event.expect("unreachable: in_event_tx is never dropped before receiver");
456 self.handle_in_event(event, Instant::now()).await;
457 }
458 _ = self.timers.wait_next() => {
459 trace!(?i, "tick: timers");
460 self.metrics.actor_tick_timers.inc();
461 let now = Instant::now();
462 while let Some((_instant, timer)) = self.timers.pop_before(now) {
463 self.handle_in_event(InEvent::TimerExpired(timer), now).await;
464 }
465 }
466 Some(res) = self.connection_tasks.join_next(), if !self.connection_tasks.is_empty() => {
467 trace!(?i, "tick: connection_tasks");
468 let (peer_id, conn, result) = res.expect("connection task panicked");
469 self.handle_connection_task_finished(peer_id, conn, result).await;
470 }
471 Some(res) = self.topic_event_forwarders.join_next(), if !self.topic_event_forwarders.is_empty() => {
472 let topic_id = res.expect("topic event forwarder panicked");
473 if let Some(state) = self.topics.get_mut(&topic_id) {
474 if !state.still_needed() {
475 self.quit_queue.push_back(topic_id);
476 self.process_quit_queue().await;
477 }
478 }
479 }
480 }
481
482 true
483 }
484
485 async fn handle_addr_update(&mut self, endpoint_addr: EndpointAddr) {
486 let peer_data = encode_peer_data(&endpoint_addr.into());
488 self.handle_in_event(InEvent::UpdatePeerData(peer_data), Instant::now())
489 .await
490 }
491
492 async fn handle_command(
493 &mut self,
494 topic: TopicId,
495 key: stream_group::Key,
496 command: Option<Command>,
497 ) {
498 debug!(?topic, ?key, ?command, "handle command");
499 let Some(state) = self.topics.get_mut(&topic) else {
500 warn!("received command for unknown topic");
502 return;
503 };
504 match command {
505 Some(command) => {
506 let command = match command {
507 Command::Broadcast(message) => ProtoCommand::Broadcast(message, Scope::Swarm),
508 Command::BroadcastNeighbors(message) => {
509 ProtoCommand::Broadcast(message, Scope::Neighbors)
510 }
511 Command::JoinPeers(peers) => ProtoCommand::Join(peers),
512 };
513 self.handle_in_event(proto::InEvent::Command(topic, command), Instant::now())
514 .await;
515 }
516 None => {
517 state.command_rx_keys.remove(&key);
518 if !state.still_needed() {
519 self.quit_queue.push_back(topic);
520 self.process_quit_queue().await;
521 }
522 }
523 }
524 }
525
526 fn handle_connection(&mut self, peer_id: EndpointId, origin: ConnOrigin, conn: Connection) {
527 let (send_tx, send_rx) = mpsc::channel(SEND_QUEUE_CAP);
528 let conn_id = conn.stable_id();
529
530 let queue = match self.peers.entry(peer_id) {
531 Entry::Occupied(mut entry) => entry.get_mut().accept_conn(send_tx, conn_id),
532 Entry::Vacant(entry) => {
533 entry.insert(PeerState::Active {
534 active_send_tx: send_tx,
535 active_conn_id: conn_id,
536 other_conns: Vec::new(),
537 });
538 Vec::new()
539 }
540 };
541
542 let max_message_size = self.state.max_message_size();
543 let in_event_tx = self.in_event_tx.clone();
544
545 self.connection_tasks.spawn(
547 async move {
548 let res = connection_loop(
549 peer_id,
550 conn.clone(),
551 origin,
552 send_rx,
553 in_event_tx,
554 max_message_size,
555 queue,
556 )
557 .await;
558 (peer_id, conn, res)
559 }
560 .instrument(error_span!("conn", peer = %peer_id.fmt_short())),
561 );
562 }
563
564 #[tracing::instrument(name = "conn", skip_all, fields(peer = %peer_id.fmt_short()))]
565 async fn handle_connection_task_finished(
566 &mut self,
567 peer_id: EndpointId,
568 conn: Connection,
569 task_result: Result<(), ConnectionLoopError>,
570 ) {
571 if conn.close_reason().is_none() {
572 conn.close(0u32.into(), b"close from disconnect");
573 }
574 let reason = conn.close_reason().expect("just closed");
575 let error = task_result.err();
576 debug!(%reason, ?error, "connection closed");
577 if let Some(PeerState::Active {
578 active_conn_id,
579 other_conns,
580 ..
581 }) = self.peers.get_mut(&peer_id)
582 {
583 if conn.stable_id() == *active_conn_id {
584 debug!("active send connection closed, mark peer as disconnected");
585 self.handle_in_event(InEvent::PeerDisconnected(peer_id), Instant::now())
586 .await;
587 } else {
588 other_conns.retain(|x| *x != conn.stable_id());
589 debug!("remaining {} other connections", other_conns.len() + 1);
590 }
591 } else {
592 debug!("peer already marked as disconnected");
593 }
594 }
595
596 async fn handle_rpc_msg(&mut self, msg: RpcMessage, now: Instant) {
597 trace!("handle to_actor {msg:?}");
598 match msg {
599 RpcMessage::Join(msg) => {
600 let WithChannels {
601 inner,
602 rx,
603 tx,
604 span: _,
606 } = msg;
607 let api::JoinRequest {
608 topic_id,
609 bootstrap,
610 } = inner;
611 let TopicState {
612 neighbors,
613 event_sender,
614 command_rx_keys,
615 } = self.topics.entry(topic_id).or_default();
616 let mut sender_dead = false;
617 if !neighbors.is_empty() {
618 for neighbor in neighbors.iter() {
619 if let Err(_err) = tx.try_send(Event::NeighborUp(*neighbor)).await {
620 sender_dead = true;
621 break;
622 }
623 }
624 }
625
626 if !sender_dead {
627 let fut =
628 topic_subscriber_loop(tx, event_sender.subscribe()).map(move |_| topic_id);
629 self.topic_event_forwarders
630 .spawn(fut.instrument(tracing::Span::current()));
631 }
632 let command_rx = TopicCommandStream::new(topic_id, Box::pin(rx.into_stream()));
633 let key = self.command_rx.insert(command_rx);
634 command_rx_keys.insert(key);
635
636 self.handle_in_event(
637 InEvent::Command(
638 topic_id,
639 ProtoCommand::Join(bootstrap.into_iter().collect()),
640 ),
641 now,
642 )
643 .await;
644 }
645 }
646 }
647
648 async fn handle_in_event(&mut self, event: InEvent, now: Instant) {
649 self.handle_in_event_inner(event, now).await;
650 self.process_quit_queue().await;
651 }
652
653 async fn process_quit_queue(&mut self) {
654 while let Some(topic_id) = self.quit_queue.pop_front() {
655 self.handle_in_event_inner(
656 InEvent::Command(topic_id, ProtoCommand::Quit),
657 Instant::now(),
658 )
659 .await;
660 if self.topics.remove(&topic_id).is_some() {
661 tracing::debug!(%topic_id, "publishers and subscribers gone; unsubscribing");
662 }
663 }
664 }
665
666 async fn handle_in_event_inner(&mut self, event: InEvent, now: Instant) {
667 if matches!(event, InEvent::TimerExpired(_)) {
668 trace!(?event, "handle in_event");
669 } else {
670 debug!(?event, "handle in_event");
671 };
672 let out = self.state.handle(event, now, Some(&self.metrics));
673 for event in out {
674 if matches!(event, OutEvent::ScheduleTimer(_, _)) {
675 trace!(?event, "handle out_event");
676 } else {
677 debug!(?event, "handle out_event");
678 };
679 match event {
680 OutEvent::SendMessage(peer_id, message) => {
681 let state = self.peers.entry(peer_id).or_default();
682 match state {
683 PeerState::Active { active_send_tx, .. } => {
684 if let Err(_err) = active_send_tx.send(message).await {
685 warn!(
688 peer = %peer_id.fmt_short(),
689 "failed to send: connection task send loop terminated",
690 );
691 }
692 }
693 PeerState::Pending { queue } => {
694 if queue.is_empty() {
695 debug!(peer = %peer_id.fmt_short(), "start to dial");
696 self.dialer.queue_dial(peer_id, self.alpn.clone());
697 }
698 queue.push(message);
699 }
700 }
701 }
702 OutEvent::EmitEvent(topic_id, event) => {
703 let Some(state) = self.topics.get_mut(&topic_id) else {
704 warn!(?topic_id, "gossip state emitted event for unknown topic");
706 continue;
707 };
708 let TopicState {
709 neighbors,
710 event_sender,
711 ..
712 } = state;
713 match &event {
714 ProtoEvent::NeighborUp(neighbor) => {
715 neighbors.insert(*neighbor);
716 }
717 ProtoEvent::NeighborDown(neighbor) => {
718 neighbors.remove(neighbor);
719 }
720 _ => {}
721 }
722 event_sender.send(event).ok();
723 if !state.still_needed() {
724 self.quit_queue.push_back(topic_id);
725 }
726 }
727 OutEvent::ScheduleTimer(delay, timer) => {
728 self.timers.insert(now + delay, timer);
729 }
730 OutEvent::DisconnectPeer(peer_id) => {
731 debug!(peer=%peer_id.fmt_short(), "gossip state indicates disconnect: drop peer");
733 self.peers.remove(&peer_id);
734 }
735 OutEvent::PeerData(endpoint_id, data) => match decode_peer_data(&data) {
736 Err(err) => warn!("Failed to decode {data:?} from {endpoint_id}: {err}"),
737 Ok(info) => {
738 debug!(peer = ?endpoint_id, "add known addrs: {info:?}");
739 let mut endpoint_addr = EndpointAddr::new(endpoint_id);
740 for addr in info.direct_addresses {
741 endpoint_addr = endpoint_addr.with_ip_addr(addr);
742 }
743 if let Some(relay_url) = info.relay_url {
744 endpoint_addr = endpoint_addr.with_relay_url(relay_url);
745 }
746
747 self.address_lookup.add(endpoint_addr);
748 }
749 },
750 }
751 }
752 }
753}
754
755type ConnId = usize;
756
757#[derive(Debug)]
758enum PeerState {
759 Pending {
760 queue: Vec<ProtoMessage>,
761 },
762 Active {
763 active_send_tx: mpsc::Sender<ProtoMessage>,
764 active_conn_id: ConnId,
765 other_conns: Vec<ConnId>,
766 },
767}
768
769impl PeerState {
770 fn accept_conn(
771 &mut self,
772 send_tx: mpsc::Sender<ProtoMessage>,
773 conn_id: ConnId,
774 ) -> Vec<ProtoMessage> {
775 match self {
776 PeerState::Pending { queue } => {
777 let queue = std::mem::take(queue);
778 *self = PeerState::Active {
779 active_send_tx: send_tx,
780 active_conn_id: conn_id,
781 other_conns: Vec::new(),
782 };
783 queue
784 }
785 PeerState::Active {
786 active_send_tx,
787 active_conn_id,
788 other_conns,
789 } => {
790 other_conns.push(*active_conn_id);
796 *active_send_tx = send_tx;
797 *active_conn_id = conn_id;
798 Vec::new()
799 }
800 }
801 }
802}
803
804impl Default for PeerState {
805 fn default() -> Self {
806 PeerState::Pending { queue: Vec::new() }
807 }
808}
809
810#[derive(Debug)]
811struct TopicState {
812 neighbors: BTreeSet<EndpointId>,
813 event_sender: broadcast::Sender<ProtoEvent>,
814 command_rx_keys: HashSet<stream_group::Key>,
818}
819
820impl Default for TopicState {
821 fn default() -> Self {
822 let (event_sender, _) = broadcast::channel(TOPIC_EVENT_CAP);
823 Self {
824 neighbors: Default::default(),
825 command_rx_keys: Default::default(),
826 event_sender,
827 }
828 }
829}
830
831impl TopicState {
832 fn still_needed(&self) -> bool {
834 !self.command_rx_keys.is_empty() || self.event_sender.receiver_count() > 0
837 }
838
839 #[cfg(test)]
840 fn joined(&self) -> bool {
841 !self.neighbors.is_empty()
842 }
843}
844
845#[derive(Debug, Clone, Copy, PartialEq, Eq)]
847enum ConnOrigin {
848 Accept,
849 Dial,
850}
851
852#[allow(missing_docs)]
853#[stack_error(derive, add_meta, from_sources, std_sources)]
854#[non_exhaustive]
855enum ConnectionLoopError {
856 #[error(transparent)]
857 Write {
858 source: self::util::WriteError,
859 },
860 #[error(transparent)]
861 Read {
862 source: self::util::ReadError,
863 },
864 #[error(transparent)]
865 Connection {
866 #[error(std_err)]
867 source: iroh::endpoint::ConnectionError,
868 },
869 ActorDropped {},
870}
871
872impl<T> From<mpsc::error::SendError<T>> for ConnectionLoopError {
873 fn from(_value: mpsc::error::SendError<T>) -> Self {
874 e!(ConnectionLoopError::ActorDropped)
875 }
876}
877
878async fn connection_loop(
879 from: PublicKey,
880 conn: Connection,
881 origin: ConnOrigin,
882 send_rx: mpsc::Receiver<ProtoMessage>,
883 in_event_tx: mpsc::Sender<InEvent>,
884 max_message_size: usize,
885 queue: Vec<ProtoMessage>,
886) -> Result<(), ConnectionLoopError> {
887 debug!(?origin, "connection established");
888
889 let mut send_loop = SendLoop::new(conn.clone(), send_rx, max_message_size);
890 let mut recv_loop = RecvLoop::new(from, conn, in_event_tx, max_message_size);
891
892 let send_fut = send_loop.run(queue).instrument(error_span!("send"));
893 let recv_fut = recv_loop.run().instrument(error_span!("recv"));
894
895 let (send_res, recv_res) = tokio::join!(send_fut, recv_fut);
896 send_res?;
897 recv_res?;
898 Ok(())
899}
900
901#[derive(Default, Debug, Clone, Serialize, Deserialize)]
902struct AddrInfo {
903 relay_url: Option<RelayUrl>,
904 direct_addresses: BTreeSet<SocketAddr>,
905}
906
907impl From<EndpointAddr> for AddrInfo {
908 fn from(endpoint_addr: EndpointAddr) -> Self {
909 Self {
910 relay_url: endpoint_addr.relay_urls().next().cloned(),
911 direct_addresses: endpoint_addr.ip_addrs().cloned().collect(),
912 }
913 }
914}
915
916fn encode_peer_data(info: &AddrInfo) -> PeerData {
917 let bytes = postcard::to_stdvec(info).expect("serializing AddrInfo may not fail");
918 PeerData::new(bytes)
919}
920
921fn decode_peer_data(peer_data: &PeerData) -> Result<AddrInfo, postcard::Error> {
922 let bytes = peer_data.as_bytes();
923 if bytes.is_empty() {
924 return Ok(AddrInfo::default());
925 }
926 let info = postcard::from_bytes(bytes)?;
927 Ok(info)
928}
929
930async fn topic_subscriber_loop(
931 sender: irpc::channel::mpsc::Sender<Event>,
932 mut topic_events: broadcast::Receiver<ProtoEvent>,
933) {
934 loop {
935 tokio::select! {
936 biased;
937 msg = topic_events.recv() => {
938 let event = match msg {
939 Err(broadcast::error::RecvError::Closed) => break,
940 Err(broadcast::error::RecvError::Lagged(_)) => Event::Lagged,
941 Ok(event) => event.into(),
942 };
943 if sender.send(event).await.is_err() {
944 break;
945 }
946 }
947 _ = sender.closed() => break,
948 }
949 }
950}
951
952type BoxedCommandReceiver =
954 n0_future::stream::Boxed<Result<Command, irpc::channel::mpsc::RecvError>>;
955
956#[derive(derive_more::Debug)]
957struct TopicCommandStream {
958 topic_id: TopicId,
959 #[debug("CommandStream")]
960 stream: BoxedCommandReceiver,
961 closed: bool,
962}
963
964impl TopicCommandStream {
965 fn new(topic_id: TopicId, stream: BoxedCommandReceiver) -> Self {
966 Self {
967 topic_id,
968 stream,
969 closed: false,
970 }
971 }
972}
973
974impl Stream for TopicCommandStream {
975 type Item = (TopicId, Option<Command>);
976 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
977 if self.closed {
978 return Poll::Ready(None);
979 }
980 match Pin::new(&mut self.stream).poll_next(cx) {
981 Poll::Ready(Some(Ok(item))) => Poll::Ready(Some((self.topic_id, Some(item)))),
982 Poll::Ready(None) | Poll::Ready(Some(Err(_))) => {
983 self.closed = true;
984 Poll::Ready(Some((self.topic_id, None)))
985 }
986 Poll::Pending => Poll::Pending,
987 }
988 }
989}
990
991#[derive(Debug)]
992struct Dialer {
993 endpoint: Endpoint,
994 pending: JoinSet<(
995 EndpointId,
996 Option<Result<Connection, iroh::endpoint::ConnectError>>,
997 )>,
998 pending_dials: HashMap<EndpointId, CancellationToken>,
999}
1000
1001impl Dialer {
1002 fn new(endpoint: Endpoint) -> Self {
1004 Self {
1005 endpoint,
1006 pending: Default::default(),
1007 pending_dials: Default::default(),
1008 }
1009 }
1010
1011 fn queue_dial(&mut self, endpoint_id: EndpointId, alpn: Bytes) {
1013 if self.is_pending(endpoint_id) {
1014 return;
1015 }
1016 let cancel = CancellationToken::new();
1017 self.pending_dials.insert(endpoint_id, cancel.clone());
1018 let endpoint = self.endpoint.clone();
1019 self.pending.spawn(
1020 async move {
1021 let res = tokio::select! {
1022 biased;
1023 _ = cancel.cancelled() => None,
1024 res = endpoint.connect(endpoint_id, &alpn) => Some(res),
1025 };
1026 (endpoint_id, res)
1027 }
1028 .instrument(tracing::Span::current()),
1029 );
1030 }
1031
1032 fn is_pending(&self, endpoint: EndpointId) -> bool {
1034 self.pending_dials.contains_key(&endpoint)
1035 }
1036
1037 async fn next_conn(
1040 &mut self,
1041 ) -> (
1042 EndpointId,
1043 Option<Result<Connection, iroh::endpoint::ConnectError>>,
1044 ) {
1045 match self.pending_dials.is_empty() {
1046 false => {
1047 let (endpoint_id, res) = loop {
1048 match self.pending.join_next().await {
1049 Some(Ok((endpoint_id, res))) => {
1050 self.pending_dials.remove(&endpoint_id);
1051 break (endpoint_id, res);
1052 }
1053 Some(Err(e)) => {
1054 error!("next conn error: {:?}", e);
1055 }
1056 None => {
1057 error!("no more pending conns available");
1058 std::future::pending().await
1059 }
1060 }
1061 };
1062
1063 (endpoint_id, res)
1064 }
1065 true => std::future::pending().await,
1066 }
1067 }
1068}
1069
1070#[cfg(test)]
1071pub(crate) mod tests {
1072 use std::time::Duration;
1073
1074 use bytes::Bytes;
1075 use futures_concurrency::future::TryJoin;
1076 use iroh::{
1077 address_lookup::memory::MemoryLookup,
1078 endpoint::{presets, BindError},
1079 protocol::Router,
1080 tls::CaRootsConfig,
1081 RelayMap, RelayMode, SecretKey,
1082 };
1083 use n0_error::{AnyError, Result, StdResultExt};
1084 use n0_tracing_test::traced_test;
1085 use rand::{CryptoRng, RngExt};
1086 use tokio::{spawn, time::timeout};
1087 use tokio_util::sync::CancellationToken;
1088 use tracing::{info, instrument};
1089
1090 use super::*;
1091 use crate::api::{ApiError, GossipReceiver, GossipSender};
1092
1093 struct ManualActorLoop {
1094 actor: Actor,
1095 step: usize,
1096 }
1097
1098 impl std::ops::Deref for ManualActorLoop {
1099 type Target = Actor;
1100
1101 fn deref(&self) -> &Self::Target {
1102 &self.actor
1103 }
1104 }
1105
1106 impl std::ops::DerefMut for ManualActorLoop {
1107 fn deref_mut(&mut self) -> &mut Self::Target {
1108 &mut self.actor
1109 }
1110 }
1111
1112 type EndpointHandle = tokio::task::JoinHandle<Result<()>>;
1113
1114 impl ManualActorLoop {
1115 #[instrument(skip_all, fields(me = %actor.endpoint.id().fmt_short()))]
1116 async fn new(mut actor: Actor) -> Self {
1117 let _ = actor.setup().await;
1118 Self { actor, step: 0 }
1119 }
1120
1121 #[instrument(skip_all, fields(me = %self.endpoint.id().fmt_short()))]
1122 async fn step(&mut self) -> bool {
1123 let ManualActorLoop { actor, step } = self;
1124 *step += 1;
1125 let addr_update_stream = &mut futures_lite::stream::pending();
1128 actor.event_loop(addr_update_stream, *step).await
1129 }
1130
1131 async fn steps(&mut self, n: usize) {
1132 for _ in 0..n {
1133 self.step().await;
1134 }
1135 }
1136
1137 async fn finish(mut self) {
1138 while self.step().await {}
1139 }
1140 }
1141
1142 impl Gossip {
1143 async fn t_new_with_actor(
1150 rng: &mut rand::rngs::ChaCha12Rng,
1151 config: proto::Config,
1152 relay_map: RelayMap,
1153 cancel: &CancellationToken,
1154 ) -> Result<(Self, Actor, EndpointHandle), BindError> {
1155 let endpoint = create_endpoint(rng, relay_map, None).await?;
1156 let metrics = Arc::new(Metrics::default());
1157 let address_lookup = GossipAddressLookup::default();
1158 endpoint
1159 .address_lookup()
1160 .expect("endpoint is not closed")
1161 .add(address_lookup.clone());
1162
1163 let (actor, to_actor_tx, conn_tx) =
1164 Actor::new(endpoint, config, metrics.clone(), None, address_lookup);
1165 let max_message_size = actor.state.max_message_size();
1166
1167 let _actor_handle =
1168 AbortOnDropHandle::new(task::spawn(futures_lite::future::pending()));
1169 let gossip = Self {
1170 inner: Inner {
1171 api: GossipApi::local(to_actor_tx),
1172 local_tx: conn_tx,
1173 _actor_handle,
1174 max_message_size,
1175 metrics,
1176 }
1177 .into(),
1178 };
1179
1180 let endpoint_task = task::spawn(endpoint_loop(
1181 actor.endpoint.clone(),
1182 gossip.clone(),
1183 cancel.child_token(),
1184 ));
1185
1186 Ok((gossip, actor, endpoint_task))
1187 }
1188
1189 async fn t_new(
1191 rng: &mut rand::rngs::ChaCha12Rng,
1192 config: proto::Config,
1193 relay_map: RelayMap,
1194 cancel: &CancellationToken,
1195 ) -> Result<(Self, Endpoint, EndpointHandle, impl Drop + use<>), BindError> {
1196 let (g, actor, ep_handle) =
1197 Gossip::t_new_with_actor(rng, config, relay_map, cancel).await?;
1198 let ep = actor.endpoint.clone();
1199 let me = ep.id().fmt_short();
1200 let actor_handle =
1201 task::spawn(actor.run().instrument(tracing::error_span!("gossip", %me)));
1202 Ok((g, ep, ep_handle, AbortOnDropHandle::new(actor_handle)))
1203 }
1204 }
1205
1206 pub(crate) async fn create_endpoint(
1207 rng: &mut rand::rngs::ChaCha12Rng,
1208 relay_map: RelayMap,
1209 memory_lookup: Option<MemoryLookup>,
1210 ) -> Result<Endpoint, BindError> {
1211 let ep = Endpoint::builder(presets::Minimal)
1212 .relay_mode(RelayMode::Custom(relay_map))
1213 .secret_key(SecretKey::from_bytes(&rng.random()))
1214 .alpns(vec![GOSSIP_ALPN.to_vec()])
1215 .ca_roots_config(CaRootsConfig::insecure_skip_verify())
1216 .bind()
1217 .await?;
1218
1219 if let Some(memory_lookup) = memory_lookup {
1220 ep.address_lookup()
1221 .expect("endpoint is not closed")
1222 .add(memory_lookup);
1223 }
1224 ep.online().await;
1225 Ok(ep)
1226 }
1227
1228 async fn endpoint_loop(
1229 endpoint: Endpoint,
1230 gossip: Gossip,
1231 cancel: CancellationToken,
1232 ) -> Result<()> {
1233 loop {
1234 tokio::select! {
1235 biased;
1236 _ = cancel.cancelled() => break,
1237 incoming = endpoint.accept() => match incoming {
1238 None => break,
1239 Some(incoming) => {
1240 let connecting = match incoming.accept() {
1241 Ok(connecting) => connecting,
1242 Err(err) => {
1243 warn!("incoming connection failed: {err:#}");
1244 continue;
1247 }
1248 };
1249 let connection = connecting
1250 .await
1251 .std_context("await incoming connection")?;
1252 gossip.handle_connection(connection).await?
1253 }
1254 }
1255 }
1256 }
1257 Ok(())
1258 }
1259
1260 #[tokio::test]
1261 #[traced_test]
1262 async fn gossip_net_smoke() {
1263 let mut rng = rand::rngs::ChaCha12Rng::seed_from_u64(1);
1264 let (relay_map, relay_url, _guard) = iroh::test_utils::run_relay_server().await.unwrap();
1265
1266 let memory_lookup = MemoryLookup::new();
1267
1268 let ep1 = create_endpoint(&mut rng, relay_map.clone(), Some(memory_lookup.clone()))
1269 .await
1270 .unwrap();
1271 let ep2 = create_endpoint(&mut rng, relay_map.clone(), Some(memory_lookup.clone()))
1272 .await
1273 .unwrap();
1274 let ep3 = create_endpoint(&mut rng, relay_map.clone(), Some(memory_lookup.clone()))
1275 .await
1276 .unwrap();
1277
1278 let go1 = Gossip::builder().spawn(ep1.clone());
1279 let go2 = Gossip::builder().spawn(ep2.clone());
1280 let go3 = Gossip::builder().spawn(ep3.clone());
1281 debug!("peer1 {:?}", ep1.id());
1282 debug!("peer2 {:?}", ep2.id());
1283 debug!("peer3 {:?}", ep3.id());
1284 let pi1 = ep1.id();
1285 let pi2 = ep2.id();
1286
1287 let cancel = CancellationToken::new();
1288 let tasks = [
1289 spawn(endpoint_loop(ep1.clone(), go1.clone(), cancel.clone())),
1290 spawn(endpoint_loop(ep2.clone(), go2.clone(), cancel.clone())),
1291 spawn(endpoint_loop(ep3.clone(), go3.clone(), cancel.clone())),
1292 ];
1293
1294 debug!("----- adding peers ----- ");
1295 let topic: TopicId = blake3::hash(b"foobar").into();
1296
1297 let addr1 = EndpointAddr::new(pi1).with_relay_url(relay_url.clone());
1298 let addr2 = EndpointAddr::new(pi2).with_relay_url(relay_url);
1299 memory_lookup.add_endpoint_info(addr1.clone());
1300 memory_lookup.add_endpoint_info(addr2.clone());
1301
1302 debug!("----- joining ----- ");
1303 let [sub1, mut sub2, mut sub3] = [
1305 go1.subscribe_and_join(topic, vec![]),
1306 go2.subscribe_and_join(topic, vec![pi1]),
1307 go3.subscribe_and_join(topic, vec![pi2]),
1308 ]
1309 .try_join()
1310 .await
1311 .unwrap();
1312
1313 let (sink1, _stream1) = sub1.split();
1314
1315 let len = 2;
1316
1317 let pub1 = spawn(async move {
1319 for i in 0..len {
1320 let message = format!("hi{i}");
1321 info!("go1 broadcast: {message:?}");
1322 sink1.broadcast(message.into_bytes().into()).await.unwrap();
1323 tokio::time::sleep(Duration::from_micros(1)).await;
1324 }
1325 });
1326
1327 let sub2 = spawn(async move {
1329 let mut recv = vec![];
1330 loop {
1331 let ev = sub2.next().await.unwrap().unwrap();
1332 info!("go2 event: {ev:?}");
1333 if let Event::Received(msg) = ev {
1334 recv.push(msg.content);
1335 }
1336 if recv.len() == len {
1337 return recv;
1338 }
1339 }
1340 });
1341
1342 let sub3 = spawn(async move {
1344 let mut recv = vec![];
1345 loop {
1346 let ev = sub3.next().await.unwrap().unwrap();
1347 info!("go3 event: {ev:?}");
1348 if let Event::Received(msg) = ev {
1349 recv.push(msg.content);
1350 }
1351 if recv.len() == len {
1352 return recv;
1353 }
1354 }
1355 });
1356
1357 timeout(Duration::from_secs(10), pub1)
1358 .await
1359 .unwrap()
1360 .unwrap();
1361 let recv2 = timeout(Duration::from_secs(10), sub2)
1362 .await
1363 .unwrap()
1364 .unwrap();
1365 let recv3 = timeout(Duration::from_secs(10), sub3)
1366 .await
1367 .unwrap()
1368 .unwrap();
1369
1370 let expected: HashSet<Bytes> = (0..len)
1375 .map(|i| Bytes::from(format!("hi{i}").into_bytes()))
1376 .collect();
1377 assert_eq!(HashSet::from_iter(recv2), expected);
1378 assert_eq!(HashSet::from_iter(recv3), expected);
1379
1380 cancel.cancel();
1381 for t in tasks {
1382 timeout(Duration::from_secs(10), t)
1383 .await
1384 .unwrap()
1385 .unwrap()
1386 .unwrap();
1387 }
1388 }
1389
1390 #[tokio::test]
1400 #[traced_test]
1401 async fn subscription_cleanup() -> Result {
1402 let rng = &mut rand::rngs::ChaCha12Rng::seed_from_u64(1);
1403 let ct = CancellationToken::new();
1404 let (relay_map, relay_url, _guard) = iroh::test_utils::run_relay_server().await.unwrap();
1405
1406 let (go1, actor, ep1_handle) =
1408 Gossip::t_new_with_actor(rng, Default::default(), relay_map.clone(), &ct).await?;
1409 let mut actor = ManualActorLoop::new(actor).await;
1410
1411 let (go2, ep2, ep2_handle, _test_actor_handle) =
1413 Gossip::t_new(rng, Default::default(), relay_map, &ct).await?;
1414
1415 let endpoint_id1 = actor.endpoint.id();
1416 let endpoint_id2 = ep2.id();
1417 tracing::info!(
1418 endpoint_1 = %endpoint_id1.fmt_short(),
1419 endpoint_2 = %endpoint_id2.fmt_short(),
1420 "endpoints ready"
1421 );
1422
1423 let topic: TopicId = blake3::hash(b"subscription_cleanup").into();
1424 tracing::info!(%topic, "joining");
1425
1426 let ct2 = ct.clone();
1433 let go2_task = async move {
1434 let (_pub_tx, mut sub_rx) = go2.subscribe_and_join(topic, vec![]).await?.split();
1435
1436 let subscribe_fut = async {
1437 while let Some(ev) = sub_rx.try_next().await? {
1438 match ev {
1439 Event::Lagged => tracing::debug!("missed some messages :("),
1440 Event::Received(_) => unreachable!("test does not send messages"),
1441 other => tracing::debug!(?other, "gs event"),
1442 }
1443 }
1444
1445 tracing::debug!("subscribe stream ended");
1446 Ok::<_, AnyError>(())
1447 };
1448
1449 tokio::select! {
1450 _ = ct2.cancelled() => Ok(()),
1451 res = subscribe_fut => res,
1452 }
1453 }
1454 .instrument(tracing::debug_span!("endpoint_2", %endpoint_id2));
1455 let go2_handle = task::spawn(go2_task);
1456
1457 let addr2 = EndpointAddr::new(endpoint_id2).with_relay_url(relay_url);
1459 let memory_lookup = MemoryLookup::new();
1460 memory_lookup.add_endpoint_info(addr2);
1461 actor.endpoint.address_lookup()?.add(memory_lookup);
1462 let (tx, mut rx) = mpsc::channel::<()>(1);
1464 let ct1 = ct.clone();
1465 let go1_task = async move {
1466 tracing::info!("subscribing the first time");
1468 let sub_1a = go1.subscribe_and_join(topic, vec![endpoint_id2]).await?;
1469
1470 rx.recv().await.expect("signal for second subscribe");
1472 tracing::info!("subscribing a second time");
1473 let sub_1b = go1.subscribe_and_join(topic, vec![endpoint_id2]).await?;
1474 drop(sub_1a);
1475
1476 rx.recv().await.expect("signal for second subscribe");
1478 tracing::info!("dropping all handles");
1479 drop(sub_1b);
1480
1481 ct1.cancelled().await;
1483 drop(go1);
1484
1485 Ok::<_, AnyError>(())
1486 }
1487 .instrument(tracing::debug_span!("endpoint_1", %endpoint_id1));
1488 let go1_handle = task::spawn(go1_task);
1489
1490 actor.steps(3).await; let state = actor.topics.get(&topic).expect("get registered topic");
1495 assert!(state.joined());
1496
1497 tx.send(())
1499 .await
1500 .std_context("signal additional subscribe")?;
1501 actor.steps(3).await; let state = actor.topics.get(&topic).expect("get registered topic");
1503 assert!(state.joined());
1504
1505 tx.send(()).await.std_context("signal drop handles")?;
1507 actor.steps(2).await; assert!(!actor.topics.contains_key(&topic));
1509
1510 ct.cancel();
1512 let wait = Duration::from_secs(2);
1513 timeout(wait, ep1_handle)
1514 .await
1515 .std_context("wait endpoint1 task")?
1516 .std_context("join endpoint1 task")??;
1517 timeout(wait, ep2_handle)
1518 .await
1519 .std_context("wait endpoint2 task")?
1520 .std_context("join endpoint2 task")??;
1521 timeout(wait, go1_handle)
1522 .await
1523 .std_context("wait gossip1 task")?
1524 .std_context("join gossip1 task")??;
1525 timeout(wait, go2_handle)
1526 .await
1527 .std_context("wait gossip2 task")?
1528 .std_context("join gossip2 task")??;
1529 timeout(wait, actor.finish())
1530 .await
1531 .std_context("wait actor finish")?;
1532
1533 Ok(())
1534 }
1535
1536 #[tokio::test]
1543 #[traced_test]
1544 async fn can_reconnect() -> Result {
1545 let rng = &mut rand::rngs::ChaCha12Rng::seed_from_u64(1);
1546 let ct = CancellationToken::new();
1547 let (relay_map, relay_url, _guard) = iroh::test_utils::run_relay_server().await.unwrap();
1548
1549 let (go1, ep1, ep1_handle, _test_actor_handle1) =
1550 Gossip::t_new(rng, Default::default(), relay_map.clone(), &ct).await?;
1551
1552 let (go2, ep2, ep2_handle, _test_actor_handle2) =
1553 Gossip::t_new(rng, Default::default(), relay_map, &ct).await?;
1554
1555 let endpoint_id1 = ep1.id();
1556 let endpoint_id2 = ep2.id();
1557 tracing::info!(
1558 endpoint_1 = %endpoint_id1.fmt_short(),
1559 endpoint_2 = %endpoint_id2.fmt_short(),
1560 "endpoints ready"
1561 );
1562
1563 let topic: TopicId = blake3::hash(b"can_reconnect").into();
1564 tracing::info!(%topic, "joining");
1565
1566 let ct2 = ct.child_token();
1567 let (tx, mut rx) = mpsc::channel::<()>(1);
1569 let addr1 = EndpointAddr::new(endpoint_id1).with_relay_url(relay_url.clone());
1570 let memory_lookup = MemoryLookup::new();
1571 memory_lookup.add_endpoint_info(addr1);
1572 ep2.address_lookup()?.add(memory_lookup.clone());
1573 let go2_task = async move {
1574 let mut sub = go2.subscribe(topic, Vec::new()).await?;
1575 sub.joined().await?;
1576
1577 rx.recv().await.expect("signal to unsubscribe");
1578 tracing::info!("unsubscribing");
1579 drop(sub);
1580
1581 rx.recv().await.expect("signal to subscribe again");
1582 tracing::info!("resubscribing");
1583 let mut sub = go2.subscribe(topic, vec![endpoint_id1]).await?;
1584
1585 sub.joined().await?;
1586 tracing::info!("subscription successful!");
1587
1588 ct2.cancelled().await;
1589
1590 Ok::<_, ApiError>(())
1591 }
1592 .instrument(tracing::debug_span!("endpoint_2", %endpoint_id2));
1593 let go2_handle = task::spawn(go2_task);
1594
1595 let addr2 = EndpointAddr::new(endpoint_id2).with_relay_url(relay_url);
1596 memory_lookup.add_endpoint_info(addr2);
1597 ep1.address_lookup()?.add(memory_lookup);
1598
1599 let mut sub = go1.subscribe(topic, vec![endpoint_id2]).await?;
1600 sub.joined().await?;
1602
1603 tx.send(()).await.std_context("signal unsubscribe")?;
1605
1606 let conn_timeout = Duration::from_millis(500);
1608 let ev = timeout(conn_timeout, sub.try_next())
1609 .await
1610 .std_context("wait neighbor down")??;
1611 assert_eq!(ev, Some(Event::NeighborDown(endpoint_id2)));
1612 tracing::info!("endpoint 2 left");
1613
1614 tx.send(()).await.std_context("signal resubscribe")?;
1616
1617 let conn_timeout = Duration::from_millis(500);
1618 let ev = timeout(conn_timeout, sub.try_next())
1619 .await
1620 .std_context("wait neighbor up")??;
1621 assert_eq!(ev, Some(Event::NeighborUp(endpoint_id2)));
1622 tracing::info!("endpoint 2 rejoined!");
1623
1624 ct.cancel();
1626 let wait = Duration::from_secs(2);
1627 timeout(wait, ep1_handle)
1628 .await
1629 .std_context("wait endpoint1 task")?
1630 .std_context("join endpoint1 task")??;
1631 timeout(wait, ep2_handle)
1632 .await
1633 .std_context("wait endpoint2 task")?
1634 .std_context("join endpoint2 task")??;
1635 timeout(wait, go2_handle)
1636 .await
1637 .std_context("wait gossip2 task")?
1638 .std_context("join gossip2 task")??;
1639
1640 Result::Ok(())
1641 }
1642
1643 #[tokio::test]
1644 #[traced_test]
1645 async fn can_die_and_reconnect() -> Result {
1646 fn run_in_thread<T: Send + 'static>(
1649 cancel: CancellationToken,
1650 fut: impl std::future::Future<Output = T> + Send + 'static,
1651 ) -> std::thread::JoinHandle<Option<T>> {
1652 std::thread::spawn(move || {
1653 let rt = tokio::runtime::Builder::new_current_thread()
1654 .enable_all()
1655 .build()
1656 .unwrap();
1657 rt.block_on(async move { cancel.run_until_cancelled(fut).await })
1658 })
1659 }
1660
1661 async fn spawn_gossip(
1663 secret_key: SecretKey,
1664 relay_map: RelayMap,
1665 ) -> Result<(Router, Gossip), BindError> {
1666 let ep = Endpoint::builder(presets::Minimal)
1667 .relay_mode(RelayMode::Custom(relay_map))
1668 .secret_key(secret_key)
1669 .ca_roots_config(CaRootsConfig::insecure_skip_verify())
1670 .bind()
1671 .await?;
1672 let gossip = Gossip::builder().spawn(ep.clone());
1673 let router = Router::builder(ep)
1674 .accept(GOSSIP_ALPN, gossip.clone())
1675 .spawn();
1676 Ok((router, gossip))
1677 }
1678
1679 async fn broadcast_once(
1681 secret_key: SecretKey,
1682 relay_map: RelayMap,
1683 bootstrap_addr: EndpointAddr,
1684 topic_id: TopicId,
1685 message: String,
1686 ) -> Result {
1687 let (router, gossip) = spawn_gossip(secret_key, relay_map).await?;
1688 info!(endpoint_id = %router.endpoint().id().fmt_short(), "broadcast endpoint spawned");
1689 let bootstrap = vec![bootstrap_addr.id];
1690 let memory_lookup = MemoryLookup::new();
1691 memory_lookup.add_endpoint_info(bootstrap_addr);
1692 router.endpoint().address_lookup()?.add(memory_lookup);
1693 let mut topic = gossip.subscribe_and_join(topic_id, bootstrap).await?;
1694 topic.broadcast(message.as_bytes().to_vec().into()).await?;
1695 std::future::pending::<()>().await;
1696 Ok(())
1697 }
1698
1699 let (relay_map, _relay_url, _guard) = iroh::test_utils::run_relay_server().await.unwrap();
1700 let rng = &mut rand::rngs::ChaCha12Rng::seed_from_u64(1);
1701 let topic_id = TopicId::from_bytes(rng.random());
1702
1703 let (addr_tx, addr_rx) = tokio::sync::oneshot::channel();
1706 let (msgs_recv_tx, mut msgs_recv_rx) = tokio::sync::mpsc::channel(3);
1707 let recv_task = tokio::task::spawn({
1708 let relay_map = relay_map.clone();
1709 let secret_key = SecretKey::from_bytes(&rng.random());
1710 async move {
1711 let (router, gossip) = spawn_gossip(secret_key, relay_map).await?;
1712 router.endpoint().online().await;
1717 let addr = router.endpoint().addr();
1718 info!(endpoint_id = %addr.id.fmt_short(), "recv endpoint spawned");
1719 addr_tx.send(addr).unwrap();
1720 let mut topic = gossip.subscribe_and_join(topic_id, vec![]).await?;
1721 while let Some(event) = topic.try_next().await.unwrap() {
1722 if let Event::Received(message) = event {
1723 let message = std::str::from_utf8(&message.content)
1724 .std_context("decode broadcast message")?
1725 .to_string();
1726 msgs_recv_tx
1727 .send(message)
1728 .await
1729 .std_context("forward received message")?;
1730 }
1731 }
1732 Ok::<_, AnyError>(())
1733 }
1734 });
1735
1736 let endpoint0_addr = addr_rx.await.std_context("receive endpoint address")?;
1737 let max_wait = Duration::from_secs(5);
1738
1739 let cancel = CancellationToken::new();
1742 let secret = SecretKey::from_bytes(&rng.random());
1743 let join_handle_1 = run_in_thread(
1744 cancel.clone(),
1745 broadcast_once(
1746 secret.clone(),
1747 relay_map.clone(),
1748 endpoint0_addr.clone(),
1749 topic_id,
1750 "msg1".to_string(),
1751 ),
1752 );
1753 let msg = timeout(max_wait, msgs_recv_rx.recv())
1755 .await
1756 .std_context("wait for first broadcast")?
1757 .std_context("receiver dropped channel")?;
1758 assert_eq!(&msg, "msg1");
1759 info!("kill broadcast endpoint");
1760 cancel.cancel();
1761
1762 let cancel = CancellationToken::new();
1764 let join_handle_2 = run_in_thread(
1765 cancel.clone(),
1766 broadcast_once(
1767 secret.clone(),
1768 relay_map.clone(),
1769 endpoint0_addr.clone(),
1770 topic_id,
1771 "msg2".to_string(),
1772 ),
1773 );
1774 let msg = timeout(max_wait, msgs_recv_rx.recv())
1777 .await
1778 .std_context("wait for second broadcast")?
1779 .std_context("receiver dropped channel")?;
1780 assert_eq!(&msg, "msg2");
1781 info!("kill broadcast endpoint");
1782 cancel.cancel();
1783
1784 info!("kill recv endpoint");
1785 recv_task.abort();
1786 assert!(join_handle_1.join().unwrap().is_none());
1787 assert!(join_handle_2.join().unwrap().is_none());
1788
1789 Ok(())
1790 }
1791
1792 #[tokio::test]
1793 #[traced_test]
1794 async fn gossip_change_alpn() -> n0_error::Result<()> {
1795 let alpn = b"my-gossip-alpn";
1796 let topic_id = TopicId::from([0u8; 32]);
1797
1798 let ep1 = Endpoint::bind(presets::Minimal).await?;
1799 let ep2 = Endpoint::bind(presets::Minimal).await?;
1800 let gossip1 = Gossip::builder().alpn(alpn).spawn(ep1.clone());
1801 let gossip2 = Gossip::builder().alpn(alpn).spawn(ep2.clone());
1802 let router1 = Router::builder(ep1).accept(alpn, gossip1.clone()).spawn();
1803 let router2 = Router::builder(ep2).accept(alpn, gossip2.clone()).spawn();
1804
1805 let addr1 = router1.endpoint().addr();
1806 let id1 = addr1.id;
1807 let memory_lookup = MemoryLookup::new();
1808 memory_lookup.add_endpoint_info(addr1);
1809 router2.endpoint().address_lookup()?.add(memory_lookup);
1810
1811 let mut topic1 = gossip1.subscribe(topic_id, vec![]).await?;
1812 let mut topic2 = gossip2.subscribe(topic_id, vec![id1]).await?;
1813
1814 timeout(Duration::from_secs(3), topic1.joined())
1815 .await
1816 .std_context("wait topic1 join")??;
1817 timeout(Duration::from_secs(3), topic2.joined())
1818 .await
1819 .std_context("wait topic2 join")??;
1820 router1.shutdown().await.std_context("shutdown router1")?;
1821 router2.shutdown().await.std_context("shutdown router2")?;
1822 Ok(())
1823 }
1824
1825 #[tokio::test]
1826 #[traced_test]
1827 async fn gossip_rely_on_gossip_address_lookup() -> n0_error::Result<()> {
1828 let rng = &mut rand::rngs::ChaCha12Rng::seed_from_u64(1);
1829
1830 async fn spawn(
1831 rng: &mut impl CryptoRng,
1832 ) -> n0_error::Result<(EndpointId, Router, Gossip, GossipSender, GossipReceiver)> {
1833 let topic_id = TopicId::from([0u8; 32]);
1834 let ep = Endpoint::builder(presets::Minimal)
1835 .secret_key(SecretKey::from_bytes(&rng.random()))
1836 .bind()
1837 .await?;
1838 let endpoint_id = ep.id();
1839 let gossip = Gossip::builder().spawn(ep.clone());
1840 let router = Router::builder(ep)
1841 .accept(GOSSIP_ALPN, gossip.clone())
1842 .spawn();
1843 let topic = gossip.subscribe(topic_id, vec![]).await?;
1844 let (sender, receiver) = topic.split();
1845 Ok((endpoint_id, router, gossip, sender, receiver))
1846 }
1847
1848 let (n1, r1, _g1, _tx1, mut rx1) = spawn(rng).await?;
1850 let (n2, r2, _g2, tx2, mut rx2) = spawn(rng).await?;
1851 let (n3, r3, _g3, tx3, mut rx3) = spawn(rng).await?;
1852
1853 println!("endpoints {:?}", [n1, n2, n3]);
1854
1855 let addr1 = r1.endpoint().addr();
1857 let lookup = MemoryLookup::new();
1858 lookup.add_endpoint_info(addr1);
1859
1860 r2.endpoint().address_lookup()?.add(lookup.clone());
1862 tx2.join_peers(vec![n1]).await?;
1863
1864 timeout(Duration::from_secs(3), rx1.joined())
1866 .await
1867 .std_context("wait rx1 join")??;
1868 timeout(Duration::from_secs(3), rx2.joined())
1869 .await
1870 .std_context("wait rx2 join")??;
1871
1872 r3.endpoint().address_lookup()?.add(lookup.clone());
1874 tx3.join_peers(vec![n1]).await?;
1875
1876 let ev = timeout(Duration::from_secs(3), rx3.next())
1879 .await
1880 .std_context("wait rx3 first neighbor")?;
1881 assert!(matches!(ev, Some(Ok(Event::NeighborUp(_)))));
1882 let ev = timeout(Duration::from_secs(3), rx3.next())
1883 .await
1884 .std_context("wait rx3 second neighbor")?;
1885 assert!(matches!(ev, Some(Ok(Event::NeighborUp(_)))));
1886
1887 assert_eq!(sorted(rx3.neighbors()), sorted([n1, n2]));
1888
1889 let ev = timeout(Duration::from_secs(3), rx2.next())
1890 .await
1891 .std_context("wait rx2 neighbor")?;
1892 assert!(matches!(ev, Some(Ok(Event::NeighborUp(n))) if n == n3));
1893
1894 let ev = timeout(Duration::from_secs(3), rx1.next())
1895 .await
1896 .std_context("wait rx1 neighbor")?;
1897 assert!(matches!(ev, Some(Ok(Event::NeighborUp(n))) if n == n3));
1898
1899 tokio::try_join!(r1.shutdown(), r2.shutdown(), r3.shutdown())
1900 .std_context("shutdown routers")?;
1901 Ok(())
1902 }
1903
1904 fn sorted<T: Ord>(input: impl IntoIterator<Item = T>) -> Vec<T> {
1905 let mut out: Vec<_> = input.into_iter().collect();
1906 out.sort();
1907 out
1908 }
1909
1910 #[tokio::test]
1916 #[traced_test]
1917 async fn topic_stays_alive_after_sender_drop() -> n0_error::Result<()> {
1918 let topic_id = TopicId::from([99u8; 32]);
1919
1920 let ep1 = Endpoint::bind(presets::Minimal).await?;
1921 let ep2 = Endpoint::bind(presets::Minimal).await?;
1922 let gossip1 = Gossip::builder().spawn(ep1.clone());
1923 let gossip2 = Gossip::builder().spawn(ep2.clone());
1924 let router1 = Router::builder(ep1)
1925 .accept(crate::ALPN, gossip1.clone())
1926 .spawn();
1927 let router2 = Router::builder(ep2)
1928 .accept(crate::ALPN, gossip2.clone())
1929 .spawn();
1930
1931 let addr1 = router1.endpoint().addr();
1932 let id1 = addr1.id;
1933 let mem_lookup = MemoryLookup::new();
1934 mem_lookup.add_endpoint_info(addr1);
1935 router2.endpoint().address_lookup()?.add(mem_lookup);
1936
1937 let topic1 = gossip1.subscribe(topic_id, vec![]).await?;
1938 let topic2 = gossip2.subscribe(topic_id, vec![id1]).await?;
1939
1940 let (tx1, mut rx1) = topic1.split();
1941 let (tx2, mut rx2) = topic2.split();
1942
1943 timeout(Duration::from_secs(3), rx1.joined())
1945 .await
1946 .std_context("wait rx1 join")??;
1947 timeout(Duration::from_secs(3), rx2.joined())
1948 .await
1949 .std_context("wait rx2 join")??;
1950
1951 drop(tx1);
1953
1954 tx2.broadcast(b"hello from node2".to_vec().into()).await?;
1956
1957 let event = timeout(Duration::from_secs(3), rx1.next())
1959 .await
1960 .std_context("wait for message on rx1")?;
1961
1962 match event {
1963 Some(Ok(Event::Received(msg))) => {
1964 assert_eq!(&msg.content[..], b"hello from node2");
1965 }
1966 other => panic!("expected Received event, got {:?}", other),
1967 }
1968
1969 drop(tx2);
1970 drop(rx1);
1971 drop(rx2);
1972 router1.shutdown().await.std_context("shutdown router1")?;
1973 router2.shutdown().await.std_context("shutdown router2")?;
1974 Ok(())
1975 }
1976}