1pub mod behavior;
2pub mod events;
3pub mod protocol;
4
5use std::collections::{HashMap, HashSet};
6use std::time::Duration;
7
8use futures::StreamExt;
9use libp2p::core::ConnectedPoint;
10use libp2p::swarm::dial_opts::DialOpts;
11use libp2p::swarm::ConnectionId;
12use libp2p::{
13 autonat, dcutr, gossipsub, identify, mdns, noise, ping, tcp, yamux, Multiaddr, PeerId, Swarm,
14 SwarmBuilder,
15};
16use tokio::sync::mpsc;
17use tracing::{debug, info, warn};
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum NetworkMode {
22 Mdns,
24 Direct,
27}
28
29impl NetworkMode {
30 pub fn as_str(&self) -> &'static str {
31 match self {
32 NetworkMode::Mdns => "mdns",
33 NetworkMode::Direct => "direct",
34 }
35 }
36
37 pub fn from_str(s: &str) -> Option<Self> {
38 match s.trim().to_ascii_lowercase().as_str() {
39 "mdns" | "lan" | "open" => Some(NetworkMode::Mdns),
40 "direct" | "dial" | "private" => Some(NetworkMode::Direct),
41 _ => None,
42 }
43 }
44}
45
46use crate::identity::{compute_fingerprint, Identity};
47use crate::network::behavior::{HuddleBehavior, HuddleBehaviorEvent};
48use crate::network::events::NetworkEvent;
49use crate::network::protocol::{room_topic, RoomAnnouncement, ROOMS_TOPIC};
50
51#[derive(Debug)]
52pub enum NetworkCommand {
53 SubscribeRoom { room_id: String },
55 UnsubscribeRoom { room_id: String },
57 PublishRoomMessage { room_id: String, payload: Vec<u8> },
59 AnnounceRoom(RoomAnnouncement),
61 Dial { address: Multiaddr },
64 AcceptInbound { peer_id: PeerId },
67 RejectInbound { peer_id: PeerId },
71 Shutdown,
72}
73
74#[derive(Clone)]
75pub struct NetworkHandle {
76 cmd_tx: mpsc::Sender<NetworkCommand>,
77}
78
79impl NetworkHandle {
80 pub async fn subscribe_room(&self, room_id: String) {
81 let _ = self
82 .cmd_tx
83 .send(NetworkCommand::SubscribeRoom { room_id })
84 .await;
85 }
86
87 pub async fn unsubscribe_room(&self, room_id: String) {
88 let _ = self
89 .cmd_tx
90 .send(NetworkCommand::UnsubscribeRoom { room_id })
91 .await;
92 }
93
94 pub async fn publish_room_message(&self, room_id: String, payload: Vec<u8>) {
95 let _ = self
96 .cmd_tx
97 .send(NetworkCommand::PublishRoomMessage { room_id, payload })
98 .await;
99 }
100
101 pub async fn announce_room(&self, ann: RoomAnnouncement) {
102 let _ = self.cmd_tx.send(NetworkCommand::AnnounceRoom(ann)).await;
103 }
104
105 pub async fn dial(&self, address: Multiaddr) {
106 let _ = self.cmd_tx.send(NetworkCommand::Dial { address }).await;
107 }
108
109 pub async fn accept_inbound(&self, peer_id: PeerId) {
110 let _ = self
111 .cmd_tx
112 .send(NetworkCommand::AcceptInbound { peer_id })
113 .await;
114 }
115
116 pub async fn reject_inbound(&self, peer_id: PeerId) {
117 let _ = self
118 .cmd_tx
119 .send(NetworkCommand::RejectInbound { peer_id })
120 .await;
121 }
122
123 pub async fn shutdown(&self) {
124 let _ = self.cmd_tx.send(NetworkCommand::Shutdown).await;
125 }
126}
127
128#[derive(Debug)]
134enum PendingPeer {
135 InboundUnknown { address: Multiaddr },
137}
138
139struct NetworkTask {
140 swarm: Swarm<HuddleBehavior>,
141 cmd_rx: mpsc::Receiver<NetworkCommand>,
142 event_tx: mpsc::Sender<NetworkEvent>,
143 discovered_peers: HashSet<PeerId>,
144 dial_attempts: HashMap<ConnectionId, Multiaddr>,
148 pending_inbound: HashMap<PeerId, PendingPeer>,
154 session_blocklist: HashSet<PeerId>,
160 configured_relays: Vec<Multiaddr>,
167 relay_peer_ids: HashSet<PeerId>,
168}
169
170pub fn start_network(
171 identity: &Identity,
172 event_tx: mpsc::Sender<NetworkEvent>,
173) -> crate::error::Result<NetworkHandle> {
174 start_network_with(identity, event_tx, NetworkMode::Mdns, 0, Vec::new())
175}
176
177pub fn start_network_with(
183 identity: &Identity,
184 event_tx: mpsc::Sender<NetworkEvent>,
185 mode: NetworkMode,
186 listen_port: u16,
187 relays: Vec<Multiaddr>,
188) -> crate::error::Result<NetworkHandle> {
189 let keypair = identity.keypair().clone();
190 let local_peer_id = identity.peer_id();
191
192 let mut swarm = SwarmBuilder::with_existing_identity(keypair)
193 .with_tokio()
194 .with_tcp(
195 tcp::Config::default(),
196 noise::Config::new,
197 yamux::Config::default,
198 )
199 .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?
200 .with_relay_client(noise::Config::new, yamux::Config::default)
205 .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?
206 .with_behaviour(|key, relay_client| {
207 let mdns_opt = match mode {
208 NetworkMode::Mdns => Some(
209 mdns::tokio::Behaviour::new(mdns::Config::default(), local_peer_id)
210 .expect("mDNS init failed"),
211 ),
212 NetworkMode::Direct => None,
213 };
214 let mdns: libp2p::swarm::behaviour::toggle::Toggle<_> = mdns_opt.into();
215
216 let identify = identify::Behaviour::new(
217 identify::Config::new("/huddle/1.0.0".into(), key.public())
218 .with_agent_version("huddle/0.3".into()),
219 );
220
221 let ping = ping::Behaviour::default();
222
223 let gossipsub_config = gossipsub::ConfigBuilder::default()
224 .heartbeat_interval(Duration::from_secs(1))
225 .validation_mode(gossipsub::ValidationMode::Strict)
226 .max_transmit_size(256 * 1024)
229 .build()
230 .expect("valid gossipsub config");
231
232 let mut gossipsub = gossipsub::Behaviour::new(
233 gossipsub::MessageAuthenticity::Signed(key.clone()),
234 gossipsub_config,
235 )
236 .expect("valid gossipsub init");
237
238 let rooms_topic = gossipsub::IdentTopic::new(ROOMS_TOPIC);
241 gossipsub
242 .subscribe(&rooms_topic)
243 .expect("subscribe rooms topic");
244
245 let autonat = autonat::v1::Behaviour::new(local_peer_id, Default::default());
246 let dcutr = dcutr::Behaviour::new(local_peer_id);
247
248 HuddleBehavior {
249 mdns,
250 identify,
251 ping,
252 gossipsub,
253 relay_client,
254 autonat,
255 dcutr,
256 }
257 })
258 .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?
259 .with_swarm_config(|cfg| cfg.with_idle_connection_timeout(Duration::from_secs(120)))
260 .build();
261
262 let listen_addr: Multiaddr = format!("/ip4/0.0.0.0/tcp/{}", listen_port)
263 .parse()
264 .expect("valid listen addr");
265 swarm
266 .listen_on(listen_addr)
267 .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?;
268 let listen_addr6: Multiaddr = format!("/ip6/::/tcp/{}", listen_port)
270 .parse()
271 .expect("valid ipv6 listen addr");
272 if let Err(e) = swarm.listen_on(listen_addr6) {
273 debug!(%e, "ipv6 listen skipped");
274 }
275
276 let (cmd_tx, cmd_rx) = mpsc::channel(256);
277 let mut task = NetworkTask {
278 swarm,
279 cmd_rx,
280 event_tx,
281 discovered_peers: HashSet::new(),
282 dial_attempts: HashMap::new(),
283 pending_inbound: HashMap::new(),
284 session_blocklist: HashSet::new(),
285 configured_relays: relays.clone(),
286 relay_peer_ids: HashSet::new(),
287 };
288 for relay_addr in relays {
292 info!(addr = %relay_addr, "dialing configured relay");
293 let opts: libp2p::swarm::dial_opts::DialOpts = relay_addr.clone().into();
294 let conn_id = opts.connection_id();
295 match task.swarm.dial(opts) {
296 Ok(()) => {
297 task.dial_attempts.insert(conn_id, relay_addr);
298 }
299 Err(e) => warn!(%e, "dial relay failed"),
300 }
301 }
302 tokio::spawn(task.run());
303
304 Ok(NetworkHandle { cmd_tx })
305}
306
307impl NetworkTask {
308 async fn run(mut self) {
309 loop {
310 tokio::select! {
311 event = self.swarm.select_next_some() => {
312 self.handle_swarm_event(event).await;
313 }
314 Some(cmd) = self.cmd_rx.recv() => {
315 if matches!(cmd, NetworkCommand::Shutdown) {
316 info!("network task shutting down");
317 break;
318 }
319 self.handle_command(cmd);
320 }
321 }
322 }
323 }
324
325 async fn handle_swarm_event(
326 &mut self,
327 event: libp2p::swarm::SwarmEvent<HuddleBehaviorEvent>,
328 ) {
329 match event {
330 libp2p::swarm::SwarmEvent::NewListenAddr { address, .. } => {
331 info!(%address, "listening");
332 use libp2p::multiaddr::Protocol;
336 let is_circuit = address
337 .iter()
338 .any(|p| matches!(p, Protocol::P2pCircuit));
339 if is_circuit {
340 let _ = self
341 .event_tx
342 .send(NetworkEvent::RelayReservationEstablished {
343 address: address.clone(),
344 })
345 .await;
346 }
347 let _ = self
348 .event_tx
349 .send(NetworkEvent::ListeningOn { address })
350 .await;
351 }
352 libp2p::swarm::SwarmEvent::ConnectionEstablished {
353 peer_id,
354 connection_id,
355 endpoint,
356 ..
357 } => {
358 if let Some(addr) = self.dial_attempts.remove(&connection_id) {
359 let is_relay = self.configured_relays.iter().any(|r| r == &addr);
365 if is_relay {
366 info!(%peer_id, %addr, "connected to configured relay");
367 self.relay_peer_ids.insert(peer_id);
368 } else {
369 info!(%peer_id, %addr, "user-dialed peer connected");
370 self.swarm
374 .behaviour_mut()
375 .gossipsub
376 .add_explicit_peer(&peer_id);
377 self.discovered_peers.insert(peer_id);
378 let _ = self
379 .event_tx
380 .send(NetworkEvent::DialSucceeded {
381 peer_id,
382 address: addr,
383 })
384 .await;
385 }
386 } else if let ConnectedPoint::Dialer { .. } = endpoint {
387 self.swarm
390 .behaviour_mut()
391 .gossipsub
392 .add_explicit_peer(&peer_id);
393 } else {
394 if self.session_blocklist.contains(&peer_id) {
407 info!(%peer_id, "rejecting inbound from session-blocked peer");
408 let _ = self.swarm.disconnect_peer_id(peer_id);
409 } else {
410 let address = match &endpoint {
411 ConnectedPoint::Listener { send_back_addr, .. } => {
412 send_back_addr.clone()
413 }
414 _ => Multiaddr::empty(),
415 };
416 debug!(%peer_id, %address, "inbound peer pending decision");
417 self.pending_inbound
418 .insert(peer_id, PendingPeer::InboundUnknown { address });
419 }
420 }
421 }
422 libp2p::swarm::SwarmEvent::OutgoingConnectionError {
423 connection_id,
424 error,
425 ..
426 } => {
427 if let Some(addr) = self.dial_attempts.remove(&connection_id) {
428 warn!(%addr, %error, "user-dialed peer failed");
429 let _ = self
430 .event_tx
431 .send(NetworkEvent::DialFailed {
432 address: addr,
433 error: error.to_string(),
434 })
435 .await;
436 }
437 }
438 libp2p::swarm::SwarmEvent::ConnectionClosed { peer_id, .. } => {
439 self.pending_inbound.remove(&peer_id);
444 }
445 libp2p::swarm::SwarmEvent::Behaviour(be) => self.handle_behavior_event(be).await,
446 _ => {}
447 }
448 }
449
450 async fn handle_behavior_event(&mut self, event: HuddleBehaviorEvent) {
451 match event {
452 HuddleBehaviorEvent::Mdns(mdns::Event::Discovered(peers)) => {
453 for (peer_id, addr) in peers {
454 if self.discovered_peers.insert(peer_id) {
455 info!(%peer_id, %addr, "mDNS discovered");
456 self.swarm.add_peer_address(peer_id, addr);
457 self.swarm
459 .behaviour_mut()
460 .gossipsub
461 .add_explicit_peer(&peer_id);
462 let _ = self
463 .event_tx
464 .send(NetworkEvent::PeerDiscovered { peer_id })
465 .await;
466 }
467 }
468 }
469 HuddleBehaviorEvent::Mdns(mdns::Event::Expired(peers)) => {
470 for (peer_id, _) in peers {
471 if self.discovered_peers.remove(&peer_id) {
472 info!(%peer_id, "mDNS peer expired");
473 self.swarm
474 .behaviour_mut()
475 .gossipsub
476 .remove_explicit_peer(&peer_id);
477 let _ = self.event_tx.send(NetworkEvent::PeerExpired { peer_id }).await;
478 }
479 }
480 }
481 HuddleBehaviorEvent::Gossipsub(gossipsub::Event::Message {
482 propagation_source,
483 message,
484 ..
485 }) => {
486 self.handle_gossipsub_message(propagation_source, message).await;
487 }
488 HuddleBehaviorEvent::Identify(identify::Event::Received {
489 peer_id, info, ..
490 }) => {
491 debug!(%peer_id, agent = %info.agent_version, "identify received");
492 if self.relay_peer_ids.contains(&peer_id) {
497 use libp2p::multiaddr::Protocol;
498 if let Some(relay_addr) = self
499 .configured_relays
500 .iter()
501 .find(|a| {
502 a.iter().any(|p| matches!(p, Protocol::P2p(pid) if pid == peer_id))
505 || self.dial_attempts.values().any(|d| d == *a)
506 })
507 .cloned()
508 {
509 let circuit = relay_addr.with(Protocol::P2pCircuit);
510 match self.swarm.listen_on(circuit.clone()) {
511 Ok(_) => info!(%circuit, "listening on relay circuit"),
512 Err(e) => warn!(%e, %circuit, "relay listen_on failed"),
513 }
514 }
515 }
516 let fingerprint = match info.public_key.clone().try_into_ed25519() {
522 Ok(ed_pk) => {
523 let bytes = ed_pk.to_bytes();
524 compute_fingerprint(&bytes)
525 }
526 Err(_) => {
527 warn!(%peer_id, "identify pubkey isn't Ed25519; skipping fingerprint");
528 return;
529 }
530 };
531 let _ = self
535 .event_tx
536 .send(NetworkEvent::PeerIdentified {
537 peer_id,
538 fingerprint: fingerprint.clone(),
539 })
540 .await;
541 if let Some(PendingPeer::InboundUnknown { address }) =
546 self.pending_inbound.get(&peer_id)
547 {
548 let address = address.clone();
549 let _ = self
550 .event_tx
551 .send(NetworkEvent::InboundDial {
552 peer_id,
553 fingerprint,
554 address,
555 })
556 .await;
557 }
558 }
559 _ => {}
560 }
561 }
562
563 async fn handle_gossipsub_message(
564 &mut self,
565 from_peer: PeerId,
566 message: gossipsub::Message,
567 ) {
568 let topic = message.topic.to_string();
569 if topic == ROOMS_TOPIC {
570 match serde_json::from_slice::<RoomAnnouncement>(&message.data) {
571 Ok(ann) => {
572 let _ = self
573 .event_tx
574 .send(NetworkEvent::RoomAnnouncementReceived(ann))
575 .await;
576 }
577 Err(e) => {
578 warn!(%e, "bad room announcement");
579 }
580 }
581 } else if let Some(room_id) = topic.strip_prefix(protocol::ROOM_TOPIC_PREFIX) {
582 let _ = self
583 .event_tx
584 .send(NetworkEvent::RoomMessageReceived {
585 room_id: room_id.to_string(),
586 payload: message.data,
587 from_peer,
588 })
589 .await;
590 }
591 }
592
593 fn handle_command(&mut self, cmd: NetworkCommand) {
594 match cmd {
595 NetworkCommand::SubscribeRoom { room_id } => {
596 let topic = gossipsub::IdentTopic::new(room_topic(&room_id));
597 if let Err(e) = self.swarm.behaviour_mut().gossipsub.subscribe(&topic) {
598 warn!(%e, %room_id, "subscribe room failed");
599 }
600 }
601 NetworkCommand::UnsubscribeRoom { room_id } => {
602 let topic = gossipsub::IdentTopic::new(room_topic(&room_id));
603 self.swarm.behaviour_mut().gossipsub.unsubscribe(&topic);
604 }
605 NetworkCommand::PublishRoomMessage { room_id, payload } => {
606 let topic = gossipsub::IdentTopic::new(room_topic(&room_id));
607 if let Err(e) = self.swarm.behaviour_mut().gossipsub.publish(topic, payload) {
608 match e {
612 gossipsub::PublishError::NoPeersSubscribedToTopic => {
613 debug!(%room_id, "publish skipped: no peers subscribed to topic yet");
614 }
615 e => warn!(%e, %room_id, "publish room message failed"),
616 }
617 }
618 }
619 NetworkCommand::AnnounceRoom(ann) => {
620 let topic = gossipsub::IdentTopic::new(ROOMS_TOPIC);
621 match serde_json::to_vec(&ann) {
622 Ok(payload) => {
623 if let Err(e) =
624 self.swarm.behaviour_mut().gossipsub.publish(topic, payload)
625 {
626 debug!(%e, "publish room announcement failed");
627 }
628 }
629 Err(e) => warn!(%e, "encode room announcement"),
630 }
631 }
632 NetworkCommand::AcceptInbound { peer_id } => {
633 if self.pending_inbound.remove(&peer_id).is_some() {
634 info!(%peer_id, "inbound dial accepted — promoting to mesh");
635 self.swarm
636 .behaviour_mut()
637 .gossipsub
638 .add_explicit_peer(&peer_id);
639 self.discovered_peers.insert(peer_id);
640 } else {
641 debug!(%peer_id, "AcceptInbound for unknown peer (already promoted or disconnected)");
642 }
643 }
644 NetworkCommand::RejectInbound { peer_id } => {
645 self.pending_inbound.remove(&peer_id);
646 self.session_blocklist.insert(peer_id);
647 info!(%peer_id, "inbound dial rejected — disconnecting");
648 let _ = self.swarm.disconnect_peer_id(peer_id);
649 }
650 NetworkCommand::Dial { address } => {
651 let opts: DialOpts = address.clone().into();
652 let conn_id = opts.connection_id();
653 match self.swarm.dial(opts) {
654 Ok(()) => {
655 self.dial_attempts.insert(conn_id, address);
656 }
657 Err(e) => {
658 let tx = self.event_tx.clone();
660 let err = e.to_string();
661 tokio::spawn(async move {
662 let _ = tx
663 .send(NetworkEvent::DialFailed {
664 address,
665 error: err,
666 })
667 .await;
668 });
669 }
670 }
671 }
672 NetworkCommand::Shutdown => unreachable!(),
673 }
674 }
675}