karak_p2p/
lib.rs

1use futures::stream::StreamExt;
2use libp2p::{
3    gossipsub::{self, Message, MessageId},
4    kad, noise,
5    swarm::{NetworkBehaviour, Swarm, SwarmEvent},
6    tcp, yamux, Multiaddr, PeerId, TransportError,
7};
8use std::{
9    collections::hash_map::DefaultHasher,
10    future::Future,
11    hash::{Hash, Hasher},
12    time::Duration,
13};
14use thiserror::Error;
15use tokio::{
16    select,
17    sync::{mpsc, oneshot},
18};
19
20#[derive(Debug, Error)]
21pub enum KarakP2PError {
22    #[error("Failed to create swarm")]
23    SwarmCreationError,
24    #[error("libp2p noise failed")]
25    NoiseError(#[from] noise::Error),
26    #[error("libp2p dns failed")]
27    TransportError(#[from] std::io::Error),
28    #[error("libp2p behaviour failed")]
29    BehaviourError,
30    #[error("libp2p subscription failed")]
31    SubscriptionError(#[from] libp2p::gossipsub::SubscriptionError),
32    #[error("libp2p listen failed")]
33    ListenError(#[from] TransportError<std::io::Error>),
34    #[error("libp2p publish failed")]
35    PublishError(#[from] libp2p::gossipsub::PublishError),
36    #[error("builder error")]
37    BuilderError,
38}
39
40// We create a custom network behaviour that combines Gossipsub and Kademlia.
41#[derive(NetworkBehaviour)]
42pub struct KarakP2PBehaviour {
43    gossipsub: gossipsub::Behaviour,
44    kademlia: kad::Behaviour<kad::store::MemoryStore>,
45}
46
47pub struct P2PNode {
48    pub peer_id: PeerId,
49    pub address: Multiaddr,
50}
51
52pub struct GossipMessage<M: AsRef<[u8]>> {
53    topic: String,
54    message: M,
55}
56
57impl<M: AsRef<[u8]>> GossipMessage<M> {
58    pub fn new(topic: String, message: M) -> Self {
59        GossipMessage { topic, message }
60    }
61}
62
63pub struct KarakP2P<M: AsRef<[u8]>> {
64    swarm: Swarm<KarakP2PBehaviour>,
65    termination_receiver: oneshot::Receiver<()>,
66    message_receiver: mpsc::Receiver<GossipMessage<M>>,
67}
68
69impl<M: AsRef<[u8]>> KarakP2P<M> {
70    pub fn create_and_start_swarm(
71        topic: &str,
72        listen_addr: Multiaddr,
73        bootstrap_addrs: Vec<P2PNode>,
74        termination_receiver: oneshot::Receiver<()>,
75        message_receiver: mpsc::Receiver<GossipMessage<M>>,
76        idle_timeout_duration: u64,
77        p2p_keypair: Option<libp2p::identity::Keypair>,
78    ) -> Result<Self, KarakP2PError> {
79        let swarm_with_key = match p2p_keypair {
80            Some(keypair) => libp2p::SwarmBuilder::with_existing_identity(keypair),
81            None => libp2p::SwarmBuilder::with_new_identity(),
82        };
83        let mut swarm = swarm_with_key
84            .with_tokio()
85            .with_tcp(
86                tcp::Config::default(),
87                noise::Config::new,
88                yamux::Config::default,
89            )?
90            .with_dns()?
91            .with_behaviour(|key| {
92                // To content-address message, we can take the hash of message and use it as an ID.
93                let message_id_fn = |message: &gossipsub::Message| {
94                    let mut s = DefaultHasher::new();
95                    message.data.hash(&mut s);
96                    gossipsub::MessageId::from(s.finish().to_string())
97                };
98
99                // Set a custom gossipsub configuration
100                let gossipsub_config = gossipsub::ConfigBuilder::default()
101                    .heartbeat_interval(Duration::from_secs(1)) // More frequent heartbeats
102                    .validation_mode(gossipsub::ValidationMode::Strict)
103                    .message_id_fn(message_id_fn)
104                    .mesh_n_low(2) // Minimum number of peers to maintain in mesh
105                    .mesh_n(6) // Target number of peers to keep in mesh
106                    .mesh_n_high(12) // Maximum number of peers in mesh
107                    .mesh_outbound_min(2) // Minimum outbound peers in mesh
108                    .flood_publish(true) // Ensure messages are flooded to all peers
109                    .history_length(10) // Keep track of more messages
110                    .history_gossip(3) // Gossip about more messages
111                    .build()
112                    .map_err(|_| KarakP2PError::BuilderError)?;
113
114                // build a gossipsub network behaviour
115                let gossipsub = gossipsub::Behaviour::new(
116                    gossipsub::MessageAuthenticity::Signed(key.clone()),
117                    gossipsub_config,
118                )?;
119
120                let store = kad::store::MemoryStore::new(key.public().to_peer_id());
121                let kademlia = kad::Behaviour::new(key.public().to_peer_id(), store);
122
123                Ok(KarakP2PBehaviour {
124                    gossipsub,
125                    kademlia,
126                })
127            })
128            .map_err(|_| KarakP2PError::BehaviourError)?
129            .with_swarm_config(|c| {
130                c.with_idle_connection_timeout(Duration::from_secs(idle_timeout_duration))
131            })
132            .build();
133
134        // Create a Gossipsub topic
135        let topic = gossipsub::IdentTopic::new(topic);
136        // subscribes to our topic
137        swarm.behaviour_mut().gossipsub.subscribe(&topic)?;
138
139        // Listen on all interfaces and whatever port the OS assigns
140        swarm.listen_on(listen_addr)?;
141
142        for peer in &bootstrap_addrs {
143            tracing::info!("Adding peer: {:?}, {:?}", peer.peer_id, peer.address);
144            swarm
145                .behaviour_mut()
146                .kademlia
147                .add_address(&peer.peer_id, peer.address.clone());
148        }
149
150        tracing::info!("Swarm peer id: {:?}", swarm.local_peer_id());
151
152        Ok(KarakP2P {
153            swarm,
154            termination_receiver,
155            message_receiver,
156        })
157    }
158
159    pub async fn start_listening<F, Fut>(
160        &mut self,
161        on_incoming_message: F,
162    ) -> Result<(), KarakP2PError>
163    where
164        F: Fn(PeerId, MessageId, Message) -> Fut + Send + Sync + 'static,
165        Fut: Future<Output = ()> + Send,
166    {
167        loop {
168            select! {
169                Ok(_) = &mut self.termination_receiver => {
170                    tracing::info!("Termination message received");
171                    return Ok(());
172                }
173                Some(gossip_message) = self.message_receiver.recv()=> {
174                    self.publish_message(&gossip_message.topic, gossip_message.message).unwrap_or_else(|e| {
175                        tracing::error!("Failed to publish message: {:?}", e);
176                    });
177                }
178                event = self.swarm.select_next_some() => match event {
179                    SwarmEvent::Behaviour(KarakP2PBehaviourEvent::Gossipsub(gossipsub::Event::Message {
180                        propagation_source: peer_id,
181                        message_id: id,
182                        message,
183                    })) => on_incoming_message(peer_id, id, message).await,
184                    SwarmEvent::NewListenAddr { address, .. } => {
185                        tracing::info!("Local node is listening on {address}");
186                    }
187                    SwarmEvent::ConnectionEstablished { peer_id, .. } => {
188                        tracing::info!("Connection established with peer: {:?}", peer_id);
189                    }
190                    _ => {}
191                }
192            }
193        }
194    }
195
196    pub fn publish_message(&mut self, topic: &str, message: M) -> Result<(), KarakP2PError> {
197        let topic_hash = gossipsub::IdentTopic::new(topic);
198        self.swarm
199            .behaviour_mut()
200            .gossipsub
201            .publish(topic_hash, message.as_ref())?;
202        Ok(())
203    }
204
205    pub fn peer_id(&mut self) -> PeerId {
206        self.swarm.local_peer_id().to_owned()
207    }
208}