1pub mod behavior;
2pub mod events;
3pub mod protocol;
4
5use std::collections::{HashMap, HashSet};
6use std::time::Duration;
7
8use futures::StreamExt;
9use libp2p::core::ConnectedPoint;
10use libp2p::swarm::dial_opts::DialOpts;
11use libp2p::swarm::ConnectionId;
12use libp2p::{
13 gossipsub, identify, mdns, noise, ping, tcp, yamux, Multiaddr, PeerId, Swarm, SwarmBuilder,
14};
15use tokio::sync::mpsc;
16use tracing::{debug, info, warn};
17
18#[derive(Debug, Clone, Copy, PartialEq, Eq)]
20pub enum NetworkMode {
21 Mdns,
23 Direct,
26}
27
28impl NetworkMode {
29 pub fn as_str(&self) -> &'static str {
30 match self {
31 NetworkMode::Mdns => "mdns",
32 NetworkMode::Direct => "direct",
33 }
34 }
35
36 pub fn from_str(s: &str) -> Option<Self> {
37 match s.trim().to_ascii_lowercase().as_str() {
38 "mdns" | "lan" | "open" => Some(NetworkMode::Mdns),
39 "direct" | "dial" | "private" => Some(NetworkMode::Direct),
40 _ => None,
41 }
42 }
43}
44
45use crate::identity::Identity;
46use crate::network::behavior::{HuddleBehavior, HuddleBehaviorEvent};
47use crate::network::events::NetworkEvent;
48use crate::network::protocol::{room_topic, RoomAnnouncement, ROOMS_TOPIC};
49
50#[derive(Debug)]
51pub enum NetworkCommand {
52 SubscribeRoom { room_id: String },
54 UnsubscribeRoom { room_id: String },
56 PublishRoomMessage { room_id: String, payload: Vec<u8> },
58 AnnounceRoom(RoomAnnouncement),
60 Dial { address: Multiaddr },
63 Shutdown,
64}
65
66#[derive(Clone)]
67pub struct NetworkHandle {
68 cmd_tx: mpsc::Sender<NetworkCommand>,
69}
70
71impl NetworkHandle {
72 pub async fn subscribe_room(&self, room_id: String) {
73 let _ = self
74 .cmd_tx
75 .send(NetworkCommand::SubscribeRoom { room_id })
76 .await;
77 }
78
79 pub async fn unsubscribe_room(&self, room_id: String) {
80 let _ = self
81 .cmd_tx
82 .send(NetworkCommand::UnsubscribeRoom { room_id })
83 .await;
84 }
85
86 pub async fn publish_room_message(&self, room_id: String, payload: Vec<u8>) {
87 let _ = self
88 .cmd_tx
89 .send(NetworkCommand::PublishRoomMessage { room_id, payload })
90 .await;
91 }
92
93 pub async fn announce_room(&self, ann: RoomAnnouncement) {
94 let _ = self.cmd_tx.send(NetworkCommand::AnnounceRoom(ann)).await;
95 }
96
97 pub async fn dial(&self, address: Multiaddr) {
98 let _ = self.cmd_tx.send(NetworkCommand::Dial { address }).await;
99 }
100
101 pub async fn shutdown(&self) {
102 let _ = self.cmd_tx.send(NetworkCommand::Shutdown).await;
103 }
104}
105
106struct NetworkTask {
107 swarm: Swarm<HuddleBehavior>,
108 cmd_rx: mpsc::Receiver<NetworkCommand>,
109 event_tx: mpsc::Sender<NetworkEvent>,
110 discovered_peers: HashSet<PeerId>,
111 dial_attempts: HashMap<ConnectionId, Multiaddr>,
115}
116
117pub fn start_network(
118 identity: &Identity,
119 event_tx: mpsc::Sender<NetworkEvent>,
120) -> crate::error::Result<NetworkHandle> {
121 start_network_with(identity, event_tx, NetworkMode::Mdns, 0)
122}
123
124pub fn start_network_with(
127 identity: &Identity,
128 event_tx: mpsc::Sender<NetworkEvent>,
129 mode: NetworkMode,
130 listen_port: u16,
131) -> crate::error::Result<NetworkHandle> {
132 let keypair = identity.keypair().clone();
133 let local_peer_id = identity.peer_id();
134
135 let mut swarm = SwarmBuilder::with_existing_identity(keypair)
136 .with_tokio()
137 .with_tcp(
138 tcp::Config::default(),
139 noise::Config::new,
140 yamux::Config::default,
141 )
142 .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?
143 .with_behaviour(|key| {
144 let mdns_opt = match mode {
145 NetworkMode::Mdns => Some(
146 mdns::tokio::Behaviour::new(mdns::Config::default(), local_peer_id)
147 .expect("mDNS init failed"),
148 ),
149 NetworkMode::Direct => None,
150 };
151 let mdns: libp2p::swarm::behaviour::toggle::Toggle<_> = mdns_opt.into();
152
153 let identify = identify::Behaviour::new(
154 identify::Config::new("/huddle/1.0.0".into(), key.public())
155 .with_agent_version("huddle/0.2".into()),
156 );
157
158 let ping = ping::Behaviour::default();
159
160 let gossipsub_config = gossipsub::ConfigBuilder::default()
161 .heartbeat_interval(Duration::from_secs(1))
162 .validation_mode(gossipsub::ValidationMode::Strict)
163 .build()
164 .expect("valid gossipsub config");
165
166 let mut gossipsub = gossipsub::Behaviour::new(
167 gossipsub::MessageAuthenticity::Signed(key.clone()),
168 gossipsub_config,
169 )
170 .expect("valid gossipsub init");
171
172 let rooms_topic = gossipsub::IdentTopic::new(ROOMS_TOPIC);
175 gossipsub
176 .subscribe(&rooms_topic)
177 .expect("subscribe rooms topic");
178
179 HuddleBehavior {
180 mdns,
181 identify,
182 ping,
183 gossipsub,
184 }
185 })
186 .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?
187 .with_swarm_config(|cfg| cfg.with_idle_connection_timeout(Duration::from_secs(120)))
188 .build();
189
190 let listen_addr: Multiaddr = format!("/ip4/0.0.0.0/tcp/{}", listen_port)
191 .parse()
192 .expect("valid listen addr");
193 swarm
194 .listen_on(listen_addr)
195 .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?;
196 let listen_addr6: Multiaddr = format!("/ip6/::/tcp/{}", listen_port)
198 .parse()
199 .expect("valid ipv6 listen addr");
200 if let Err(e) = swarm.listen_on(listen_addr6) {
201 debug!(%e, "ipv6 listen skipped");
202 }
203
204 let (cmd_tx, cmd_rx) = mpsc::channel(256);
205 let task = NetworkTask {
206 swarm,
207 cmd_rx,
208 event_tx,
209 discovered_peers: HashSet::new(),
210 dial_attempts: HashMap::new(),
211 };
212 tokio::spawn(task.run());
213
214 Ok(NetworkHandle { cmd_tx })
215}
216
217impl NetworkTask {
218 async fn run(mut self) {
219 loop {
220 tokio::select! {
221 event = self.swarm.select_next_some() => {
222 self.handle_swarm_event(event).await;
223 }
224 Some(cmd) = self.cmd_rx.recv() => {
225 if matches!(cmd, NetworkCommand::Shutdown) {
226 info!("network task shutting down");
227 break;
228 }
229 self.handle_command(cmd);
230 }
231 }
232 }
233 }
234
235 async fn handle_swarm_event(
236 &mut self,
237 event: libp2p::swarm::SwarmEvent<HuddleBehaviorEvent>,
238 ) {
239 match event {
240 libp2p::swarm::SwarmEvent::NewListenAddr { address, .. } => {
241 info!(%address, "listening");
242 let _ = self
243 .event_tx
244 .send(NetworkEvent::ListeningOn { address })
245 .await;
246 }
247 libp2p::swarm::SwarmEvent::ConnectionEstablished {
248 peer_id,
249 connection_id,
250 endpoint,
251 ..
252 } => {
253 if let Some(addr) = self.dial_attempts.remove(&connection_id) {
254 info!(%peer_id, %addr, "user-dialed peer connected");
255 self.swarm
258 .behaviour_mut()
259 .gossipsub
260 .add_explicit_peer(&peer_id);
261 self.discovered_peers.insert(peer_id);
262 let _ = self
263 .event_tx
264 .send(NetworkEvent::DialSucceeded {
265 peer_id,
266 address: addr,
267 })
268 .await;
269 } else if let ConnectedPoint::Dialer { .. } = endpoint {
270 self.swarm
273 .behaviour_mut()
274 .gossipsub
275 .add_explicit_peer(&peer_id);
276 }
277 }
278 libp2p::swarm::SwarmEvent::OutgoingConnectionError {
279 connection_id,
280 error,
281 ..
282 } => {
283 if let Some(addr) = self.dial_attempts.remove(&connection_id) {
284 warn!(%addr, %error, "user-dialed peer failed");
285 let _ = self
286 .event_tx
287 .send(NetworkEvent::DialFailed {
288 address: addr,
289 error: error.to_string(),
290 })
291 .await;
292 }
293 }
294 libp2p::swarm::SwarmEvent::Behaviour(be) => self.handle_behavior_event(be).await,
295 _ => {}
296 }
297 }
298
299 async fn handle_behavior_event(&mut self, event: HuddleBehaviorEvent) {
300 match event {
301 HuddleBehaviorEvent::Mdns(mdns::Event::Discovered(peers)) => {
302 for (peer_id, addr) in peers {
303 if self.discovered_peers.insert(peer_id) {
304 info!(%peer_id, %addr, "mDNS discovered");
305 self.swarm.add_peer_address(peer_id, addr);
306 self.swarm
308 .behaviour_mut()
309 .gossipsub
310 .add_explicit_peer(&peer_id);
311 let _ = self
312 .event_tx
313 .send(NetworkEvent::PeerDiscovered { peer_id })
314 .await;
315 }
316 }
317 }
318 HuddleBehaviorEvent::Mdns(mdns::Event::Expired(peers)) => {
319 for (peer_id, _) in peers {
320 if self.discovered_peers.remove(&peer_id) {
321 info!(%peer_id, "mDNS peer expired");
322 self.swarm
323 .behaviour_mut()
324 .gossipsub
325 .remove_explicit_peer(&peer_id);
326 let _ = self.event_tx.send(NetworkEvent::PeerExpired { peer_id }).await;
327 }
328 }
329 }
330 HuddleBehaviorEvent::Gossipsub(gossipsub::Event::Message {
331 propagation_source,
332 message,
333 ..
334 }) => {
335 self.handle_gossipsub_message(propagation_source, message).await;
336 }
337 HuddleBehaviorEvent::Identify(identify::Event::Received {
338 peer_id, info, ..
339 }) => {
340 debug!(%peer_id, agent = %info.agent_version, "identify received");
341 }
342 _ => {}
343 }
344 }
345
346 async fn handle_gossipsub_message(
347 &mut self,
348 from_peer: PeerId,
349 message: gossipsub::Message,
350 ) {
351 let topic = message.topic.to_string();
352 if topic == ROOMS_TOPIC {
353 match serde_json::from_slice::<RoomAnnouncement>(&message.data) {
354 Ok(ann) => {
355 let _ = self
356 .event_tx
357 .send(NetworkEvent::RoomAnnouncementReceived(ann))
358 .await;
359 }
360 Err(e) => {
361 warn!(%e, "bad room announcement");
362 }
363 }
364 } else if let Some(room_id) = topic.strip_prefix(protocol::ROOM_TOPIC_PREFIX) {
365 let _ = self
366 .event_tx
367 .send(NetworkEvent::RoomMessageReceived {
368 room_id: room_id.to_string(),
369 payload: message.data,
370 from_peer,
371 })
372 .await;
373 }
374 }
375
376 fn handle_command(&mut self, cmd: NetworkCommand) {
377 match cmd {
378 NetworkCommand::SubscribeRoom { room_id } => {
379 let topic = gossipsub::IdentTopic::new(room_topic(&room_id));
380 if let Err(e) = self.swarm.behaviour_mut().gossipsub.subscribe(&topic) {
381 warn!(%e, %room_id, "subscribe room failed");
382 }
383 }
384 NetworkCommand::UnsubscribeRoom { room_id } => {
385 let topic = gossipsub::IdentTopic::new(room_topic(&room_id));
386 self.swarm.behaviour_mut().gossipsub.unsubscribe(&topic);
387 }
388 NetworkCommand::PublishRoomMessage { room_id, payload } => {
389 let topic = gossipsub::IdentTopic::new(room_topic(&room_id));
390 if let Err(e) = self.swarm.behaviour_mut().gossipsub.publish(topic, payload) {
391 debug!(%e, %room_id, "publish room message failed (no peers yet?)");
392 }
393 }
394 NetworkCommand::AnnounceRoom(ann) => {
395 let topic = gossipsub::IdentTopic::new(ROOMS_TOPIC);
396 match serde_json::to_vec(&ann) {
397 Ok(payload) => {
398 if let Err(e) =
399 self.swarm.behaviour_mut().gossipsub.publish(topic, payload)
400 {
401 debug!(%e, "publish room announcement failed");
402 }
403 }
404 Err(e) => warn!(%e, "encode room announcement"),
405 }
406 }
407 NetworkCommand::Dial { address } => {
408 let opts: DialOpts = address.clone().into();
409 let conn_id = opts.connection_id();
410 match self.swarm.dial(opts) {
411 Ok(()) => {
412 self.dial_attempts.insert(conn_id, address);
413 }
414 Err(e) => {
415 let tx = self.event_tx.clone();
417 let err = e.to_string();
418 tokio::spawn(async move {
419 let _ = tx
420 .send(NetworkEvent::DialFailed {
421 address,
422 error: err,
423 })
424 .await;
425 });
426 }
427 }
428 }
429 NetworkCommand::Shutdown => unreachable!(),
430 }
431 }
432}