1use std::fmt::Debug;
120use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
121use std::sync::Arc;
122use std::time::Duration;
123
124use anyhow::{anyhow, Context, Result};
125use futures_lite::StreamExt;
126use futures_util::future::{MapErr, Shared};
127use futures_util::{FutureExt, TryFutureExt};
128use iroh_gossip::net::{Gossip, GOSSIP_ALPN};
129use iroh_net::endpoint::TransportConfig;
130use iroh_net::key::SecretKey;
131use iroh_net::relay::{RelayMap, RelayNode};
132use iroh_net::{Endpoint, NodeAddr, NodeId};
133use p2panda_core::{PrivateKey, PublicKey};
134use p2panda_discovery::{Discovery, DiscoveryMap};
135use p2panda_sync::TopicQuery;
136use tokio::sync::{mpsc, oneshot};
137use tokio::task::{JoinError, JoinSet};
138use tokio_util::sync::CancellationToken;
139use tokio_util::task::AbortOnDropHandle;
140use tracing::{debug, error, error_span, warn, Instrument};
141
142use crate::addrs::DEFAULT_STUN_PORT;
143use crate::config::{Config, GossipConfig, DEFAULT_BIND_PORT};
144use crate::engine::Engine;
145use crate::protocols::{ProtocolHandler, ProtocolMap};
146use crate::sync::{SyncConfiguration, SYNC_CONNECTION_ALPN};
147use crate::{NetworkId, RelayUrl, TopicId};
148
149const MAX_STREAMS: u32 = 1024;
151
152const DIRECT_ADDRESSES_WAIT: Duration = Duration::from_secs(5);
154
155#[derive(Debug, PartialEq)]
157pub enum RelayMode {
158 Disabled,
163
164 Custom(RelayNode),
173}
174
175#[derive(Debug)]
180pub struct NetworkBuilder<T> {
181 bind_ip_v4: Option<Ipv4Addr>,
182 bind_port_v4: Option<u16>,
183 bind_ip_v6: Option<Ipv6Addr>,
184 bind_port_v6: Option<u16>,
185 direct_node_addresses: Vec<NodeAddr>,
186 discovery: DiscoveryMap,
187 gossip_config: Option<GossipConfig>,
188 network_id: NetworkId,
189 protocols: ProtocolMap,
190 relay_mode: RelayMode,
191 secret_key: Option<SecretKey>,
192 sync_config: Option<SyncConfiguration<T>>,
193}
194
195impl<T> NetworkBuilder<T>
196where
197 T: TopicQuery,
198{
199 pub fn new(network_id: NetworkId) -> Self {
204 Self {
205 bind_ip_v4: None,
206 bind_port_v4: None,
207 bind_ip_v6: None,
208 bind_port_v6: None,
209 direct_node_addresses: Vec::new(),
210 discovery: DiscoveryMap::default(),
211 gossip_config: None,
212 network_id,
213 protocols: Default::default(),
214 relay_mode: RelayMode::Disabled,
215 secret_key: None,
216 sync_config: None,
217 }
218 }
219
220 pub fn from_config(config: Config) -> Self {
222 let mut network_builder = Self::new(config.network_id)
223 .bind_ip_v4(config.bind_ip_v4)
224 .bind_port_v4(config.bind_port_v4)
225 .bind_ip_v6(config.bind_ip_v6)
226 .bind_port_v6(config.bind_port_v6);
227
228 for (public_key, addresses, relay_addr) in config.direct_node_addresses {
229 network_builder = network_builder.direct_address(public_key, addresses, relay_addr)
230 }
231
232 if let Some(url) = config.relay {
233 let port = url.port().unwrap_or(DEFAULT_STUN_PORT);
234 network_builder = network_builder.relay(url, false, port)
235 }
236
237 network_builder
238 }
239
240 pub fn bind_ip_v4(mut self, ip: Ipv4Addr) -> Self {
244 self.bind_ip_v4.replace(ip);
245 self
246 }
247
248 pub fn bind_port_v4(mut self, port: u16) -> Self {
252 self.bind_port_v4.replace(port);
253 self
254 }
255
256 pub fn bind_ip_v6(mut self, ip: Ipv6Addr) -> Self {
260 self.bind_ip_v6.replace(ip);
261 self
262 }
263
264 pub fn bind_port_v6(mut self, port: u16) -> Self {
268 self.bind_port_v6.replace(port);
269 self
270 }
271
272 pub fn private_key(mut self, private_key: PrivateKey) -> Self {
277 self.secret_key = Some(SecretKey::from_bytes(private_key.as_bytes()));
278 self
279 }
280
281 pub fn relay(mut self, url: RelayUrl, stun_only: bool, stun_port: u16) -> Self {
289 self.relay_mode = RelayMode::Custom(RelayNode {
290 url: url.into(),
291 stun_only,
292 stun_port,
293 });
294 self
295 }
296
297 pub fn direct_address(
308 mut self,
309 node_id: PublicKey,
310 addresses: Vec<SocketAddr>,
311 relay_addr: Option<RelayUrl>,
312 ) -> Self {
313 let node_id = NodeId::from_bytes(node_id.as_bytes()).expect("invalid public key");
314 let mut node_addr = NodeAddr::new(node_id).with_direct_addresses(addresses);
315 if let Some(url) = relay_addr {
316 node_addr = node_addr.with_relay_url(url.into());
317 }
318 self.direct_node_addresses.push(node_addr);
319 self
320 }
321
322 pub fn discovery(mut self, handler: impl Discovery + 'static) -> Self {
324 self.discovery.add(handler);
325 self
326 }
327
328 pub fn sync(mut self, config: SyncConfiguration<T>) -> Self {
333 self.sync_config = Some(config);
334 self
335 }
336
337 pub fn gossip(mut self, config: GossipConfig) -> Self {
342 self.gossip_config = Some(config);
343 self
344 }
345
346 pub fn protocol(
348 mut self,
349 protocol_name: &'static [u8],
350 handler: impl ProtocolHandler + 'static,
351 ) -> Self {
352 self.protocols.insert(protocol_name, Arc::new(handler));
353 self
354 }
355
356 pub async fn build(mut self) -> Result<Network<T>>
368 where
369 T: TopicQuery + TopicId + 'static,
370 {
371 let secret_key = self.secret_key.unwrap_or(SecretKey::generate());
372
373 let relay: Option<RelayNode> = match self.relay_mode {
374 RelayMode::Disabled => None,
375 RelayMode::Custom(ref node) => Some(node.clone()),
376 };
377
378 let endpoint = {
380 let mut transport_config = TransportConfig::default();
381 transport_config
382 .max_concurrent_bidi_streams(MAX_STREAMS.into())
383 .max_concurrent_uni_streams(0u32.into());
384
385 let relay_mode = match self.relay_mode {
386 RelayMode::Disabled => iroh_net::relay::RelayMode::Disabled,
387 RelayMode::Custom(node) => iroh_net::relay::RelayMode::Custom(
388 RelayMap::from_nodes(vec![node])
389 .expect("relay list can not contain duplicates"),
390 ),
391 };
392
393 let bind_ip_v4 = self.bind_ip_v4.unwrap_or(Ipv4Addr::UNSPECIFIED);
394 let bind_port_v4 = self.bind_port_v4.unwrap_or(DEFAULT_BIND_PORT);
395 let bind_ip_v6 = self.bind_ip_v6.unwrap_or(Ipv6Addr::UNSPECIFIED);
396 let bind_port_v6 = self.bind_port_v6.unwrap_or(DEFAULT_BIND_PORT + 1);
397 let socket_address_v4 = SocketAddrV4::new(bind_ip_v4, bind_port_v4);
398 let socket_address_v6 = SocketAddrV6::new(bind_ip_v6, bind_port_v6, 0, 0);
399
400 Endpoint::builder()
401 .transport_config(transport_config)
402 .secret_key(secret_key.clone())
403 .relay_mode(relay_mode)
404 .bind_addr_v4(socket_address_v4)
405 .bind_addr_v6(socket_address_v6)
406 .bind()
407 .await?
408 };
409
410 let node_addr = endpoint.node_addr().await?;
411
412 let gossip = Gossip::from_endpoint(
413 endpoint.clone(),
414 self.gossip_config.unwrap_or_default(),
415 &node_addr.info,
416 );
417
418 let engine = Engine::new(
419 secret_key.clone(),
420 self.network_id,
421 endpoint.clone(),
422 gossip.clone(),
423 self.sync_config,
424 );
425
426 let sync_handler = engine.sync_handler();
427
428 let inner = Arc::new(NetworkInner {
429 cancel_token: CancellationToken::new(),
430 relay: relay.clone(),
431 discovery: self.discovery,
432 endpoint: endpoint.clone(),
433 engine,
434 gossip: gossip.clone(),
435 network_id: self.network_id,
436 secret_key,
437 });
438
439 self.protocols.insert(GOSSIP_ALPN, Arc::new(gossip.clone()));
440 if let Some(sync_handler) = sync_handler {
441 self.protocols
442 .insert(SYNC_CONNECTION_ALPN, Arc::new(sync_handler));
443 };
444 let protocols = Arc::new(self.protocols.clone());
445 let alpns = self.protocols.alpns();
446 if let Err(err) = inner.endpoint.set_alpns(alpns) {
447 inner.shutdown(protocols.clone()).await;
448 return Err(err);
449 }
450
451 let fut = inner
452 .clone()
453 .spawn(protocols.clone())
454 .instrument(error_span!("node", me=%node_addr.node_id.fmt_short()));
455 let task = tokio::task::spawn(fut);
456 let task_handle = AbortOnDropHandle::new(task)
457 .map_err(Box::new(|err: JoinError| err.to_string()) as JoinErrToStr)
458 .shared();
459
460 let network = Network {
461 inner,
462 task: task_handle,
463 protocols,
464 };
465
466 let wait_for_endpoints = {
469 async move {
470 tokio::time::timeout(DIRECT_ADDRESSES_WAIT, endpoint.direct_addresses().next())
471 .await
472 .context("waiting for endpoint")?
473 .context("no endpoints given to establish at least one connection")?;
474 Ok(())
475 }
476 };
477
478 if let Err(err) = wait_for_endpoints.await {
479 network.shutdown().await.ok();
480 return Err(err);
481 }
482
483 for mut direct_addr in self.direct_node_addresses {
484 if direct_addr.relay_url().is_none() {
485 if let Some(ref relay_node) = relay {
489 direct_addr = direct_addr.with_relay_url(relay_node.url.clone());
490 }
491 }
492
493 network.add_peer(direct_addr.clone()).await?;
494 }
495
496 Ok(network)
497 }
498}
499
500#[derive(Debug)]
501struct NetworkInner<T> {
502 cancel_token: CancellationToken,
503 relay: Option<RelayNode>,
504 discovery: DiscoveryMap,
505 endpoint: Endpoint,
506 engine: Engine<T>,
507 #[allow(dead_code)]
508 gossip: Gossip,
509 network_id: NetworkId,
510 #[allow(dead_code)]
511 secret_key: SecretKey,
512}
513
514impl<T> NetworkInner<T>
515where
516 T: TopicQuery + TopicId + 'static,
517{
518 async fn spawn(self: Arc<Self>, protocols: Arc<ProtocolMap>) {
528 let (ipv4, ipv6) = self.endpoint.bound_sockets();
529 debug!(
530 "listening at: {}{}",
531 ipv4,
532 ipv6.map(|addr| format!(" and {addr}")).unwrap_or_default()
533 );
534
535 let mut join_set = JoinSet::<Result<()>>::new();
536
537 {
539 let inner = self.clone();
540 join_set.spawn(async move {
541 let mut addrs_stream = inner.endpoint.direct_addresses();
542 let mut my_node_addr = NodeAddr::new(inner.endpoint.node_id());
543 if let Some(node) = &inner.relay {
544 my_node_addr = my_node_addr.with_relay_url(node.url.to_owned());
545 }
546
547 loop {
548 tokio::select! {
549 Some(endpoints) = addrs_stream.next() => {
551 let direct_addresses = endpoints.iter().map(|endpoint| endpoint.addr).collect();
552 my_node_addr.info.direct_addresses = direct_addresses;
553 if let Err(err) = inner.discovery.update_local_address(&my_node_addr) {
554 warn!("failed to update direct addresses for discovery: {err:?}");
555 }
556 },
557 else => break,
558 }
559 }
560
561 Ok(())
562 });
563 }
564
565 let mut discovery_stream = self
567 .discovery
568 .subscribe(self.network_id)
569 .expect("discovery map needs to be given");
570
571 loop {
572 tokio::select! {
573 biased;
575 _ = self.cancel_token.cancelled() => {
577 break;
578 },
579 Some(incoming) = self.endpoint.accept() => {
581 let connecting = match incoming.accept() {
584 Ok(connecting) => connecting,
585 Err(err) => {
586 warn!("incoming connection failed: {err:#}");
587 continue;
589 },
590 };
591 let protocols = protocols.clone();
592 join_set.spawn(async move {
593 handle_connection(connecting, protocols).await;
594 Ok(())
595 });
596 },
597 Some(event) = discovery_stream.next() => {
599 match event {
600 Ok(event) => {
601 if let Err(err) = self.engine.add_peer(event.node_addr).await {
602 error!("engine failed on add_peer: {err:?}");
603 break;
604 }
605 }
606 Err(err) => {
607 error!("discovery service failed: {err:?}");
608 break;
609 },
610 }
611 },
612 res = join_set.join_next(), if !join_set.is_empty() => {
614 match res {
615 Some(Err(outer)) => {
616 if outer.is_panic() {
617 error!("task panicked: {outer:?}");
618 break;
619 } else if outer.is_cancelled() {
620 debug!("task cancelled: {outer:?}");
621 } else {
622 error!("task failed: {outer:?}");
623 break;
624 }
625 }
626 Some(Ok(Err(inner))) => {
627 debug!("task errored: {inner:?}");
628 }
629 _ => {}
630 }
631 },
632 else => break,
633 }
634 }
635
636 self.shutdown(protocols).await;
637
638 join_set.shutdown().await;
640 }
641
642 async fn shutdown(&self, protocols: Arc<ProtocolMap>) {
644 debug!("close all connections and shutdown the node");
646 let _ = tokio::join!(
647 self.endpoint
651 .clone()
652 .close(1u32.into(), b"provider terminating"),
653 self.engine.shutdown(),
654 protocols.shutdown(),
655 );
656 }
657}
658
659#[derive(Clone, Debug)]
671pub struct Network<T> {
672 inner: Arc<NetworkInner<T>>,
673 #[allow(dead_code)]
674 protocols: Arc<ProtocolMap>,
675 task: Shared<MapErr<AbortOnDropHandle<()>, JoinErrToStr>>,
681}
682
683impl<T> Network<T>
684where
685 T: TopicQuery + TopicId + 'static,
686{
687 pub async fn add_peer(&self, node_addr: NodeAddr) -> Result<()> {
689 self.inner.engine.add_peer(node_addr).await
690 }
691
692 pub async fn known_peers(&self) -> Result<Vec<NodeAddr>> {
694 self.inner.engine.known_peers().await
695 }
696
697 pub async fn direct_addresses(&self) -> Option<Vec<SocketAddr>> {
699 self.inner
700 .endpoint
701 .direct_addresses()
702 .next()
703 .await
704 .map(|addrs| addrs.into_iter().map(|direct| direct.addr).collect())
705 }
706
707 pub fn endpoint(&self) -> &Endpoint {
715 &self.inner.endpoint
716 }
717
718 pub fn node_id(&self) -> PublicKey {
720 PublicKey::from_bytes(self.inner.endpoint.node_id().as_bytes())
721 .expect("public key already checked")
722 }
723
724 pub async fn shutdown(self) -> Result<()> {
726 self.inner.cancel_token.cancel();
728
729 self.task.await.map_err(|err| anyhow!(err))?;
731
732 Ok(())
733 }
734
735 pub async fn subscribe(
738 &self,
739 topic: T,
740 ) -> Result<(
741 mpsc::Sender<ToNetwork>,
742 mpsc::Receiver<FromNetwork>,
743 oneshot::Receiver<()>,
744 )> {
745 let (to_network_tx, to_network_rx) = mpsc::channel::<ToNetwork>(128);
746 let (from_network_tx, from_network_rx) = mpsc::channel::<FromNetwork>(128);
747 let (gossip_ready_tx, gossip_ready_rx) = oneshot::channel();
748
749 self.inner
750 .engine
751 .subscribe(topic, from_network_tx, to_network_rx, gossip_ready_tx)
752 .await?;
753
754 Ok((to_network_tx, from_network_rx, gossip_ready_rx))
755 }
756}
757
758#[derive(Clone, Debug)]
760pub enum ToNetwork {
761 Message { bytes: Vec<u8> },
762}
763
764#[allow(clippy::large_enum_variant)]
766#[derive(Clone, Debug, Eq, PartialEq)]
767pub enum FromNetwork {
768 GossipMessage {
769 bytes: Vec<u8>,
770 delivered_from: PublicKey,
771 },
772 SyncMessage {
773 header: Vec<u8>,
774 payload: Option<Vec<u8>>,
775 delivered_from: PublicKey,
776 },
777}
778
779async fn handle_connection(
784 mut connecting: iroh_net::endpoint::Connecting,
785 protocols: Arc<ProtocolMap>,
786) {
787 let alpn = match connecting.alpn().await {
788 Ok(alpn) => alpn,
789 Err(err) => {
790 warn!("ignoring connection: invalid handshake: {:?}", err);
791 return;
792 }
793 };
794 let Some(handler) = protocols.get(&alpn) else {
795 warn!("ignoring connection: unsupported alpn protocol");
796 return;
797 };
798 if let Err(err) = handler.accept(connecting).await {
799 warn!("handling incoming connection ended with error: {err}");
800 }
801}
802
803pub(crate) type JoinErrToStr =
805 Box<dyn Fn(tokio::task::JoinError) -> String + Send + Sync + 'static>;
806
807#[cfg(test)]
808pub(crate) mod sync_protocols {
809 use std::sync::Arc;
810
811 use async_trait::async_trait;
812 use futures_lite::{AsyncRead, AsyncWrite, StreamExt};
813 use futures_util::{Sink, SinkExt};
814 use p2panda_sync::cbor::{into_cbor_sink, into_cbor_stream};
815 use p2panda_sync::{FromSync, SyncError, SyncProtocol};
816 use serde::{Deserialize, Serialize};
817 use tracing::debug;
818
819 use super::tests::TestTopic;
820
821 #[derive(Debug, Serialize, Deserialize)]
822 enum DummyProtocolMessage {
823 TopicQuery(TestTopic),
824 Done,
825 }
826
827 #[derive(Debug)]
829 pub struct DummyProtocol {}
830
831 #[async_trait]
832 impl<'a> SyncProtocol<'a, TestTopic> for DummyProtocol {
833 fn name(&self) -> &'static str {
834 static DUMMY_PROTOCOL_NAME: &str = "dummy_protocol";
835 DUMMY_PROTOCOL_NAME
836 }
837 async fn initiate(
838 self: Arc<Self>,
839 topic_query: TestTopic,
840 tx: Box<&'a mut (dyn AsyncWrite + Send + Unpin)>,
841 rx: Box<&'a mut (dyn AsyncRead + Send + Unpin)>,
842 mut app_tx: Box<
843 &'a mut (dyn Sink<FromSync<TestTopic>, Error = SyncError> + Send + Unpin),
844 >,
845 ) -> Result<(), SyncError> {
846 debug!("DummyProtocol: initiate sync session");
847
848 let mut sink = into_cbor_sink(tx);
849 let mut stream = into_cbor_stream(rx);
850
851 sink.send(DummyProtocolMessage::TopicQuery(topic_query.clone()))
852 .await?;
853 sink.send(DummyProtocolMessage::Done).await?;
854 app_tx.send(FromSync::HandshakeSuccess(topic_query)).await?;
855
856 while let Some(result) = stream.next().await {
857 let message: DummyProtocolMessage = result?;
858 debug!("message received: {:?}", message);
859
860 match &message {
861 DummyProtocolMessage::TopicQuery(_) => panic!(),
862 DummyProtocolMessage::Done => break,
863 }
864 }
865
866 sink.flush().await?;
867 app_tx.flush().await?;
868
869 Ok(())
870 }
871
872 async fn accept(
873 self: Arc<Self>,
874 tx: Box<&'a mut (dyn AsyncWrite + Send + Unpin)>,
875 rx: Box<&'a mut (dyn AsyncRead + Send + Unpin)>,
876 mut app_tx: Box<
877 &'a mut (dyn Sink<FromSync<TestTopic>, Error = SyncError> + Send + Unpin),
878 >,
879 ) -> Result<(), SyncError> {
880 debug!("DummyProtocol: accept sync session");
881
882 let mut sink = into_cbor_sink(tx);
883 let mut stream = into_cbor_stream(rx);
884
885 while let Some(result) = stream.next().await {
886 let message: DummyProtocolMessage = result?;
887 debug!("message received: {:?}", message);
888
889 match &message {
890 DummyProtocolMessage::TopicQuery(topic_query) => {
891 app_tx
892 .send(FromSync::HandshakeSuccess(topic_query.clone()))
893 .await?
894 }
895 DummyProtocolMessage::Done => break,
896 }
897 }
898
899 sink.send(DummyProtocolMessage::Done).await?;
900
901 sink.flush().await?;
902 app_tx.flush().await?;
903
904 Ok(())
905 }
906 }
907
908 #[derive(Serialize, Deserialize)]
910 enum Message {
911 TopicQuery(TestTopic),
912 Ping,
913 Pong,
914 }
915
916 #[derive(Debug, Clone)]
917 pub struct PingPongProtocol {}
918
919 #[async_trait]
921 impl<'a> SyncProtocol<'a, TestTopic> for PingPongProtocol {
922 fn name(&self) -> &'static str {
923 static SIMPLE_PROTOCOL_NAME: &str = "simple_protocol";
924 SIMPLE_PROTOCOL_NAME
925 }
926
927 async fn initiate(
928 self: Arc<Self>,
929 topic_query: TestTopic,
930 tx: Box<&'a mut (dyn AsyncWrite + Send + Unpin)>,
931 rx: Box<&'a mut (dyn AsyncRead + Send + Unpin)>,
932 mut app_tx: Box<
933 &'a mut (dyn Sink<FromSync<TestTopic>, Error = SyncError> + Send + Unpin),
934 >,
935 ) -> Result<(), SyncError> {
936 debug!("initiate sync session");
937 let mut sink = into_cbor_sink(tx);
938 let mut stream = into_cbor_stream(rx);
939
940 sink.send(Message::TopicQuery(topic_query.clone())).await?;
941 sink.send(Message::Ping).await?;
942 debug!("ping message sent");
943
944 app_tx.send(FromSync::HandshakeSuccess(topic_query)).await?;
945
946 while let Some(result) = stream.next().await {
947 let message = result?;
948
949 match message {
950 Message::TopicQuery(_) => panic!(),
951 Message::Ping => {
952 return Err(SyncError::UnexpectedBehaviour(
953 "unexpected Ping message received".to_string(),
954 ));
955 }
956 Message::Pong => {
957 debug!("pong message received");
958 app_tx
959 .send(FromSync::Data {
960 header: "PONG".as_bytes().to_owned(),
961 payload: None,
962 })
963 .await
964 .unwrap();
965 break;
966 }
967 }
968 }
969
970 sink.flush().await?;
972 app_tx.flush().await?;
973
974 Ok(())
975 }
976
977 async fn accept(
978 self: Arc<Self>,
979 tx: Box<&'a mut (dyn AsyncWrite + Send + Unpin)>,
980 rx: Box<&'a mut (dyn AsyncRead + Send + Unpin)>,
981 mut app_tx: Box<
982 &'a mut (dyn Sink<FromSync<TestTopic>, Error = SyncError> + Send + Unpin),
983 >,
984 ) -> Result<(), SyncError> {
985 debug!("accept sync session");
986 let mut sink = into_cbor_sink(tx);
987 let mut stream = into_cbor_stream(rx);
988
989 while let Some(result) = stream.next().await {
990 let message = result?;
991
992 match message {
993 Message::TopicQuery(topic_query) => {
994 app_tx.send(FromSync::HandshakeSuccess(topic_query)).await?
995 }
996 Message::Ping => {
997 debug!("ping message received");
998 app_tx
999 .send(FromSync::Data {
1000 header: "PING".as_bytes().to_owned(),
1001 payload: None,
1002 })
1003 .await
1004 .unwrap();
1005
1006 sink.send(Message::Pong).await?;
1007 debug!("pong message sent");
1008 break;
1009 }
1010 Message::Pong => {
1011 return Err(SyncError::UnexpectedBehaviour(
1012 "unexpected Pong message received".to_string(),
1013 ));
1014 }
1015 }
1016 }
1017
1018 sink.flush().await?;
1019 app_tx.flush().await?;
1020
1021 Ok(())
1022 }
1023 }
1024}
1025
1026#[cfg(test)]
1027pub(crate) mod tests {
1028 use std::collections::HashMap;
1029 use std::net::{Ipv4Addr, Ipv6Addr};
1030 use std::path::PathBuf;
1031 use std::time::Duration;
1032
1033 use async_trait::async_trait;
1034 use iroh_net::relay::{RelayNode, RelayUrl as IrohRelayUrl};
1035 use p2panda_core::{Body, Extensions, Hash, Header, PrivateKey, PublicKey};
1036 use p2panda_store::{MemoryStore, OperationStore};
1037 use p2panda_sync::log_sync::{LogSyncProtocol, TopicLogMap};
1038 use p2panda_sync::TopicQuery;
1039 use serde::{Deserialize, Serialize};
1040 use tokio::task::JoinHandle;
1041 use tracing_subscriber::layer::SubscriberExt;
1042 use tracing_subscriber::util::SubscriberInitExt;
1043 use tracing_subscriber::EnvFilter;
1044
1045 use crate::addrs::DEFAULT_STUN_PORT;
1046 use crate::bytes::ToBytes;
1047 use crate::config::Config;
1048 use crate::network::sync_protocols::PingPongProtocol;
1049 use crate::sync::SyncConfiguration;
1050 use crate::{NetworkBuilder, RelayMode, RelayUrl, TopicId};
1051
1052 use super::{FromNetwork, Network, ToNetwork};
1053
1054 fn setup_logging() {
1055 tracing_subscriber::registry()
1056 .with(tracing_subscriber::fmt::layer().with_writer(std::io::stderr))
1057 .with(EnvFilter::from_default_env())
1058 .try_init()
1059 .ok();
1060 }
1061
1062 fn create_operation<E: Extensions>(
1063 private_key: &PrivateKey,
1064 body: &Body,
1065 seq_num: u64,
1066 timestamp: u64,
1067 backlink: Option<Hash>,
1068 extensions: Option<E>,
1069 ) -> (Hash, Header<E>, Vec<u8>) {
1070 let mut header = Header {
1071 version: 1,
1072 public_key: private_key.public_key(),
1073 signature: None,
1074 payload_size: body.size(),
1075 payload_hash: Some(body.hash()),
1076 timestamp,
1077 seq_num,
1078 backlink,
1079 previous: vec![],
1080 extensions,
1081 };
1082 header.sign(private_key);
1083 let header_bytes = header.to_bytes();
1084 (header.hash(), header, header_bytes)
1085 }
1086
1087 #[derive(Clone, Debug, PartialEq, Eq, Hash, Deserialize, Serialize)]
1088 pub struct TestTopic(String, [u8; 32]);
1089
1090 impl TestTopic {
1091 pub fn new(name: &str) -> Self {
1092 Self(name.to_owned(), [0; 32])
1093 }
1094 }
1095
1096 impl TopicQuery for TestTopic {}
1097
1098 impl TopicId for TestTopic {
1099 fn id(&self) -> [u8; 32] {
1100 self.1
1101 }
1102 }
1103
1104 #[tokio::test]
1105 async fn config() {
1106 let direct_node_public_key = PrivateKey::new().public_key();
1107 let relay_address: RelayUrl = "https://example.net".parse().unwrap();
1108
1109 let config = Config {
1110 bind_ip_v4: Ipv4Addr::new(7, 7, 7, 7),
1111 bind_port_v4: 2024,
1112 bind_ip_v6: Ipv6Addr::new(8, 8, 8, 8, 8, 8, 8, 8),
1113 bind_port_v6: 2025,
1114 network_id: [1; 32],
1115 private_key: Some(PathBuf::new().join("secret-key.txt")),
1116 direct_node_addresses: vec![(
1117 direct_node_public_key,
1118 vec!["0.0.0.0:2026".parse().unwrap()],
1119 None,
1120 )],
1121 relay: Some(relay_address.clone()),
1122 };
1123
1124 let builder = NetworkBuilder::<TestTopic>::from_config(config);
1125
1126 assert_eq!(builder.bind_ip_v4, Some(Ipv4Addr::new(7, 7, 7, 7)));
1127 assert_eq!(builder.bind_port_v4, Some(2024));
1128 assert_eq!(
1129 builder.bind_ip_v6,
1130 Some(Ipv6Addr::new(8, 8, 8, 8, 8, 8, 8, 8))
1131 );
1132 assert_eq!(builder.bind_port_v6, Some(2025));
1133 assert_eq!(builder.network_id, [1; 32]);
1134 assert!(builder.secret_key.is_none());
1135 assert_eq!(builder.direct_node_addresses.len(), 1);
1136 let relay_node = RelayNode {
1137 url: IrohRelayUrl::from(relay_address),
1138 stun_only: false,
1139 stun_port: DEFAULT_STUN_PORT,
1140 };
1141 assert_eq!(builder.relay_mode, RelayMode::Custom(relay_node));
1142 }
1143
1144 #[tokio::test]
1145 async fn join_gossip_overlay() {
1146 setup_logging();
1147
1148 let network_id = [1; 32];
1149 let topic = TestTopic::new("chat");
1150
1151 let node_1 = NetworkBuilder::new(network_id).build().await.unwrap();
1152 let node_2 = NetworkBuilder::new(network_id).build().await.unwrap();
1153
1154 let node_1_addr = node_1.endpoint().node_addr().await.unwrap();
1155 let node_2_addr = node_2.endpoint().node_addr().await.unwrap();
1156
1157 node_1.add_peer(node_2_addr).await.unwrap();
1158 node_2.add_peer(node_1_addr).await.unwrap();
1159
1160 let (tx_1, _rx_1, ready_1) = node_1.subscribe(topic.clone()).await.unwrap();
1162 let (_tx_2, mut rx_2, ready_2) = node_2.subscribe(topic).await.unwrap();
1163
1164 assert!(ready_2.await.is_ok());
1166 assert!(ready_1.await.is_ok());
1167
1168 tx_1.send(ToNetwork::Message {
1170 bytes: "Hello, Node".to_bytes(),
1171 })
1172 .await
1173 .unwrap();
1174
1175 let rx_2_msg = rx_2.recv().await.unwrap();
1176 assert_eq!(
1177 rx_2_msg,
1178 FromNetwork::GossipMessage {
1179 bytes: "Hello, Node".to_bytes(),
1180 delivered_from: node_1.node_id(),
1181 }
1182 );
1183
1184 node_1.shutdown().await.unwrap();
1185 node_2.shutdown().await.unwrap();
1186 }
1187
1188 #[tokio::test]
1189 async fn ping_pong() {
1190 setup_logging();
1191
1192 let network_id = [1; 32];
1193 let topic = TestTopic::new("ping_pong");
1194 let ping_pong = PingPongProtocol {};
1195 let sync_config = SyncConfiguration::new(ping_pong);
1196
1197 let node_1 = NetworkBuilder::new(network_id)
1198 .sync(sync_config.clone())
1199 .build()
1200 .await
1201 .unwrap();
1202 let node_2 = NetworkBuilder::new(network_id)
1203 .sync(sync_config)
1204 .build()
1205 .await
1206 .unwrap();
1207
1208 let node_1_addr = node_1.endpoint().node_addr().await.unwrap();
1209 let node_2_addr = node_2.endpoint().node_addr().await.unwrap();
1210
1211 node_1.add_peer(node_2_addr).await.unwrap();
1212 node_2.add_peer(node_1_addr).await.unwrap();
1213
1214 let topic_clone = topic.clone();
1216 let handle1 = tokio::spawn(async move {
1217 let (_tx, _rx, _ready) = node_1.subscribe(topic_clone).await.unwrap();
1218 tokio::time::sleep(Duration::from_secs(2)).await;
1219 node_1.shutdown().await.unwrap();
1220 });
1221 let handle2 = tokio::spawn(async move {
1222 let (_tx, _rx, _ready) = node_2.subscribe(topic).await.unwrap();
1223 tokio::time::sleep(Duration::from_secs(2)).await;
1224 node_2.shutdown().await.unwrap();
1225 });
1226
1227 let (result1, result2) = tokio::join!(handle1, handle2);
1228 assert!(result1.is_ok());
1229 assert!(result2.is_ok());
1230 }
1231
1232 type Logs<T> = HashMap<PublicKey, Vec<T>>;
1233
1234 #[derive(Clone, Debug)]
1235 struct LogIdTopicMap<T>(HashMap<T, Logs<u64>>);
1236
1237 impl<T> LogIdTopicMap<T>
1238 where
1239 T: TopicQuery,
1240 {
1241 pub fn new() -> Self {
1242 LogIdTopicMap(HashMap::new())
1243 }
1244
1245 fn insert(&mut self, topic: T, logs: Logs<u64>) -> Option<Logs<u64>> {
1246 self.0.insert(topic, logs)
1247 }
1248 }
1249
1250 #[async_trait]
1251 impl<T> TopicLogMap<T, u64> for LogIdTopicMap<T>
1252 where
1253 T: TopicQuery,
1254 {
1255 async fn get(&self, topic: &T) -> Option<Logs<u64>> {
1256 self.0.get(topic).cloned()
1257 }
1258 }
1259
1260 #[tokio::test]
1261 async fn e2e_log_height_sync() {
1262 setup_logging();
1263
1264 const NETWORK_ID: [u8; 32] = [1; 32];
1265
1266 let peer_a_private_key = PrivateKey::new();
1267 let peer_b_private_key = PrivateKey::new();
1268
1269 let topic = TestTopic::new("event_logs");
1270 let log_id = 0;
1271 let logs = HashMap::from([(peer_a_private_key.public_key(), vec![log_id])]);
1272
1273 let mut topic_map = LogIdTopicMap::new();
1274 topic_map.insert(topic.clone(), logs);
1275
1276 let store_a = MemoryStore::default();
1278 let protocol_a = LogSyncProtocol::new(topic_map.clone(), store_a);
1279 let sync_config_a = SyncConfiguration::new(protocol_a);
1280
1281 let body = Body::new("Hello, Sloth!".as_bytes());
1283 let (hash_0, header_0, header_bytes_0) =
1284 create_operation(&peer_a_private_key, &body, 0, 0, None, None);
1285 let (hash_1, header_1, header_bytes_1) =
1286 create_operation(&peer_a_private_key, &body, 1, 100, Some(hash_0), None);
1287 let (hash_2, header_2, header_bytes_2) =
1288 create_operation(&peer_a_private_key, &body, 2, 200, Some(hash_1), None);
1289
1290 let mut store_b = MemoryStore::default();
1292 store_b
1293 .insert_operation(hash_0, &header_0, Some(&body), &header_bytes_0, &log_id)
1294 .await
1295 .unwrap();
1296 store_b
1297 .insert_operation(hash_1, &header_1, Some(&body), &header_bytes_1, &log_id)
1298 .await
1299 .unwrap();
1300 store_b
1301 .insert_operation(hash_2, &header_2, Some(&body), &header_bytes_2, &log_id)
1302 .await
1303 .unwrap();
1304
1305 let protocol_b = LogSyncProtocol::new(topic_map, store_b);
1307 let sync_config_b = SyncConfiguration::new(protocol_b);
1308
1309 let node_a = NetworkBuilder::new(NETWORK_ID)
1311 .sync(sync_config_a)
1312 .private_key(peer_a_private_key)
1313 .build()
1314 .await
1315 .unwrap();
1316
1317 let node_b = NetworkBuilder::new(NETWORK_ID)
1319 .sync(sync_config_b)
1320 .private_key(peer_b_private_key.clone())
1321 .build()
1322 .await
1323 .unwrap();
1324
1325 let node_a_addr = node_a.endpoint().node_addr().await.unwrap();
1326 let node_b_addr = node_b.endpoint().node_addr().await.unwrap();
1327
1328 node_a.add_peer(node_b_addr).await.unwrap();
1329 node_b.add_peer(node_a_addr).await.unwrap();
1330
1331 let topic_clone = topic.clone();
1333 let handle1 = tokio::spawn(async move {
1334 let (_tx, mut from_sync_rx, ready) = node_a.subscribe(topic_clone).await.unwrap();
1335
1336 assert!(ready.await.is_ok());
1338
1339 let mut from_sync_messages = Vec::new();
1340 while let Some(message) = from_sync_rx.recv().await {
1341 from_sync_messages.push(message);
1342 if from_sync_messages.len() == 3 {
1343 break;
1344 }
1345 }
1346
1347 let peer_a_expected_messages = vec![
1350 FromNetwork::SyncMessage {
1351 header: header_bytes_0.to_vec(),
1352 payload: Some(body.to_bytes()),
1353 delivered_from: peer_b_private_key.public_key(),
1354 },
1355 FromNetwork::SyncMessage {
1356 header: header_bytes_1.to_vec(),
1357 payload: Some(body.to_bytes()),
1358 delivered_from: peer_b_private_key.public_key(),
1359 },
1360 FromNetwork::SyncMessage {
1361 header: header_bytes_2.to_vec(),
1362 payload: Some(body.to_bytes()),
1363 delivered_from: peer_b_private_key.public_key(),
1364 },
1365 ];
1366
1367 assert_eq!(from_sync_messages, peer_a_expected_messages);
1369
1370 node_a.shutdown().await.unwrap();
1371 });
1372
1373 let handle2 = tokio::spawn(async move {
1374 let (_tx, _from_sync_rx, ready) = node_b.subscribe(topic).await.unwrap();
1375
1376 assert!(ready.await.is_ok());
1378
1379 tokio::time::sleep(Duration::from_secs(2)).await;
1381
1382 node_b.shutdown().await.unwrap();
1383 });
1384
1385 let (result1, result2) = tokio::join!(handle1, handle2);
1387
1388 assert!(result1.is_ok());
1389 assert!(result2.is_ok())
1390 }
1391
1392 #[tokio::test]
1393 async fn multi_hop_join_gossip_overlay() {
1394 setup_logging();
1395
1396 let network_id = [1; 32];
1397 let chat_topic = TestTopic::new("chat");
1398
1399 let node_1 = NetworkBuilder::new(network_id).build().await.unwrap();
1400 let node_2 = NetworkBuilder::new(network_id).build().await.unwrap();
1401 let node_3 = NetworkBuilder::new(network_id).build().await.unwrap();
1402
1403 let node_1_addr = node_1.endpoint().node_addr().await.unwrap();
1404 let node_2_addr = node_2.endpoint().node_addr().await.unwrap();
1405
1406 node_1.add_peer(node_2_addr.clone()).await.unwrap();
1407 node_2.add_peer(node_1_addr).await.unwrap();
1408 node_3.add_peer(node_2_addr).await.unwrap();
1409
1410 let (tx_1, _rx_1, ready_1) = node_1.subscribe(chat_topic.clone()).await.unwrap();
1412 let (_tx_2, mut rx_2, ready_2) = node_2.subscribe(chat_topic.clone()).await.unwrap();
1413 let (_tx_3, mut rx_3, ready_3) = node_3.subscribe(chat_topic).await.unwrap();
1414
1415 assert!(ready_3.await.is_ok());
1417 assert!(ready_2.await.is_ok());
1418 assert!(ready_1.await.is_ok());
1419
1420 tx_1.send(ToNetwork::Message {
1422 bytes: "Hello, Node".to_bytes(),
1423 })
1424 .await
1425 .unwrap();
1426
1427 let rx_2_msg = rx_2.recv().await.unwrap();
1428 assert_eq!(
1429 rx_2_msg,
1430 FromNetwork::GossipMessage {
1431 bytes: "Hello, Node".to_bytes(),
1432 delivered_from: node_1.node_id(),
1434 }
1435 );
1436
1437 let rx_3_msg = rx_3.recv().await.unwrap();
1438 assert_eq!(
1439 rx_3_msg,
1440 FromNetwork::GossipMessage {
1441 bytes: "Hello, Node".to_bytes(),
1442 delivered_from: node_1.node_id(),
1444 }
1445 );
1446
1447 node_1.shutdown().await.unwrap();
1448 node_2.shutdown().await.unwrap();
1449 node_3.shutdown().await.unwrap();
1450 }
1451
1452 fn run_node<T: TopicId + TopicQuery + 'static>(node: Network<T>, topic: T) -> JoinHandle<()> {
1453 tokio::spawn(async move {
1454 let (_tx, mut rx, ready) = node.subscribe(topic).await.unwrap();
1455
1456 assert!(ready.await.is_ok());
1458
1459 loop {
1461 let msg = rx.recv().await.unwrap();
1462 println!("{msg:?}");
1463 match msg {
1464 FromNetwork::SyncMessage { .. } => break,
1465 _ => (),
1466 }
1467 }
1468
1469 tokio::time::sleep(Duration::from_secs(3)).await;
1471 node.shutdown().await.unwrap();
1472 })
1473 }
1474 #[tokio::test]
1475 async fn multi_hop_topic_discovery_and_sync() {
1476 setup_logging();
1477
1478 let network_id = [1; 32];
1479 let topic = TestTopic::new("chat");
1480 let sync_config = SyncConfiguration::new(PingPongProtocol {});
1481
1482 let node_1 = NetworkBuilder::new(network_id)
1484 .sync(sync_config.clone())
1485 .build()
1486 .await
1487 .unwrap();
1488 let node_2 = NetworkBuilder::new(network_id)
1489 .sync(sync_config.clone())
1490 .build()
1491 .await
1492 .unwrap();
1493 let node_3 = NetworkBuilder::new(network_id)
1494 .sync(sync_config.clone())
1495 .build()
1496 .await
1497 .unwrap();
1498 let node_4 = NetworkBuilder::new(network_id)
1499 .sync(sync_config.clone())
1500 .build()
1501 .await
1502 .unwrap();
1503
1504 let node_1_addr = node_1.endpoint().node_addr().await.unwrap();
1505 let node_2_addr = node_2.endpoint().node_addr().await.unwrap();
1506 let node_3_addr = node_3.endpoint().node_addr().await.unwrap();
1507
1508 node_1.add_peer(node_2_addr.clone()).await.unwrap();
1510 node_2.add_peer(node_1_addr).await.unwrap();
1511 node_3.add_peer(node_2_addr.clone()).await.unwrap();
1512 node_4.add_peer(node_3_addr.clone()).await.unwrap();
1513
1514 let handle1 = run_node(node_1, topic.clone());
1518 let handle2 = run_node(node_2, topic.clone());
1519 let handle3 = run_node(node_3, topic.clone());
1520 let handle4 = run_node(node_4, topic.clone());
1521
1522 let (result1, result2, result3, result4) = tokio::join!(handle1, handle2, handle3, handle4);
1523
1524 assert!(result1.is_ok());
1525 assert!(result2.is_ok());
1526 assert!(result3.is_ok());
1527 assert!(result4.is_ok());
1528 }
1529}