1use futures::stream::StreamExt;
2use libp2p::{
3 gossipsub::{self, Message, MessageId},
4 kad, noise,
5 swarm::{NetworkBehaviour, Swarm, SwarmEvent},
6 tcp, yamux, Multiaddr, PeerId, TransportError,
7};
8use std::{
9 collections::hash_map::DefaultHasher,
10 future::Future,
11 hash::{Hash, Hasher},
12 time::Duration,
13};
14use thiserror::Error;
15use tokio::{
16 select,
17 sync::{mpsc, oneshot},
18};
19
20#[derive(Debug, Error)]
21pub enum KarakP2PError {
22 #[error("Failed to create swarm")]
23 SwarmCreationError,
24 #[error("libp2p noise failed")]
25 NoiseError(#[from] noise::Error),
26 #[error("libp2p dns failed")]
27 TransportError(#[from] std::io::Error),
28 #[error("libp2p behaviour failed")]
29 BehaviourError,
30 #[error("libp2p subscription failed")]
31 SubscriptionError(#[from] libp2p::gossipsub::SubscriptionError),
32 #[error("libp2p listen failed")]
33 ListenError(#[from] TransportError<std::io::Error>),
34 #[error("libp2p publish failed")]
35 PublishError(#[from] libp2p::gossipsub::PublishError),
36 #[error("builder error")]
37 BuilderError,
38}
39
40#[derive(NetworkBehaviour)]
42pub struct KarakP2PBehaviour {
43 gossipsub: gossipsub::Behaviour,
44 kademlia: kad::Behaviour<kad::store::MemoryStore>,
45}
46
47pub struct P2PNode {
48 pub peer_id: PeerId,
49 pub address: Multiaddr,
50}
51
52pub struct GossipMessage<M: AsRef<[u8]>> {
53 topic: String,
54 message: M,
55}
56
57impl<M: AsRef<[u8]>> GossipMessage<M> {
58 pub fn new(topic: String, message: M) -> Self {
59 GossipMessage { topic, message }
60 }
61}
62
63pub struct KarakP2P<M: AsRef<[u8]>> {
64 swarm: Swarm<KarakP2PBehaviour>,
65 termination_receiver: oneshot::Receiver<()>,
66 message_receiver: mpsc::Receiver<GossipMessage<M>>,
67}
68
69impl<M: AsRef<[u8]>> KarakP2P<M> {
70 pub fn create_and_start_swarm(
71 topic: &str,
72 listen_addr: Multiaddr,
73 bootstrap_addrs: Vec<P2PNode>,
74 termination_receiver: oneshot::Receiver<()>,
75 message_receiver: mpsc::Receiver<GossipMessage<M>>,
76 idle_timeout_duration: u64,
77 p2p_keypair: Option<libp2p::identity::Keypair>,
78 ) -> Result<Self, KarakP2PError> {
79 let swarm_with_key = match p2p_keypair {
80 Some(keypair) => libp2p::SwarmBuilder::with_existing_identity(keypair),
81 None => libp2p::SwarmBuilder::with_new_identity(),
82 };
83 let mut swarm = swarm_with_key
84 .with_tokio()
85 .with_tcp(
86 tcp::Config::default(),
87 noise::Config::new,
88 yamux::Config::default,
89 )?
90 .with_dns()?
91 .with_behaviour(|key| {
92 let message_id_fn = |message: &gossipsub::Message| {
94 let mut s = DefaultHasher::new();
95 message.data.hash(&mut s);
96 gossipsub::MessageId::from(s.finish().to_string())
97 };
98
99 let gossipsub_config = gossipsub::ConfigBuilder::default()
101 .heartbeat_interval(Duration::from_secs(1)) .validation_mode(gossipsub::ValidationMode::Strict)
103 .message_id_fn(message_id_fn)
104 .mesh_n_low(2) .mesh_n(6) .mesh_n_high(12) .mesh_outbound_min(2) .flood_publish(true) .history_length(10) .history_gossip(3) .build()
112 .map_err(|_| KarakP2PError::BuilderError)?;
113
114 let gossipsub = gossipsub::Behaviour::new(
116 gossipsub::MessageAuthenticity::Signed(key.clone()),
117 gossipsub_config,
118 )?;
119
120 let store = kad::store::MemoryStore::new(key.public().to_peer_id());
121 let kademlia = kad::Behaviour::new(key.public().to_peer_id(), store);
122
123 Ok(KarakP2PBehaviour {
124 gossipsub,
125 kademlia,
126 })
127 })
128 .map_err(|_| KarakP2PError::BehaviourError)?
129 .with_swarm_config(|c| {
130 c.with_idle_connection_timeout(Duration::from_secs(idle_timeout_duration))
131 })
132 .build();
133
134 let topic = gossipsub::IdentTopic::new(topic);
136 swarm.behaviour_mut().gossipsub.subscribe(&topic)?;
138
139 swarm.listen_on(listen_addr)?;
141
142 for peer in &bootstrap_addrs {
143 tracing::info!("Adding peer: {:?}, {:?}", peer.peer_id, peer.address);
144 swarm
145 .behaviour_mut()
146 .kademlia
147 .add_address(&peer.peer_id, peer.address.clone());
148 }
149
150 tracing::info!("Swarm peer id: {:?}", swarm.local_peer_id());
151
152 Ok(KarakP2P {
153 swarm,
154 termination_receiver,
155 message_receiver,
156 })
157 }
158
159 pub async fn start_listening<F, Fut>(
160 &mut self,
161 on_incoming_message: F,
162 ) -> Result<(), KarakP2PError>
163 where
164 F: Fn(PeerId, MessageId, Message) -> Fut + Send + Sync + 'static,
165 Fut: Future<Output = ()> + Send,
166 {
167 loop {
168 select! {
169 Ok(_) = &mut self.termination_receiver => {
170 tracing::info!("Termination message received");
171 return Ok(());
172 }
173 Some(gossip_message) = self.message_receiver.recv()=> {
174 self.publish_message(&gossip_message.topic, gossip_message.message).unwrap_or_else(|e| {
175 tracing::error!("Failed to publish message: {:?}", e);
176 });
177 }
178 event = self.swarm.select_next_some() => match event {
179 SwarmEvent::Behaviour(KarakP2PBehaviourEvent::Gossipsub(gossipsub::Event::Message {
180 propagation_source: peer_id,
181 message_id: id,
182 message,
183 })) => on_incoming_message(peer_id, id, message).await,
184 SwarmEvent::NewListenAddr { address, .. } => {
185 tracing::info!("Local node is listening on {address}");
186 }
187 SwarmEvent::ConnectionEstablished { peer_id, .. } => {
188 tracing::info!("Connection established with peer: {:?}", peer_id);
189 }
190 _ => {}
191 }
192 }
193 }
194 }
195
196 pub fn publish_message(&mut self, topic: &str, message: M) -> Result<(), KarakP2PError> {
197 let topic_hash = gossipsub::IdentTopic::new(topic);
198 self.swarm
199 .behaviour_mut()
200 .gossipsub
201 .publish(topic_hash, message.as_ref())?;
202 Ok(())
203 }
204
205 pub fn peer_id(&mut self) -> PeerId {
206 self.swarm.local_peer_id().to_owned()
207 }
208}