Skip to main content

vyn_core/
p2p.rs

1use std::collections::hash_map::DefaultHasher;
2use std::hash::{Hash, Hasher};
3use std::time::Duration;
4
5use libp2p::futures::StreamExt;
6use libp2p::kad::store::MemoryStore;
7use libp2p::swarm::{NetworkBehaviour, SwarmEvent};
8use libp2p::{
9    Multiaddr, PeerId, Swarm, SwarmBuilder, gossipsub, identify, kad, mdns, noise, ping, tcp, yamux,
10};
11
12pub const SYNC_TOPIC: &str = "vyn-sync";
13
14#[derive(Debug, thiserror::Error)]
15pub enum P2PError {
16    #[error("failed to build gossipsub config")]
17    GossipsubConfig,
18    #[error("failed to initialize gossipsub behaviour")]
19    GossipsubInit,
20    #[error("failed to initialize mDNS behaviour")]
21    MdnsInit,
22    #[error("failed to build libp2p swarm: {0}")]
23    SwarmBuild(String),
24    #[error("failed to listen on address: {0}")]
25    Listen(String),
26    #[error("failed to subscribe to sync topic")]
27    Subscribe,
28    #[error("timed out waiting for peer connection")]
29    Timeout,
30}
31
32#[derive(NetworkBehaviour)]
33pub struct VynP2PBehaviour {
34    pub gossipsub: gossipsub::Behaviour,
35    pub kademlia: kad::Behaviour<MemoryStore>,
36    pub identify: identify::Behaviour,
37    pub mdns: mdns::tokio::Behaviour,
38    pub ping: ping::Behaviour,
39}
40
41pub type VynSwarm = Swarm<VynP2PBehaviour>;
42
43pub fn build_swarm() -> Result<(PeerId, VynSwarm), P2PError> {
44    let local_key = libp2p::identity::Keypair::generate_ed25519();
45    let local_peer_id = local_key.public().to_peer_id();
46
47    let swarm = SwarmBuilder::with_existing_identity(local_key)
48        .with_tokio()
49        .with_tcp(
50            tcp::Config::default(),
51            noise::Config::new,
52            yamux::Config::default,
53        )
54        .map_err(|err| P2PError::SwarmBuild(err.to_string()))?
55        .with_behaviour(|key| {
56            let peer_id = key.public().to_peer_id();
57            let mut hasher = DefaultHasher::new();
58            SYNC_TOPIC.hash(&mut hasher);
59            let message_id_fn = move |message: &gossipsub::Message| {
60                let mut h = DefaultHasher::new();
61                message.data.hash(&mut h);
62                gossipsub::MessageId::from(h.finish().to_string())
63            };
64
65            let gossipsub_config = gossipsub::ConfigBuilder::default()
66                .heartbeat_interval(Duration::from_secs(10))
67                .validation_mode(gossipsub::ValidationMode::Strict)
68                .message_id_fn(message_id_fn)
69                .build()
70                .map_err(|_| P2PError::GossipsubConfig)?;
71
72            let gossipsub = gossipsub::Behaviour::new(
73                gossipsub::MessageAuthenticity::Signed(key.clone()),
74                gossipsub_config,
75            )
76            .map_err(|_| P2PError::GossipsubInit)?;
77
78            let mut kademlia = kad::Behaviour::new(peer_id, MemoryStore::new(peer_id));
79            kademlia.set_mode(Some(kad::Mode::Server));
80
81            let identify = identify::Behaviour::new(identify::Config::new(
82                "/vyn/1.0.0".to_string(),
83                key.public(),
84            ));
85
86            let mdns = mdns::tokio::Behaviour::new(mdns::Config::default(), peer_id)
87                .map_err(|_| P2PError::MdnsInit)?;
88
89            let ping = ping::Behaviour::new(ping::Config::new());
90
91            Ok(VynP2PBehaviour {
92                gossipsub,
93                kademlia,
94                identify,
95                mdns,
96                ping,
97            })
98        })
99        .map_err(|err| P2PError::SwarmBuild(err.to_string()))?
100        .build();
101
102    Ok((local_peer_id, swarm))
103}
104
105pub fn subscribe_sync_topic(swarm: &mut VynSwarm) -> Result<gossipsub::IdentTopic, P2PError> {
106    let topic = gossipsub::IdentTopic::new(SYNC_TOPIC);
107    swarm
108        .behaviour_mut()
109        .gossipsub
110        .subscribe(&topic)
111        .map_err(|_| P2PError::Subscribe)?;
112    Ok(topic)
113}
114
115pub fn listen_localhost(swarm: &mut VynSwarm) -> Result<Multiaddr, P2PError> {
116    let addr: Multiaddr = "/ip4/127.0.0.1/tcp/0"
117        .parse()
118        .map_err(|err: libp2p::multiaddr::Error| P2PError::Listen(err.to_string()))?;
119    swarm
120        .listen_on(addr.clone())
121        .map_err(|err| P2PError::Listen(err.to_string()))?;
122    Ok(addr)
123}
124
125pub fn handle_discovery_event(swarm: &mut VynSwarm, event: &VynP2PBehaviourEvent) {
126    if let VynP2PBehaviourEvent::Mdns(mdns::Event::Discovered(list)) = event {
127        for (peer_id, addr) in list {
128            swarm.behaviour_mut().gossipsub.add_explicit_peer(peer_id);
129            swarm
130                .behaviour_mut()
131                .kademlia
132                .add_address(peer_id, addr.clone());
133        }
134    }
135}
136
137pub async fn wait_for_connection(
138    swarm: &mut VynSwarm,
139    timeout: Duration,
140) -> Result<PeerId, P2PError> {
141    let fut = async {
142        loop {
143            if let SwarmEvent::ConnectionEstablished { peer_id, .. } =
144                swarm.select_next_some().await
145            {
146                return Ok(peer_id);
147            }
148        }
149    };
150
151    tokio::time::timeout(timeout, fut)
152        .await
153        .map_err(|_| P2PError::Timeout)?
154}
155
156#[cfg(test)]
157mod tests {
158    use super::{build_swarm, listen_localhost, subscribe_sync_topic, wait_for_connection};
159    use libp2p::futures::StreamExt;
160    use libp2p::swarm::SwarmEvent;
161    use std::time::Duration;
162
163    #[tokio::test]
164    async fn libp2p_local_discovery() {
165        let (_peer_a, mut swarm_a) = build_swarm().expect("swarm A should build");
166        let (_peer_b, mut swarm_b) = build_swarm().expect("swarm B should build");
167
168        subscribe_sync_topic(&mut swarm_a).expect("swarm A topic subscription should work");
169        subscribe_sync_topic(&mut swarm_b).expect("swarm B topic subscription should work");
170
171        listen_localhost(&mut swarm_a).expect("swarm A should listen");
172
173        let addr = loop {
174            if let SwarmEvent::NewListenAddr { address, .. } = swarm_a.select_next_some().await {
175                break address;
176            }
177        };
178
179        swarm_b
180            .dial(addr)
181            .expect("swarm B should dial swarm A listen address");
182
183        let connected_a = wait_for_connection(&mut swarm_a, Duration::from_secs(10));
184        let connected_b = wait_for_connection(&mut swarm_b, Duration::from_secs(10));
185        let (a_peer, b_peer) = tokio::join!(connected_a, connected_b);
186
187        assert!(a_peer.is_ok());
188        assert!(b_peer.is_ok());
189    }
190}