1pub mod behavior;
2pub mod events;
3pub mod protocol;
4
5use std::collections::{HashMap, HashSet};
6use std::time::Duration;
7
8use futures::StreamExt;
9use libp2p::core::ConnectedPoint;
10use libp2p::swarm::dial_opts::DialOpts;
11use libp2p::swarm::ConnectionId;
12use libp2p::{
13 autonat, dcutr, gossipsub, identify, mdns, noise, ping, tcp, yamux, Multiaddr, PeerId, Swarm,
14 SwarmBuilder,
15};
16use tokio::sync::mpsc;
17use tracing::{debug, info, warn};
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum NetworkMode {
22 Mdns,
24 Direct,
27}
28
29impl NetworkMode {
30 pub fn as_str(&self) -> &'static str {
31 match self {
32 NetworkMode::Mdns => "mdns",
33 NetworkMode::Direct => "direct",
34 }
35 }
36
37 pub fn from_str(s: &str) -> Option<Self> {
38 match s.trim().to_ascii_lowercase().as_str() {
39 "mdns" | "lan" | "open" => Some(NetworkMode::Mdns),
40 "direct" | "dial" | "private" => Some(NetworkMode::Direct),
41 _ => None,
42 }
43 }
44}
45
46use crate::identity::{compute_fingerprint, Identity};
47use crate::network::behavior::{HuddleBehavior, HuddleBehaviorEvent};
48use crate::network::events::NetworkEvent;
49use crate::network::protocol::{room_topic, RoomAnnouncement, ROOMS_TOPIC};
50
51#[derive(Debug)]
52pub enum NetworkCommand {
53 SubscribeRoom { room_id: String },
55 UnsubscribeRoom { room_id: String },
57 PublishRoomMessage { room_id: String, payload: Vec<u8> },
59 AnnounceRoom(RoomAnnouncement),
61 Dial { address: Multiaddr },
64 AcceptInbound { peer_id: PeerId },
67 RejectInbound { peer_id: PeerId },
71 DisconnectPeer { peer_id: PeerId },
79 Shutdown,
80}
81
82#[derive(Clone)]
83pub struct NetworkHandle {
84 cmd_tx: mpsc::Sender<NetworkCommand>,
85}
86
87impl NetworkHandle {
88 pub async fn subscribe_room(&self, room_id: String) {
89 let _ = self
90 .cmd_tx
91 .send(NetworkCommand::SubscribeRoom { room_id })
92 .await;
93 }
94
95 pub async fn unsubscribe_room(&self, room_id: String) {
96 let _ = self
97 .cmd_tx
98 .send(NetworkCommand::UnsubscribeRoom { room_id })
99 .await;
100 }
101
102 pub async fn publish_room_message(&self, room_id: String, payload: Vec<u8>) {
103 let _ = self
104 .cmd_tx
105 .send(NetworkCommand::PublishRoomMessage { room_id, payload })
106 .await;
107 }
108
109 pub async fn announce_room(&self, ann: RoomAnnouncement) {
110 let _ = self.cmd_tx.send(NetworkCommand::AnnounceRoom(ann)).await;
111 }
112
113 pub async fn dial(&self, address: Multiaddr) {
114 let _ = self.cmd_tx.send(NetworkCommand::Dial { address }).await;
115 }
116
117 pub async fn accept_inbound(&self, peer_id: PeerId) {
118 let _ = self
119 .cmd_tx
120 .send(NetworkCommand::AcceptInbound { peer_id })
121 .await;
122 }
123
124 pub async fn reject_inbound(&self, peer_id: PeerId) {
125 let _ = self
126 .cmd_tx
127 .send(NetworkCommand::RejectInbound { peer_id })
128 .await;
129 }
130
131 pub async fn disconnect_peer(&self, peer_id: PeerId) {
132 let _ = self
133 .cmd_tx
134 .send(NetworkCommand::DisconnectPeer { peer_id })
135 .await;
136 }
137
138 pub async fn shutdown(&self) {
139 let _ = self.cmd_tx.send(NetworkCommand::Shutdown).await;
140 }
141}
142
143#[derive(Debug)]
149enum PendingPeer {
150 InboundUnknown { address: Multiaddr },
152}
153
154struct NetworkTask {
155 swarm: Swarm<HuddleBehavior>,
156 cmd_rx: mpsc::Receiver<NetworkCommand>,
157 event_tx: mpsc::Sender<NetworkEvent>,
158 discovered_peers: HashSet<PeerId>,
159 dial_attempts: HashMap<ConnectionId, Multiaddr>,
163 pending_inbound: HashMap<PeerId, PendingPeer>,
169 session_blocklist: HashSet<PeerId>,
175 configured_relays: Vec<Multiaddr>,
182 relay_peer_ids: HashSet<PeerId>,
183}
184
185pub fn start_network(
186 identity: &Identity,
187 event_tx: mpsc::Sender<NetworkEvent>,
188) -> crate::error::Result<NetworkHandle> {
189 start_network_with(identity, event_tx, NetworkMode::Mdns, 0, Vec::new())
190}
191
192pub fn start_network_with(
198 identity: &Identity,
199 event_tx: mpsc::Sender<NetworkEvent>,
200 mode: NetworkMode,
201 listen_port: u16,
202 relays: Vec<Multiaddr>,
203) -> crate::error::Result<NetworkHandle> {
204 let keypair = identity.keypair().clone();
205 let local_peer_id = identity.peer_id();
206
207 let mut swarm = SwarmBuilder::with_existing_identity(keypair)
208 .with_tokio()
209 .with_tcp(
210 tcp::Config::default(),
211 noise::Config::new,
212 yamux::Config::default,
213 )
214 .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?
215 .with_relay_client(noise::Config::new, yamux::Config::default)
220 .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?
221 .with_behaviour(|key, relay_client| {
222 let mdns_opt = match mode {
223 NetworkMode::Mdns => Some(
224 mdns::tokio::Behaviour::new(mdns::Config::default(), local_peer_id)
225 .expect("mDNS init failed"),
226 ),
227 NetworkMode::Direct => None,
228 };
229 let mdns: libp2p::swarm::behaviour::toggle::Toggle<_> = mdns_opt.into();
230
231 let identify = identify::Behaviour::new(
232 identify::Config::new("/huddle/1.0.0".into(), key.public())
233 .with_agent_version("huddle/0.4".into()),
234 );
235
236 let ping = ping::Behaviour::default();
237
238 let gossipsub_config = gossipsub::ConfigBuilder::default()
239 .heartbeat_interval(Duration::from_secs(1))
240 .validation_mode(gossipsub::ValidationMode::Strict)
241 .max_transmit_size(256 * 1024)
244 .build()
245 .expect("valid gossipsub config");
246
247 let mut gossipsub = gossipsub::Behaviour::new(
248 gossipsub::MessageAuthenticity::Signed(key.clone()),
249 gossipsub_config,
250 )
251 .expect("valid gossipsub init");
252
253 let rooms_topic = gossipsub::IdentTopic::new(ROOMS_TOPIC);
256 gossipsub
257 .subscribe(&rooms_topic)
258 .expect("subscribe rooms topic");
259
260 let autonat_client = autonat::v2::client::Behaviour::new(
265 rand::rngs::OsRng,
266 autonat::v2::client::Config::default(),
267 );
268 let autonat_server = autonat::v2::server::Behaviour::new(rand::rngs::OsRng);
269 let dcutr = dcutr::Behaviour::new(local_peer_id);
270
271 HuddleBehavior {
272 mdns,
273 identify,
274 ping,
275 gossipsub,
276 relay_client,
277 autonat_client,
278 autonat_server,
279 dcutr,
280 }
281 })
282 .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?
283 .with_swarm_config(|cfg| cfg.with_idle_connection_timeout(Duration::from_secs(120)))
284 .build();
285
286 let listen_addr: Multiaddr = format!("/ip4/0.0.0.0/tcp/{}", listen_port)
287 .parse()
288 .expect("valid listen addr");
289 swarm
290 .listen_on(listen_addr)
291 .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?;
292 let listen_addr6: Multiaddr = format!("/ip6/::/tcp/{}", listen_port)
294 .parse()
295 .expect("valid ipv6 listen addr");
296 if let Err(e) = swarm.listen_on(listen_addr6) {
297 debug!(%e, "ipv6 listen skipped");
298 }
299
300 let (cmd_tx, cmd_rx) = mpsc::channel(256);
301 let mut task = NetworkTask {
302 swarm,
303 cmd_rx,
304 event_tx,
305 discovered_peers: HashSet::new(),
306 dial_attempts: HashMap::new(),
307 pending_inbound: HashMap::new(),
308 session_blocklist: HashSet::new(),
309 configured_relays: relays.clone(),
310 relay_peer_ids: HashSet::new(),
311 };
312 for relay_addr in relays {
316 info!(addr = %relay_addr, "dialing configured relay");
317 let opts: libp2p::swarm::dial_opts::DialOpts = relay_addr.clone().into();
318 let conn_id = opts.connection_id();
319 match task.swarm.dial(opts) {
320 Ok(()) => {
321 task.dial_attempts.insert(conn_id, relay_addr);
322 }
323 Err(e) => warn!(%e, "dial relay failed"),
324 }
325 }
326 tokio::spawn(task.run());
327
328 Ok(NetworkHandle { cmd_tx })
329}
330
331impl NetworkTask {
332 async fn run(mut self) {
333 loop {
334 tokio::select! {
335 event = self.swarm.select_next_some() => {
336 self.handle_swarm_event(event).await;
337 }
338 Some(cmd) = self.cmd_rx.recv() => {
339 if matches!(cmd, NetworkCommand::Shutdown) {
340 info!("network task shutting down");
341 break;
342 }
343 self.handle_command(cmd);
344 }
345 }
346 }
347 }
348
349 async fn handle_swarm_event(
350 &mut self,
351 event: libp2p::swarm::SwarmEvent<HuddleBehaviorEvent>,
352 ) {
353 match event {
354 libp2p::swarm::SwarmEvent::NewListenAddr { address, .. } => {
355 info!(%address, "listening");
356 use libp2p::multiaddr::Protocol;
360 let is_circuit = address
361 .iter()
362 .any(|p| matches!(p, Protocol::P2pCircuit));
363 if is_circuit {
364 let _ = self
365 .event_tx
366 .send(NetworkEvent::RelayReservationEstablished {
367 address: address.clone(),
368 })
369 .await;
370 }
371 let _ = self
372 .event_tx
373 .send(NetworkEvent::ListeningOn { address })
374 .await;
375 }
376 libp2p::swarm::SwarmEvent::ConnectionEstablished {
377 peer_id,
378 connection_id,
379 endpoint,
380 ..
381 } => {
382 if let Some(addr) = self.dial_attempts.remove(&connection_id) {
383 let is_relay = self.configured_relays.iter().any(|r| r == &addr);
389 if is_relay {
390 info!(%peer_id, %addr, "connected to configured relay");
391 self.relay_peer_ids.insert(peer_id);
392 } else {
393 info!(%peer_id, %addr, "user-dialed peer connected");
394 self.swarm
398 .behaviour_mut()
399 .gossipsub
400 .add_explicit_peer(&peer_id);
401 self.discovered_peers.insert(peer_id);
402 let _ = self
403 .event_tx
404 .send(NetworkEvent::DialSucceeded {
405 peer_id,
406 address: addr,
407 })
408 .await;
409 }
410 } else if let ConnectedPoint::Dialer { .. } = endpoint {
411 self.swarm
414 .behaviour_mut()
415 .gossipsub
416 .add_explicit_peer(&peer_id);
417 } else {
418 if self.session_blocklist.contains(&peer_id) {
431 info!(%peer_id, "rejecting inbound from session-blocked peer");
432 let _ = self.swarm.disconnect_peer_id(peer_id);
433 } else {
434 let address = match &endpoint {
435 ConnectedPoint::Listener { send_back_addr, .. } => {
436 send_back_addr.clone()
437 }
438 _ => Multiaddr::empty(),
439 };
440 debug!(%peer_id, %address, "inbound peer pending decision");
441 self.pending_inbound
442 .insert(peer_id, PendingPeer::InboundUnknown { address });
443 }
444 }
445 }
446 libp2p::swarm::SwarmEvent::OutgoingConnectionError {
447 connection_id,
448 error,
449 ..
450 } => {
451 if let Some(addr) = self.dial_attempts.remove(&connection_id) {
452 warn!(%addr, %error, "user-dialed peer failed");
453 let _ = self
454 .event_tx
455 .send(NetworkEvent::DialFailed {
456 address: addr,
457 error: error.to_string(),
458 })
459 .await;
460 }
461 }
462 libp2p::swarm::SwarmEvent::ConnectionClosed { peer_id, .. } => {
463 self.pending_inbound.remove(&peer_id);
468 }
469 libp2p::swarm::SwarmEvent::Behaviour(be) => self.handle_behavior_event(be).await,
470 _ => {}
471 }
472 }
473
474 async fn handle_behavior_event(&mut self, event: HuddleBehaviorEvent) {
475 match event {
476 HuddleBehaviorEvent::Mdns(mdns::Event::Discovered(peers)) => {
477 for (peer_id, addr) in peers {
478 if self.discovered_peers.insert(peer_id) {
479 info!(%peer_id, %addr, "mDNS discovered");
480 self.swarm.add_peer_address(peer_id, addr);
481 self.swarm
483 .behaviour_mut()
484 .gossipsub
485 .add_explicit_peer(&peer_id);
486 let _ = self
487 .event_tx
488 .send(NetworkEvent::PeerDiscovered { peer_id })
489 .await;
490 }
491 }
492 }
493 HuddleBehaviorEvent::Mdns(mdns::Event::Expired(peers)) => {
494 for (peer_id, _) in peers {
495 if self.discovered_peers.remove(&peer_id) {
496 info!(%peer_id, "mDNS peer expired");
497 self.swarm
498 .behaviour_mut()
499 .gossipsub
500 .remove_explicit_peer(&peer_id);
501 let _ = self.event_tx.send(NetworkEvent::PeerExpired { peer_id }).await;
502 }
503 }
504 }
505 HuddleBehaviorEvent::Gossipsub(gossipsub::Event::Message {
506 propagation_source,
507 message,
508 ..
509 }) => {
510 self.handle_gossipsub_message(propagation_source, message).await;
511 }
512 HuddleBehaviorEvent::Identify(identify::Event::Received {
513 peer_id, info, ..
514 }) => {
515 debug!(%peer_id, agent = %info.agent_version, "identify received");
516 if self.relay_peer_ids.contains(&peer_id) {
521 use libp2p::multiaddr::Protocol;
522 if let Some(relay_addr) = self
523 .configured_relays
524 .iter()
525 .find(|a| {
526 a.iter().any(|p| matches!(p, Protocol::P2p(pid) if pid == peer_id))
529 || self.dial_attempts.values().any(|d| d == *a)
530 })
531 .cloned()
532 {
533 let circuit = relay_addr.with(Protocol::P2pCircuit);
534 match self.swarm.listen_on(circuit.clone()) {
535 Ok(_) => info!(%circuit, "listening on relay circuit"),
536 Err(e) => warn!(%e, %circuit, "relay listen_on failed"),
537 }
538 }
539 }
540 let fingerprint = match info.public_key.clone().try_into_ed25519() {
546 Ok(ed_pk) => {
547 let bytes = ed_pk.to_bytes();
548 compute_fingerprint(&bytes)
549 }
550 Err(_) => {
551 warn!(%peer_id, "identify pubkey isn't Ed25519; skipping fingerprint");
552 return;
553 }
554 };
555 let _ = self
559 .event_tx
560 .send(NetworkEvent::PeerIdentified {
561 peer_id,
562 fingerprint: fingerprint.clone(),
563 })
564 .await;
565 if let Some(PendingPeer::InboundUnknown { address }) =
570 self.pending_inbound.get(&peer_id)
571 {
572 let address = address.clone();
573 let _ = self
574 .event_tx
575 .send(NetworkEvent::InboundDial {
576 peer_id,
577 fingerprint,
578 address,
579 })
580 .await;
581 }
582 }
583 HuddleBehaviorEvent::AutonatClient(ev) => {
584 let reachable = ev.result.is_ok();
590 if reachable {
591 info!(tested = %ev.tested_addr, server = %ev.server, "AutoNAT: reachable");
592 } else {
593 debug!(tested = %ev.tested_addr, server = %ev.server, "AutoNAT: probe failed");
594 }
595 let _ = self
596 .event_tx
597 .send(NetworkEvent::NatProbeResult {
598 tested_addr: ev.tested_addr,
599 reachable,
600 })
601 .await;
602 }
603 HuddleBehaviorEvent::AutonatServer(_) => {
604 }
607 HuddleBehaviorEvent::Dcutr(ev) => {
608 let success = ev.result.is_ok();
609 if success {
610 info!(remote = %ev.remote_peer_id, "DCUtR: direct connection established");
611 } else {
612 debug!(remote = %ev.remote_peer_id, "DCUtR: hole-punch failed");
613 }
614 let _ = self
615 .event_tx
616 .send(NetworkEvent::DcutrUpgrade {
617 remote_peer: ev.remote_peer_id,
618 success,
619 })
620 .await;
621 }
622 _ => {}
623 }
624 }
625
626 async fn handle_gossipsub_message(
627 &mut self,
628 from_peer: PeerId,
629 message: gossipsub::Message,
630 ) {
631 let topic = message.topic.to_string();
632 if topic == ROOMS_TOPIC {
633 match serde_json::from_slice::<RoomAnnouncement>(&message.data) {
634 Ok(ann) => {
635 let _ = self
636 .event_tx
637 .send(NetworkEvent::RoomAnnouncementReceived(ann))
638 .await;
639 }
640 Err(e) => {
641 warn!(%e, "bad room announcement");
642 }
643 }
644 } else if let Some(room_id) = topic.strip_prefix(protocol::ROOM_TOPIC_PREFIX) {
645 let _ = self
646 .event_tx
647 .send(NetworkEvent::RoomMessageReceived {
648 room_id: room_id.to_string(),
649 payload: message.data,
650 from_peer,
651 })
652 .await;
653 }
654 }
655
656 fn handle_command(&mut self, cmd: NetworkCommand) {
657 match cmd {
658 NetworkCommand::SubscribeRoom { room_id } => {
659 let topic = gossipsub::IdentTopic::new(room_topic(&room_id));
660 if let Err(e) = self.swarm.behaviour_mut().gossipsub.subscribe(&topic) {
661 warn!(%e, %room_id, "subscribe room failed");
662 }
663 }
664 NetworkCommand::UnsubscribeRoom { room_id } => {
665 let topic = gossipsub::IdentTopic::new(room_topic(&room_id));
666 self.swarm.behaviour_mut().gossipsub.unsubscribe(&topic);
667 }
668 NetworkCommand::PublishRoomMessage { room_id, payload } => {
669 let topic = gossipsub::IdentTopic::new(room_topic(&room_id));
670 if let Err(e) = self.swarm.behaviour_mut().gossipsub.publish(topic, payload) {
671 match e {
675 gossipsub::PublishError::NoPeersSubscribedToTopic => {
676 debug!(%room_id, "publish skipped: no peers subscribed to topic yet");
677 }
678 e => warn!(%e, %room_id, "publish room message failed"),
679 }
680 }
681 }
682 NetworkCommand::AnnounceRoom(ann) => {
683 let topic = gossipsub::IdentTopic::new(ROOMS_TOPIC);
684 match serde_json::to_vec(&ann) {
685 Ok(payload) => {
686 if let Err(e) =
687 self.swarm.behaviour_mut().gossipsub.publish(topic, payload)
688 {
689 debug!(%e, "publish room announcement failed");
690 }
691 }
692 Err(e) => warn!(%e, "encode room announcement"),
693 }
694 }
695 NetworkCommand::AcceptInbound { peer_id } => {
696 if self.pending_inbound.remove(&peer_id).is_some() {
697 info!(%peer_id, "inbound dial accepted — promoting to mesh");
698 self.swarm
699 .behaviour_mut()
700 .gossipsub
701 .add_explicit_peer(&peer_id);
702 self.discovered_peers.insert(peer_id);
703 } else {
704 debug!(%peer_id, "AcceptInbound for unknown peer (already promoted or disconnected)");
705 }
706 }
707 NetworkCommand::RejectInbound { peer_id } => {
708 self.pending_inbound.remove(&peer_id);
709 self.session_blocklist.insert(peer_id);
710 info!(%peer_id, "inbound dial rejected — disconnecting");
711 let _ = self.swarm.disconnect_peer_id(peer_id);
712 }
713 NetworkCommand::DisconnectPeer { peer_id } => {
714 info!(%peer_id, "app-level identity check failed — disconnecting");
715 let _ = self.swarm.disconnect_peer_id(peer_id);
716 }
717 NetworkCommand::Dial { address } => {
718 let opts: DialOpts = address.clone().into();
719 let conn_id = opts.connection_id();
720 match self.swarm.dial(opts) {
721 Ok(()) => {
722 self.dial_attempts.insert(conn_id, address);
723 }
724 Err(e) => {
725 let tx = self.event_tx.clone();
727 let err = e.to_string();
728 tokio::spawn(async move {
729 let _ = tx
730 .send(NetworkEvent::DialFailed {
731 address,
732 error: err,
733 })
734 .await;
735 });
736 }
737 }
738 }
739 NetworkCommand::Shutdown => unreachable!(),
740 }
741 }
742}