1use std::fmt::Debug;
120use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
121use std::sync::Arc;
122use std::time::Duration;
123
124use anyhow::{Context, Result, anyhow};
125use futures_lite::StreamExt;
126use futures_util::future::{MapErr, Shared};
127use futures_util::{FutureExt, TryFutureExt};
128use iroh::{Endpoint, RelayMap, RelayNode};
129use iroh_gossip::net::{GOSSIP_ALPN, Gossip};
130use iroh_quinn::TransportConfig;
131use p2panda_core::{PrivateKey, PublicKey};
132use p2panda_discovery::{Discovery, DiscoveryMap};
133use p2panda_sync::TopicQuery;
134use tokio::sync::{broadcast, mpsc, oneshot};
135use tokio::task::{JoinError, JoinSet};
136use tokio_util::sync::CancellationToken;
137use tokio_util::task::AbortOnDropHandle;
138use tracing::{Instrument, debug, error, error_span, warn};
139
140use crate::addrs::{DEFAULT_STUN_PORT, to_node_addr, to_relay_url};
141use crate::config::{Config, DEFAULT_BIND_PORT, GossipConfig};
142use crate::engine::Engine;
143use crate::events::SystemEvent;
144use crate::protocols::{ProtocolHandler, ProtocolMap};
145use crate::sync::{SYNC_CONNECTION_ALPN, SyncConfiguration};
146use crate::{NetworkId, NodeAddress, RelayUrl, TopicId, from_private_key};
147
148const MAX_STREAMS: u32 = 1024;
150
151const DIRECT_ADDRESSES_WAIT: Duration = Duration::from_secs(5);
153
154#[derive(Debug, PartialEq)]
156pub enum RelayMode {
157 Disabled,
162
163 Custom(RelayNode),
172}
173
174#[derive(Debug)]
179pub struct NetworkBuilder<T> {
180 bind_ip_v4: Option<Ipv4Addr>,
181 bind_port_v4: Option<u16>,
182 bind_ip_v6: Option<Ipv6Addr>,
183 bind_port_v6: Option<u16>,
184 bootstrap: bool,
185 direct_node_addresses: Vec<NodeAddress>,
186 discovery: DiscoveryMap,
187 gossip_config: Option<GossipConfig>,
188 network_id: NetworkId,
189 protocols: ProtocolMap,
190 relay_mode: RelayMode,
191 private_key: Option<PrivateKey>,
192 sync_config: Option<SyncConfiguration<T>>,
193}
194
195impl<T> NetworkBuilder<T>
196where
197 T: TopicQuery,
198{
199 pub fn new(network_id: NetworkId) -> Self {
204 Self {
205 bind_ip_v4: None,
206 bind_port_v4: None,
207 bind_ip_v6: None,
208 bind_port_v6: None,
209 bootstrap: false,
210 direct_node_addresses: Vec::new(),
211 discovery: DiscoveryMap::default(),
212 gossip_config: None,
213 network_id,
214 protocols: Default::default(),
215 relay_mode: RelayMode::Disabled,
216 private_key: None,
217 sync_config: None,
218 }
219 }
220
221 pub fn from_config(config: Config) -> Self {
223 let mut network_builder = Self::new(config.network_id)
224 .bind_ip_v4(config.bind_ip_v4)
225 .bind_port_v4(config.bind_port_v4)
226 .bind_ip_v6(config.bind_ip_v6)
227 .bind_port_v6(config.bind_port_v6);
228
229 for addr in config.direct_node_addresses {
230 network_builder = network_builder.direct_address(
231 addr.public_key,
232 addr.direct_addresses,
233 addr.relay_url,
234 )
235 }
236
237 if let Some(url) = config.relay {
238 let port = url.port().unwrap_or(DEFAULT_STUN_PORT);
239 network_builder = network_builder.relay(url, false, port)
240 }
241
242 network_builder
243 }
244
245 pub fn bind_ip_v4(mut self, ip: Ipv4Addr) -> Self {
249 self.bind_ip_v4.replace(ip);
250 self
251 }
252
253 pub fn bind_port_v4(mut self, port: u16) -> Self {
257 self.bind_port_v4.replace(port);
258 self
259 }
260
261 pub fn bind_ip_v6(mut self, ip: Ipv6Addr) -> Self {
265 self.bind_ip_v6.replace(ip);
266 self
267 }
268
269 pub fn bind_port_v6(mut self, port: u16) -> Self {
273 self.bind_port_v6.replace(port);
274 self
275 }
276
277 pub fn bootstrap(mut self) -> Self {
282 self.bootstrap = true;
283 self
284 }
285
286 pub fn private_key(mut self, private_key: PrivateKey) -> Self {
291 self.private_key = Some(private_key);
292 self
293 }
294
295 pub fn relay(mut self, url: RelayUrl, stun_only: bool, stun_port: u16) -> Self {
303 self.relay_mode = RelayMode::Custom(RelayNode {
304 url: url.into(),
305 stun_only,
306 stun_port,
307 quic: None,
308 });
309 self
310 }
311
312 pub fn direct_address(
323 mut self,
324 public_key: PublicKey,
325 direct_addresses: Vec<SocketAddr>,
326 relay_url: Option<RelayUrl>,
327 ) -> Self {
328 self.direct_node_addresses.push(NodeAddress {
329 public_key,
330 direct_addresses,
331 relay_url,
332 });
333 self
334 }
335
336 pub fn discovery(mut self, handler: impl Discovery + 'static) -> Self {
338 self.discovery.add(handler);
339 self
340 }
341
342 pub fn sync(mut self, config: SyncConfiguration<T>) -> Self {
347 self.sync_config = Some(config);
348 self
349 }
350
351 pub fn gossip(mut self, config: GossipConfig) -> Self {
356 self.gossip_config = Some(config);
357 self
358 }
359
360 pub fn protocol(
362 mut self,
363 protocol_name: &'static [u8],
364 handler: impl ProtocolHandler + 'static,
365 ) -> Self {
366 self.protocols.insert(protocol_name, Arc::new(handler));
367 self
368 }
369
370 pub async fn build(mut self) -> Result<Network<T>>
382 where
383 T: TopicQuery + TopicId + 'static,
384 {
385 let private_key = self.private_key.unwrap_or_default();
386
387 let relay: Option<RelayNode> = match self.relay_mode {
388 RelayMode::Disabled => None,
389 RelayMode::Custom(ref node) => Some(node.clone()),
390 };
391
392 let endpoint = {
394 let mut transport_config = TransportConfig::default();
395 transport_config
396 .max_concurrent_bidi_streams(MAX_STREAMS.into())
397 .max_concurrent_uni_streams(0u32.into());
398
399 let relay_mode = match self.relay_mode {
400 RelayMode::Disabled => iroh::RelayMode::Disabled,
401 RelayMode::Custom(node) => iroh::RelayMode::Custom(
402 RelayMap::from_nodes(vec![node])
403 .expect("relay list can not contain duplicates"),
404 ),
405 };
406
407 let bind_ip_v4 = self.bind_ip_v4.unwrap_or(Ipv4Addr::UNSPECIFIED);
408 let bind_port_v4 = self.bind_port_v4.unwrap_or(DEFAULT_BIND_PORT);
409 let bind_ip_v6 = self.bind_ip_v6.unwrap_or(Ipv6Addr::UNSPECIFIED);
410 let bind_port_v6 = self.bind_port_v6.unwrap_or(DEFAULT_BIND_PORT + 1);
411 let socket_address_v4 = SocketAddrV4::new(bind_ip_v4, bind_port_v4);
412 let socket_address_v6 = SocketAddrV6::new(bind_ip_v6, bind_port_v6, 0, 0);
413
414 Endpoint::builder()
415 .transport_config(transport_config)
416 .secret_key(from_private_key(private_key.clone()))
417 .relay_mode(relay_mode)
418 .bind_addr_v4(socket_address_v4)
419 .bind_addr_v6(socket_address_v6)
420 .bind()
421 .await?
422 };
423
424 let node_addr = endpoint.node_addr().await?;
425
426 let gossip = Gossip::builder()
427 .max_message_size(self.gossip_config.unwrap_or_default().max_message_size)
428 .spawn(endpoint.clone())
429 .await?;
430
431 let engine = Engine::new(
432 self.bootstrap,
433 private_key.clone(),
434 self.network_id,
435 endpoint.clone(),
436 gossip.clone(),
437 self.sync_config,
438 );
439
440 let sync_handler = engine.sync_handler();
441
442 let inner = Arc::new(NetworkInner {
443 cancel_token: CancellationToken::new(),
444 relay: relay.clone(),
445 discovery: self.discovery,
446 endpoint: endpoint.clone(),
447 engine,
448 gossip: gossip.clone(),
449 network_id: self.network_id,
450 private_key,
451 });
452
453 self.protocols.insert(GOSSIP_ALPN, Arc::new(gossip.clone()));
454 if let Some(sync_handler) = sync_handler {
455 self.protocols
456 .insert(SYNC_CONNECTION_ALPN, Arc::new(sync_handler));
457 };
458 let protocols = Arc::new(self.protocols.clone());
459 let alpns = self.protocols.alpns();
460 if let Err(err) = inner.endpoint.set_alpns(alpns) {
461 inner.shutdown(protocols.clone()).await;
462 return Err(err);
463 }
464
465 let fut = inner
466 .clone()
467 .spawn(protocols.clone())
468 .instrument(error_span!("node", me=%node_addr.node_id.fmt_short()));
469 let task = tokio::task::spawn(fut);
470 let task_handle = AbortOnDropHandle::new(task)
471 .map_err(Box::new(|err: JoinError| err.to_string()) as JoinErrToStr)
472 .shared();
473
474 let network = Network {
475 inner,
476 task: task_handle,
477 protocols,
478 };
479
480 let wait_for_endpoints = {
483 async move {
484 tokio::time::timeout(
485 DIRECT_ADDRESSES_WAIT,
486 endpoint.direct_addresses().initialized(),
487 )
488 .await
489 .context("waiting for endpoint")?
490 .context("no endpoints given to establish at least one connection")?;
491 Ok(())
492 }
493 };
494
495 if let Err(err) = wait_for_endpoints.await {
496 network.shutdown().await.ok();
497 return Err(err);
498 }
499
500 for mut direct_addr in self.direct_node_addresses {
501 if direct_addr.relay_url.is_none() {
502 if let Some(ref relay_node) = relay {
506 direct_addr.relay_url = Some(to_relay_url(relay_node.url.clone()))
507 }
508 }
509
510 network.add_peer(direct_addr.clone()).await?;
511 }
512
513 Ok(network)
514 }
515}
516
517#[derive(Debug)]
518struct NetworkInner<T> {
519 cancel_token: CancellationToken,
520 relay: Option<RelayNode>,
521 discovery: DiscoveryMap,
522 endpoint: Endpoint,
523 engine: Engine<T>,
524 #[allow(dead_code)]
525 gossip: Gossip,
526 network_id: NetworkId,
527 #[allow(dead_code)]
528 private_key: PrivateKey,
529}
530
531impl<T> NetworkInner<T>
532where
533 T: TopicQuery + TopicId + 'static,
534{
535 async fn spawn(self: Arc<Self>, protocols: Arc<ProtocolMap>) {
545 let (ipv4, ipv6) = self.endpoint.bound_sockets();
546 debug!(
547 "listening at: {}{}",
548 ipv4,
549 ipv6.map(|addr| format!(" and {addr}")).unwrap_or_default()
550 );
551
552 let mut join_set = JoinSet::<Result<()>>::new();
553
554 {
556 let inner = self.clone();
557 join_set.spawn(async move {
558 let mut addrs_stream = inner.endpoint.direct_addresses().stream();
559 let mut my_node_addr = iroh::NodeAddr::from(inner.endpoint.node_id());
560 if let Some(relay) = &inner.relay {
561 my_node_addr = my_node_addr.with_relay_url(relay.url.clone());
562 }
563
564 while let Some(endpoints) = addrs_stream.next().await {
565 let direct_addresses: Option<Vec<SocketAddr>> = endpoints
567 .map(|endpoints| endpoints.iter().map(|endpoint| endpoint.addr).collect());
568 if let Some(addresses) = direct_addresses {
569 my_node_addr = my_node_addr.with_direct_addresses(addresses);
570 if let Err(err) = inner.discovery.update_local_address(&my_node_addr) {
571 warn!("failed to update direct addresses for discovery: {err:?}");
572 }
573 }
574 }
575
576 Ok(())
577 });
578 }
579
580 let mut discovery_stream = self
582 .discovery
583 .subscribe(self.network_id)
584 .expect("discovery map needs to be given");
585
586 loop {
587 tokio::select! {
588 biased;
590 _ = self.cancel_token.cancelled() => {
592 break;
593 },
594 Some(incoming) = self.endpoint.accept() => {
596 let connecting = match incoming.accept() {
599 Ok(connecting) => connecting,
600 Err(err) => {
601 warn!("incoming connection failed: {err:#}");
602 continue;
604 },
605 };
606 let protocols = protocols.clone();
607 join_set.spawn(async move {
608 handle_connection(connecting, protocols).await;
609 Ok(())
610 });
611 },
612 Some(event) = discovery_stream.next() => {
614 match event {
615 Ok(event) => {
616 if let Err(err) = self.engine.add_peer(to_node_addr(event.node_addr)).await {
617 error!("engine failed on add_peer: {err:?}");
618 break;
619 }
620 }
621 Err(err) => {
622 error!("discovery service failed: {err:?}");
623 break;
624 },
625 }
626 },
627 res = join_set.join_next(), if !join_set.is_empty() => {
629 match res {
630 Some(Err(outer)) => {
631 if outer.is_panic() {
632 error!("task panicked: {outer:?}");
633 break;
634 } else if outer.is_cancelled() {
635 debug!("task cancelled: {outer:?}");
636 } else {
637 error!("task failed: {outer:?}");
638 break;
639 }
640 }
641 Some(Ok(Err(inner))) => {
642 debug!("task errored: {inner:?}");
643 }
644 _ => {}
645 }
646 },
647 else => break,
648 }
649 }
650
651 self.shutdown(protocols).await;
652
653 join_set.shutdown().await;
655 }
656
657 async fn shutdown(&self, protocols: Arc<ProtocolMap>) {
659 debug!("close all connections and shutdown the node");
661 let _ = tokio::join!(
662 self.endpoint.close(),
666 self.engine.shutdown(),
667 protocols.shutdown(),
668 );
669 }
670}
671
672#[derive(Clone, Debug)]
684pub struct Network<T> {
685 inner: Arc<NetworkInner<T>>,
686 #[allow(dead_code)]
687 protocols: Arc<ProtocolMap>,
688 task: Shared<MapErr<AbortOnDropHandle<()>, JoinErrToStr>>,
694}
695
696impl<T> Network<T>
697where
698 T: TopicQuery + TopicId + 'static,
699{
700 pub async fn add_peer(&self, node_addr: NodeAddress) -> Result<()> {
702 self.inner.engine.add_peer(node_addr).await
703 }
704
705 pub async fn events(&self) -> Result<broadcast::Receiver<SystemEvent<T>>> {
710 self.inner.engine.events().await
711 }
712
713 pub async fn known_peers(&self) -> Result<Vec<NodeAddress>> {
715 self.inner.engine.known_peers().await
716 }
717
718 pub async fn direct_addresses(&self) -> Option<Vec<SocketAddr>> {
720 match self
721 .inner
722 .endpoint
723 .direct_addresses()
724 .initialized()
725 .await
726 .map(|addrs| addrs.into_iter().map(|direct| direct.addr).collect())
727 {
728 Ok(result) => Some(result),
729 Err(_) => None,
730 }
731 }
732
733 pub fn endpoint(&self) -> &Endpoint {
741 &self.inner.endpoint
742 }
743
744 pub fn node_id(&self) -> PublicKey {
746 PublicKey::from_bytes(self.inner.endpoint.node_id().as_bytes())
747 .expect("public key already checked")
748 }
749
750 pub async fn shutdown(self) -> Result<()> {
752 self.inner.cancel_token.cancel();
754
755 self.task.await.map_err(|err| anyhow!(err))?;
757
758 Ok(())
759 }
760
761 pub async fn subscribe(
764 &self,
765 topic: T,
766 ) -> Result<(
767 mpsc::Sender<ToNetwork>,
768 mpsc::Receiver<FromNetwork>,
769 oneshot::Receiver<()>,
770 )> {
771 let (to_network_tx, to_network_rx) = mpsc::channel::<ToNetwork>(128);
772 let (from_network_tx, from_network_rx) = mpsc::channel::<FromNetwork>(128);
773 let (gossip_ready_tx, gossip_ready_rx) = oneshot::channel();
774
775 self.inner
776 .engine
777 .subscribe(topic, from_network_tx, to_network_rx, gossip_ready_tx)
778 .await?;
779
780 Ok((to_network_tx, from_network_rx, gossip_ready_rx))
781 }
782}
783
784#[derive(Clone, Debug)]
786pub enum ToNetwork {
787 Message { bytes: Vec<u8> },
788}
789
790#[allow(clippy::large_enum_variant)]
792#[derive(Clone, Debug, Eq, PartialEq)]
793pub enum FromNetwork {
794 GossipMessage {
795 bytes: Vec<u8>,
796 delivered_from: PublicKey,
797 },
798 SyncMessage {
799 header: Vec<u8>,
800 payload: Option<Vec<u8>>,
801 delivered_from: PublicKey,
802 },
803}
804
805async fn handle_connection(
810 mut connecting: iroh::endpoint::Connecting,
811 protocols: Arc<ProtocolMap>,
812) {
813 let alpn = match connecting.alpn().await {
814 Ok(alpn) => alpn,
815 Err(err) => {
816 warn!("ignoring connection: invalid handshake: {:?}", err);
817 return;
818 }
819 };
820 let Some(handler) = protocols.get(&alpn) else {
821 warn!("ignoring connection: unsupported alpn protocol");
822 return;
823 };
824 if let Err(err) = handler.accept(connecting).await {
825 warn!("handling incoming connection ended with error: {err}");
826 }
827}
828
829pub(crate) type JoinErrToStr =
831 Box<dyn Fn(tokio::task::JoinError) -> String + Send + Sync + 'static>;
832
833#[cfg(test)]
834mod tests {
835 use std::collections::HashMap;
836 use std::net::{Ipv4Addr, Ipv6Addr};
837 use std::path::PathBuf;
838 use std::time::Duration;
839
840 use async_trait::async_trait;
841 use iroh::{RelayNode, RelayUrl as IrohRelayUrl};
842 use p2panda_core::{Body, Extensions, Hash, Header, PrivateKey, PublicKey};
843 use p2panda_discovery::mdns::LocalDiscovery;
844 use p2panda_store::{MemoryStore, OperationStore};
845 use p2panda_sync::TopicQuery;
846 use p2panda_sync::log_sync::{LogSyncProtocol, TopicLogMap};
847 use p2panda_sync::test_protocols::{
848 FailingProtocol, PingPongProtocol, SyncTestTopic as TestTopic,
849 };
850 use tokio::task::JoinHandle;
851
852 use crate::addrs::{DEFAULT_STUN_PORT, to_node_addr};
853 use crate::bytes::ToBytes;
854 use crate::config::Config;
855 use crate::events::SystemEvent;
856 use crate::sync::SyncConfiguration;
857 use crate::{NetworkBuilder, NodeAddress, RelayMode, RelayUrl, TopicId, to_public_key};
858
859 use super::{FromNetwork, Network, ToNetwork};
860
861 impl TopicId for TestTopic {
862 fn id(&self) -> [u8; 32] {
863 self.1
864 }
865 }
866
867 fn create_operation<E: Extensions>(
868 private_key: &PrivateKey,
869 body: &Body,
870 seq_num: u64,
871 timestamp: u64,
872 backlink: Option<Hash>,
873 extensions: Option<E>,
874 ) -> (Hash, Header<E>, Vec<u8>) {
875 let mut header = Header {
876 version: 1,
877 public_key: private_key.public_key(),
878 signature: None,
879 payload_size: body.size(),
880 payload_hash: Some(body.hash()),
881 timestamp,
882 seq_num,
883 backlink,
884 previous: vec![],
885 extensions,
886 };
887 header.sign(private_key);
888 let header_bytes = header.to_bytes();
889 (header.hash(), header, header_bytes)
890 }
891
892 fn run_node<T: TopicId + TopicQuery + 'static>(node: Network<T>, topic: T) -> JoinHandle<()> {
893 tokio::spawn(async move {
894 let (_tx, mut rx, ready) = node.subscribe(topic).await.unwrap();
895
896 assert!(ready.await.is_ok());
898
899 loop {
901 let msg = rx.recv().await.unwrap();
902 println!("{msg:?}");
903 match msg {
904 FromNetwork::SyncMessage { .. } => break,
905 _ => (),
906 }
907 }
908
909 tokio::time::sleep(Duration::from_secs(3)).await;
911 node.shutdown().await.unwrap();
912 })
913 }
914
915 #[tokio::test]
916 async fn config() {
917 let direct_node_public_key = PrivateKey::new().public_key();
918 let relay_address: RelayUrl = "https://example.net".parse().unwrap();
919
920 let config = Config {
921 bind_ip_v4: Ipv4Addr::new(7, 7, 7, 7),
922 bind_port_v4: 2024,
923 bind_ip_v6: Ipv6Addr::new(8, 8, 8, 8, 8, 8, 8, 8),
924 bind_port_v6: 2025,
925 network_id: [1; 32],
926 private_key: Some(PathBuf::new().join("secret-key.txt")),
927 direct_node_addresses: vec![NodeAddress {
928 public_key: direct_node_public_key,
929 direct_addresses: vec!["0.0.0.0:2026".parse().unwrap()],
930 relay_url: None,
931 }],
932 relay: Some(relay_address.clone()),
933 };
934
935 let builder = NetworkBuilder::<TestTopic>::from_config(config);
936
937 assert_eq!(builder.bind_ip_v4, Some(Ipv4Addr::new(7, 7, 7, 7)));
938 assert_eq!(builder.bind_port_v4, Some(2024));
939 assert_eq!(
940 builder.bind_ip_v6,
941 Some(Ipv6Addr::new(8, 8, 8, 8, 8, 8, 8, 8))
942 );
943 assert_eq!(builder.bind_port_v6, Some(2025));
944 assert_eq!(builder.network_id, [1; 32]);
945 assert!(builder.private_key.is_none());
946 assert_eq!(builder.direct_node_addresses.len(), 1);
947 let relay_node = RelayNode {
948 url: IrohRelayUrl::from(relay_address),
949 stun_only: false,
950 stun_port: DEFAULT_STUN_PORT,
951 quic: None,
952 };
953 assert_eq!(builder.relay_mode, RelayMode::Custom(relay_node));
954 }
955
956 #[tokio::test]
957 async fn join_gossip_overlay() {
958 let network_id = [1; 32];
959 let topic = TestTopic::new("chat");
960
961 let node_1 = NetworkBuilder::new(network_id).build().await.unwrap();
962 let node_2 = NetworkBuilder::new(network_id).build().await.unwrap();
963
964 let node_1_addr = node_1.endpoint().node_addr().await.unwrap();
965 let node_2_addr = node_2.endpoint().node_addr().await.unwrap();
966
967 node_1.add_peer(to_node_addr(node_2_addr)).await.unwrap();
968 node_2.add_peer(to_node_addr(node_1_addr)).await.unwrap();
969
970 let (tx_1, _rx_1, ready_1) = node_1.subscribe(topic.clone()).await.unwrap();
972 let (_tx_2, mut rx_2, ready_2) = node_2.subscribe(topic).await.unwrap();
973
974 assert!(ready_2.await.is_ok());
976 assert!(ready_1.await.is_ok());
977
978 tx_1.send(ToNetwork::Message {
980 bytes: "Hello, Node".to_bytes(),
981 })
982 .await
983 .unwrap();
984
985 let rx_2_msg = rx_2.recv().await.unwrap();
986 assert_eq!(
987 rx_2_msg,
988 FromNetwork::GossipMessage {
989 bytes: "Hello, Node".to_bytes(),
990 delivered_from: node_1.node_id(),
991 }
992 );
993
994 node_1.shutdown().await.unwrap();
995 node_2.shutdown().await.unwrap();
996 }
997
998 #[tokio::test]
999 async fn join_gossip_overlay_with_local_discovery() {
1000 let network_id = [1; 32];
1001 let topic = TestTopic::new("chat");
1002
1003 let node_1 = NetworkBuilder::new(network_id)
1005 .discovery(LocalDiscovery::new())
1006 .build()
1007 .await
1008 .unwrap();
1009 let node_2 = NetworkBuilder::new(network_id)
1010 .discovery(LocalDiscovery::new())
1011 .build()
1012 .await
1013 .unwrap();
1014
1015 let (tx_1, _rx_1, ready_1) = node_1.subscribe(topic.clone()).await.unwrap();
1017 let (_tx_2, mut rx_2, ready_2) = node_2.subscribe(topic).await.unwrap();
1018
1019 assert!(ready_2.await.is_ok());
1021 assert!(ready_1.await.is_ok());
1022
1023 tx_1.send(ToNetwork::Message {
1025 bytes: "Hello, Node".to_bytes(),
1026 })
1027 .await
1028 .unwrap();
1029
1030 let rx_2_msg = rx_2.recv().await.unwrap();
1031 assert_eq!(
1032 rx_2_msg,
1033 FromNetwork::GossipMessage {
1034 bytes: "Hello, Node".to_bytes(),
1035 delivered_from: node_1.node_id(),
1036 }
1037 );
1038
1039 node_1.shutdown().await.unwrap();
1040 node_2.shutdown().await.unwrap();
1041 }
1042
1043 #[tokio::test]
1044 async fn join_gossip_overlay_with_relay() {
1045 let network_id = [1; 32];
1046 let topic = TestTopic::new("chat");
1047
1048 let relay_url: RelayUrl = "https://wasser.liebechaos.org/".parse().unwrap();
1052
1053 let node_1 = NetworkBuilder::new(network_id)
1055 .bootstrap()
1056 .relay(relay_url.clone(), false, 0)
1057 .build()
1058 .await
1059 .unwrap();
1060 node_1.endpoint().home_relay().initialized().await.unwrap();
1062
1063 let node_2 = NetworkBuilder::new(network_id)
1065 .relay(relay_url, false, 0)
1066 .direct_address(node_1.node_id(), vec![], None)
1067 .build()
1068 .await
1069 .unwrap();
1070
1071 let (tx_1, _rx_1, ready_1) = node_1.subscribe(topic.clone()).await.unwrap();
1073 let (_tx_2, mut rx_2, ready_2) = node_2.subscribe(topic).await.unwrap();
1074
1075 assert!(ready_1.await.is_ok());
1077 assert!(ready_2.await.is_ok());
1078
1079 tx_1.send(ToNetwork::Message {
1081 bytes: "Hello, Node".to_bytes(),
1082 })
1083 .await
1084 .unwrap();
1085
1086 let rx_2_msg = rx_2.recv().await.unwrap();
1087 assert_eq!(
1088 rx_2_msg,
1089 FromNetwork::GossipMessage {
1090 bytes: "Hello, Node".to_bytes(),
1091 delivered_from: node_1.node_id(),
1092 }
1093 );
1094
1095 node_1.shutdown().await.unwrap();
1096 node_2.shutdown().await.unwrap();
1097 }
1098
1099 #[tokio::test]
1100 async fn ping_pong() {
1101 let network_id = [1; 32];
1102 let topic = TestTopic::new("ping_pong");
1103 let ping_pong = PingPongProtocol {};
1104 let sync_config = SyncConfiguration::new(ping_pong);
1105
1106 let node_1 = NetworkBuilder::new(network_id)
1107 .sync(sync_config.clone())
1108 .build()
1109 .await
1110 .unwrap();
1111 let node_2 = NetworkBuilder::new(network_id)
1112 .sync(sync_config)
1113 .build()
1114 .await
1115 .unwrap();
1116
1117 let node_1_addr = node_1.endpoint().node_addr().await.unwrap();
1118 let node_2_addr = node_2.endpoint().node_addr().await.unwrap();
1119
1120 node_1.add_peer(to_node_addr(node_1_addr)).await.unwrap();
1121 node_2.add_peer(to_node_addr(node_2_addr)).await.unwrap();
1122
1123 let topic_clone = topic.clone();
1125 let handle1 = tokio::spawn(async move {
1126 let (_tx, _rx, _ready) = node_1.subscribe(topic_clone).await.unwrap();
1127 tokio::time::sleep(Duration::from_secs(2)).await;
1128 node_1.shutdown().await.unwrap();
1129 });
1130 let handle2 = tokio::spawn(async move {
1131 let (_tx, _rx, _ready) = node_2.subscribe(topic).await.unwrap();
1132 tokio::time::sleep(Duration::from_secs(2)).await;
1133 node_2.shutdown().await.unwrap();
1134 });
1135
1136 let (result1, result2) = tokio::join!(handle1, handle2);
1137 assert!(result1.is_ok());
1138 assert!(result2.is_ok());
1139 }
1140
1141 type Logs<T> = HashMap<PublicKey, Vec<T>>;
1142
1143 #[derive(Clone, Debug)]
1144 struct LogIdTopicMap<T>(HashMap<T, Logs<u64>>);
1145
1146 impl<T> LogIdTopicMap<T>
1147 where
1148 T: TopicQuery,
1149 {
1150 pub fn new() -> Self {
1151 LogIdTopicMap(HashMap::new())
1152 }
1153
1154 fn insert(&mut self, topic: T, logs: Logs<u64>) -> Option<Logs<u64>> {
1155 self.0.insert(topic, logs)
1156 }
1157 }
1158
1159 #[async_trait]
1160 impl<T> TopicLogMap<T, u64> for LogIdTopicMap<T>
1161 where
1162 T: TopicQuery,
1163 {
1164 async fn get(&self, topic: &T) -> Option<Logs<u64>> {
1165 self.0.get(topic).cloned()
1166 }
1167 }
1168
1169 #[tokio::test]
1170 async fn e2e_log_height_sync() {
1171 const NETWORK_ID: [u8; 32] = [1; 32];
1172
1173 let peer_a_private_key = PrivateKey::new();
1174 let peer_b_private_key = PrivateKey::new();
1175
1176 let topic = TestTopic::new("event_logs");
1177 let log_id = 0;
1178 let logs = HashMap::from([(peer_a_private_key.public_key(), vec![log_id])]);
1179
1180 let mut topic_map = LogIdTopicMap::new();
1181 topic_map.insert(topic.clone(), logs);
1182
1183 let store_a = MemoryStore::default();
1185 let protocol_a = LogSyncProtocol::new(topic_map.clone(), store_a);
1186 let sync_config_a = SyncConfiguration::new(protocol_a);
1187
1188 let body = Body::new("Hello, Sloth!".as_bytes());
1190 let (hash_0, header_0, header_bytes_0) =
1191 create_operation(&peer_a_private_key, &body, 0, 0, None, None);
1192 let (hash_1, header_1, header_bytes_1) =
1193 create_operation(&peer_a_private_key, &body, 1, 100, Some(hash_0), None);
1194 let (hash_2, header_2, header_bytes_2) =
1195 create_operation(&peer_a_private_key, &body, 2, 200, Some(hash_1), None);
1196
1197 let mut store_b = MemoryStore::default();
1199 store_b
1200 .insert_operation(hash_0, &header_0, Some(&body), &header_bytes_0, &log_id)
1201 .await
1202 .unwrap();
1203 store_b
1204 .insert_operation(hash_1, &header_1, Some(&body), &header_bytes_1, &log_id)
1205 .await
1206 .unwrap();
1207 store_b
1208 .insert_operation(hash_2, &header_2, Some(&body), &header_bytes_2, &log_id)
1209 .await
1210 .unwrap();
1211
1212 let protocol_b = LogSyncProtocol::new(topic_map, store_b);
1214 let sync_config_b = SyncConfiguration::new(protocol_b);
1215
1216 let node_a = NetworkBuilder::new(NETWORK_ID)
1218 .sync(sync_config_a)
1219 .private_key(peer_a_private_key)
1220 .build()
1221 .await
1222 .unwrap();
1223
1224 let node_b = NetworkBuilder::new(NETWORK_ID)
1226 .sync(sync_config_b)
1227 .private_key(peer_b_private_key.clone())
1228 .build()
1229 .await
1230 .unwrap();
1231
1232 let node_a_addr = node_a.endpoint().node_addr().await.unwrap();
1233 let node_b_addr = node_b.endpoint().node_addr().await.unwrap();
1234
1235 node_a.add_peer(to_node_addr(node_b_addr)).await.unwrap();
1236 node_b.add_peer(to_node_addr(node_a_addr)).await.unwrap();
1237
1238 let topic_clone = topic.clone();
1240 let handle1 = tokio::spawn(async move {
1241 let (_tx, mut from_sync_rx, ready) = node_a.subscribe(topic_clone).await.unwrap();
1242
1243 assert!(ready.await.is_ok());
1245
1246 let mut from_sync_messages = Vec::new();
1247 while let Some(message) = from_sync_rx.recv().await {
1248 from_sync_messages.push(message);
1249 if from_sync_messages.len() == 3 {
1250 break;
1251 }
1252 }
1253
1254 let peer_a_expected_messages = vec![
1257 FromNetwork::SyncMessage {
1258 header: header_bytes_0.to_vec(),
1259 payload: Some(body.to_bytes()),
1260 delivered_from: peer_b_private_key.public_key(),
1261 },
1262 FromNetwork::SyncMessage {
1263 header: header_bytes_1.to_vec(),
1264 payload: Some(body.to_bytes()),
1265 delivered_from: peer_b_private_key.public_key(),
1266 },
1267 FromNetwork::SyncMessage {
1268 header: header_bytes_2.to_vec(),
1269 payload: Some(body.to_bytes()),
1270 delivered_from: peer_b_private_key.public_key(),
1271 },
1272 ];
1273
1274 assert_eq!(from_sync_messages, peer_a_expected_messages);
1276
1277 node_a.shutdown().await.unwrap();
1278 });
1279
1280 let handle2 = tokio::spawn(async move {
1281 let (_tx, _from_sync_rx, ready) = node_b.subscribe(topic).await.unwrap();
1282
1283 assert!(ready.await.is_ok());
1285
1286 tokio::time::sleep(Duration::from_secs(2)).await;
1288
1289 node_b.shutdown().await.unwrap();
1290 });
1291
1292 let (result1, result2) = tokio::join!(handle1, handle2);
1294
1295 assert!(result1.is_ok());
1296 assert!(result2.is_ok())
1297 }
1298
1299 #[tokio::test]
1300 async fn multi_hop_join_gossip_overlay() {
1301 let network_id = [1; 32];
1302 let chat_topic = TestTopic::new("chat");
1303
1304 let node_1 = NetworkBuilder::new(network_id).build().await.unwrap();
1305 let node_2 = NetworkBuilder::new(network_id).build().await.unwrap();
1306 let node_3 = NetworkBuilder::new(network_id).build().await.unwrap();
1307
1308 let node_1_addr = node_1.endpoint().node_addr().await.unwrap();
1309 let node_2_addr = node_2.endpoint().node_addr().await.unwrap();
1310
1311 node_1
1312 .add_peer(to_node_addr(node_2_addr.clone()))
1313 .await
1314 .unwrap();
1315 node_2.add_peer(to_node_addr(node_1_addr)).await.unwrap();
1316 node_3.add_peer(to_node_addr(node_2_addr)).await.unwrap();
1317
1318 let (tx_1, _rx_1, ready_1) = node_1.subscribe(chat_topic.clone()).await.unwrap();
1320 let (_tx_2, mut rx_2, ready_2) = node_2.subscribe(chat_topic.clone()).await.unwrap();
1321 let (_tx_3, mut rx_3, ready_3) = node_3.subscribe(chat_topic).await.unwrap();
1322
1323 assert!(ready_3.await.is_ok());
1325 assert!(ready_2.await.is_ok());
1326 assert!(ready_1.await.is_ok());
1327
1328 tx_1.send(ToNetwork::Message {
1330 bytes: "Hello, Node".to_bytes(),
1331 })
1332 .await
1333 .unwrap();
1334
1335 let rx_2_msg = rx_2.recv().await.unwrap();
1336 assert_eq!(
1337 rx_2_msg,
1338 FromNetwork::GossipMessage {
1339 bytes: "Hello, Node".to_bytes(),
1340 delivered_from: node_1.node_id(),
1342 }
1343 );
1344
1345 let rx_3_msg = rx_3.recv().await.unwrap();
1346 assert_eq!(
1347 rx_3_msg,
1348 FromNetwork::GossipMessage {
1349 bytes: "Hello, Node".to_bytes(),
1350 delivered_from: node_1.node_id(),
1352 }
1353 );
1354
1355 node_1.shutdown().await.unwrap();
1356 node_2.shutdown().await.unwrap();
1357 node_3.shutdown().await.unwrap();
1358 }
1359
1360 #[tokio::test]
1361 async fn multi_hop_topic_discovery_and_sync() {
1362 let network_id = [1; 32];
1363 let topic = TestTopic::new("chat");
1364 let sync_config = SyncConfiguration::new(PingPongProtocol {});
1365
1366 let node_1 = NetworkBuilder::new(network_id)
1368 .sync(sync_config.clone())
1369 .build()
1370 .await
1371 .unwrap();
1372 let node_2 = NetworkBuilder::new(network_id)
1373 .sync(sync_config.clone())
1374 .build()
1375 .await
1376 .unwrap();
1377 let node_3 = NetworkBuilder::new(network_id)
1378 .bootstrap()
1379 .sync(sync_config.clone())
1380 .build()
1381 .await
1382 .unwrap();
1383 let node_4 = NetworkBuilder::new(network_id)
1384 .bootstrap()
1385 .sync(sync_config.clone())
1386 .build()
1387 .await
1388 .unwrap();
1389
1390 let node_1_addr = node_1.endpoint().node_addr().await.unwrap();
1391 let node_2_addr = node_2.endpoint().node_addr().await.unwrap();
1392 let node_3_addr = node_3.endpoint().node_addr().await.unwrap();
1393
1394 node_1
1396 .add_peer(to_node_addr(node_2_addr.clone()))
1397 .await
1398 .unwrap();
1399 node_2.add_peer(to_node_addr(node_1_addr)).await.unwrap();
1400 node_3
1401 .add_peer(to_node_addr(node_2_addr.clone()))
1402 .await
1403 .unwrap();
1404 node_4
1405 .add_peer(to_node_addr(node_3_addr.clone()))
1406 .await
1407 .unwrap();
1408
1409 let handle1 = run_node(node_1, topic.clone());
1413 let handle2 = run_node(node_2, topic.clone());
1414 let handle3 = run_node(node_3, topic.clone());
1415 let handle4 = run_node(node_4, topic.clone());
1416
1417 let (result1, result2, result3, result4) = tokio::join!(handle1, handle2, handle3, handle4);
1418
1419 assert!(result1.is_ok());
1420 assert!(result2.is_ok());
1421 assert!(result3.is_ok());
1422 assert!(result4.is_ok());
1423 }
1424
1425 #[tokio::test]
1426 async fn gossip_and_sync_events() {
1427 let network_id = [17; 32];
1428 let chat_topic = TestTopic::new("chat");
1429 let chat_topic_id = chat_topic.clone().id();
1430 let sync_config = SyncConfiguration::new(PingPongProtocol {});
1431
1432 let node_1 = NetworkBuilder::new(network_id)
1433 .sync(sync_config.clone())
1434 .build()
1435 .await
1436 .unwrap();
1437 let node_2 = NetworkBuilder::new(network_id)
1438 .sync(sync_config.clone())
1439 .build()
1440 .await
1441 .unwrap();
1442
1443 let node_2_id = node_2.endpoint().node_id();
1444
1445 let node_1_addr = node_1.endpoint().node_addr().await.unwrap();
1446 let node_2_addr = node_2.endpoint().node_addr().await.unwrap();
1447
1448 node_1
1449 .add_peer(to_node_addr(node_2_addr.clone()))
1450 .await
1451 .unwrap();
1452 node_2
1453 .add_peer(to_node_addr(node_1_addr.clone()))
1454 .await
1455 .unwrap();
1456
1457 let mut event_rx_1 = node_1.events().await.unwrap();
1459
1460 let (_tx_1, _rx_1, ready_1) = node_1.subscribe(chat_topic.clone()).await.unwrap();
1462 let (_tx_2, _rx_2, ready_2) = node_2.subscribe(chat_topic.clone()).await.unwrap();
1463
1464 assert!(ready_2.await.is_ok());
1466 assert!(ready_1.await.is_ok());
1467
1468 let node_3 = NetworkBuilder::new(network_id).build().await.unwrap();
1470 let node_3_id = node_3.endpoint().node_id();
1471 node_3.add_peer(to_node_addr(node_1_addr)).await.unwrap();
1472 let (_tx_3, _rx_3, ready_3) = node_3.subscribe(chat_topic.clone()).await.unwrap();
1473 assert!(ready_3.await.is_ok());
1474
1475 let mut expected_events = vec![
1477 SystemEvent::GossipJoined {
1479 topic_id: network_id,
1480 peers: vec![to_public_key(node_2_id)],
1481 },
1482 SystemEvent::PeerDiscovered {
1484 peer: to_public_key(node_2_id),
1485 },
1486 SystemEvent::SyncStarted {
1488 topic: None,
1489 peer: to_public_key(node_2_id),
1490 },
1491 SystemEvent::SyncDone {
1493 topic: chat_topic.clone(),
1494 peer: to_public_key(node_2_id),
1495 },
1496 SystemEvent::SyncStarted {
1498 topic: Some(chat_topic.clone()),
1499 peer: to_public_key(node_2_id),
1500 },
1501 SystemEvent::GossipJoined {
1503 topic_id: chat_topic_id,
1504 peers: vec![to_public_key(node_2_id)],
1505 },
1506 SystemEvent::SyncDone {
1508 topic: chat_topic.clone(),
1509 peer: to_public_key(node_2_id),
1510 },
1511 SystemEvent::GossipNeighborUp {
1513 topic_id: network_id,
1514 peer: to_public_key(node_3_id),
1515 },
1516 SystemEvent::PeerDiscovered {
1518 peer: to_public_key(node_2_id),
1519 },
1520 SystemEvent::PeerDiscovered {
1522 peer: to_public_key(node_3_id),
1523 },
1524 ];
1525
1526 let mut received_events = Vec::new();
1528 while let Ok(event) = event_rx_1.recv().await {
1529 assert!(expected_events.contains(&event));
1530 let index = expected_events.iter().position(|ev| *ev == event).unwrap();
1531 received_events.push(expected_events.remove(index));
1532 if received_events.len() == 10 {
1533 break;
1534 }
1535 }
1536
1537 node_1.shutdown().await.unwrap();
1538 node_2.shutdown().await.unwrap();
1539 }
1540
1541 #[tokio::test]
1542 async fn resync_after_error() {
1543 let network_id = [17; 32];
1544 let chat_topic = TestTopic::new("chat");
1545 let sync_config = SyncConfiguration::new(FailingProtocol::InitiatorFailsUnexpected);
1546
1547 let node_1 = NetworkBuilder::new(network_id)
1548 .sync(sync_config.clone())
1549 .build()
1550 .await
1551 .unwrap();
1552 let node_2 = NetworkBuilder::new(network_id)
1553 .sync(sync_config.clone())
1554 .build()
1555 .await
1556 .unwrap();
1557
1558 let node_2_id = node_2.endpoint().node_id();
1559
1560 let node_1_addr = node_1.endpoint().node_addr().await.unwrap();
1561 let node_2_addr = node_2.endpoint().node_addr().await.unwrap();
1562
1563 node_1
1564 .add_peer(to_node_addr(node_2_addr.clone()))
1565 .await
1566 .unwrap();
1567 node_2
1568 .add_peer(to_node_addr(node_1_addr.clone()))
1569 .await
1570 .unwrap();
1571
1572 let mut event_rx_1 = node_1.events().await.unwrap();
1574
1575 let (_tx_1, _rx_1, ready_1) = node_1.subscribe(chat_topic.clone()).await.unwrap();
1577 let (_tx_2, _rx_2, ready_2) = node_2.subscribe(chat_topic.clone()).await.unwrap();
1578
1579 assert!(ready_2.await.is_ok());
1581 assert!(ready_1.await.is_ok());
1582
1583 let expected_events = vec![
1585 SystemEvent::SyncStarted {
1587 topic: None,
1588 peer: to_public_key(node_2_id),
1589 },
1590 SystemEvent::SyncFailed {
1592 topic: None,
1593 peer: to_public_key(node_2_id),
1594 },
1595 SystemEvent::SyncStarted {
1597 topic: None,
1598 peer: to_public_key(node_2_id),
1599 },
1600 SystemEvent::SyncStarted {
1605 topic: Some(chat_topic.clone()),
1606 peer: to_public_key(node_2_id),
1607 },
1608 SystemEvent::SyncStarted {
1610 topic: Some(chat_topic.clone()),
1611 peer: to_public_key(node_2_id),
1612 },
1613 ];
1614
1615 let mut received_events = Vec::new();
1617 while let Ok(event) = event_rx_1.recv().await {
1618 received_events.push(event);
1619
1620 if received_events.len() == 14 {
1622 break;
1623 }
1624 }
1625
1626 expected_events.into_iter().for_each(|event| {
1629 assert!(received_events.contains(&event));
1630
1631 let index = received_events.iter().position(|ev| *ev == event).unwrap();
1634 received_events.remove(index);
1635 });
1636
1637 node_1.shutdown().await.unwrap();
1638 node_2.shutdown().await.unwrap();
1639 }
1640}