huddle_core/network/mod.rs
1pub mod behavior;
2pub mod events;
3pub mod protocol;
4
5use std::collections::{HashMap, HashSet};
6use std::time::Duration;
7
8use futures::StreamExt;
9use libp2p::core::ConnectedPoint;
10use libp2p::swarm::dial_opts::DialOpts;
11use libp2p::swarm::ConnectionId;
12use libp2p::{
13 autonat, dcutr, gossipsub, identify, mdns, noise, ping, tcp, yamux, Multiaddr, PeerId, Swarm,
14 SwarmBuilder,
15};
16use tokio::sync::mpsc;
17use tracing::{debug, info, warn};
18
19/// How the network discovers peers.
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum NetworkMode {
22 /// mDNS on: announce ourselves on the LAN and pick up announcements.
23 Mdns,
24 /// mDNS off: invisible to LAN discovery; the only way to connect is
25 /// for someone to dial our address (or for us to dial theirs).
26 Direct,
27}
28
29impl NetworkMode {
30 pub fn as_str(&self) -> &'static str {
31 match self {
32 NetworkMode::Mdns => "mdns",
33 NetworkMode::Direct => "direct",
34 }
35 }
36
37 pub fn from_str(s: &str) -> Option<Self> {
38 match s.trim().to_ascii_lowercase().as_str() {
39 "mdns" | "lan" | "open" => Some(NetworkMode::Mdns),
40 "direct" | "dial" | "private" => Some(NetworkMode::Direct),
41 _ => None,
42 }
43 }
44}
45
46use crate::identity::{compute_fingerprint, Identity};
47use crate::network::behavior::{HuddleBehavior, HuddleBehaviorEvent};
48use crate::network::events::NetworkEvent;
49use crate::network::protocol::{room_topic, RoomAnnouncement, ROOMS_TOPIC};
50
51#[derive(Debug)]
52pub enum NetworkCommand {
53 /// Subscribe to a room's per-room gossipsub topic.
54 SubscribeRoom { room_id: String },
55 /// Unsubscribe from a room's topic.
56 UnsubscribeRoom { room_id: String },
57 /// Publish a JSON-encoded `RoomMessage` to a room's topic.
58 PublishRoomMessage { room_id: String, payload: Vec<u8> },
59 /// Publish a room announcement on the global rooms topic.
60 AnnounceRoom(RoomAnnouncement),
61 /// User-initiated dial of an explicit address. Used for cross-network
62 /// reach when mDNS isn't enough.
63 Dial { address: Multiaddr },
64 /// huddle 0.5.2: dial a peer using multiple candidate addresses,
65 /// letting libp2p race them in parallel. Used by the "add by HD
66 /// ID / username" flow, which resolves a fingerprint to every
67 /// address we know for that peer (room announcement `host_addrs`
68 /// + persisted `known_peers`). libp2p's parallel dialer picks
69 /// the cheapest path that completes — LAN beats public IP beats
70 /// relay-hopped without us having to probe transports manually.
71 DialAddresses { addresses: Vec<Multiaddr> },
72 /// Phase A: user accepted an inbound dial — promote the peer to
73 /// explicit-peer status so room announcements flow.
74 AcceptInbound { peer_id: PeerId },
75 /// Phase A: user rejected an inbound dial — disconnect them and
76 /// add the peer_id to the in-memory blocklist for this session
77 /// (caller is responsible for the persistent blocked_peers row).
78 RejectInbound { peer_id: PeerId },
79 /// Phase C follow-up: drop a connection that failed an
80 /// application-level identity check (e.g. invite-fingerprint
81 /// mismatch). Differs from `RejectInbound` in that it doesn't
82 /// touch the inbound-pending map (the connection is already
83 /// past Identify when we discover the mismatch) and doesn't
84 /// persist a block — the caller may want to retry with a
85 /// corrected invite.
86 DisconnectPeer { peer_id: PeerId },
87 Shutdown,
88}
89
90#[derive(Clone)]
91pub struct NetworkHandle {
92 cmd_tx: mpsc::Sender<NetworkCommand>,
93}
94
95impl NetworkHandle {
96 pub async fn subscribe_room(&self, room_id: String) {
97 let _ = self
98 .cmd_tx
99 .send(NetworkCommand::SubscribeRoom { room_id })
100 .await;
101 }
102
103 pub async fn unsubscribe_room(&self, room_id: String) {
104 let _ = self
105 .cmd_tx
106 .send(NetworkCommand::UnsubscribeRoom { room_id })
107 .await;
108 }
109
110 pub async fn publish_room_message(&self, room_id: String, payload: Vec<u8>) {
111 let _ = self
112 .cmd_tx
113 .send(NetworkCommand::PublishRoomMessage { room_id, payload })
114 .await;
115 }
116
117 pub async fn announce_room(&self, ann: RoomAnnouncement) {
118 let _ = self.cmd_tx.send(NetworkCommand::AnnounceRoom(ann)).await;
119 }
120
121 pub async fn dial(&self, address: Multiaddr) {
122 let _ = self.cmd_tx.send(NetworkCommand::Dial { address }).await;
123 }
124
125 /// huddle 0.5.2: dial a peer with multiple candidate addresses
126 /// at once. libp2p's swarm races them; the first to complete a
127 /// handshake wins, the rest are dropped. Use when you've resolved
128 /// a peer fingerprint to several known transports — typically
129 /// LAN ip4 + public ip4 + relay circuit.
130 pub async fn dial_addresses(&self, addresses: Vec<Multiaddr>) {
131 let _ = self
132 .cmd_tx
133 .send(NetworkCommand::DialAddresses { addresses })
134 .await;
135 }
136
137 pub async fn accept_inbound(&self, peer_id: PeerId) {
138 let _ = self
139 .cmd_tx
140 .send(NetworkCommand::AcceptInbound { peer_id })
141 .await;
142 }
143
144 pub async fn reject_inbound(&self, peer_id: PeerId) {
145 let _ = self
146 .cmd_tx
147 .send(NetworkCommand::RejectInbound { peer_id })
148 .await;
149 }
150
151 pub async fn disconnect_peer(&self, peer_id: PeerId) {
152 let _ = self
153 .cmd_tx
154 .send(NetworkCommand::DisconnectPeer { peer_id })
155 .await;
156 }
157
158 pub async fn shutdown(&self) {
159 let _ = self.cmd_tx.send(NetworkCommand::Shutdown).await;
160 }
161}
162
163/// What kind of connection we're holding open until Identify gives us
164/// the remote's Ed25519 fingerprint. Quarantined inbound dials are NOT
165/// added to the gossipsub mesh until the user accepts; outbound user-
166/// dials add to the mesh on connection but still wait on Identify so
167/// the eventual `DialSucceeded` can carry the fingerprint.
168#[derive(Debug)]
169enum PendingPeer {
170 /// Inbound dial from an unknown peer — modal-pending in the app.
171 InboundUnknown { address: Multiaddr },
172}
173
174struct NetworkTask {
175 swarm: Swarm<HuddleBehavior>,
176 cmd_rx: mpsc::Receiver<NetworkCommand>,
177 event_tx: mpsc::Sender<NetworkEvent>,
178 discovered_peers: HashSet<PeerId>,
179 /// Tracks user-initiated dials so we can correlate the eventual
180 /// `ConnectionEstablished` / `OutgoingConnectionError` back to a
181 /// specific address the user asked us to dial.
182 dial_attempts: HashMap<ConnectionId, Multiaddr>,
183 /// Phase A: peers connected but not yet promoted to the gossipsub
184 /// mesh (inbound) — waiting either for `Identify` to land so we
185 /// know their fingerprint, or for the user's accept/reject decision.
186 /// On `ConnectionClosed` we drop entries here so a peer disconnecting
187 /// mid-prompt doesn't leak state.
188 pending_inbound: HashMap<PeerId, PendingPeer>,
189 /// Phase A: peers the user has explicitly rejected this session —
190 /// auto-disconnect every reconnect attempt without re-prompting.
191 /// Persistent across runs via `blocked_peers`; this in-memory copy
192 /// is loaded at startup (TODO: not yet wired; falls back to the DB
193 /// check in the app layer for now) and updated on `RejectInbound`.
194 session_blocklist: HashSet<PeerId>,
195 /// Phase D: configured relay multiaddrs. When Identify lands for a
196 /// peer whose multiaddr matches one of these, we call
197 /// `listen_on("<addr>/p2p-circuit")` to register a reservation.
198 /// Tracked as multiaddr strings (no PeerId yet — we don't know it
199 /// until Identify) plus a set of peer_ids of confirmed relays so
200 /// we only register once per relay.
201 configured_relays: Vec<Multiaddr>,
202 relay_peer_ids: HashSet<PeerId>,
203 /// huddle 0.7.11: per-peer DCUtR failure counter. Logged at warn
204 /// once the count crosses `DCUTR_FAIL_BUDGET` so symmetric-NAT
205 /// pairs don't generate runaway hole-punch attempts in the logs.
206 /// libp2p's dcutr behavior schedules its own retries internally;
207 /// this is purely an observability signal for the app.
208 dcutr_failures: HashMap<PeerId, u32>,
209}
210
211/// huddle 0.7.11: warn after this many failed DCUtR attempts to the
212/// same peer. dcutr keeps trying internally, but the audit flagged the
213/// spam as a real issue — capping the log noise is the realistic fix.
214const DCUTR_FAIL_BUDGET: u32 = 6;
215
216pub fn start_network(
217 identity: &Identity,
218 event_tx: mpsc::Sender<NetworkEvent>,
219) -> crate::error::Result<NetworkHandle> {
220 start_network_with(identity, event_tx, NetworkMode::Mdns, 0, Vec::new())
221}
222
223/// Start the network task with explicit mode, TCP listen port, and any
224/// pre-configured relay multiaddrs. `listen_port = 0` requests a
225/// random port. Relays are dialed on startup; once `Identify` lands
226/// from a relay peer, we call `listen_on("<relay>/p2p-circuit")` to
227/// register a reservation so peers behind NAT can dial us through it.
228pub fn start_network_with(
229 identity: &Identity,
230 event_tx: mpsc::Sender<NetworkEvent>,
231 mode: NetworkMode,
232 listen_port: u16,
233 relays: Vec<Multiaddr>,
234) -> crate::error::Result<NetworkHandle> {
235 let keypair = identity.keypair().clone();
236 let local_peer_id = identity.peer_id();
237
238 let mut swarm = SwarmBuilder::with_existing_identity(keypair)
239 .with_tokio()
240 .with_tcp(
241 tcp::Config::default(),
242 noise::Config::new,
243 yamux::Config::default,
244 )
245 .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?
246 // Phase D: wrap the transport with relay-client. This both
247 // composes the transport (so we can dial `/p2p-circuit/`
248 // addresses) and surfaces a `relay::client::Behaviour` into
249 // the `with_behaviour` closure as the second argument.
250 .with_relay_client(noise::Config::new, yamux::Config::default)
251 .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?
252 .with_behaviour(|key, relay_client| {
253 let mdns_opt = match mode {
254 NetworkMode::Mdns => Some(
255 mdns::tokio::Behaviour::new(mdns::Config::default(), local_peer_id)
256 .expect("mDNS init failed"),
257 ),
258 NetworkMode::Direct => None,
259 };
260 let mdns: libp2p::swarm::behaviour::toggle::Toggle<_> = mdns_opt.into();
261
262 let identify = identify::Behaviour::new(
263 identify::Config::new("/huddle/1.0.0".into(), key.public())
264 .with_agent_version("huddle/0.5".into()),
265 );
266
267 let ping = ping::Behaviour::default();
268
269 let gossipsub_config = gossipsub::ConfigBuilder::default()
270 .heartbeat_interval(Duration::from_secs(1))
271 .validation_mode(gossipsub::ValidationMode::Strict)
272 // Default is 64 KiB. Raise it so base64-encoded file
273 // chunks (see files::CHUNK_SIZE) keep ample headroom.
274 .max_transmit_size(256 * 1024)
275 .build()
276 .expect("valid gossipsub config");
277
278 let mut gossipsub = gossipsub::Behaviour::new(
279 gossipsub::MessageAuthenticity::Signed(key.clone()),
280 gossipsub_config,
281 )
282 .expect("valid gossipsub init");
283
284 // Every node subscribes to the global rooms topic so the lobby
285 // shows discovered rooms even before joining anything.
286 let rooms_topic = gossipsub::IdentTopic::new(ROOMS_TOPIC);
287 gossipsub
288 .subscribe(&rooms_topic)
289 .expect("subscribe rooms topic");
290
291 // AutoNAT v2: client probes external addresses by asking
292 // remote servers to dial us back; server answers other
293 // peers' probes. Both halves are needed for symmetric
294 // P2P reachability detection (v1 combined them; v2 split).
295 let autonat_client = autonat::v2::client::Behaviour::new(
296 rand::rngs::OsRng,
297 autonat::v2::client::Config::default(),
298 );
299 let autonat_server = autonat::v2::server::Behaviour::new(rand::rngs::OsRng);
300 let dcutr = dcutr::Behaviour::new(local_peer_id);
301
302 HuddleBehavior {
303 mdns,
304 identify,
305 ping,
306 gossipsub,
307 relay_client,
308 autonat_client,
309 autonat_server,
310 dcutr,
311 }
312 })
313 .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?
314 .with_swarm_config(|cfg| cfg.with_idle_connection_timeout(Duration::from_secs(120)))
315 .build();
316
317 let listen_addr: Multiaddr = format!("/ip4/0.0.0.0/tcp/{}", listen_port)
318 .parse()
319 .expect("valid listen addr");
320 swarm
321 .listen_on(listen_addr)
322 .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?;
323 // Also bind IPv6 on all interfaces so users can dial via IPv6.
324 let listen_addr6: Multiaddr = format!("/ip6/::/tcp/{}", listen_port)
325 .parse()
326 .expect("valid ipv6 listen addr");
327 if let Err(e) = swarm.listen_on(listen_addr6) {
328 debug!(%e, "ipv6 listen skipped");
329 }
330
331 let (cmd_tx, cmd_rx) = mpsc::channel(256);
332 let mut task = NetworkTask {
333 swarm,
334 cmd_rx,
335 event_tx,
336 discovered_peers: HashSet::new(),
337 dial_attempts: HashMap::new(),
338 pending_inbound: HashMap::new(),
339 session_blocklist: HashSet::new(),
340 configured_relays: relays.clone(),
341 relay_peer_ids: HashSet::new(),
342 dcutr_failures: HashMap::new(),
343 };
344 // Phase D: dial each configured relay so Identify can complete and
345 // we can register a `/p2p-circuit` reservation. Failures here are
346 // non-fatal — the user can still chat on LAN.
347 for relay_addr in relays {
348 info!(addr = %relay_addr, "dialing configured relay");
349 let opts: libp2p::swarm::dial_opts::DialOpts = relay_addr.clone().into();
350 let conn_id = opts.connection_id();
351 match task.swarm.dial(opts) {
352 Ok(()) => {
353 task.dial_attempts.insert(conn_id, relay_addr);
354 }
355 Err(e) => warn!(%e, "dial relay failed"),
356 }
357 }
358 tokio::spawn(task.run());
359
360 Ok(NetworkHandle { cmd_tx })
361}
362
363impl NetworkTask {
364 async fn run(mut self) {
365 loop {
366 tokio::select! {
367 event = self.swarm.select_next_some() => {
368 self.handle_swarm_event(event).await;
369 }
370 Some(cmd) = self.cmd_rx.recv() => {
371 if matches!(cmd, NetworkCommand::Shutdown) {
372 info!("network task shutting down");
373 break;
374 }
375 self.handle_command(cmd);
376 }
377 }
378 }
379 }
380
381 async fn handle_swarm_event(
382 &mut self,
383 event: libp2p::swarm::SwarmEvent<HuddleBehaviorEvent>,
384 ) {
385 match event {
386 libp2p::swarm::SwarmEvent::NewListenAddr { address, .. } => {
387 info!(%address, "listening");
388 // Phase D: a relay-circuit address is a reachability
389 // milestone — surface it as its own event so the lobby
390 // can show "reachable via N relays" status.
391 use libp2p::multiaddr::Protocol;
392 let is_circuit = address
393 .iter()
394 .any(|p| matches!(p, Protocol::P2pCircuit));
395 if is_circuit {
396 let _ = self
397 .event_tx
398 .send(NetworkEvent::RelayReservationEstablished {
399 address: address.clone(),
400 })
401 .await;
402 }
403 let _ = self
404 .event_tx
405 .send(NetworkEvent::ListeningOn { address })
406 .await;
407 }
408 libp2p::swarm::SwarmEvent::ConnectionEstablished {
409 peer_id,
410 connection_id,
411 endpoint,
412 ..
413 } => {
414 if let Some(addr) = self.dial_attempts.remove(&connection_id) {
415 // Phase D: a connection that was for a configured
416 // relay shouldn't pollute the lobby with a normal
417 // DialSucceeded — relays aren't chat peers. We just
418 // remember the peer_id and wait for Identify, then
419 // register a /p2p-circuit reservation.
420 let is_relay = self.configured_relays.iter().any(|r| r == &addr);
421 if is_relay {
422 info!(%peer_id, %addr, "connected to configured relay");
423 self.relay_peer_ids.insert(peer_id);
424 } else {
425 info!(%peer_id, %addr, "user-dialed peer connected");
426 // Treat dialed peers like mDNS-discovered: add
427 // to gossipsub explicit peers so room
428 // announcements flow.
429 self.swarm
430 .behaviour_mut()
431 .gossipsub
432 .add_explicit_peer(&peer_id);
433 self.discovered_peers.insert(peer_id);
434 let _ = self
435 .event_tx
436 .send(NetworkEvent::DialSucceeded {
437 peer_id,
438 address: addr,
439 })
440 .await;
441 }
442 } else if let ConnectedPoint::Dialer { .. } = endpoint {
443 // Outgoing connection we didn't track (e.g. mDNS auto-dial)
444 // — still add to mesh; no user-visible event needed.
445 self.swarm
446 .behaviour_mut()
447 .gossipsub
448 .add_explicit_peer(&peer_id);
449 } else {
450 // Inbound dial from an unknown peer (Phase A). We
451 // hold the connection but do NOT add them to the
452 // explicit-peer set yet — wait for Identify so we
453 // can show the user the peer's fingerprint, then
454 // either AcceptInbound (promote to mesh) or
455 // RejectInbound (disconnect + persist blocklist).
456 //
457 // Known limitation: gossipsub's score-based mesh
458 // formation may still forward topic messages to
459 // this peer via other peers we have in common.
460 // True hard-quarantine would need a custom
461 // ConnectionHandler — out of scope for v1.
462 if self.session_blocklist.contains(&peer_id) {
463 info!(%peer_id, "rejecting inbound from session-blocked peer");
464 let _ = self.swarm.disconnect_peer_id(peer_id);
465 } else {
466 let address = match &endpoint {
467 ConnectedPoint::Listener { send_back_addr, .. } => {
468 send_back_addr.clone()
469 }
470 _ => Multiaddr::empty(),
471 };
472 debug!(%peer_id, %address, "inbound peer pending decision");
473 self.pending_inbound
474 .insert(peer_id, PendingPeer::InboundUnknown { address });
475 }
476 }
477 }
478 libp2p::swarm::SwarmEvent::OutgoingConnectionError {
479 connection_id,
480 error,
481 ..
482 } => {
483 if let Some(addr) = self.dial_attempts.remove(&connection_id) {
484 warn!(%addr, %error, "user-dialed peer failed");
485 let _ = self
486 .event_tx
487 .send(NetworkEvent::DialFailed {
488 address: addr,
489 error: error.to_string(),
490 })
491 .await;
492 }
493 }
494 libp2p::swarm::SwarmEvent::ConnectionClosed {
495 peer_id,
496 num_established,
497 ..
498 } => {
499 // Drop any pending-inbound entry for this peer — they
500 // disconnected before we could prompt the user (or
501 // before the user accepted). Lets a re-connect start
502 // fresh rather than reusing stale state.
503 self.pending_inbound.remove(&peer_id);
504 // huddle 0.7.11: emit PeerDisconnected so the app can
505 // clean its `connected_dial_addrs` map. Only fire when
506 // the LAST connection to that peer closed — multiple
507 // simultaneous connections (e.g. direct + relay) would
508 // otherwise emit spurious disconnects when one of the
509 // two drops.
510 if num_established == 0 {
511 let _ = self
512 .event_tx
513 .send(NetworkEvent::PeerDisconnected { peer_id })
514 .await;
515 // Also clean up gossipsub's explicit-peers set so
516 // it doesn't accumulate dead PeerIds across the
517 // session.
518 self.swarm
519 .behaviour_mut()
520 .gossipsub
521 .remove_explicit_peer(&peer_id);
522 }
523 }
524 libp2p::swarm::SwarmEvent::Behaviour(be) => self.handle_behavior_event(be).await,
525 _ => {}
526 }
527 }
528
529 async fn handle_behavior_event(&mut self, event: HuddleBehaviorEvent) {
530 match event {
531 HuddleBehaviorEvent::Mdns(mdns::Event::Discovered(peers)) => {
532 for (peer_id, addr) in peers {
533 if self.discovered_peers.insert(peer_id) {
534 info!(%peer_id, %addr, "mDNS discovered");
535 self.swarm.add_peer_address(peer_id, addr);
536 // Explicitly add to gossipsub mesh.
537 self.swarm
538 .behaviour_mut()
539 .gossipsub
540 .add_explicit_peer(&peer_id);
541 let _ = self
542 .event_tx
543 .send(NetworkEvent::PeerDiscovered { peer_id })
544 .await;
545 }
546 }
547 }
548 HuddleBehaviorEvent::Mdns(mdns::Event::Expired(peers)) => {
549 for (peer_id, _) in peers {
550 if self.discovered_peers.remove(&peer_id) {
551 info!(%peer_id, "mDNS peer expired");
552 self.swarm
553 .behaviour_mut()
554 .gossipsub
555 .remove_explicit_peer(&peer_id);
556 let _ = self.event_tx.send(NetworkEvent::PeerExpired { peer_id }).await;
557 }
558 }
559 }
560 HuddleBehaviorEvent::Gossipsub(gossipsub::Event::Message {
561 propagation_source,
562 message,
563 ..
564 }) => {
565 self.handle_gossipsub_message(propagation_source, message).await;
566 }
567 HuddleBehaviorEvent::Identify(identify::Event::Received {
568 peer_id, info, ..
569 }) => {
570 debug!(%peer_id, agent = %info.agent_version, "identify received");
571 // Phase D: if this peer is a configured relay, register
572 // a `/p2p-circuit` reservation on first identify. Idem-
573 // potent — only fire if we haven't listened on this
574 // relay already (identify fires periodically).
575 if self.relay_peer_ids.contains(&peer_id) {
576 use libp2p::multiaddr::Protocol;
577 if let Some(relay_addr) = self
578 .configured_relays
579 .iter()
580 .find(|a| {
581 // Match by /p2p/<peer-id> suffix when
582 // present, else by the addr we dialed.
583 a.iter().any(|p| matches!(p, Protocol::P2p(pid) if pid == peer_id))
584 || self.dial_attempts.values().any(|d| d == *a)
585 })
586 .cloned()
587 {
588 let circuit = relay_addr.with(Protocol::P2pCircuit);
589 match self.swarm.listen_on(circuit.clone()) {
590 Ok(_) => info!(%circuit, "listening on relay circuit"),
591 Err(e) => warn!(%e, %circuit, "relay listen_on failed"),
592 }
593 }
594 }
595 // Decode the remote's Ed25519 pubkey and derive our
596 // 24-char fingerprint from it. Non-Ed25519 keys (Secp,
597 // Rsa, Ecdsa) shouldn't appear in practice — huddle
598 // only generates Ed25519 identities — so we just log
599 // and skip if the cast fails.
600 let fingerprint = match info.public_key.clone().try_into_ed25519() {
601 Ok(ed_pk) => {
602 let bytes = ed_pk.to_bytes();
603 compute_fingerprint(&bytes)
604 }
605 Err(_) => {
606 warn!(%peer_id, "identify pubkey isn't Ed25519; skipping fingerprint");
607 return;
608 }
609 };
610 // Always notify the app layer so it can populate
611 // `known_peers.fingerprint` and detect that an
612 // outbound peer we dialed has fully identified.
613 let _ = self
614 .event_tx
615 .send(NetworkEvent::PeerIdentified {
616 peer_id,
617 fingerprint: fingerprint.clone(),
618 })
619 .await;
620 // If the peer is in pending_inbound, Identify completing
621 // is the cue to surface the user prompt. Keep them in
622 // pending_inbound until Accept or Reject — we don't
623 // know yet which way the user will decide.
624 if let Some(PendingPeer::InboundUnknown { address }) =
625 self.pending_inbound.get(&peer_id)
626 {
627 let address = address.clone();
628 let _ = self
629 .event_tx
630 .send(NetworkEvent::InboundDial {
631 peer_id,
632 fingerprint,
633 address,
634 })
635 .await;
636 }
637 }
638 HuddleBehaviorEvent::AutonatClient(ev) => {
639 // One probe per address candidate. `result.is_ok()`
640 // means a remote AutoNAT server dialed us back
641 // successfully on `tested_addr` ⇒ this address is
642 // reachable from the outside. The app layer aggregates
643 // these into the lobby reachability badge.
644 let reachable = ev.result.is_ok();
645 if reachable {
646 info!(tested = %ev.tested_addr, server = %ev.server, "AutoNAT: reachable");
647 } else {
648 debug!(tested = %ev.tested_addr, server = %ev.server, "AutoNAT: probe failed");
649 }
650 let _ = self
651 .event_tx
652 .send(NetworkEvent::NatProbeResult {
653 tested_addr: ev.tested_addr,
654 reachable,
655 })
656 .await;
657 }
658 HuddleBehaviorEvent::AutonatServer(_) => {
659 // We answered another peer's reachability probe.
660 // No app-visible action.
661 }
662 HuddleBehaviorEvent::Dcutr(ev) => {
663 let success = ev.result.is_ok();
664 if success {
665 info!(remote = %ev.remote_peer_id, "DCUtR: direct connection established");
666 // huddle 0.7.11: a successful hole-punch resets the
667 // per-peer DCUtR retry counter so future failures
668 // don't immediately bail out.
669 self.dcutr_failures.remove(&ev.remote_peer_id);
670 } else {
671 debug!(remote = %ev.remote_peer_id, "DCUtR: hole-punch failed");
672 let count = self
673 .dcutr_failures
674 .entry(ev.remote_peer_id)
675 .and_modify(|n| *n += 1)
676 .or_insert(1);
677 if *count >= DCUTR_FAIL_BUDGET {
678 warn!(
679 remote = %ev.remote_peer_id,
680 attempts = *count,
681 "DCUtR: giving up after repeated failures (symmetric NAT likely)"
682 );
683 }
684 }
685 let _ = self
686 .event_tx
687 .send(NetworkEvent::DcutrUpgrade {
688 remote_peer: ev.remote_peer_id,
689 success,
690 })
691 .await;
692 }
693 HuddleBehaviorEvent::RelayClient(event) => {
694 // huddle 0.7.12: libp2p 0.56's `relay::client::Event`
695 // only exposes the success-path variants
696 // (`ReservationReqAccepted`, `OutboundCircuitEstablished`,
697 // `InboundCircuitEstablished`). There's no
698 // `ReservationReqFailed` arm we can match on, so we
699 // can't reliably surface reservation loss without a
700 // separate health-check timer (future work). For now
701 // we log every event so operators can see the
702 // lifecycle in `huddle.log` — pre-0.7.11 the whole
703 // arm was swallowed by `_ => {}` which hid even that.
704 use libp2p::relay::client::Event as Rc;
705 match event {
706 Rc::ReservationReqAccepted { relay_peer_id, .. } => {
707 info!(%relay_peer_id, "relay: reservation accepted");
708 }
709 Rc::OutboundCircuitEstablished { relay_peer_id, .. } => {
710 debug!(%relay_peer_id, "relay: outbound circuit established");
711 }
712 other => {
713 debug!(?other, "relay client event");
714 }
715 }
716 }
717 _ => {}
718 }
719 }
720
721 async fn handle_gossipsub_message(
722 &mut self,
723 from_peer: PeerId,
724 message: gossipsub::Message,
725 ) {
726 let topic = message.topic.to_string();
727 if topic == ROOMS_TOPIC {
728 match serde_json::from_slice::<RoomAnnouncement>(&message.data) {
729 Ok(ann) => {
730 let _ = self
731 .event_tx
732 .send(NetworkEvent::RoomAnnouncementReceived(ann))
733 .await;
734 }
735 Err(e) => {
736 warn!(%e, "bad room announcement");
737 }
738 }
739 } else if let Some(room_id) = topic.strip_prefix(protocol::ROOM_TOPIC_PREFIX) {
740 let _ = self
741 .event_tx
742 .send(NetworkEvent::RoomMessageReceived {
743 room_id: room_id.to_string(),
744 payload: message.data,
745 from_peer,
746 })
747 .await;
748 }
749 }
750
751 fn handle_command(&mut self, cmd: NetworkCommand) {
752 match cmd {
753 NetworkCommand::SubscribeRoom { room_id } => {
754 let topic = gossipsub::IdentTopic::new(room_topic(&room_id));
755 if let Err(e) = self.swarm.behaviour_mut().gossipsub.subscribe(&topic) {
756 warn!(%e, %room_id, "subscribe room failed");
757 }
758 }
759 NetworkCommand::UnsubscribeRoom { room_id } => {
760 let topic = gossipsub::IdentTopic::new(room_topic(&room_id));
761 self.swarm.behaviour_mut().gossipsub.unsubscribe(&topic);
762 }
763 NetworkCommand::PublishRoomMessage { room_id, payload } => {
764 let topic = gossipsub::IdentTopic::new(room_topic(&room_id));
765 if let Err(e) = self.swarm.behaviour_mut().gossipsub.publish(topic, payload) {
766 // No subscribed peers is expected before the mesh
767 // forms; anything else (MessageTooLarge, full queues)
768 // is a real bug worth surfacing.
769 match e {
770 gossipsub::PublishError::NoPeersSubscribedToTopic => {
771 debug!(%room_id, "publish skipped: no peers subscribed to topic yet");
772 }
773 e => warn!(%e, %room_id, "publish room message failed"),
774 }
775 }
776 }
777 NetworkCommand::AnnounceRoom(ann) => {
778 let topic = gossipsub::IdentTopic::new(ROOMS_TOPIC);
779 match serde_json::to_vec(&ann) {
780 Ok(payload) => {
781 if let Err(e) =
782 self.swarm.behaviour_mut().gossipsub.publish(topic, payload)
783 {
784 debug!(%e, "publish room announcement failed");
785 }
786 }
787 Err(e) => warn!(%e, "encode room announcement"),
788 }
789 }
790 NetworkCommand::AcceptInbound { peer_id } => {
791 if self.pending_inbound.remove(&peer_id).is_some() {
792 info!(%peer_id, "inbound dial accepted — promoting to mesh");
793 self.swarm
794 .behaviour_mut()
795 .gossipsub
796 .add_explicit_peer(&peer_id);
797 self.discovered_peers.insert(peer_id);
798 } else {
799 debug!(%peer_id, "AcceptInbound for unknown peer (already promoted or disconnected)");
800 }
801 }
802 NetworkCommand::RejectInbound { peer_id } => {
803 self.pending_inbound.remove(&peer_id);
804 self.session_blocklist.insert(peer_id);
805 info!(%peer_id, "inbound dial rejected — disconnecting");
806 let _ = self.swarm.disconnect_peer_id(peer_id);
807 }
808 NetworkCommand::DisconnectPeer { peer_id } => {
809 info!(%peer_id, "app-level identity check failed — disconnecting");
810 let _ = self.swarm.disconnect_peer_id(peer_id);
811 }
812 NetworkCommand::Dial { address } => {
813 let opts: DialOpts = address.clone().into();
814 let conn_id = opts.connection_id();
815 match self.swarm.dial(opts) {
816 Ok(()) => {
817 self.dial_attempts.insert(conn_id, address);
818 }
819 Err(e) => {
820 // Synchronous dial error (bad multiaddr, transport refused).
821 let tx = self.event_tx.clone();
822 let err = e.to_string();
823 tokio::spawn(async move {
824 let _ = tx
825 .send(NetworkEvent::DialFailed {
826 address,
827 error: err,
828 })
829 .await;
830 });
831 }
832 }
833 }
834 NetworkCommand::DialAddresses { addresses } => {
835 use libp2p::multiaddr::Protocol;
836 if addresses.is_empty() {
837 return;
838 }
839 // Extract the shared peer-id from any address with a
840 // `/p2p/<peer-id>` suffix. All host_addrs originating
841 // from the same `RoomAnnouncement` share the
842 // announcer's peer-id, so the first match is enough.
843 let peer_id = addresses
844 .iter()
845 .flat_map(|a| a.iter())
846 .find_map(|p| match p {
847 Protocol::P2p(pid) => Some(pid),
848 _ => None,
849 });
850 let opts = match peer_id {
851 Some(pid) => DialOpts::peer_id(pid)
852 .addresses(addresses.clone())
853 .build(),
854 // No /p2p/ segment anywhere — fall back to single-
855 // address dial of the first candidate, matching the
856 // legacy `Dial` semantics for unanchored multiaddrs.
857 None => addresses[0].clone().into(),
858 };
859 let conn_id = opts.connection_id();
860 // Use the first address as the representative for
861 // dial_attempts. On synchronous error we report it;
862 // on async success the post-identify handler upserts
863 // with the actually-connected endpoint.
864 let primary = addresses[0].clone();
865 match self.swarm.dial(opts) {
866 Ok(()) => {
867 self.dial_attempts.insert(conn_id, primary);
868 }
869 Err(e) => {
870 let tx = self.event_tx.clone();
871 let err = e.to_string();
872 tokio::spawn(async move {
873 let _ = tx
874 .send(NetworkEvent::DialFailed {
875 address: primary,
876 error: err,
877 })
878 .await;
879 });
880 }
881 }
882 }
883 NetworkCommand::Shutdown => unreachable!(),
884 }
885 }
886}