1use std::collections::hash_map::DefaultHasher;
2use std::hash::{Hash, Hasher};
3use std::time::Duration;
4
5use libp2p::futures::StreamExt;
6use libp2p::kad::store::MemoryStore;
7use libp2p::swarm::{NetworkBehaviour, SwarmEvent};
8use libp2p::{
9 Multiaddr, PeerId, Swarm, SwarmBuilder, gossipsub, identify, kad, mdns, noise, ping, tcp, yamux,
10};
11
12pub const SYNC_TOPIC: &str = "vyn-sync";
13
14#[derive(Debug, thiserror::Error)]
15pub enum P2PError {
16 #[error("failed to build gossipsub config")]
17 GossipsubConfig,
18 #[error("failed to initialize gossipsub behaviour")]
19 GossipsubInit,
20 #[error("failed to initialize mDNS behaviour")]
21 MdnsInit,
22 #[error("failed to build libp2p swarm: {0}")]
23 SwarmBuild(String),
24 #[error("failed to listen on address: {0}")]
25 Listen(String),
26 #[error("failed to subscribe to sync topic")]
27 Subscribe,
28 #[error("timed out waiting for peer connection")]
29 Timeout,
30}
31
32#[derive(NetworkBehaviour)]
33pub struct VynP2PBehaviour {
34 pub gossipsub: gossipsub::Behaviour,
35 pub kademlia: kad::Behaviour<MemoryStore>,
36 pub identify: identify::Behaviour,
37 pub mdns: mdns::tokio::Behaviour,
38 pub ping: ping::Behaviour,
39}
40
41pub type VynSwarm = Swarm<VynP2PBehaviour>;
42
43pub fn build_swarm() -> Result<(PeerId, VynSwarm), P2PError> {
44 let local_key = libp2p::identity::Keypair::generate_ed25519();
45 let local_peer_id = local_key.public().to_peer_id();
46
47 let swarm = SwarmBuilder::with_existing_identity(local_key)
48 .with_tokio()
49 .with_tcp(
50 tcp::Config::default(),
51 noise::Config::new,
52 yamux::Config::default,
53 )
54 .map_err(|err| P2PError::SwarmBuild(err.to_string()))?
55 .with_behaviour(|key| {
56 let peer_id = key.public().to_peer_id();
57 let mut hasher = DefaultHasher::new();
58 SYNC_TOPIC.hash(&mut hasher);
59 let message_id_fn = move |message: &gossipsub::Message| {
60 let mut h = DefaultHasher::new();
61 message.data.hash(&mut h);
62 gossipsub::MessageId::from(h.finish().to_string())
63 };
64
65 let gossipsub_config = gossipsub::ConfigBuilder::default()
66 .heartbeat_interval(Duration::from_secs(10))
67 .validation_mode(gossipsub::ValidationMode::Strict)
68 .message_id_fn(message_id_fn)
69 .build()
70 .map_err(|_| P2PError::GossipsubConfig)?;
71
72 let gossipsub = gossipsub::Behaviour::new(
73 gossipsub::MessageAuthenticity::Signed(key.clone()),
74 gossipsub_config,
75 )
76 .map_err(|_| P2PError::GossipsubInit)?;
77
78 let mut kademlia = kad::Behaviour::new(peer_id, MemoryStore::new(peer_id));
79 kademlia.set_mode(Some(kad::Mode::Server));
80
81 let identify = identify::Behaviour::new(identify::Config::new(
82 "/vyn/1.0.0".to_string(),
83 key.public(),
84 ));
85
86 let mdns = mdns::tokio::Behaviour::new(mdns::Config::default(), peer_id)
87 .map_err(|_| P2PError::MdnsInit)?;
88
89 let ping = ping::Behaviour::new(ping::Config::new());
90
91 Ok(VynP2PBehaviour {
92 gossipsub,
93 kademlia,
94 identify,
95 mdns,
96 ping,
97 })
98 })
99 .map_err(|err| P2PError::SwarmBuild(err.to_string()))?
100 .build();
101
102 Ok((local_peer_id, swarm))
103}
104
105pub fn subscribe_sync_topic(swarm: &mut VynSwarm) -> Result<gossipsub::IdentTopic, P2PError> {
106 let topic = gossipsub::IdentTopic::new(SYNC_TOPIC);
107 swarm
108 .behaviour_mut()
109 .gossipsub
110 .subscribe(&topic)
111 .map_err(|_| P2PError::Subscribe)?;
112 Ok(topic)
113}
114
115pub fn listen_localhost(swarm: &mut VynSwarm) -> Result<Multiaddr, P2PError> {
116 let addr: Multiaddr = "/ip4/127.0.0.1/tcp/0"
117 .parse()
118 .map_err(|err: libp2p::multiaddr::Error| P2PError::Listen(err.to_string()))?;
119 swarm
120 .listen_on(addr.clone())
121 .map_err(|err| P2PError::Listen(err.to_string()))?;
122 Ok(addr)
123}
124
125pub fn handle_discovery_event(swarm: &mut VynSwarm, event: &VynP2PBehaviourEvent) {
126 if let VynP2PBehaviourEvent::Mdns(mdns::Event::Discovered(list)) = event {
127 for (peer_id, addr) in list {
128 swarm.behaviour_mut().gossipsub.add_explicit_peer(peer_id);
129 swarm
130 .behaviour_mut()
131 .kademlia
132 .add_address(peer_id, addr.clone());
133 }
134 }
135}
136
137pub async fn wait_for_connection(
138 swarm: &mut VynSwarm,
139 timeout: Duration,
140) -> Result<PeerId, P2PError> {
141 let fut = async {
142 loop {
143 if let SwarmEvent::ConnectionEstablished { peer_id, .. } =
144 swarm.select_next_some().await
145 {
146 return Ok(peer_id);
147 }
148 }
149 };
150
151 tokio::time::timeout(timeout, fut)
152 .await
153 .map_err(|_| P2PError::Timeout)?
154}
155
156#[cfg(test)]
157mod tests {
158 use super::{build_swarm, listen_localhost, subscribe_sync_topic, wait_for_connection};
159 use libp2p::futures::StreamExt;
160 use libp2p::swarm::SwarmEvent;
161 use std::time::Duration;
162
163 #[tokio::test]
164 async fn libp2p_local_discovery() {
165 let (_peer_a, mut swarm_a) = build_swarm().expect("swarm A should build");
166 let (_peer_b, mut swarm_b) = build_swarm().expect("swarm B should build");
167
168 subscribe_sync_topic(&mut swarm_a).expect("swarm A topic subscription should work");
169 subscribe_sync_topic(&mut swarm_b).expect("swarm B topic subscription should work");
170
171 listen_localhost(&mut swarm_a).expect("swarm A should listen");
172
173 let addr = loop {
174 if let SwarmEvent::NewListenAddr { address, .. } = swarm_a.select_next_some().await {
175 break address;
176 }
177 };
178
179 swarm_b
180 .dial(addr)
181 .expect("swarm B should dial swarm A listen address");
182
183 let connected_a = wait_for_connection(&mut swarm_a, Duration::from_secs(10));
184 let connected_b = wait_for_connection(&mut swarm_b, Duration::from_secs(10));
185 let (a_peer, b_peer) = tokio::join!(connected_a, connected_b);
186
187 assert!(a_peer.is_ok());
188 assert!(b_peer.is_ok());
189 }
190}