p2panda_net/
network.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2
3//! Node implementation for p2p networking and data streaming, extensible with discovery
4//! strategies, sync protocols, blob sync and more.
5//!
6//! As soon as the local node is launched it roughly attempts the following goal:
7//!
8//! 1. Find as many peers as possible who are interested in the same topic
9//! 2. Connect with as many peers as possible to exchange data with, eventually converging to the
10//!    same state
11//!
12//! To achieve this goal, multiple systems are in play:
13//!
14//! ## Bootstrap
15//!
16//! Establishing a peer-to-peer network suffers from the "chicken and egg" problem where we need to
17//! start _somewhere_ before we can begin to discover more and become "discoverable" for other,
18//! possibly previously unknown peers.
19//!
20//! This process of the "first step" into a network is called "bootstrap" and can be realised with
21//! different strategies:
22//!
23//! 1. Supply your node with a hard-coded list of well-known node addresses which are directly
24//!    reachable. The node will attempt connecting them on start.
25//! 2. Use techniques like mDNS or rendesvouz servers to learn about other nodes to connect to.
26//!
27//! The latter approach is very similar to "peer discovery" with one important difference: While
28//! peer discovery helps us to learn about _more peers_ we use these discovery techniques during
29//! bootstrap to find _the first_ peer to connect to.
30//!
31//! ## Peer Discovery
32//!
33//! Like "Bootstrap" we can apply similar algorithms to find more peers in the network. In
34//! `p2panda-net` this is an "ambient" process, meaning that it constantly takes place in the
35//! background, as soon as the node is running.
36//!
37//! To find more peers which potentially might be interested in the same data as the local node,
38//! one or more discovery techniques can be added when creating the network, for example mDNS for
39//! finding peers in the local network or a "Rendesvouz" server for finding peers on the internet.
40//!
41//! Additionally `p2panda-net` might find new peers through joining any gossip overlay. If another
42//! peer becomes a direct neighbor in the gossip tree, we register it in our address book.
43//!
44//! ## Topic Discovery
45//!
46//! Next to "Peer Discovery" we additionally find out what peers are interested in and announce our
47//! own topics of interest in the network. With this design it is possible to have peers in the
48//! network being interested in different data, potentially using other applications, at the same
49//! time.
50//!
51//! By default `p2panda-net` always enters at least one network-wide gossip overlay where peers
52//! exchange information over this information. In the future we might use a random-walk traversal
53//! algorithm instead to "explore" the network.
54//!
55//! As soon as we've identified a common interest in the same topic, we're joining the gossip
56//! overlay for this topic with them and earmark this peer for a future sync session.
57//!
58//! ## Connectivity
59//!
60//! With the help of iroh, we can connect to any device whereever they are. There is a multi-step
61//! process requiring additional strategies depending on the situation to connect to a peer:
62//!
63//! 1. If we know a peer's directly reachable address, we can just connect to them
64//! 2. If this peer's direct address is not known (because they are behind a NAT or we only know
65//!    their public key) we use a STUN server to find out the address
66//! 3. If this peer is still not reachable (for example because they are behind a Firewall), we use
67//!    a Relay (TURN-like) server to handle the connection through it
68//!
69//! In the last case we can't establish a direct connection and rely on additional infrastructure.
70//!
71//! Read the `iroh-relay` documentation to learn about how to run your own STUN and Relay server:
72//! <https://github.com/n0-computer/iroh/tree/main/iroh-relay>.
73//!
74//! This implementation aims at being robust in situations where bad or no connectivity is
75//! (temporarily) given. `p2panda-net` will automatically re-connect to peers as soon as they are
76//! reachable again which, from the application perspective, shouldn't make any difference.
77//!
78//! ## "Live Mode"
79//!
80//! To send newly created messages fastly to other peers, `p2panda-net` uses broadcast gossip
81//! overlays for each topic. With this "live mode" messages arrive almost instantly.
82//!
83//! ## Sync
84//!
85//! By learning about other peers who are interested in the same topic we keep track of them in an
86//! internal "address book". A sync session mananger process will eventually kick in a sync session
87//! with one of these peers and try to exchange _all_ data we're so far missing on that topic with
88//! them.
89//!
90//! Sync is disabled by default and can be enabled by adding a `SyncProtocol` implementation to the
91//! node.
92//!
93//! Sync sessions are only running once per peer per topic but can optionally be re-attempted after
94//! a certain duration if a `ResyncConfiguration` was given.
95//!
96//! ## Gossip Buffer
97//!
98//! Since a node receives potentially older data from another node during a sync session,
99//! `p2panda-net` uses a "gossip buffer" to make sure that we're buffering new messages which were
100//! received at the same time during the gossip overlay.
101//!
102//! The buffer is released after the sync session has ended. Through this trick we can make sure
103//! that messages are arriving "in order" (based on their timestamp or partial ordering for
104//! example) to higher application layers.
105//!
106//! Please note that this implementation can never fully be sure that messages will arrive "out of
107//! order" to the application. It is recommended to apply additional buffering if this is required.
108//! In `p2panda-streams` we offer a solution which will take care of that.
109//!
110//! ## Blobs
111//!
112//! With the help of the `p2panda-blobs` crate it is possible to extend the node to support
113//! efficient sync of large binary data (images, files etc.) with various storage backends.
114//!
115//! ## Custom Protocols
116//!
117//! Next to blob sync, data sync or discovery protocols it is also possible to register any other
118//! low-level bi-directional communication protocol to the node when necessary.
119use std::fmt::Debug;
120use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
121use std::sync::Arc;
122use std::time::Duration;
123
124use anyhow::{Context, Result, anyhow};
125use futures_lite::StreamExt;
126use futures_util::future::{MapErr, Shared};
127use futures_util::{FutureExt, TryFutureExt};
128use iroh::{Endpoint, RelayMap, RelayNode};
129use iroh_gossip::net::{GOSSIP_ALPN, Gossip};
130use iroh_quinn::TransportConfig;
131use p2panda_core::{PrivateKey, PublicKey};
132use p2panda_discovery::{Discovery, DiscoveryMap};
133use p2panda_sync::TopicQuery;
134use tokio::sync::{broadcast, mpsc, oneshot};
135use tokio::task::{JoinError, JoinSet};
136use tokio_util::sync::CancellationToken;
137use tokio_util::task::AbortOnDropHandle;
138use tracing::{Instrument, debug, error, error_span, warn};
139
140use crate::addrs::{DEFAULT_STUN_PORT, to_node_addr, to_relay_url};
141use crate::config::{Config, DEFAULT_BIND_PORT, GossipConfig};
142use crate::engine::Engine;
143use crate::events::SystemEvent;
144use crate::protocols::{ProtocolHandler, ProtocolMap};
145use crate::sync::{SYNC_CONNECTION_ALPN, SyncConfiguration};
146use crate::{NetworkId, NodeAddress, RelayUrl, TopicId, from_private_key};
147
148/// Maximum number of streams accepted on a QUIC connection.
149const MAX_STREAMS: u32 = 1024;
150
151/// Timeout duration for receiving of at least one peer's direct address.
152const DIRECT_ADDRESSES_WAIT: Duration = Duration::from_secs(5);
153
154/// Relay server configuration mode.
155#[derive(Debug, PartialEq)]
156pub enum RelayMode {
157    /// No relay has been specified.
158    ///
159    /// To connect to another peer it's direct address needs to be known, otherwise any connection
160    /// attempt will fail.
161    Disabled,
162
163    /// Specify a custom relay.
164    ///
165    /// Relays are used to help establishing a connection in case the direct address is not known
166    /// yet (via STUN). In case this process fails (for example due to a firewall), the relay is
167    /// used as a fallback to tunnel traffic from one peer to another (via DERP, which is similar
168    /// to TURN).
169    ///
170    /// Important: Peers need to use the _same_ relay address to be able to connect to each other.
171    Custom(RelayNode),
172}
173
174/// Builds an overlay network for peers grouped under the same network identifier.
175///
176/// All peers can subscribe to multiple topics in this overlay and hook into a data stream per
177/// topic where they'll send and receive data.
178#[derive(Debug)]
179pub struct NetworkBuilder<T> {
180    bind_ip_v4: Option<Ipv4Addr>,
181    bind_port_v4: Option<u16>,
182    bind_ip_v6: Option<Ipv6Addr>,
183    bind_port_v6: Option<u16>,
184    bootstrap: bool,
185    direct_node_addresses: Vec<NodeAddress>,
186    discovery: DiscoveryMap,
187    gossip_config: Option<GossipConfig>,
188    network_id: NetworkId,
189    protocols: ProtocolMap,
190    relay_mode: RelayMode,
191    private_key: Option<PrivateKey>,
192    sync_config: Option<SyncConfiguration<T>>,
193}
194
195impl<T> NetworkBuilder<T>
196where
197    T: TopicQuery,
198{
199    /// Returns a new instance of `NetworkBuilder` using the given network identifier.
200    ///
201    /// Networks must use the same identifier if they wish to successfully connect and share
202    /// data.
203    pub fn new(network_id: NetworkId) -> Self {
204        Self {
205            bind_ip_v4: None,
206            bind_port_v4: None,
207            bind_ip_v6: None,
208            bind_port_v6: None,
209            bootstrap: false,
210            direct_node_addresses: Vec::new(),
211            discovery: DiscoveryMap::default(),
212            gossip_config: None,
213            network_id,
214            protocols: Default::default(),
215            relay_mode: RelayMode::Disabled,
216            private_key: None,
217            sync_config: None,
218        }
219    }
220
221    /// Returns a new instance of `NetworkBuilder` using the given configuration.
222    pub fn from_config(config: Config) -> Self {
223        let mut network_builder = Self::new(config.network_id)
224            .bind_ip_v4(config.bind_ip_v4)
225            .bind_port_v4(config.bind_port_v4)
226            .bind_ip_v6(config.bind_ip_v6)
227            .bind_port_v6(config.bind_port_v6);
228
229        for addr in config.direct_node_addresses {
230            network_builder = network_builder.direct_address(
231                addr.public_key,
232                addr.direct_addresses,
233                addr.relay_url,
234            )
235        }
236
237        if let Some(url) = config.relay {
238            let port = url.port().unwrap_or(DEFAULT_STUN_PORT);
239            network_builder = network_builder.relay(url, false, port)
240        }
241
242        network_builder
243    }
244
245    /// Sets or overwrites the local IP for IPv4 sockets.
246    ///
247    /// Default is 0.0.0.0 (`UNSPECIFIED`).
248    pub fn bind_ip_v4(mut self, ip: Ipv4Addr) -> Self {
249        self.bind_ip_v4.replace(ip);
250        self
251    }
252
253    /// Sets or overwrites the local bind port for IPv4 sockets.
254    ///
255    /// Default is 2022.
256    pub fn bind_port_v4(mut self, port: u16) -> Self {
257        self.bind_port_v4.replace(port);
258        self
259    }
260
261    /// Sets or overwrites the local IP for IPv6 sockets.
262    ///
263    /// Default is :: (`UNSPECIFIED`).
264    pub fn bind_ip_v6(mut self, ip: Ipv6Addr) -> Self {
265        self.bind_ip_v6.replace(ip);
266        self
267    }
268
269    /// Sets or overwrites the local bind port for IPv6 sockets.
270    ///
271    /// Default is 2023.
272    pub fn bind_port_v6(mut self, port: u16) -> Self {
273        self.bind_port_v6.replace(port);
274        self
275    }
276
277    /// Sets the bootstrap flag.
278    ///
279    /// A bootstrap node is one which is not aware of any other peers at start-up and is intended
280    /// to serve as an entry node into the network for other peers.
281    pub fn bootstrap(mut self) -> Self {
282        self.bootstrap = true;
283        self
284    }
285
286    /// Sets or overwrites the private key.
287    ///
288    /// If this value is not set, the `NetworkBuilder` will generate a new, random key when
289    /// building the network.
290    pub fn private_key(mut self, private_key: PrivateKey) -> Self {
291        self.private_key = Some(private_key);
292        self
293    }
294
295    /// Sets the relay used by the local network to facilitate the establishment of direct
296    /// connections.
297    ///
298    /// Relay nodes are STUN servers which help in establishing a peer-to-peer connection if one or
299    /// both of the peers are behind a NAT. The relay node might offer proxy functionality on top
300    /// (via the Tailscale DERP protocol which is very similar to TURN) if the connection attempt
301    /// fails, which will serve to relay the data in that case.
302    pub fn relay(mut self, url: RelayUrl, stun_only: bool, stun_port: u16) -> Self {
303        self.relay_mode = RelayMode::Custom(RelayNode {
304            url: url.into(),
305            stun_only,
306            stun_port,
307            quic: None,
308        });
309        self
310    }
311
312    /// Sets the direct address of a peer, identified by their public key (node id).
313    ///
314    /// The direct address should be reachable without the aid of a STUN or TURN-based relay node.
315    /// However, if the direct connection attempt might fail (for example, because of a NAT or
316    /// Firewall), the relay node of that peer can be supplied to allow connecting to it via a
317    /// fallback connection.
318    ///
319    /// If no relay address is given but turns out to be required, we optimistically try to use our
320    /// own relay node instead (if specified). This might still fail, as we can't know if the peer
321    /// is using the same relay node.
322    pub fn direct_address(
323        mut self,
324        public_key: PublicKey,
325        direct_addresses: Vec<SocketAddr>,
326        relay_url: Option<RelayUrl>,
327    ) -> Self {
328        self.direct_node_addresses.push(NodeAddress {
329            public_key,
330            direct_addresses,
331            relay_url,
332        });
333        self
334    }
335
336    /// Adds one or more discovery strategy, such as mDNS.
337    pub fn discovery(mut self, handler: impl Discovery + 'static) -> Self {
338        self.discovery.add(handler);
339        self
340    }
341
342    /// Sets the sync protocol and configuration.
343    ///
344    /// Sync sessions will be automatically initiated with any known peers with whom we share
345    /// topics of interest.
346    pub fn sync(mut self, config: SyncConfiguration<T>) -> Self {
347        self.sync_config = Some(config);
348        self
349    }
350
351    /// Sets the gossip configuration.
352    ///
353    /// Configuration parameters define the behavior of the swarm membership (HyParView) and gossip
354    /// broadcast (Plumtree) layers, as well as the maximum message size.
355    pub fn gossip(mut self, config: GossipConfig) -> Self {
356        self.gossip_config = Some(config);
357        self
358    }
359
360    /// Adds additional, custom protocols for communication between two peers.
361    pub fn protocol(
362        mut self,
363        protocol_name: &'static [u8],
364        handler: impl ProtocolHandler + 'static,
365    ) -> Self {
366        self.protocols.insert(protocol_name, Arc::new(handler));
367        self
368    }
369
370    /// Returns a handle to a newly-spawned instance of `Network`.
371    ///
372    /// A peer-to-peer endpoint is created and bound to a QUIC socket, after which the gossip,
373    /// engine and connection handlers are instantiated. A sync handler is also instantiated if a
374    /// sync protocol is provided. Direct addresses for network peers are added to the engine from
375    /// the address book and core protocols are registered.
376    ///
377    /// After configuration and registration processes are complete, the network is spawned and an
378    /// attempt is made to retrieve a direct address for a network peer so that a connection may be
379    /// made. If no address is retrieved within the timeout limit, the network is shut down and an
380    /// error is returned.
381    pub async fn build(mut self) -> Result<Network<T>>
382    where
383        T: TopicQuery + TopicId + 'static,
384    {
385        let private_key = self.private_key.unwrap_or_default();
386
387        let relay: Option<RelayNode> = match self.relay_mode {
388            RelayMode::Disabled => None,
389            RelayMode::Custom(ref node) => Some(node.clone()),
390        };
391
392        // Build p2p endpoint and bind the QUIC socket.
393        let endpoint = {
394            let mut transport_config = TransportConfig::default();
395            transport_config
396                .max_concurrent_bidi_streams(MAX_STREAMS.into())
397                .max_concurrent_uni_streams(0u32.into());
398
399            let relay_mode = match self.relay_mode {
400                RelayMode::Disabled => iroh::RelayMode::Disabled,
401                RelayMode::Custom(node) => iroh::RelayMode::Custom(
402                    RelayMap::from_nodes(vec![node])
403                        .expect("relay list can not contain duplicates"),
404                ),
405            };
406
407            let bind_ip_v4 = self.bind_ip_v4.unwrap_or(Ipv4Addr::UNSPECIFIED);
408            let bind_port_v4 = self.bind_port_v4.unwrap_or(DEFAULT_BIND_PORT);
409            let bind_ip_v6 = self.bind_ip_v6.unwrap_or(Ipv6Addr::UNSPECIFIED);
410            let bind_port_v6 = self.bind_port_v6.unwrap_or(DEFAULT_BIND_PORT + 1);
411            let socket_address_v4 = SocketAddrV4::new(bind_ip_v4, bind_port_v4);
412            let socket_address_v6 = SocketAddrV6::new(bind_ip_v6, bind_port_v6, 0, 0);
413
414            Endpoint::builder()
415                .transport_config(transport_config)
416                .secret_key(from_private_key(private_key.clone()))
417                .relay_mode(relay_mode)
418                .bind_addr_v4(socket_address_v4)
419                .bind_addr_v6(socket_address_v6)
420                .bind()
421                .await?
422        };
423
424        let node_addr = endpoint.node_addr().await?;
425
426        let gossip = Gossip::builder()
427            .max_message_size(self.gossip_config.unwrap_or_default().max_message_size)
428            .spawn(endpoint.clone())
429            .await?;
430
431        let engine = Engine::new(
432            self.bootstrap,
433            private_key.clone(),
434            self.network_id,
435            endpoint.clone(),
436            gossip.clone(),
437            self.sync_config,
438        );
439
440        let sync_handler = engine.sync_handler();
441
442        let inner = Arc::new(NetworkInner {
443            cancel_token: CancellationToken::new(),
444            relay: relay.clone(),
445            discovery: self.discovery,
446            endpoint: endpoint.clone(),
447            engine,
448            gossip: gossip.clone(),
449            network_id: self.network_id,
450            private_key,
451        });
452
453        self.protocols.insert(GOSSIP_ALPN, Arc::new(gossip.clone()));
454        if let Some(sync_handler) = sync_handler {
455            self.protocols
456                .insert(SYNC_CONNECTION_ALPN, Arc::new(sync_handler));
457        };
458        let protocols = Arc::new(self.protocols.clone());
459        let alpns = self.protocols.alpns();
460        if let Err(err) = inner.endpoint.set_alpns(alpns) {
461            inner.shutdown(protocols.clone()).await;
462            return Err(err);
463        }
464
465        let fut = inner
466            .clone()
467            .spawn(protocols.clone())
468            .instrument(error_span!("node", me=%node_addr.node_id.fmt_short()));
469        let task = tokio::task::spawn(fut);
470        let task_handle = AbortOnDropHandle::new(task)
471            .map_err(Box::new(|err: JoinError| err.to_string()) as JoinErrToStr)
472            .shared();
473
474        let network = Network {
475            inner,
476            task: task_handle,
477            protocols,
478        };
479
480        // Wait for a single direct address update, to make sure we found at least one direct
481        // address.
482        let wait_for_endpoints = {
483            async move {
484                tokio::time::timeout(
485                    DIRECT_ADDRESSES_WAIT,
486                    endpoint.direct_addresses().initialized(),
487                )
488                .await
489                .context("waiting for endpoint")?
490                .context("no endpoints given to establish at least one connection")?;
491                Ok(())
492            }
493        };
494
495        if let Err(err) = wait_for_endpoints.await {
496            network.shutdown().await.ok();
497            return Err(err);
498        }
499
500        for mut direct_addr in self.direct_node_addresses {
501            if direct_addr.relay_url.is_none() {
502                // If given address does not hold any relay information we optimistically add ours
503                // (if we have one). It's not guaranteed that this address will have the same relay
504                // url as we have, but it's better than nothing!
505                if let Some(ref relay_node) = relay {
506                    direct_addr.relay_url = Some(to_relay_url(relay_node.url.clone()))
507                }
508            }
509
510            network.add_peer(direct_addr.clone()).await?;
511        }
512
513        Ok(network)
514    }
515}
516
517#[derive(Debug)]
518struct NetworkInner<T> {
519    cancel_token: CancellationToken,
520    relay: Option<RelayNode>,
521    discovery: DiscoveryMap,
522    endpoint: Endpoint,
523    engine: Engine<T>,
524    #[allow(dead_code)]
525    gossip: Gossip,
526    network_id: NetworkId,
527    #[allow(dead_code)]
528    private_key: PrivateKey,
529}
530
531impl<T> NetworkInner<T>
532where
533    T: TopicQuery + TopicId + 'static,
534{
535    /// Spawns a network.
536    ///
537    /// Local network sockets are bound and a task is started to listen for direct addresses
538    /// changes for the local endpoint. Inbound connection attempts to these endpoints are passed
539    /// to a handler.
540    ///
541    /// Any registered discovery services are subscribed to so that the identifiers and addresses
542    /// of peers operating on the same network may be learned. Discovered peers are added to the
543    /// local address book so they may be involved in connection and gossip activites.
544    async fn spawn(self: Arc<Self>, protocols: Arc<ProtocolMap>) {
545        let (ipv4, ipv6) = self.endpoint.bound_sockets();
546        debug!(
547            "listening at: {}{}",
548            ipv4,
549            ipv6.map(|addr| format!(" and {addr}")).unwrap_or_default()
550        );
551
552        let mut join_set = JoinSet::<Result<()>>::new();
553
554        // Spawn a task that updates the gossip endpoints and discovery services.
555        {
556            let inner = self.clone();
557            join_set.spawn(async move {
558                let mut addrs_stream = inner.endpoint.direct_addresses().stream();
559                let mut my_node_addr = iroh::NodeAddr::from(inner.endpoint.node_id());
560                if let Some(relay) = &inner.relay {
561                    my_node_addr = my_node_addr.with_relay_url(relay.url.clone());
562                }
563
564                while let Some(endpoints) = addrs_stream.next().await {
565                    // Learn about our direct addresses and changes to them.
566                    let direct_addresses: Option<Vec<SocketAddr>> = endpoints
567                        .map(|endpoints| endpoints.iter().map(|endpoint| endpoint.addr).collect());
568                    if let Some(addresses) = direct_addresses {
569                        my_node_addr = my_node_addr.with_direct_addresses(addresses);
570                        if let Err(err) = inner.discovery.update_local_address(&my_node_addr) {
571                            warn!("failed to update direct addresses for discovery: {err:?}");
572                        }
573                    }
574                }
575
576                Ok(())
577            });
578        }
579
580        // Subscribe to all discovery channels where we might find new peers.
581        let mut discovery_stream = self
582            .discovery
583            .subscribe(self.network_id)
584            .expect("discovery map needs to be given");
585
586        loop {
587            tokio::select! {
588                // Do not let tokio select futures randomly but with top-to-bottom priority.
589                biased;
590                // Exit loop when shutdown was signalled somewhere else.
591                _ = self.cancel_token.cancelled() => {
592                    break;
593                },
594                // Handle incoming p2p connections.
595                Some(incoming) = self.endpoint.accept() => {
596                    // @TODO: This is the point at which we can reject the connection if limits
597                    // have been reached.
598                    let connecting = match incoming.accept() {
599                        Ok(connecting) => connecting,
600                        Err(err) => {
601                            warn!("incoming connection failed: {err:#}");
602                            // This may be caused by retransmitted datagrams so we continue.
603                            continue;
604                        },
605                    };
606                    let protocols = protocols.clone();
607                    join_set.spawn(async move {
608                        handle_connection(connecting, protocols).await;
609                        Ok(())
610                    });
611                },
612                // Handle discovered peers.
613                Some(event) = discovery_stream.next() => {
614                    match event {
615                        Ok(event) => {
616                            if let Err(err) = self.engine.add_peer(to_node_addr(event.node_addr)).await {
617                                error!("engine failed on add_peer: {err:?}");
618                                break;
619                            }
620                        }
621                        Err(err) => {
622                            error!("discovery service failed: {err:?}");
623                            break;
624                        },
625                    }
626                },
627                // Handle task terminations and quit on panics.
628                res = join_set.join_next(), if !join_set.is_empty() => {
629                    match res {
630                        Some(Err(outer)) => {
631                            if outer.is_panic() {
632                                error!("task panicked: {outer:?}");
633                                break;
634                            } else if outer.is_cancelled() {
635                                debug!("task cancelled: {outer:?}");
636                            } else {
637                                error!("task failed: {outer:?}");
638                                break;
639                            }
640                        }
641                        Some(Ok(Err(inner))) => {
642                            debug!("task errored: {inner:?}");
643                        }
644                        _ => {}
645                    }
646                },
647                else => break,
648            }
649        }
650
651        self.shutdown(protocols).await;
652
653        // Abort remaining tasks.
654        join_set.shutdown().await;
655    }
656
657    /// Closes all connections and shuts down the network engine.
658    async fn shutdown(&self, protocols: Arc<ProtocolMap>) {
659        // We ignore all errors during shutdown.
660        debug!("close all connections and shutdown the node");
661        let _ = tokio::join!(
662            // Closing the Endpoint is the equivalent of calling `Connection::close` on all
663            // connections: Operations will immediately fail with `ConnectionError::LocallyClosed`.
664            // All streams are interrupted, this is not graceful.
665            self.endpoint.close(),
666            self.engine.shutdown(),
667            protocols.shutdown(),
668        );
669    }
670}
671
672/// Running peer-to-peer node.
673///
674/// The primary feature of the `Network` is the ability to subscribe to one or more topics and
675/// exchange messages over those topics with remote peers. Replication can be conducted exclusively
676/// in "live-mode" or may include the synchronisation of past state, thereby ensuring eventual
677/// consistency among all peers for a given topic. Replication and discovery strategies are defined
678/// in the `NetworkBuilder`.
679///
680/// In addition to topic subscription, `Network` offers a way to access information about the local
681/// network such as the node ID and direct addresses. It also provides a convenient means to add the
682/// address of a remote peer and to query the addresses of all known peers.
683#[derive(Clone, Debug)]
684pub struct Network<T> {
685    inner: Arc<NetworkInner<T>>,
686    #[allow(dead_code)]
687    protocols: Arc<ProtocolMap>,
688    // `Network` needs to be `Clone + Send` and we need to `task.await` in its `shutdown()` impl.
689    // - `Shared` allows us to `task.await` from all `Network` clones
690    //   - Acts like an `Arc` around the inner future
691    // - `MapErr` is needed to map the `JoinError` to a `String`, since `JoinError` is `!Clone`
692    // - `AbortOnDropHandle` ensures the `task` is cancelled when all `Network`s are dropped
693    task: Shared<MapErr<AbortOnDropHandle<()>, JoinErrToStr>>,
694}
695
696impl<T> Network<T>
697where
698    T: TopicQuery + TopicId + 'static,
699{
700    /// Adds a peer to the address book.
701    pub async fn add_peer(&self, node_addr: NodeAddress) -> Result<()> {
702        self.inner.engine.add_peer(node_addr).await
703    }
704
705    /// Returns a receiver of system events.
706    ///
707    /// This method can be called repeatedly if multiple event receivers are required. Each
708    /// receiver will receive all emitted events.
709    pub async fn events(&self) -> Result<broadcast::Receiver<SystemEvent<T>>> {
710        self.inner.engine.events().await
711    }
712
713    /// Returns the addresses of all known peers.
714    pub async fn known_peers(&self) -> Result<Vec<NodeAddress>> {
715        self.inner.engine.known_peers().await
716    }
717
718    /// Returns the direct addresses of this node.
719    pub async fn direct_addresses(&self) -> Option<Vec<SocketAddr>> {
720        match self
721            .inner
722            .endpoint
723            .direct_addresses()
724            .initialized()
725            .await
726            .map(|addrs| addrs.into_iter().map(|direct| direct.addr).collect())
727        {
728            Ok(result) => Some(result),
729            Err(_) => None,
730        }
731    }
732
733    /// Returns a handle to the network endpoint.
734    ///
735    /// The `Endpoint` exposes low-level networking functionality such as the ability to connect to
736    /// specific peers, accept connections, query local socket addresses and more.
737    ///
738    /// This level of control is unlikely to be required in most cases but has been exposed for the
739    /// convenience of advanced users.
740    pub fn endpoint(&self) -> &Endpoint {
741        &self.inner.endpoint
742    }
743
744    /// Returns the public key of the node.
745    pub fn node_id(&self) -> PublicKey {
746        PublicKey::from_bytes(self.inner.endpoint.node_id().as_bytes())
747            .expect("public key already checked")
748    }
749
750    /// Terminates all internal tasks and shuts down the node.
751    pub async fn shutdown(self) -> Result<()> {
752        // Trigger shutdown of the main run task by activating the cancel token.
753        self.inner.cancel_token.cancel();
754
755        // Wait for the main task to terminate.
756        self.task.await.map_err(|err| anyhow!(err))?;
757
758        Ok(())
759    }
760
761    /// Subscribes to a topic and returns a bi-directional stream that can be read from and written
762    /// to, along with a oneshot receiver to be informed when the gossip overlay has been joined.
763    pub async fn subscribe(
764        &self,
765        topic: T,
766    ) -> Result<(
767        mpsc::Sender<ToNetwork>,
768        mpsc::Receiver<FromNetwork>,
769        oneshot::Receiver<()>,
770    )> {
771        let (to_network_tx, to_network_rx) = mpsc::channel::<ToNetwork>(128);
772        let (from_network_tx, from_network_rx) = mpsc::channel::<FromNetwork>(128);
773        let (gossip_ready_tx, gossip_ready_rx) = oneshot::channel();
774
775        self.inner
776            .engine
777            .subscribe(topic, from_network_tx, to_network_rx, gossip_ready_tx)
778            .await?;
779
780        Ok((to_network_tx, from_network_rx, gossip_ready_rx))
781    }
782}
783
784/// An event to be broadcast to the network.
785#[derive(Clone, Debug)]
786pub enum ToNetwork {
787    Message { bytes: Vec<u8> },
788}
789
790/// An event received from the network.
791#[allow(clippy::large_enum_variant)]
792#[derive(Clone, Debug, Eq, PartialEq)]
793pub enum FromNetwork {
794    GossipMessage {
795        bytes: Vec<u8>,
796        delivered_from: PublicKey,
797    },
798    SyncMessage {
799        header: Vec<u8>,
800        payload: Option<Vec<u8>>,
801        delivered_from: PublicKey,
802    },
803}
804
805/// Handle an inbound connection on the local network endpoint.
806///
807/// The connection is accepted if the handshake is successful and the peer is operating with
808/// a supported ALPN protocol.
809async fn handle_connection(
810    mut connecting: iroh::endpoint::Connecting,
811    protocols: Arc<ProtocolMap>,
812) {
813    let alpn = match connecting.alpn().await {
814        Ok(alpn) => alpn,
815        Err(err) => {
816            warn!("ignoring connection: invalid handshake: {:?}", err);
817            return;
818        }
819    };
820    let Some(handler) = protocols.get(&alpn) else {
821        warn!("ignoring connection: unsupported alpn protocol");
822        return;
823    };
824    if let Err(err) = handler.accept(connecting).await {
825        warn!("handling incoming connection ended with error: {err}");
826    }
827}
828
829/// Helper to construct shared `AbortOnDropHandle` coming from tokio crate.
830pub(crate) type JoinErrToStr =
831    Box<dyn Fn(tokio::task::JoinError) -> String + Send + Sync + 'static>;
832
833#[cfg(test)]
834mod tests {
835    use std::collections::HashMap;
836    use std::net::{Ipv4Addr, Ipv6Addr};
837    use std::path::PathBuf;
838    use std::time::Duration;
839
840    use async_trait::async_trait;
841    use iroh::{RelayNode, RelayUrl as IrohRelayUrl};
842    use p2panda_core::{Body, Extensions, Hash, Header, PrivateKey, PublicKey};
843    use p2panda_discovery::mdns::LocalDiscovery;
844    use p2panda_store::{MemoryStore, OperationStore};
845    use p2panda_sync::TopicQuery;
846    use p2panda_sync::log_sync::{LogSyncProtocol, TopicLogMap};
847    use p2panda_sync::test_protocols::{
848        FailingProtocol, PingPongProtocol, SyncTestTopic as TestTopic,
849    };
850    use tokio::task::JoinHandle;
851
852    use crate::addrs::{DEFAULT_STUN_PORT, to_node_addr};
853    use crate::bytes::ToBytes;
854    use crate::config::Config;
855    use crate::events::SystemEvent;
856    use crate::sync::SyncConfiguration;
857    use crate::{NetworkBuilder, NodeAddress, RelayMode, RelayUrl, TopicId, to_public_key};
858
859    use super::{FromNetwork, Network, ToNetwork};
860
861    impl TopicId for TestTopic {
862        fn id(&self) -> [u8; 32] {
863            self.1
864        }
865    }
866
867    fn create_operation<E: Extensions>(
868        private_key: &PrivateKey,
869        body: &Body,
870        seq_num: u64,
871        timestamp: u64,
872        backlink: Option<Hash>,
873        extensions: Option<E>,
874    ) -> (Hash, Header<E>, Vec<u8>) {
875        let mut header = Header {
876            version: 1,
877            public_key: private_key.public_key(),
878            signature: None,
879            payload_size: body.size(),
880            payload_hash: Some(body.hash()),
881            timestamp,
882            seq_num,
883            backlink,
884            previous: vec![],
885            extensions,
886        };
887        header.sign(private_key);
888        let header_bytes = header.to_bytes();
889        (header.hash(), header, header_bytes)
890    }
891
892    fn run_node<T: TopicId + TopicQuery + 'static>(node: Network<T>, topic: T) -> JoinHandle<()> {
893        tokio::spawn(async move {
894            let (_tx, mut rx, ready) = node.subscribe(topic).await.unwrap();
895
896            // Await the ready signal so we know the gossip overlay has been joined.
897            assert!(ready.await.is_ok());
898
899            // Await at least one message received via sync.
900            loop {
901                let msg = rx.recv().await.unwrap();
902                println!("{msg:?}");
903                match msg {
904                    FromNetwork::SyncMessage { .. } => break,
905                    _ => (),
906                }
907            }
908
909            // Give other nodes enough time to complete sync sessions.
910            tokio::time::sleep(Duration::from_secs(3)).await;
911            node.shutdown().await.unwrap();
912        })
913    }
914
915    #[tokio::test]
916    async fn config() {
917        let direct_node_public_key = PrivateKey::new().public_key();
918        let relay_address: RelayUrl = "https://example.net".parse().unwrap();
919
920        let config = Config {
921            bind_ip_v4: Ipv4Addr::new(7, 7, 7, 7),
922            bind_port_v4: 2024,
923            bind_ip_v6: Ipv6Addr::new(8, 8, 8, 8, 8, 8, 8, 8),
924            bind_port_v6: 2025,
925            network_id: [1; 32],
926            private_key: Some(PathBuf::new().join("secret-key.txt")),
927            direct_node_addresses: vec![NodeAddress {
928                public_key: direct_node_public_key,
929                direct_addresses: vec!["0.0.0.0:2026".parse().unwrap()],
930                relay_url: None,
931            }],
932            relay: Some(relay_address.clone()),
933        };
934
935        let builder = NetworkBuilder::<TestTopic>::from_config(config);
936
937        assert_eq!(builder.bind_ip_v4, Some(Ipv4Addr::new(7, 7, 7, 7)));
938        assert_eq!(builder.bind_port_v4, Some(2024));
939        assert_eq!(
940            builder.bind_ip_v6,
941            Some(Ipv6Addr::new(8, 8, 8, 8, 8, 8, 8, 8))
942        );
943        assert_eq!(builder.bind_port_v6, Some(2025));
944        assert_eq!(builder.network_id, [1; 32]);
945        assert!(builder.private_key.is_none());
946        assert_eq!(builder.direct_node_addresses.len(), 1);
947        let relay_node = RelayNode {
948            url: IrohRelayUrl::from(relay_address),
949            stun_only: false,
950            stun_port: DEFAULT_STUN_PORT,
951            quic: None,
952        };
953        assert_eq!(builder.relay_mode, RelayMode::Custom(relay_node));
954    }
955
956    #[tokio::test]
957    async fn join_gossip_overlay() {
958        let network_id = [1; 32];
959        let topic = TestTopic::new("chat");
960
961        let node_1 = NetworkBuilder::new(network_id).build().await.unwrap();
962        let node_2 = NetworkBuilder::new(network_id).build().await.unwrap();
963
964        let node_1_addr = node_1.endpoint().node_addr().await.unwrap();
965        let node_2_addr = node_2.endpoint().node_addr().await.unwrap();
966
967        node_1.add_peer(to_node_addr(node_2_addr)).await.unwrap();
968        node_2.add_peer(to_node_addr(node_1_addr)).await.unwrap();
969
970        // Subscribe to the same topic from both nodes
971        let (tx_1, _rx_1, ready_1) = node_1.subscribe(topic.clone()).await.unwrap();
972        let (_tx_2, mut rx_2, ready_2) = node_2.subscribe(topic).await.unwrap();
973
974        // Ensure the gossip-overlay has been joined by both nodes for the given topic
975        assert!(ready_2.await.is_ok());
976        assert!(ready_1.await.is_ok());
977
978        // Broadcast a message and make sure it's received by the other node
979        tx_1.send(ToNetwork::Message {
980            bytes: "Hello, Node".to_bytes(),
981        })
982        .await
983        .unwrap();
984
985        let rx_2_msg = rx_2.recv().await.unwrap();
986        assert_eq!(
987            rx_2_msg,
988            FromNetwork::GossipMessage {
989                bytes: "Hello, Node".to_bytes(),
990                delivered_from: node_1.node_id(),
991            }
992        );
993
994        node_1.shutdown().await.unwrap();
995        node_2.shutdown().await.unwrap();
996    }
997
998    #[tokio::test]
999    async fn join_gossip_overlay_with_local_discovery() {
1000        let network_id = [1; 32];
1001        let topic = TestTopic::new("chat");
1002
1003        // Build two nodes with local discovery (mDNS) enabled.
1004        let node_1 = NetworkBuilder::new(network_id)
1005            .discovery(LocalDiscovery::new())
1006            .build()
1007            .await
1008            .unwrap();
1009        let node_2 = NetworkBuilder::new(network_id)
1010            .discovery(LocalDiscovery::new())
1011            .build()
1012            .await
1013            .unwrap();
1014
1015        // Subscribe to the same topic from both nodes
1016        let (tx_1, _rx_1, ready_1) = node_1.subscribe(topic.clone()).await.unwrap();
1017        let (_tx_2, mut rx_2, ready_2) = node_2.subscribe(topic).await.unwrap();
1018
1019        // Ensure the gossip-overlay has been joined by both nodes for the given topic
1020        assert!(ready_2.await.is_ok());
1021        assert!(ready_1.await.is_ok());
1022
1023        // Broadcast a message and make sure it's received by the other node
1024        tx_1.send(ToNetwork::Message {
1025            bytes: "Hello, Node".to_bytes(),
1026        })
1027        .await
1028        .unwrap();
1029
1030        let rx_2_msg = rx_2.recv().await.unwrap();
1031        assert_eq!(
1032            rx_2_msg,
1033            FromNetwork::GossipMessage {
1034                bytes: "Hello, Node".to_bytes(),
1035                delivered_from: node_1.node_id(),
1036            }
1037        );
1038
1039        node_1.shutdown().await.unwrap();
1040        node_2.shutdown().await.unwrap();
1041    }
1042
1043    #[tokio::test]
1044    async fn join_gossip_overlay_with_relay() {
1045        let network_id = [1; 32];
1046        let topic = TestTopic::new("chat");
1047
1048        // @NOTE(glyph): I tried using the iroh test relay (`iroh::test_utils::run_relay_server()`)
1049        // but it fails (the peers never find one another via the network-wide gossip overlay).
1050        // For now we use the p2panda relay instead.
1051        let relay_url: RelayUrl = "https://wasser.liebechaos.org/".parse().unwrap();
1052
1053        // Build the bootstrap node.
1054        let node_1 = NetworkBuilder::new(network_id)
1055            .bootstrap()
1056            .relay(relay_url.clone(), false, 0)
1057            .build()
1058            .await
1059            .unwrap();
1060        // Ensure the connection to the relay has been initialized.
1061        node_1.endpoint().home_relay().initialized().await.unwrap();
1062
1063        // Build the second node.
1064        let node_2 = NetworkBuilder::new(network_id)
1065            .relay(relay_url, false, 0)
1066            .direct_address(node_1.node_id(), vec![], None)
1067            .build()
1068            .await
1069            .unwrap();
1070
1071        // Subscribe to the same topic from both nodes
1072        let (tx_1, _rx_1, ready_1) = node_1.subscribe(topic.clone()).await.unwrap();
1073        let (_tx_2, mut rx_2, ready_2) = node_2.subscribe(topic).await.unwrap();
1074
1075        // Ensure the gossip-overlay has been joined by both nodes for the given topic
1076        assert!(ready_1.await.is_ok());
1077        assert!(ready_2.await.is_ok());
1078
1079        // Broadcast a message and make sure it's received by the other node
1080        tx_1.send(ToNetwork::Message {
1081            bytes: "Hello, Node".to_bytes(),
1082        })
1083        .await
1084        .unwrap();
1085
1086        let rx_2_msg = rx_2.recv().await.unwrap();
1087        assert_eq!(
1088            rx_2_msg,
1089            FromNetwork::GossipMessage {
1090                bytes: "Hello, Node".to_bytes(),
1091                delivered_from: node_1.node_id(),
1092            }
1093        );
1094
1095        node_1.shutdown().await.unwrap();
1096        node_2.shutdown().await.unwrap();
1097    }
1098
1099    #[tokio::test]
1100    async fn ping_pong() {
1101        let network_id = [1; 32];
1102        let topic = TestTopic::new("ping_pong");
1103        let ping_pong = PingPongProtocol {};
1104        let sync_config = SyncConfiguration::new(ping_pong);
1105
1106        let node_1 = NetworkBuilder::new(network_id)
1107            .sync(sync_config.clone())
1108            .build()
1109            .await
1110            .unwrap();
1111        let node_2 = NetworkBuilder::new(network_id)
1112            .sync(sync_config)
1113            .build()
1114            .await
1115            .unwrap();
1116
1117        let node_1_addr = node_1.endpoint().node_addr().await.unwrap();
1118        let node_2_addr = node_2.endpoint().node_addr().await.unwrap();
1119
1120        node_1.add_peer(to_node_addr(node_1_addr)).await.unwrap();
1121        node_2.add_peer(to_node_addr(node_2_addr)).await.unwrap();
1122
1123        // Subscribe to the same topic from both nodes which should kick off sync
1124        let topic_clone = topic.clone();
1125        let handle1 = tokio::spawn(async move {
1126            let (_tx, _rx, _ready) = node_1.subscribe(topic_clone).await.unwrap();
1127            tokio::time::sleep(Duration::from_secs(2)).await;
1128            node_1.shutdown().await.unwrap();
1129        });
1130        let handle2 = tokio::spawn(async move {
1131            let (_tx, _rx, _ready) = node_2.subscribe(topic).await.unwrap();
1132            tokio::time::sleep(Duration::from_secs(2)).await;
1133            node_2.shutdown().await.unwrap();
1134        });
1135
1136        let (result1, result2) = tokio::join!(handle1, handle2);
1137        assert!(result1.is_ok());
1138        assert!(result2.is_ok());
1139    }
1140
1141    type Logs<T> = HashMap<PublicKey, Vec<T>>;
1142
1143    #[derive(Clone, Debug)]
1144    struct LogIdTopicMap<T>(HashMap<T, Logs<u64>>);
1145
1146    impl<T> LogIdTopicMap<T>
1147    where
1148        T: TopicQuery,
1149    {
1150        pub fn new() -> Self {
1151            LogIdTopicMap(HashMap::new())
1152        }
1153
1154        fn insert(&mut self, topic: T, logs: Logs<u64>) -> Option<Logs<u64>> {
1155            self.0.insert(topic, logs)
1156        }
1157    }
1158
1159    #[async_trait]
1160    impl<T> TopicLogMap<T, u64> for LogIdTopicMap<T>
1161    where
1162        T: TopicQuery,
1163    {
1164        async fn get(&self, topic: &T) -> Option<Logs<u64>> {
1165            self.0.get(topic).cloned()
1166        }
1167    }
1168
1169    #[tokio::test]
1170    async fn e2e_log_height_sync() {
1171        const NETWORK_ID: [u8; 32] = [1; 32];
1172
1173        let peer_a_private_key = PrivateKey::new();
1174        let peer_b_private_key = PrivateKey::new();
1175
1176        let topic = TestTopic::new("event_logs");
1177        let log_id = 0;
1178        let logs = HashMap::from([(peer_a_private_key.public_key(), vec![log_id])]);
1179
1180        let mut topic_map = LogIdTopicMap::new();
1181        topic_map.insert(topic.clone(), logs);
1182
1183        // Construct a store and log height protocol for peer a.
1184        let store_a = MemoryStore::default();
1185        let protocol_a = LogSyncProtocol::new(topic_map.clone(), store_a);
1186        let sync_config_a = SyncConfiguration::new(protocol_a);
1187
1188        // Create some operations.
1189        let body = Body::new("Hello, Sloth!".as_bytes());
1190        let (hash_0, header_0, header_bytes_0) =
1191            create_operation(&peer_a_private_key, &body, 0, 0, None, None);
1192        let (hash_1, header_1, header_bytes_1) =
1193            create_operation(&peer_a_private_key, &body, 1, 100, Some(hash_0), None);
1194        let (hash_2, header_2, header_bytes_2) =
1195            create_operation(&peer_a_private_key, &body, 2, 200, Some(hash_1), None);
1196
1197        // Create store for peer b and populate with operations.
1198        let mut store_b = MemoryStore::default();
1199        store_b
1200            .insert_operation(hash_0, &header_0, Some(&body), &header_bytes_0, &log_id)
1201            .await
1202            .unwrap();
1203        store_b
1204            .insert_operation(hash_1, &header_1, Some(&body), &header_bytes_1, &log_id)
1205            .await
1206            .unwrap();
1207        store_b
1208            .insert_operation(hash_2, &header_2, Some(&body), &header_bytes_2, &log_id)
1209            .await
1210            .unwrap();
1211
1212        // Construct log height protocol for peer b.
1213        let protocol_b = LogSyncProtocol::new(topic_map, store_b);
1214        let sync_config_b = SyncConfiguration::new(protocol_b);
1215
1216        // Build peer a's node
1217        let node_a = NetworkBuilder::new(NETWORK_ID)
1218            .sync(sync_config_a)
1219            .private_key(peer_a_private_key)
1220            .build()
1221            .await
1222            .unwrap();
1223
1224        // Build peer b's node
1225        let node_b = NetworkBuilder::new(NETWORK_ID)
1226            .sync(sync_config_b)
1227            .private_key(peer_b_private_key.clone())
1228            .build()
1229            .await
1230            .unwrap();
1231
1232        let node_a_addr = node_a.endpoint().node_addr().await.unwrap();
1233        let node_b_addr = node_b.endpoint().node_addr().await.unwrap();
1234
1235        node_a.add_peer(to_node_addr(node_b_addr)).await.unwrap();
1236        node_b.add_peer(to_node_addr(node_a_addr)).await.unwrap();
1237
1238        // Subscribe to the same topic from both nodes which should kick off sync.
1239        let topic_clone = topic.clone();
1240        let handle1 = tokio::spawn(async move {
1241            let (_tx, mut from_sync_rx, ready) = node_a.subscribe(topic_clone).await.unwrap();
1242
1243            // Wait until the gossip overlay has been joined for TOPIC_ID.
1244            assert!(ready.await.is_ok());
1245
1246            let mut from_sync_messages = Vec::new();
1247            while let Some(message) = from_sync_rx.recv().await {
1248                from_sync_messages.push(message);
1249                if from_sync_messages.len() == 3 {
1250                    break;
1251                }
1252            }
1253
1254            // Construct the messages we expect to receive on the from_sync channel based on the
1255            // operations we created earlier.
1256            let peer_a_expected_messages = vec![
1257                FromNetwork::SyncMessage {
1258                    header: header_bytes_0.to_vec(),
1259                    payload: Some(body.to_bytes()),
1260                    delivered_from: peer_b_private_key.public_key(),
1261                },
1262                FromNetwork::SyncMessage {
1263                    header: header_bytes_1.to_vec(),
1264                    payload: Some(body.to_bytes()),
1265                    delivered_from: peer_b_private_key.public_key(),
1266                },
1267                FromNetwork::SyncMessage {
1268                    header: header_bytes_2.to_vec(),
1269                    payload: Some(body.to_bytes()),
1270                    delivered_from: peer_b_private_key.public_key(),
1271                },
1272            ];
1273
1274            // Assert we receive the expected messages
1275            assert_eq!(from_sync_messages, peer_a_expected_messages);
1276
1277            node_a.shutdown().await.unwrap();
1278        });
1279
1280        let handle2 = tokio::spawn(async move {
1281            let (_tx, _from_sync_rx, ready) = node_b.subscribe(topic).await.unwrap();
1282
1283            // Wait until the gossip overlay has been joined for TOPIC_ID
1284            assert!(ready.await.is_ok());
1285
1286            // Sleep for a moment to ensure sync has time to complete
1287            tokio::time::sleep(Duration::from_secs(2)).await;
1288
1289            node_b.shutdown().await.unwrap();
1290        });
1291
1292        // Wait on both to complete
1293        let (result1, result2) = tokio::join!(handle1, handle2);
1294
1295        assert!(result1.is_ok());
1296        assert!(result2.is_ok())
1297    }
1298
1299    #[tokio::test]
1300    async fn multi_hop_join_gossip_overlay() {
1301        let network_id = [1; 32];
1302        let chat_topic = TestTopic::new("chat");
1303
1304        let node_1 = NetworkBuilder::new(network_id).build().await.unwrap();
1305        let node_2 = NetworkBuilder::new(network_id).build().await.unwrap();
1306        let node_3 = NetworkBuilder::new(network_id).build().await.unwrap();
1307
1308        let node_1_addr = node_1.endpoint().node_addr().await.unwrap();
1309        let node_2_addr = node_2.endpoint().node_addr().await.unwrap();
1310
1311        node_1
1312            .add_peer(to_node_addr(node_2_addr.clone()))
1313            .await
1314            .unwrap();
1315        node_2.add_peer(to_node_addr(node_1_addr)).await.unwrap();
1316        node_3.add_peer(to_node_addr(node_2_addr)).await.unwrap();
1317
1318        // Subscribe to the same topic from all nodes
1319        let (tx_1, _rx_1, ready_1) = node_1.subscribe(chat_topic.clone()).await.unwrap();
1320        let (_tx_2, mut rx_2, ready_2) = node_2.subscribe(chat_topic.clone()).await.unwrap();
1321        let (_tx_3, mut rx_3, ready_3) = node_3.subscribe(chat_topic).await.unwrap();
1322
1323        // Ensure the gossip-overlay has been joined by all three nodes for the given topic
1324        assert!(ready_3.await.is_ok());
1325        assert!(ready_2.await.is_ok());
1326        assert!(ready_1.await.is_ok());
1327
1328        // Broadcast a message and make sure it's received by the other nodes
1329        tx_1.send(ToNetwork::Message {
1330            bytes: "Hello, Node".to_bytes(),
1331        })
1332        .await
1333        .unwrap();
1334
1335        let rx_2_msg = rx_2.recv().await.unwrap();
1336        assert_eq!(
1337            rx_2_msg,
1338            FromNetwork::GossipMessage {
1339                bytes: "Hello, Node".to_bytes(),
1340                // Node 2 receives the message and it is delivered by node 1
1341                delivered_from: node_1.node_id(),
1342            }
1343        );
1344
1345        let rx_3_msg = rx_3.recv().await.unwrap();
1346        assert_eq!(
1347            rx_3_msg,
1348            FromNetwork::GossipMessage {
1349                bytes: "Hello, Node".to_bytes(),
1350                // Node 3 receives the message and it is also delivered by node 1
1351                delivered_from: node_1.node_id(),
1352            }
1353        );
1354
1355        node_1.shutdown().await.unwrap();
1356        node_2.shutdown().await.unwrap();
1357        node_3.shutdown().await.unwrap();
1358    }
1359
1360    #[tokio::test]
1361    async fn multi_hop_topic_discovery_and_sync() {
1362        let network_id = [1; 32];
1363        let topic = TestTopic::new("chat");
1364        let sync_config = SyncConfiguration::new(PingPongProtocol {});
1365
1366        // Create 4 nodes.
1367        let node_1 = NetworkBuilder::new(network_id)
1368            .sync(sync_config.clone())
1369            .build()
1370            .await
1371            .unwrap();
1372        let node_2 = NetworkBuilder::new(network_id)
1373            .sync(sync_config.clone())
1374            .build()
1375            .await
1376            .unwrap();
1377        let node_3 = NetworkBuilder::new(network_id)
1378            .bootstrap()
1379            .sync(sync_config.clone())
1380            .build()
1381            .await
1382            .unwrap();
1383        let node_4 = NetworkBuilder::new(network_id)
1384            .bootstrap()
1385            .sync(sync_config.clone())
1386            .build()
1387            .await
1388            .unwrap();
1389
1390        let node_1_addr = node_1.endpoint().node_addr().await.unwrap();
1391        let node_2_addr = node_2.endpoint().node_addr().await.unwrap();
1392        let node_3_addr = node_3.endpoint().node_addr().await.unwrap();
1393
1394        // All peers know about only one other peer.
1395        node_1
1396            .add_peer(to_node_addr(node_2_addr.clone()))
1397            .await
1398            .unwrap();
1399        node_2.add_peer(to_node_addr(node_1_addr)).await.unwrap();
1400        node_3
1401            .add_peer(to_node_addr(node_2_addr.clone()))
1402            .await
1403            .unwrap();
1404        node_4
1405            .add_peer(to_node_addr(node_3_addr.clone()))
1406            .await
1407            .unwrap();
1408
1409        // Run all nodes. We are testing that peers gracefully handle starting a sync session while
1410        // not knowing the other peer's address yet. Eventually all peers complete at least one
1411        // sync session.
1412        let handle1 = run_node(node_1, topic.clone());
1413        let handle2 = run_node(node_2, topic.clone());
1414        let handle3 = run_node(node_3, topic.clone());
1415        let handle4 = run_node(node_4, topic.clone());
1416
1417        let (result1, result2, result3, result4) = tokio::join!(handle1, handle2, handle3, handle4);
1418
1419        assert!(result1.is_ok());
1420        assert!(result2.is_ok());
1421        assert!(result3.is_ok());
1422        assert!(result4.is_ok());
1423    }
1424
1425    #[tokio::test]
1426    async fn gossip_and_sync_events() {
1427        let network_id = [17; 32];
1428        let chat_topic = TestTopic::new("chat");
1429        let chat_topic_id = chat_topic.clone().id();
1430        let sync_config = SyncConfiguration::new(PingPongProtocol {});
1431
1432        let node_1 = NetworkBuilder::new(network_id)
1433            .sync(sync_config.clone())
1434            .build()
1435            .await
1436            .unwrap();
1437        let node_2 = NetworkBuilder::new(network_id)
1438            .sync(sync_config.clone())
1439            .build()
1440            .await
1441            .unwrap();
1442
1443        let node_2_id = node_2.endpoint().node_id();
1444
1445        let node_1_addr = node_1.endpoint().node_addr().await.unwrap();
1446        let node_2_addr = node_2.endpoint().node_addr().await.unwrap();
1447
1448        node_1
1449            .add_peer(to_node_addr(node_2_addr.clone()))
1450            .await
1451            .unwrap();
1452        node_2
1453            .add_peer(to_node_addr(node_1_addr.clone()))
1454            .await
1455            .unwrap();
1456
1457        // Subscribe to network events for each node.
1458        let mut event_rx_1 = node_1.events().await.unwrap();
1459
1460        // Subscribe to the same topic from all nodes.
1461        let (_tx_1, _rx_1, ready_1) = node_1.subscribe(chat_topic.clone()).await.unwrap();
1462        let (_tx_2, _rx_2, ready_2) = node_2.subscribe(chat_topic.clone()).await.unwrap();
1463
1464        // Ensure the gossip-overlay has been joined by all both nodes for the given topic.
1465        assert!(ready_2.await.is_ok());
1466        assert!(ready_1.await.is_ok());
1467
1468        // Start a third node.
1469        let node_3 = NetworkBuilder::new(network_id).build().await.unwrap();
1470        let node_3_id = node_3.endpoint().node_id();
1471        node_3.add_peer(to_node_addr(node_1_addr)).await.unwrap();
1472        let (_tx_3, _rx_3, ready_3) = node_3.subscribe(chat_topic.clone()).await.unwrap();
1473        assert!(ready_3.await.is_ok());
1474
1475        // Events we expect to receive on node 1.
1476        let mut expected_events = vec![
1477            // Join the network-wide gossip overlay by connecting to node 2.
1478            SystemEvent::GossipJoined {
1479                topic_id: network_id,
1480                peers: vec![to_public_key(node_2_id)],
1481            },
1482            // Discover node 2 via an announcement on the network-wide gossip overlay.
1483            SystemEvent::PeerDiscovered {
1484                peer: to_public_key(node_2_id),
1485            },
1486            // Start sync (part one) with node 2.
1487            SystemEvent::SyncStarted {
1488                topic: None,
1489                peer: to_public_key(node_2_id),
1490            },
1491            // Complete sync (part one) with node 2.
1492            SystemEvent::SyncDone {
1493                topic: chat_topic.clone(),
1494                peer: to_public_key(node_2_id),
1495            },
1496            // Start sync (part two) with node 2.
1497            SystemEvent::SyncStarted {
1498                topic: Some(chat_topic.clone()),
1499                peer: to_public_key(node_2_id),
1500            },
1501            // Join the topic gossip overlay by connecting to node 2.
1502            SystemEvent::GossipJoined {
1503                topic_id: chat_topic_id,
1504                peers: vec![to_public_key(node_2_id)],
1505            },
1506            // Complete sync (part two) with node 2.
1507            SystemEvent::SyncDone {
1508                topic: chat_topic.clone(),
1509                peer: to_public_key(node_2_id),
1510            },
1511            // Gain a direct neighbor in the network-wide gossip overlay by connecting to node 3.
1512            SystemEvent::GossipNeighborUp {
1513                topic_id: network_id,
1514                peer: to_public_key(node_3_id),
1515            },
1516            // Discover node 2 (again) via an announcement on the network-wide gossip overlay.
1517            SystemEvent::PeerDiscovered {
1518                peer: to_public_key(node_2_id),
1519            },
1520            // Discover node 3 via an announcement on the network-wide gossip overlay.
1521            SystemEvent::PeerDiscovered {
1522                peer: to_public_key(node_3_id),
1523            },
1524        ];
1525
1526        // Receive events on the node one receiver.
1527        let mut received_events = Vec::new();
1528        while let Ok(event) = event_rx_1.recv().await {
1529            assert!(expected_events.contains(&event));
1530            let index = expected_events.iter().position(|ev| *ev == event).unwrap();
1531            received_events.push(expected_events.remove(index));
1532            if received_events.len() == 10 {
1533                break;
1534            }
1535        }
1536
1537        node_1.shutdown().await.unwrap();
1538        node_2.shutdown().await.unwrap();
1539    }
1540
1541    #[tokio::test]
1542    async fn resync_after_error() {
1543        let network_id = [17; 32];
1544        let chat_topic = TestTopic::new("chat");
1545        let sync_config = SyncConfiguration::new(FailingProtocol::InitiatorFailsUnexpected);
1546
1547        let node_1 = NetworkBuilder::new(network_id)
1548            .sync(sync_config.clone())
1549            .build()
1550            .await
1551            .unwrap();
1552        let node_2 = NetworkBuilder::new(network_id)
1553            .sync(sync_config.clone())
1554            .build()
1555            .await
1556            .unwrap();
1557
1558        let node_2_id = node_2.endpoint().node_id();
1559
1560        let node_1_addr = node_1.endpoint().node_addr().await.unwrap();
1561        let node_2_addr = node_2.endpoint().node_addr().await.unwrap();
1562
1563        node_1
1564            .add_peer(to_node_addr(node_2_addr.clone()))
1565            .await
1566            .unwrap();
1567        node_2
1568            .add_peer(to_node_addr(node_1_addr.clone()))
1569            .await
1570            .unwrap();
1571
1572        // Subscribe to network events for the first node.
1573        let mut event_rx_1 = node_1.events().await.unwrap();
1574
1575        // Subscribe to the same topic from all nodes.
1576        let (_tx_1, _rx_1, ready_1) = node_1.subscribe(chat_topic.clone()).await.unwrap();
1577        let (_tx_2, _rx_2, ready_2) = node_2.subscribe(chat_topic.clone()).await.unwrap();
1578
1579        // Ensure the gossip-overlay has been joined by all both nodes for the given topic.
1580        assert!(ready_2.await.is_ok());
1581        assert!(ready_1.await.is_ok());
1582
1583        // Events we expect to receive on node 1.
1584        let expected_events = vec![
1585            // Start sync (first attempt) as acceptor with node 2.
1586            SystemEvent::SyncStarted {
1587                topic: None,
1588                peer: to_public_key(node_2_id),
1589            },
1590            // Fail sync (first attempt) as acceptor with node 2.
1591            SystemEvent::SyncFailed {
1592                topic: None,
1593                peer: to_public_key(node_2_id),
1594            },
1595            // Start sync (second attempt) as acceptor with node 2.
1596            SystemEvent::SyncStarted {
1597                topic: None,
1598                peer: to_public_key(node_2_id),
1599            },
1600            // Start sync (first attempt) as initiator with node 2.
1601            //
1602            // The initiator side of the sync protocol fails before the `HandshakeSuccess` message
1603            // is sent, so we never lock the gossip buffer and therefore never send `SyncFailed`.
1604            SystemEvent::SyncStarted {
1605                topic: Some(chat_topic.clone()),
1606                peer: to_public_key(node_2_id),
1607            },
1608            // Start sync (second attempt) as initiator with node 2.
1609            SystemEvent::SyncStarted {
1610                topic: Some(chat_topic.clone()),
1611                peer: to_public_key(node_2_id),
1612            },
1613        ];
1614
1615        // Receive events on the node one receiver.
1616        let mut received_events = Vec::new();
1617        while let Ok(event) = event_rx_1.recv().await {
1618            received_events.push(event);
1619
1620            // Fourteen events should be enough to detect the subset we're looking for.
1621            if received_events.len() == 14 {
1622                break;
1623            }
1624        }
1625
1626        // Iterate through the expected events, making sure that each one appears in the received
1627        // events.
1628        expected_events.into_iter().for_each(|event| {
1629            assert!(received_events.contains(&event));
1630
1631            // Remove the event from received events list so we can ensure that the expected
1632            // duplicate events have been received.
1633            let index = received_events.iter().position(|ev| *ev == event).unwrap();
1634            received_events.remove(index);
1635        });
1636
1637        node_1.shutdown().await.unwrap();
1638        node_2.shutdown().await.unwrap();
1639    }
1640}