p2panda_net/
network.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2
3//! Node implementation for p2p networking and data streaming, extensible with discovery
4//! strategies, sync protocols, blob sync and more.
5//!
6//! As soon as the local node is launched it roughly attempts the following goal:
7//!
8//! 1. Find as many peers as possible who are interested in the same topic
9//! 2. Connect with as many peers as possible to exchange data with, eventually converging to the
10//!    same state
11//!
12//! To achieve this goal, multiple systems are in play:
13//!
14//! ## Bootstrap
15//!
16//! Establishing a peer-to-peer network suffers from the "chicken and egg" problem where we need to
17//! start _somewhere_ before we can begin to discover more and become "discoverable" for other,
18//! possibly previously unknown peers.
19//!
20//! This process of the "first step" into a network is called "bootstrap" and can be realised with
21//! different strategies:
22//!
23//! 1. Supply your node with a hard-coded list of well-known node addresses which are directly
24//!    reachable. The node will attempt connecting them on start.
25//! 2. Use techniques like mDNS or rendesvouz servers to learn about other nodes to connect to.
26//!
27//! The latter approach is very similar to "peer discovery" with one important difference: While
28//! peer discovery helps us to learn about _more peers_ we use these discovery techniques during
29//! bootstrap to find _the first_ peer to connect to.
30//!
31//! ## Peer Discovery
32//!
33//! Like "Bootstrap" we can apply similar algorithms to find more peers in the network. In
34//! `p2panda-net` this is an "ambient" process, meaning that it constantly takes place in the
35//! background, as soon as the node is running.
36//!
37//! To find more peers which potentially might be interested in the same data as the local node,
38//! one or more discovery techniques can be added when creating the network, for example mDNS for
39//! finding peers in the local network or a "Rendesvouz" server for finding peers on the internet.
40//!
41//! Additionally `p2panda-net` might find new peers through joining any gossip overlay. If another
42//! peer becomes a direct neighbor in the gossip tree, we register it in our address book.
43//!
44//! ## Topic Discovery
45//!
46//! Next to "Peer Discovery" we additionally find out what peers are interested in and announce our
47//! own topics of interest in the network. With this design it is possible to have peers in the
48//! network being interested in different data, potentially using other applications, at the same
49//! time.
50//!
51//! By default `p2panda-net` always enters at least one network-wide gossip overlay where peers
52//! exchange information over this information. In the future we might use a random-walk traversal
53//! algorithm instead to "explore" the network.
54//!
55//! As soon as we've identified a common interest in the same topic, we're joining the gossip
56//! overlay for this topic with them and earmark this peer for a future sync session.
57//!
58//! ## Connectivity
59//!
60//! With the help of iroh, we can connect to any device whereever they are. There is a multi-step
61//! process requiring additional strategies depending on the situation to connect to a peer:
62//!
63//! 1. If we know a peer's directly reachable address, we can just connect to them
64//! 2. If this peer's direct address is not known (because they are behind a NAT or we only know
65//!    their public key) we use a STUN server to find out the address
66//! 3. If this peer is still not reachable (for example because they are behind a Firewall), we use
67//!    a Relay (TURN-like) server to handle the connection through it
68//!
69//! In the last case we can't establish a direct connection and rely on additional infrastructure.
70//!
71//! Read the `iroh-relay` documentation to learn about how to run your own STUN and Relay server:
72//! <https://github.com/n0-computer/iroh/tree/main/iroh-relay>.
73//!
74//! This implementation aims at being robust in situations where bad or no connectivity is
75//! (temporarily) given. `p2panda-net` will automatically re-connect to peers as soon as they are
76//! reachable again which, from the application perspective, shouldn't make any difference.
77//!
78//! ## "Live Mode"
79//!
80//! To send newly created messages fastly to other peers, `p2panda-net` uses broadcast gossip
81//! overlays for each topic. With this "live mode" messages arrive almost instantly.
82//!
83//! ## Sync
84//!
85//! By learning about other peers who are interested in the same topic we keep track of them in an
86//! internal "address book". A sync session mananger process will eventually kick in a sync session
87//! with one of these peers and try to exchange _all_ data we're so far missing on that topic with
88//! them.
89//!
90//! Sync is disabled by default and can be enabled by adding a `SyncProtocol` implementation to the
91//! node.
92//!
93//! Sync sessions are only running once per peer per topic but can optionally be re-attempted after
94//! a certain duration if a `ResyncConfiguration` was given.
95//!
96//! ## Gossip Buffer
97//!
98//! Since a node receives potentially older data from another node during a sync session,
99//! `p2panda-net` uses a "gossip buffer" to make sure that we're buffering new messages which were
100//! received at the same time during the gossip overlay.
101//!
102//! The buffer is released after the sync session has ended. Through this trick we can make sure
103//! that messages are arriving "in order" (based on their timestamp or partial ordering for
104//! example) to higher application layers.
105//!
106//! Please note that this implementation can never fully be sure that messages will arrive "out of
107//! order" to the application. It is recommended to apply additional buffering if this is required.
108//! In `p2panda-streams` we offer a solution which will take care of that.
109//!
110//! ## Blobs
111//!
112//! With the help of the `p2panda-blobs` crate it is possible to extend the node to support
113//! efficient sync of large binary data (images, files etc.) with various storage backends.
114//!
115//! ## Custom Protocols
116//!
117//! Next to blob sync, data sync or discovery protocols it is also possible to register any other
118//! low-level bi-directional communication protocol to the node when necessary.
119use std::fmt::Debug;
120use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
121use std::sync::Arc;
122use std::time::Duration;
123
124use anyhow::{anyhow, Context, Result};
125use futures_lite::StreamExt;
126use futures_util::future::{MapErr, Shared};
127use futures_util::{FutureExt, TryFutureExt};
128use iroh_gossip::net::{Gossip, GOSSIP_ALPN};
129use iroh_net::endpoint::TransportConfig;
130use iroh_net::key::SecretKey;
131use iroh_net::relay::{RelayMap, RelayNode};
132use iroh_net::{Endpoint, NodeAddr, NodeId};
133use p2panda_core::{PrivateKey, PublicKey};
134use p2panda_discovery::{Discovery, DiscoveryMap};
135use p2panda_sync::TopicQuery;
136use tokio::sync::{mpsc, oneshot};
137use tokio::task::{JoinError, JoinSet};
138use tokio_util::sync::CancellationToken;
139use tokio_util::task::AbortOnDropHandle;
140use tracing::{debug, error, error_span, warn, Instrument};
141
142use crate::addrs::DEFAULT_STUN_PORT;
143use crate::config::{Config, GossipConfig, DEFAULT_BIND_PORT};
144use crate::engine::Engine;
145use crate::protocols::{ProtocolHandler, ProtocolMap};
146use crate::sync::{SyncConfiguration, SYNC_CONNECTION_ALPN};
147use crate::{NetworkId, RelayUrl, TopicId};
148
149/// Maximum number of streams accepted on a QUIC connection.
150const MAX_STREAMS: u32 = 1024;
151
152/// Timeout duration for receiving of at least one peer's direct address.
153const DIRECT_ADDRESSES_WAIT: Duration = Duration::from_secs(5);
154
155/// Relay server configuration mode.
156#[derive(Debug, PartialEq)]
157pub enum RelayMode {
158    /// No relay has been specified.
159    ///
160    /// To connect to another peer it's direct address needs to be known, otherwise any connection
161    /// attempt will fail.
162    Disabled,
163
164    /// Specify a custom relay.
165    ///
166    /// Relays are used to help establishing a connection in case the direct address is not known
167    /// yet (via STUN). In case this process fails (for example due to a firewall), the relay is
168    /// used as a fallback to tunnel traffic from one peer to another (via DERP, which is similar
169    /// to TURN).
170    ///
171    /// Important: Peers need to use the _same_ relay address to be able to connect to each other.
172    Custom(RelayNode),
173}
174
175/// Builds an overlay network for peers grouped under the same network identifier.
176///
177/// All peers can subscribe to multiple topics in this overlay and hook into a data stream per
178/// topic where they'll send and receive data.
179#[derive(Debug)]
180pub struct NetworkBuilder<T> {
181    bind_ip_v4: Option<Ipv4Addr>,
182    bind_port_v4: Option<u16>,
183    bind_ip_v6: Option<Ipv6Addr>,
184    bind_port_v6: Option<u16>,
185    direct_node_addresses: Vec<NodeAddr>,
186    discovery: DiscoveryMap,
187    gossip_config: Option<GossipConfig>,
188    network_id: NetworkId,
189    protocols: ProtocolMap,
190    relay_mode: RelayMode,
191    secret_key: Option<SecretKey>,
192    sync_config: Option<SyncConfiguration<T>>,
193}
194
195impl<T> NetworkBuilder<T>
196where
197    T: TopicQuery,
198{
199    /// Returns a new instance of `NetworkBuilder` using the given network identifier.
200    ///
201    /// Networks must use the same identifier if they wish to successfully connect and share
202    /// data.
203    pub fn new(network_id: NetworkId) -> Self {
204        Self {
205            bind_ip_v4: None,
206            bind_port_v4: None,
207            bind_ip_v6: None,
208            bind_port_v6: None,
209            direct_node_addresses: Vec::new(),
210            discovery: DiscoveryMap::default(),
211            gossip_config: None,
212            network_id,
213            protocols: Default::default(),
214            relay_mode: RelayMode::Disabled,
215            secret_key: None,
216            sync_config: None,
217        }
218    }
219
220    /// Returns a new instance of `NetworkBuilder` using the given configuration.
221    pub fn from_config(config: Config) -> Self {
222        let mut network_builder = Self::new(config.network_id)
223            .bind_ip_v4(config.bind_ip_v4)
224            .bind_port_v4(config.bind_port_v4)
225            .bind_ip_v6(config.bind_ip_v6)
226            .bind_port_v6(config.bind_port_v6);
227
228        for (public_key, addresses, relay_addr) in config.direct_node_addresses {
229            network_builder = network_builder.direct_address(public_key, addresses, relay_addr)
230        }
231
232        if let Some(url) = config.relay {
233            let port = url.port().unwrap_or(DEFAULT_STUN_PORT);
234            network_builder = network_builder.relay(url, false, port)
235        }
236
237        network_builder
238    }
239
240    /// Sets or overwrites the local IP for IPv4 sockets.
241    ///
242    /// Default is 0.0.0.0 (`UNSPECIFIED`).
243    pub fn bind_ip_v4(mut self, ip: Ipv4Addr) -> Self {
244        self.bind_ip_v4.replace(ip);
245        self
246    }
247
248    /// Sets or overwrites the local bind port for IPv4 sockets.
249    ///
250    /// Default is 2022.
251    pub fn bind_port_v4(mut self, port: u16) -> Self {
252        self.bind_port_v4.replace(port);
253        self
254    }
255
256    /// Sets or overwrites the local IP for IPv6 sockets.
257    ///
258    /// Default is :: (`UNSPECIFIED`).
259    pub fn bind_ip_v6(mut self, ip: Ipv6Addr) -> Self {
260        self.bind_ip_v6.replace(ip);
261        self
262    }
263
264    /// Sets or overwrites the local bind port for IPv6 sockets.
265    ///
266    /// Default is 2023.
267    pub fn bind_port_v6(mut self, port: u16) -> Self {
268        self.bind_port_v6.replace(port);
269        self
270    }
271
272    /// Sets or overwrites the private key.
273    ///
274    /// If this value is not set, the `NetworkBuilder` will generate a new, random key when
275    /// building the network.
276    pub fn private_key(mut self, private_key: PrivateKey) -> Self {
277        self.secret_key = Some(SecretKey::from_bytes(private_key.as_bytes()));
278        self
279    }
280
281    /// Sets the relay used by the local network to facilitate the establishment of direct
282    /// connections.
283    ///
284    /// Relay nodes are STUN servers which help in establishing a peer-to-peer connection if one or
285    /// both of the peers are behind a NAT. The relay node might offer proxy functionality on top
286    /// (via the Tailscale DERP protocol which is very similar to TURN) if the connection attempt
287    /// fails, which will serve to relay the data in that case.
288    pub fn relay(mut self, url: RelayUrl, stun_only: bool, stun_port: u16) -> Self {
289        self.relay_mode = RelayMode::Custom(RelayNode {
290            url: url.into(),
291            stun_only,
292            stun_port,
293        });
294        self
295    }
296
297    /// Sets the direct address of a peer, identified by their public key (node id).
298    ///
299    /// The direct address should be reachable without the aid of a STUN or TURN-based relay node.
300    /// However, if the direct connection attempt might fail (for example, because of a NAT or
301    /// Firewall), the relay node of that peer can be supplied to allow connecting to it via a
302    /// fallback connection.
303    ///
304    /// If no relay address is given but turns out to be required, we optimistically try to use our
305    /// own relay node instead (if specified). This might still fail, as we can't know if the peer
306    /// is using the same relay node.
307    pub fn direct_address(
308        mut self,
309        node_id: PublicKey,
310        addresses: Vec<SocketAddr>,
311        relay_addr: Option<RelayUrl>,
312    ) -> Self {
313        let node_id = NodeId::from_bytes(node_id.as_bytes()).expect("invalid public key");
314        let mut node_addr = NodeAddr::new(node_id).with_direct_addresses(addresses);
315        if let Some(url) = relay_addr {
316            node_addr = node_addr.with_relay_url(url.into());
317        }
318        self.direct_node_addresses.push(node_addr);
319        self
320    }
321
322    /// Adds one or more discovery strategy, such as mDNS.
323    pub fn discovery(mut self, handler: impl Discovery + 'static) -> Self {
324        self.discovery.add(handler);
325        self
326    }
327
328    /// Sets the sync protocol and configuration.
329    ///
330    /// Sync sessions will be automatically initiated with any known peers with whom we share
331    /// topics of interest.
332    pub fn sync(mut self, config: SyncConfiguration<T>) -> Self {
333        self.sync_config = Some(config);
334        self
335    }
336
337    /// Sets the gossip configuration.
338    ///
339    /// Configuration parameters define the behavior of the swarm membership (HyParView) and gossip
340    /// broadcast (Plumtree) layers, as well as the maximum message size.
341    pub fn gossip(mut self, config: GossipConfig) -> Self {
342        self.gossip_config = Some(config);
343        self
344    }
345
346    /// Adds additional, custom protocols for communication between two peers.
347    pub fn protocol(
348        mut self,
349        protocol_name: &'static [u8],
350        handler: impl ProtocolHandler + 'static,
351    ) -> Self {
352        self.protocols.insert(protocol_name, Arc::new(handler));
353        self
354    }
355
356    /// Returns a handle to a newly-spawned instance of `Network`.
357    ///
358    /// A peer-to-peer endpoint is created and bound to a QUIC socket, after which the gossip,
359    /// engine and connection handlers are instantiated. A sync handler is also instantiated if a
360    /// sync protocol is provided. Direct addresses for network peers are added to the engine from
361    /// the address book and core protocols are registered.
362    ///
363    /// After configuration and registration processes are complete, the network is spawned and an
364    /// attempt is made to retrieve a direct address for a network peer so that a connection may be
365    /// made. If no address is retrieved within the timeout limit, the network is shut down and an
366    /// error is returned.
367    pub async fn build(mut self) -> Result<Network<T>>
368    where
369        T: TopicQuery + TopicId + 'static,
370    {
371        let secret_key = self.secret_key.unwrap_or(SecretKey::generate());
372
373        let relay: Option<RelayNode> = match self.relay_mode {
374            RelayMode::Disabled => None,
375            RelayMode::Custom(ref node) => Some(node.clone()),
376        };
377
378        // Build p2p endpoint and bind the QUIC socket.
379        let endpoint = {
380            let mut transport_config = TransportConfig::default();
381            transport_config
382                .max_concurrent_bidi_streams(MAX_STREAMS.into())
383                .max_concurrent_uni_streams(0u32.into());
384
385            let relay_mode = match self.relay_mode {
386                RelayMode::Disabled => iroh_net::relay::RelayMode::Disabled,
387                RelayMode::Custom(node) => iroh_net::relay::RelayMode::Custom(
388                    RelayMap::from_nodes(vec![node])
389                        .expect("relay list can not contain duplicates"),
390                ),
391            };
392
393            let bind_ip_v4 = self.bind_ip_v4.unwrap_or(Ipv4Addr::UNSPECIFIED);
394            let bind_port_v4 = self.bind_port_v4.unwrap_or(DEFAULT_BIND_PORT);
395            let bind_ip_v6 = self.bind_ip_v6.unwrap_or(Ipv6Addr::UNSPECIFIED);
396            let bind_port_v6 = self.bind_port_v6.unwrap_or(DEFAULT_BIND_PORT + 1);
397            let socket_address_v4 = SocketAddrV4::new(bind_ip_v4, bind_port_v4);
398            let socket_address_v6 = SocketAddrV6::new(bind_ip_v6, bind_port_v6, 0, 0);
399
400            Endpoint::builder()
401                .transport_config(transport_config)
402                .secret_key(secret_key.clone())
403                .relay_mode(relay_mode)
404                .bind_addr_v4(socket_address_v4)
405                .bind_addr_v6(socket_address_v6)
406                .bind()
407                .await?
408        };
409
410        let node_addr = endpoint.node_addr().await?;
411
412        let gossip = Gossip::from_endpoint(
413            endpoint.clone(),
414            self.gossip_config.unwrap_or_default(),
415            &node_addr.info,
416        );
417
418        let engine = Engine::new(
419            secret_key.clone(),
420            self.network_id,
421            endpoint.clone(),
422            gossip.clone(),
423            self.sync_config,
424        );
425
426        let sync_handler = engine.sync_handler();
427
428        let inner = Arc::new(NetworkInner {
429            cancel_token: CancellationToken::new(),
430            relay: relay.clone(),
431            discovery: self.discovery,
432            endpoint: endpoint.clone(),
433            engine,
434            gossip: gossip.clone(),
435            network_id: self.network_id,
436            secret_key,
437        });
438
439        self.protocols.insert(GOSSIP_ALPN, Arc::new(gossip.clone()));
440        if let Some(sync_handler) = sync_handler {
441            self.protocols
442                .insert(SYNC_CONNECTION_ALPN, Arc::new(sync_handler));
443        };
444        let protocols = Arc::new(self.protocols.clone());
445        let alpns = self.protocols.alpns();
446        if let Err(err) = inner.endpoint.set_alpns(alpns) {
447            inner.shutdown(protocols.clone()).await;
448            return Err(err);
449        }
450
451        let fut = inner
452            .clone()
453            .spawn(protocols.clone())
454            .instrument(error_span!("node", me=%node_addr.node_id.fmt_short()));
455        let task = tokio::task::spawn(fut);
456        let task_handle = AbortOnDropHandle::new(task)
457            .map_err(Box::new(|err: JoinError| err.to_string()) as JoinErrToStr)
458            .shared();
459
460        let network = Network {
461            inner,
462            task: task_handle,
463            protocols,
464        };
465
466        // Wait for a single direct address update, to make sure we found at least one direct
467        // address.
468        let wait_for_endpoints = {
469            async move {
470                tokio::time::timeout(DIRECT_ADDRESSES_WAIT, endpoint.direct_addresses().next())
471                    .await
472                    .context("waiting for endpoint")?
473                    .context("no endpoints given to establish at least one connection")?;
474                Ok(())
475            }
476        };
477
478        if let Err(err) = wait_for_endpoints.await {
479            network.shutdown().await.ok();
480            return Err(err);
481        }
482
483        for mut direct_addr in self.direct_node_addresses {
484            if direct_addr.relay_url().is_none() {
485                // If given address does not hold any relay information we optimistically add ours
486                // (if we have one). It's not guaranteed that this address will have the same relay
487                // url as we have, but it's better than nothing!
488                if let Some(ref relay_node) = relay {
489                    direct_addr = direct_addr.with_relay_url(relay_node.url.clone());
490                }
491            }
492
493            network.add_peer(direct_addr.clone()).await?;
494        }
495
496        Ok(network)
497    }
498}
499
500#[derive(Debug)]
501struct NetworkInner<T> {
502    cancel_token: CancellationToken,
503    relay: Option<RelayNode>,
504    discovery: DiscoveryMap,
505    endpoint: Endpoint,
506    engine: Engine<T>,
507    #[allow(dead_code)]
508    gossip: Gossip,
509    network_id: NetworkId,
510    #[allow(dead_code)]
511    secret_key: SecretKey,
512}
513
514impl<T> NetworkInner<T>
515where
516    T: TopicQuery + TopicId + 'static,
517{
518    /// Spawns a network.
519    ///
520    /// Local network sockets are bound and a task is started to listen for direct addresses
521    /// changes for the local endpoint. Inbound connection attempts to these endpoints are passed
522    /// to a handler.
523    ///
524    /// Any registered discovery services are subscribed to so that the identifiers and addresses
525    /// of peers operating on the same network may be learned. Discovered peers are added to the
526    /// local address book so they may be involved in connection and gossip activites.
527    async fn spawn(self: Arc<Self>, protocols: Arc<ProtocolMap>) {
528        let (ipv4, ipv6) = self.endpoint.bound_sockets();
529        debug!(
530            "listening at: {}{}",
531            ipv4,
532            ipv6.map(|addr| format!(" and {addr}")).unwrap_or_default()
533        );
534
535        let mut join_set = JoinSet::<Result<()>>::new();
536
537        // Spawn a task that updates the gossip endpoints and discovery services.
538        {
539            let inner = self.clone();
540            join_set.spawn(async move {
541                let mut addrs_stream = inner.endpoint.direct_addresses();
542                let mut my_node_addr = NodeAddr::new(inner.endpoint.node_id());
543                if let Some(node) = &inner.relay {
544                    my_node_addr = my_node_addr.with_relay_url(node.url.to_owned());
545                }
546
547                loop {
548                    tokio::select! {
549                        // Learn about our direct addresses and changes to them.
550                        Some(endpoints) = addrs_stream.next() => {
551                            let direct_addresses = endpoints.iter().map(|endpoint| endpoint.addr).collect();
552                            my_node_addr.info.direct_addresses = direct_addresses;
553                            if let Err(err) = inner.discovery.update_local_address(&my_node_addr) {
554                                warn!("failed to update direct addresses for discovery: {err:?}");
555                            }
556                        },
557                        else => break,
558                    }
559                }
560
561                Ok(())
562            });
563        }
564
565        // Subscribe to all discovery channels where we might find new peers.
566        let mut discovery_stream = self
567            .discovery
568            .subscribe(self.network_id)
569            .expect("discovery map needs to be given");
570
571        loop {
572            tokio::select! {
573                // Do not let tokio select futures randomly but with top-to-bottom priority.
574                biased;
575                // Exit loop when shutdown was signalled somewhere else.
576                _ = self.cancel_token.cancelled() => {
577                    break;
578                },
579                // Handle incoming p2p connections.
580                Some(incoming) = self.endpoint.accept() => {
581                    // @TODO: This is the point at which we can reject the connection if limits
582                    // have been reached.
583                    let connecting = match incoming.accept() {
584                        Ok(connecting) => connecting,
585                        Err(err) => {
586                            warn!("incoming connection failed: {err:#}");
587                            // This may be caused by retransmitted datagrams so we continue.
588                            continue;
589                        },
590                    };
591                    let protocols = protocols.clone();
592                    join_set.spawn(async move {
593                        handle_connection(connecting, protocols).await;
594                        Ok(())
595                    });
596                },
597                // Handle discovered peers.
598                Some(event) = discovery_stream.next() => {
599                    match event {
600                        Ok(event) => {
601                            if let Err(err) = self.engine.add_peer(event.node_addr).await {
602                                error!("engine failed on add_peer: {err:?}");
603                                break;
604                            }
605                        }
606                        Err(err) => {
607                            error!("discovery service failed: {err:?}");
608                            break;
609                        },
610                    }
611                },
612                // Handle task terminations and quit on panics.
613                res = join_set.join_next(), if !join_set.is_empty() => {
614                    match res {
615                        Some(Err(outer)) => {
616                            if outer.is_panic() {
617                                error!("task panicked: {outer:?}");
618                                break;
619                            } else if outer.is_cancelled() {
620                                debug!("task cancelled: {outer:?}");
621                            } else {
622                                error!("task failed: {outer:?}");
623                                break;
624                            }
625                        }
626                        Some(Ok(Err(inner))) => {
627                            debug!("task errored: {inner:?}");
628                        }
629                        _ => {}
630                    }
631                },
632                else => break,
633            }
634        }
635
636        self.shutdown(protocols).await;
637
638        // Abort remaining tasks.
639        join_set.shutdown().await;
640    }
641
642    /// Closes all connections and shuts down the network engine.
643    async fn shutdown(&self, protocols: Arc<ProtocolMap>) {
644        // We ignore all errors during shutdown.
645        debug!("close all connections and shutdown the node");
646        let _ = tokio::join!(
647            // Closing the Endpoint is the equivalent of calling `Connection::close` on all
648            // connections: Operations will immediately fail with `ConnectionError::LocallyClosed`.
649            // All streams are interrupted, this is not graceful.
650            self.endpoint
651                .clone()
652                .close(1u32.into(), b"provider terminating"),
653            self.engine.shutdown(),
654            protocols.shutdown(),
655        );
656    }
657}
658
659/// Running peer-to-peer node.
660///
661/// The primary feature of the `Network` is the ability to subscribe to one or more topics and
662/// exchange messages over those topics with remote peers. Replication can be conducted exclusively
663/// in "live-mode" or may include the synchronisation of past state, thereby ensuring eventual
664/// consistency among all peers for a given topic. Replication and discovery strategies are defined
665/// in the `NetworkBuilder`.
666///
667/// In addition to topic subscription, `Network` offers a way to access information about the local
668/// network such as the node ID and direct addresses. It also provides a convenient means to add the
669/// address of a remote peer and to query the addresses of all known peers.
670#[derive(Clone, Debug)]
671pub struct Network<T> {
672    inner: Arc<NetworkInner<T>>,
673    #[allow(dead_code)]
674    protocols: Arc<ProtocolMap>,
675    // `Network` needs to be `Clone + Send` and we need to `task.await` in its `shutdown()` impl.
676    // - `Shared` allows us to `task.await` from all `Network` clones
677    //   - Acts like an `Arc` around the inner future
678    // - `MapErr` is needed to map the `JoinError` to a `String`, since `JoinError` is `!Clone`
679    // - `AbortOnDropHandle` ensures the `task` is cancelled when all `Network`s are dropped
680    task: Shared<MapErr<AbortOnDropHandle<()>, JoinErrToStr>>,
681}
682
683impl<T> Network<T>
684where
685    T: TopicQuery + TopicId + 'static,
686{
687    /// Adds a peer to the address book.
688    pub async fn add_peer(&self, node_addr: NodeAddr) -> Result<()> {
689        self.inner.engine.add_peer(node_addr).await
690    }
691
692    /// Returns the addresses of all known peers.
693    pub async fn known_peers(&self) -> Result<Vec<NodeAddr>> {
694        self.inner.engine.known_peers().await
695    }
696
697    /// Returns the direct addresses of this node.
698    pub async fn direct_addresses(&self) -> Option<Vec<SocketAddr>> {
699        self.inner
700            .endpoint
701            .direct_addresses()
702            .next()
703            .await
704            .map(|addrs| addrs.into_iter().map(|direct| direct.addr).collect())
705    }
706
707    /// Returns a handle to the network endpoint.
708    ///
709    /// The `Endpoint` exposes low-level networking functionality such as the ability to connect to
710    /// specific peers, accept connections, query local socket addresses and more.
711    ///
712    /// This level of control is unlikely to be required in most cases but has been exposed for the
713    /// convenience of advanced users.
714    pub fn endpoint(&self) -> &Endpoint {
715        &self.inner.endpoint
716    }
717
718    /// Returns the public key of the node.
719    pub fn node_id(&self) -> PublicKey {
720        PublicKey::from_bytes(self.inner.endpoint.node_id().as_bytes())
721            .expect("public key already checked")
722    }
723
724    /// Terminates all internal tasks and shuts down the node.
725    pub async fn shutdown(self) -> Result<()> {
726        // Trigger shutdown of the main run task by activating the cancel token.
727        self.inner.cancel_token.cancel();
728
729        // Wait for the main task to terminate.
730        self.task.await.map_err(|err| anyhow!(err))?;
731
732        Ok(())
733    }
734
735    /// Subscribes to a topic and returns a bi-directional stream that can be read from and written
736    /// to, along with a oneshot receiver to be informed when the gossip overlay has been joined.
737    pub async fn subscribe(
738        &self,
739        topic: T,
740    ) -> Result<(
741        mpsc::Sender<ToNetwork>,
742        mpsc::Receiver<FromNetwork>,
743        oneshot::Receiver<()>,
744    )> {
745        let (to_network_tx, to_network_rx) = mpsc::channel::<ToNetwork>(128);
746        let (from_network_tx, from_network_rx) = mpsc::channel::<FromNetwork>(128);
747        let (gossip_ready_tx, gossip_ready_rx) = oneshot::channel();
748
749        self.inner
750            .engine
751            .subscribe(topic, from_network_tx, to_network_rx, gossip_ready_tx)
752            .await?;
753
754        Ok((to_network_tx, from_network_rx, gossip_ready_rx))
755    }
756}
757
758/// An event to be broadcast to the network.
759#[derive(Clone, Debug)]
760pub enum ToNetwork {
761    Message { bytes: Vec<u8> },
762}
763
764/// An event received from the network.
765#[allow(clippy::large_enum_variant)]
766#[derive(Clone, Debug, Eq, PartialEq)]
767pub enum FromNetwork {
768    GossipMessage {
769        bytes: Vec<u8>,
770        delivered_from: PublicKey,
771    },
772    SyncMessage {
773        header: Vec<u8>,
774        payload: Option<Vec<u8>>,
775        delivered_from: PublicKey,
776    },
777}
778
779/// Handle an inbound connection on the local network endpoint.
780///
781/// The connection is accepted if the handshake is successful and the peer is operating with
782/// a supported ALPN protocol.
783async fn handle_connection(
784    mut connecting: iroh_net::endpoint::Connecting,
785    protocols: Arc<ProtocolMap>,
786) {
787    let alpn = match connecting.alpn().await {
788        Ok(alpn) => alpn,
789        Err(err) => {
790            warn!("ignoring connection: invalid handshake: {:?}", err);
791            return;
792        }
793    };
794    let Some(handler) = protocols.get(&alpn) else {
795        warn!("ignoring connection: unsupported alpn protocol");
796        return;
797    };
798    if let Err(err) = handler.accept(connecting).await {
799        warn!("handling incoming connection ended with error: {err}");
800    }
801}
802
803/// Helper to construct shared `AbortOnDropHandle` coming from tokio crate.
804pub(crate) type JoinErrToStr =
805    Box<dyn Fn(tokio::task::JoinError) -> String + Send + Sync + 'static>;
806
807#[cfg(test)]
808pub(crate) mod sync_protocols {
809    use std::sync::Arc;
810
811    use async_trait::async_trait;
812    use futures_lite::{AsyncRead, AsyncWrite, StreamExt};
813    use futures_util::{Sink, SinkExt};
814    use p2panda_sync::cbor::{into_cbor_sink, into_cbor_stream};
815    use p2panda_sync::{FromSync, SyncError, SyncProtocol};
816    use serde::{Deserialize, Serialize};
817    use tracing::debug;
818
819    use super::tests::TestTopic;
820
821    #[derive(Debug, Serialize, Deserialize)]
822    enum DummyProtocolMessage {
823        TopicQuery(TestTopic),
824        Done,
825    }
826
827    /// A sync implementation which fulfills basic protocol requirements but nothing more
828    #[derive(Debug)]
829    pub struct DummyProtocol {}
830
831    #[async_trait]
832    impl<'a> SyncProtocol<'a, TestTopic> for DummyProtocol {
833        fn name(&self) -> &'static str {
834            static DUMMY_PROTOCOL_NAME: &str = "dummy_protocol";
835            DUMMY_PROTOCOL_NAME
836        }
837        async fn initiate(
838            self: Arc<Self>,
839            topic_query: TestTopic,
840            tx: Box<&'a mut (dyn AsyncWrite + Send + Unpin)>,
841            rx: Box<&'a mut (dyn AsyncRead + Send + Unpin)>,
842            mut app_tx: Box<
843                &'a mut (dyn Sink<FromSync<TestTopic>, Error = SyncError> + Send + Unpin),
844            >,
845        ) -> Result<(), SyncError> {
846            debug!("DummyProtocol: initiate sync session");
847
848            let mut sink = into_cbor_sink(tx);
849            let mut stream = into_cbor_stream(rx);
850
851            sink.send(DummyProtocolMessage::TopicQuery(topic_query.clone()))
852                .await?;
853            sink.send(DummyProtocolMessage::Done).await?;
854            app_tx.send(FromSync::HandshakeSuccess(topic_query)).await?;
855
856            while let Some(result) = stream.next().await {
857                let message: DummyProtocolMessage = result?;
858                debug!("message received: {:?}", message);
859
860                match &message {
861                    DummyProtocolMessage::TopicQuery(_) => panic!(),
862                    DummyProtocolMessage::Done => break,
863                }
864            }
865
866            sink.flush().await?;
867            app_tx.flush().await?;
868
869            Ok(())
870        }
871
872        async fn accept(
873            self: Arc<Self>,
874            tx: Box<&'a mut (dyn AsyncWrite + Send + Unpin)>,
875            rx: Box<&'a mut (dyn AsyncRead + Send + Unpin)>,
876            mut app_tx: Box<
877                &'a mut (dyn Sink<FromSync<TestTopic>, Error = SyncError> + Send + Unpin),
878            >,
879        ) -> Result<(), SyncError> {
880            debug!("DummyProtocol: accept sync session");
881
882            let mut sink = into_cbor_sink(tx);
883            let mut stream = into_cbor_stream(rx);
884
885            while let Some(result) = stream.next().await {
886                let message: DummyProtocolMessage = result?;
887                debug!("message received: {:?}", message);
888
889                match &message {
890                    DummyProtocolMessage::TopicQuery(topic_query) => {
891                        app_tx
892                            .send(FromSync::HandshakeSuccess(topic_query.clone()))
893                            .await?
894                    }
895                    DummyProtocolMessage::Done => break,
896                }
897            }
898
899            sink.send(DummyProtocolMessage::Done).await?;
900
901            sink.flush().await?;
902            app_tx.flush().await?;
903
904            Ok(())
905        }
906    }
907
908    // The protocol message types.
909    #[derive(Serialize, Deserialize)]
910    enum Message {
911        TopicQuery(TestTopic),
912        Ping,
913        Pong,
914    }
915
916    #[derive(Debug, Clone)]
917    pub struct PingPongProtocol {}
918
919    /// A ping-pong sync protocol
920    #[async_trait]
921    impl<'a> SyncProtocol<'a, TestTopic> for PingPongProtocol {
922        fn name(&self) -> &'static str {
923            static SIMPLE_PROTOCOL_NAME: &str = "simple_protocol";
924            SIMPLE_PROTOCOL_NAME
925        }
926
927        async fn initiate(
928            self: Arc<Self>,
929            topic_query: TestTopic,
930            tx: Box<&'a mut (dyn AsyncWrite + Send + Unpin)>,
931            rx: Box<&'a mut (dyn AsyncRead + Send + Unpin)>,
932            mut app_tx: Box<
933                &'a mut (dyn Sink<FromSync<TestTopic>, Error = SyncError> + Send + Unpin),
934            >,
935        ) -> Result<(), SyncError> {
936            debug!("initiate sync session");
937            let mut sink = into_cbor_sink(tx);
938            let mut stream = into_cbor_stream(rx);
939
940            sink.send(Message::TopicQuery(topic_query.clone())).await?;
941            sink.send(Message::Ping).await?;
942            debug!("ping message sent");
943
944            app_tx.send(FromSync::HandshakeSuccess(topic_query)).await?;
945
946            while let Some(result) = stream.next().await {
947                let message = result?;
948
949                match message {
950                    Message::TopicQuery(_) => panic!(),
951                    Message::Ping => {
952                        return Err(SyncError::UnexpectedBehaviour(
953                            "unexpected Ping message received".to_string(),
954                        ));
955                    }
956                    Message::Pong => {
957                        debug!("pong message received");
958                        app_tx
959                            .send(FromSync::Data {
960                                header: "PONG".as_bytes().to_owned(),
961                                payload: None,
962                            })
963                            .await
964                            .unwrap();
965                        break;
966                    }
967                }
968            }
969
970            // Flush all bytes so that no messages are lost.
971            sink.flush().await?;
972            app_tx.flush().await?;
973
974            Ok(())
975        }
976
977        async fn accept(
978            self: Arc<Self>,
979            tx: Box<&'a mut (dyn AsyncWrite + Send + Unpin)>,
980            rx: Box<&'a mut (dyn AsyncRead + Send + Unpin)>,
981            mut app_tx: Box<
982                &'a mut (dyn Sink<FromSync<TestTopic>, Error = SyncError> + Send + Unpin),
983            >,
984        ) -> Result<(), SyncError> {
985            debug!("accept sync session");
986            let mut sink = into_cbor_sink(tx);
987            let mut stream = into_cbor_stream(rx);
988
989            while let Some(result) = stream.next().await {
990                let message = result?;
991
992                match message {
993                    Message::TopicQuery(topic_query) => {
994                        app_tx.send(FromSync::HandshakeSuccess(topic_query)).await?
995                    }
996                    Message::Ping => {
997                        debug!("ping message received");
998                        app_tx
999                            .send(FromSync::Data {
1000                                header: "PING".as_bytes().to_owned(),
1001                                payload: None,
1002                            })
1003                            .await
1004                            .unwrap();
1005
1006                        sink.send(Message::Pong).await?;
1007                        debug!("pong message sent");
1008                        break;
1009                    }
1010                    Message::Pong => {
1011                        return Err(SyncError::UnexpectedBehaviour(
1012                            "unexpected Pong message received".to_string(),
1013                        ));
1014                    }
1015                }
1016            }
1017
1018            sink.flush().await?;
1019            app_tx.flush().await?;
1020
1021            Ok(())
1022        }
1023    }
1024}
1025
1026#[cfg(test)]
1027pub(crate) mod tests {
1028    use std::collections::HashMap;
1029    use std::net::{Ipv4Addr, Ipv6Addr};
1030    use std::path::PathBuf;
1031    use std::time::Duration;
1032
1033    use async_trait::async_trait;
1034    use iroh_net::relay::{RelayNode, RelayUrl as IrohRelayUrl};
1035    use p2panda_core::{Body, Extensions, Hash, Header, PrivateKey, PublicKey};
1036    use p2panda_store::{MemoryStore, OperationStore};
1037    use p2panda_sync::log_sync::{LogSyncProtocol, TopicLogMap};
1038    use p2panda_sync::TopicQuery;
1039    use serde::{Deserialize, Serialize};
1040    use tokio::task::JoinHandle;
1041    use tracing_subscriber::layer::SubscriberExt;
1042    use tracing_subscriber::util::SubscriberInitExt;
1043    use tracing_subscriber::EnvFilter;
1044
1045    use crate::addrs::DEFAULT_STUN_PORT;
1046    use crate::bytes::ToBytes;
1047    use crate::config::Config;
1048    use crate::network::sync_protocols::PingPongProtocol;
1049    use crate::sync::SyncConfiguration;
1050    use crate::{NetworkBuilder, RelayMode, RelayUrl, TopicId};
1051
1052    use super::{FromNetwork, Network, ToNetwork};
1053
1054    fn setup_logging() {
1055        tracing_subscriber::registry()
1056            .with(tracing_subscriber::fmt::layer().with_writer(std::io::stderr))
1057            .with(EnvFilter::from_default_env())
1058            .try_init()
1059            .ok();
1060    }
1061
1062    fn create_operation<E: Extensions>(
1063        private_key: &PrivateKey,
1064        body: &Body,
1065        seq_num: u64,
1066        timestamp: u64,
1067        backlink: Option<Hash>,
1068        extensions: Option<E>,
1069    ) -> (Hash, Header<E>, Vec<u8>) {
1070        let mut header = Header {
1071            version: 1,
1072            public_key: private_key.public_key(),
1073            signature: None,
1074            payload_size: body.size(),
1075            payload_hash: Some(body.hash()),
1076            timestamp,
1077            seq_num,
1078            backlink,
1079            previous: vec![],
1080            extensions,
1081        };
1082        header.sign(private_key);
1083        let header_bytes = header.to_bytes();
1084        (header.hash(), header, header_bytes)
1085    }
1086
1087    #[derive(Clone, Debug, PartialEq, Eq, Hash, Deserialize, Serialize)]
1088    pub struct TestTopic(String, [u8; 32]);
1089
1090    impl TestTopic {
1091        pub fn new(name: &str) -> Self {
1092            Self(name.to_owned(), [0; 32])
1093        }
1094    }
1095
1096    impl TopicQuery for TestTopic {}
1097
1098    impl TopicId for TestTopic {
1099        fn id(&self) -> [u8; 32] {
1100            self.1
1101        }
1102    }
1103
1104    #[tokio::test]
1105    async fn config() {
1106        let direct_node_public_key = PrivateKey::new().public_key();
1107        let relay_address: RelayUrl = "https://example.net".parse().unwrap();
1108
1109        let config = Config {
1110            bind_ip_v4: Ipv4Addr::new(7, 7, 7, 7),
1111            bind_port_v4: 2024,
1112            bind_ip_v6: Ipv6Addr::new(8, 8, 8, 8, 8, 8, 8, 8),
1113            bind_port_v6: 2025,
1114            network_id: [1; 32],
1115            private_key: Some(PathBuf::new().join("secret-key.txt")),
1116            direct_node_addresses: vec![(
1117                direct_node_public_key,
1118                vec!["0.0.0.0:2026".parse().unwrap()],
1119                None,
1120            )],
1121            relay: Some(relay_address.clone()),
1122        };
1123
1124        let builder = NetworkBuilder::<TestTopic>::from_config(config);
1125
1126        assert_eq!(builder.bind_ip_v4, Some(Ipv4Addr::new(7, 7, 7, 7)));
1127        assert_eq!(builder.bind_port_v4, Some(2024));
1128        assert_eq!(
1129            builder.bind_ip_v6,
1130            Some(Ipv6Addr::new(8, 8, 8, 8, 8, 8, 8, 8))
1131        );
1132        assert_eq!(builder.bind_port_v6, Some(2025));
1133        assert_eq!(builder.network_id, [1; 32]);
1134        assert!(builder.secret_key.is_none());
1135        assert_eq!(builder.direct_node_addresses.len(), 1);
1136        let relay_node = RelayNode {
1137            url: IrohRelayUrl::from(relay_address),
1138            stun_only: false,
1139            stun_port: DEFAULT_STUN_PORT,
1140        };
1141        assert_eq!(builder.relay_mode, RelayMode::Custom(relay_node));
1142    }
1143
1144    #[tokio::test]
1145    async fn join_gossip_overlay() {
1146        setup_logging();
1147
1148        let network_id = [1; 32];
1149        let topic = TestTopic::new("chat");
1150
1151        let node_1 = NetworkBuilder::new(network_id).build().await.unwrap();
1152        let node_2 = NetworkBuilder::new(network_id).build().await.unwrap();
1153
1154        let node_1_addr = node_1.endpoint().node_addr().await.unwrap();
1155        let node_2_addr = node_2.endpoint().node_addr().await.unwrap();
1156
1157        node_1.add_peer(node_2_addr).await.unwrap();
1158        node_2.add_peer(node_1_addr).await.unwrap();
1159
1160        // Subscribe to the same topic from both nodes
1161        let (tx_1, _rx_1, ready_1) = node_1.subscribe(topic.clone()).await.unwrap();
1162        let (_tx_2, mut rx_2, ready_2) = node_2.subscribe(topic).await.unwrap();
1163
1164        // Ensure the gossip-overlay has been joined by both nodes for the given topic
1165        assert!(ready_2.await.is_ok());
1166        assert!(ready_1.await.is_ok());
1167
1168        // Broadcast a message and make sure it's received by the other node
1169        tx_1.send(ToNetwork::Message {
1170            bytes: "Hello, Node".to_bytes(),
1171        })
1172        .await
1173        .unwrap();
1174
1175        let rx_2_msg = rx_2.recv().await.unwrap();
1176        assert_eq!(
1177            rx_2_msg,
1178            FromNetwork::GossipMessage {
1179                bytes: "Hello, Node".to_bytes(),
1180                delivered_from: node_1.node_id(),
1181            }
1182        );
1183
1184        node_1.shutdown().await.unwrap();
1185        node_2.shutdown().await.unwrap();
1186    }
1187
1188    #[tokio::test]
1189    async fn ping_pong() {
1190        setup_logging();
1191
1192        let network_id = [1; 32];
1193        let topic = TestTopic::new("ping_pong");
1194        let ping_pong = PingPongProtocol {};
1195        let sync_config = SyncConfiguration::new(ping_pong);
1196
1197        let node_1 = NetworkBuilder::new(network_id)
1198            .sync(sync_config.clone())
1199            .build()
1200            .await
1201            .unwrap();
1202        let node_2 = NetworkBuilder::new(network_id)
1203            .sync(sync_config)
1204            .build()
1205            .await
1206            .unwrap();
1207
1208        let node_1_addr = node_1.endpoint().node_addr().await.unwrap();
1209        let node_2_addr = node_2.endpoint().node_addr().await.unwrap();
1210
1211        node_1.add_peer(node_2_addr).await.unwrap();
1212        node_2.add_peer(node_1_addr).await.unwrap();
1213
1214        // Subscribe to the same topic from both nodes which should kick off sync
1215        let topic_clone = topic.clone();
1216        let handle1 = tokio::spawn(async move {
1217            let (_tx, _rx, _ready) = node_1.subscribe(topic_clone).await.unwrap();
1218            tokio::time::sleep(Duration::from_secs(2)).await;
1219            node_1.shutdown().await.unwrap();
1220        });
1221        let handle2 = tokio::spawn(async move {
1222            let (_tx, _rx, _ready) = node_2.subscribe(topic).await.unwrap();
1223            tokio::time::sleep(Duration::from_secs(2)).await;
1224            node_2.shutdown().await.unwrap();
1225        });
1226
1227        let (result1, result2) = tokio::join!(handle1, handle2);
1228        assert!(result1.is_ok());
1229        assert!(result2.is_ok());
1230    }
1231
1232    type Logs<T> = HashMap<PublicKey, Vec<T>>;
1233
1234    #[derive(Clone, Debug)]
1235    struct LogIdTopicMap<T>(HashMap<T, Logs<u64>>);
1236
1237    impl<T> LogIdTopicMap<T>
1238    where
1239        T: TopicQuery,
1240    {
1241        pub fn new() -> Self {
1242            LogIdTopicMap(HashMap::new())
1243        }
1244
1245        fn insert(&mut self, topic: T, logs: Logs<u64>) -> Option<Logs<u64>> {
1246            self.0.insert(topic, logs)
1247        }
1248    }
1249
1250    #[async_trait]
1251    impl<T> TopicLogMap<T, u64> for LogIdTopicMap<T>
1252    where
1253        T: TopicQuery,
1254    {
1255        async fn get(&self, topic: &T) -> Option<Logs<u64>> {
1256            self.0.get(topic).cloned()
1257        }
1258    }
1259
1260    #[tokio::test]
1261    async fn e2e_log_height_sync() {
1262        setup_logging();
1263
1264        const NETWORK_ID: [u8; 32] = [1; 32];
1265
1266        let peer_a_private_key = PrivateKey::new();
1267        let peer_b_private_key = PrivateKey::new();
1268
1269        let topic = TestTopic::new("event_logs");
1270        let log_id = 0;
1271        let logs = HashMap::from([(peer_a_private_key.public_key(), vec![log_id])]);
1272
1273        let mut topic_map = LogIdTopicMap::new();
1274        topic_map.insert(topic.clone(), logs);
1275
1276        // Construct a store and log height protocol for peer a.
1277        let store_a = MemoryStore::default();
1278        let protocol_a = LogSyncProtocol::new(topic_map.clone(), store_a);
1279        let sync_config_a = SyncConfiguration::new(protocol_a);
1280
1281        // Create some operations.
1282        let body = Body::new("Hello, Sloth!".as_bytes());
1283        let (hash_0, header_0, header_bytes_0) =
1284            create_operation(&peer_a_private_key, &body, 0, 0, None, None);
1285        let (hash_1, header_1, header_bytes_1) =
1286            create_operation(&peer_a_private_key, &body, 1, 100, Some(hash_0), None);
1287        let (hash_2, header_2, header_bytes_2) =
1288            create_operation(&peer_a_private_key, &body, 2, 200, Some(hash_1), None);
1289
1290        // Create store for peer b and populate with operations.
1291        let mut store_b = MemoryStore::default();
1292        store_b
1293            .insert_operation(hash_0, &header_0, Some(&body), &header_bytes_0, &log_id)
1294            .await
1295            .unwrap();
1296        store_b
1297            .insert_operation(hash_1, &header_1, Some(&body), &header_bytes_1, &log_id)
1298            .await
1299            .unwrap();
1300        store_b
1301            .insert_operation(hash_2, &header_2, Some(&body), &header_bytes_2, &log_id)
1302            .await
1303            .unwrap();
1304
1305        // Construct log height protocol for peer b.
1306        let protocol_b = LogSyncProtocol::new(topic_map, store_b);
1307        let sync_config_b = SyncConfiguration::new(protocol_b);
1308
1309        // Build peer a's node
1310        let node_a = NetworkBuilder::new(NETWORK_ID)
1311            .sync(sync_config_a)
1312            .private_key(peer_a_private_key)
1313            .build()
1314            .await
1315            .unwrap();
1316
1317        // Build peer b's node
1318        let node_b = NetworkBuilder::new(NETWORK_ID)
1319            .sync(sync_config_b)
1320            .private_key(peer_b_private_key.clone())
1321            .build()
1322            .await
1323            .unwrap();
1324
1325        let node_a_addr = node_a.endpoint().node_addr().await.unwrap();
1326        let node_b_addr = node_b.endpoint().node_addr().await.unwrap();
1327
1328        node_a.add_peer(node_b_addr).await.unwrap();
1329        node_b.add_peer(node_a_addr).await.unwrap();
1330
1331        // Subscribe to the same topic from both nodes which should kick off sync.
1332        let topic_clone = topic.clone();
1333        let handle1 = tokio::spawn(async move {
1334            let (_tx, mut from_sync_rx, ready) = node_a.subscribe(topic_clone).await.unwrap();
1335
1336            // Wait until the gossip overlay has been joined for TOPIC_ID.
1337            assert!(ready.await.is_ok());
1338
1339            let mut from_sync_messages = Vec::new();
1340            while let Some(message) = from_sync_rx.recv().await {
1341                from_sync_messages.push(message);
1342                if from_sync_messages.len() == 3 {
1343                    break;
1344                }
1345            }
1346
1347            // Construct the messages we expect to receive on the from_sync channel based on the
1348            // operations we created earlier.
1349            let peer_a_expected_messages = vec![
1350                FromNetwork::SyncMessage {
1351                    header: header_bytes_0.to_vec(),
1352                    payload: Some(body.to_bytes()),
1353                    delivered_from: peer_b_private_key.public_key(),
1354                },
1355                FromNetwork::SyncMessage {
1356                    header: header_bytes_1.to_vec(),
1357                    payload: Some(body.to_bytes()),
1358                    delivered_from: peer_b_private_key.public_key(),
1359                },
1360                FromNetwork::SyncMessage {
1361                    header: header_bytes_2.to_vec(),
1362                    payload: Some(body.to_bytes()),
1363                    delivered_from: peer_b_private_key.public_key(),
1364                },
1365            ];
1366
1367            // Assert we receive the expected messages
1368            assert_eq!(from_sync_messages, peer_a_expected_messages);
1369
1370            node_a.shutdown().await.unwrap();
1371        });
1372
1373        let handle2 = tokio::spawn(async move {
1374            let (_tx, _from_sync_rx, ready) = node_b.subscribe(topic).await.unwrap();
1375
1376            // Wait until the gossip overlay has been joined for TOPIC_ID
1377            assert!(ready.await.is_ok());
1378
1379            // Sleep for a moment to ensure sync has time to complete
1380            tokio::time::sleep(Duration::from_secs(2)).await;
1381
1382            node_b.shutdown().await.unwrap();
1383        });
1384
1385        // Wait on both to complete
1386        let (result1, result2) = tokio::join!(handle1, handle2);
1387
1388        assert!(result1.is_ok());
1389        assert!(result2.is_ok())
1390    }
1391
1392    #[tokio::test]
1393    async fn multi_hop_join_gossip_overlay() {
1394        setup_logging();
1395
1396        let network_id = [1; 32];
1397        let chat_topic = TestTopic::new("chat");
1398
1399        let node_1 = NetworkBuilder::new(network_id).build().await.unwrap();
1400        let node_2 = NetworkBuilder::new(network_id).build().await.unwrap();
1401        let node_3 = NetworkBuilder::new(network_id).build().await.unwrap();
1402
1403        let node_1_addr = node_1.endpoint().node_addr().await.unwrap();
1404        let node_2_addr = node_2.endpoint().node_addr().await.unwrap();
1405
1406        node_1.add_peer(node_2_addr.clone()).await.unwrap();
1407        node_2.add_peer(node_1_addr).await.unwrap();
1408        node_3.add_peer(node_2_addr).await.unwrap();
1409
1410        // Subscribe to the same topic from all nodes
1411        let (tx_1, _rx_1, ready_1) = node_1.subscribe(chat_topic.clone()).await.unwrap();
1412        let (_tx_2, mut rx_2, ready_2) = node_2.subscribe(chat_topic.clone()).await.unwrap();
1413        let (_tx_3, mut rx_3, ready_3) = node_3.subscribe(chat_topic).await.unwrap();
1414
1415        // Ensure the gossip-overlay has been joined by all three nodes for the given topic
1416        assert!(ready_3.await.is_ok());
1417        assert!(ready_2.await.is_ok());
1418        assert!(ready_1.await.is_ok());
1419
1420        // Broadcast a message and make sure it's received by the other nodes
1421        tx_1.send(ToNetwork::Message {
1422            bytes: "Hello, Node".to_bytes(),
1423        })
1424        .await
1425        .unwrap();
1426
1427        let rx_2_msg = rx_2.recv().await.unwrap();
1428        assert_eq!(
1429            rx_2_msg,
1430            FromNetwork::GossipMessage {
1431                bytes: "Hello, Node".to_bytes(),
1432                // Node 2 receives the message and it is delivered by node 1
1433                delivered_from: node_1.node_id(),
1434            }
1435        );
1436
1437        let rx_3_msg = rx_3.recv().await.unwrap();
1438        assert_eq!(
1439            rx_3_msg,
1440            FromNetwork::GossipMessage {
1441                bytes: "Hello, Node".to_bytes(),
1442                // Node 3 receives the message and it is also delivered by node 1
1443                delivered_from: node_1.node_id(),
1444            }
1445        );
1446
1447        node_1.shutdown().await.unwrap();
1448        node_2.shutdown().await.unwrap();
1449        node_3.shutdown().await.unwrap();
1450    }
1451
1452    fn run_node<T: TopicId + TopicQuery + 'static>(node: Network<T>, topic: T) -> JoinHandle<()> {
1453        tokio::spawn(async move {
1454            let (_tx, mut rx, ready) = node.subscribe(topic).await.unwrap();
1455
1456            // Await the ready signal so we know the gossip overlay has been joined.
1457            assert!(ready.await.is_ok());
1458
1459            // Await at least one message received via sync.
1460            loop {
1461                let msg = rx.recv().await.unwrap();
1462                println!("{msg:?}");
1463                match msg {
1464                    FromNetwork::SyncMessage { .. } => break,
1465                    _ => (),
1466                }
1467            }
1468
1469            // Give other nodes enough time to complete sync sessions.
1470            tokio::time::sleep(Duration::from_secs(3)).await;
1471            node.shutdown().await.unwrap();
1472        })
1473    }
1474    #[tokio::test]
1475    async fn multi_hop_topic_discovery_and_sync() {
1476        setup_logging();
1477
1478        let network_id = [1; 32];
1479        let topic = TestTopic::new("chat");
1480        let sync_config = SyncConfiguration::new(PingPongProtocol {});
1481
1482        // Create 4 nodes.
1483        let node_1 = NetworkBuilder::new(network_id)
1484            .sync(sync_config.clone())
1485            .build()
1486            .await
1487            .unwrap();
1488        let node_2 = NetworkBuilder::new(network_id)
1489            .sync(sync_config.clone())
1490            .build()
1491            .await
1492            .unwrap();
1493        let node_3 = NetworkBuilder::new(network_id)
1494            .sync(sync_config.clone())
1495            .build()
1496            .await
1497            .unwrap();
1498        let node_4 = NetworkBuilder::new(network_id)
1499            .sync(sync_config.clone())
1500            .build()
1501            .await
1502            .unwrap();
1503
1504        let node_1_addr = node_1.endpoint().node_addr().await.unwrap();
1505        let node_2_addr = node_2.endpoint().node_addr().await.unwrap();
1506        let node_3_addr = node_3.endpoint().node_addr().await.unwrap();
1507
1508        // All peers know about only one other peer.
1509        node_1.add_peer(node_2_addr.clone()).await.unwrap();
1510        node_2.add_peer(node_1_addr).await.unwrap();
1511        node_3.add_peer(node_2_addr.clone()).await.unwrap();
1512        node_4.add_peer(node_3_addr.clone()).await.unwrap();
1513
1514        // Run all nodes. We are testing that peers gracefully handle starting a sync session while
1515        // not knowing the other peer's address yet. Eventually all peers complete at least one
1516        // sync session.
1517        let handle1 = run_node(node_1, topic.clone());
1518        let handle2 = run_node(node_2, topic.clone());
1519        let handle3 = run_node(node_3, topic.clone());
1520        let handle4 = run_node(node_4, topic.clone());
1521
1522        let (result1, result2, result3, result4) = tokio::join!(handle1, handle2, handle3, handle4);
1523
1524        assert!(result1.is_ok());
1525        assert!(result2.is_ok());
1526        assert!(result3.is_ok());
1527        assert!(result4.is_ok());
1528    }
1529}