Skip to main content

huddle_core/network/
mod.rs

1pub mod behavior;
2pub mod events;
3pub mod protocol;
4
5use std::collections::{HashMap, HashSet};
6use std::time::Duration;
7
8use futures::StreamExt;
9use libp2p::core::ConnectedPoint;
10use libp2p::swarm::dial_opts::DialOpts;
11use libp2p::swarm::ConnectionId;
12use libp2p::{
13    autonat, dcutr, gossipsub, identify, mdns, noise, ping, tcp, yamux, Multiaddr, PeerId, Swarm,
14    SwarmBuilder,
15};
16use tokio::sync::mpsc;
17use tracing::{debug, info, warn};
18
19/// How the network discovers peers.
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum NetworkMode {
22    /// mDNS on: announce ourselves on the LAN and pick up announcements.
23    Mdns,
24    /// mDNS off: invisible to LAN discovery; the only way to connect is
25    /// for someone to dial our address (or for us to dial theirs).
26    Direct,
27}
28
29impl NetworkMode {
30    pub fn as_str(&self) -> &'static str {
31        match self {
32            NetworkMode::Mdns => "mdns",
33            NetworkMode::Direct => "direct",
34        }
35    }
36
37    pub fn from_str(s: &str) -> Option<Self> {
38        match s.trim().to_ascii_lowercase().as_str() {
39            "mdns" | "lan" | "open" => Some(NetworkMode::Mdns),
40            "direct" | "dial" | "private" => Some(NetworkMode::Direct),
41            _ => None,
42        }
43    }
44}
45
46use crate::identity::{compute_fingerprint, Identity};
47use crate::network::behavior::{HuddleBehavior, HuddleBehaviorEvent};
48use crate::network::events::NetworkEvent;
49use crate::network::protocol::{room_topic, RoomAnnouncement, ROOMS_TOPIC};
50
51#[derive(Debug)]
52pub enum NetworkCommand {
53    /// Subscribe to a room's per-room gossipsub topic.
54    SubscribeRoom { room_id: String },
55    /// Unsubscribe from a room's topic.
56    UnsubscribeRoom { room_id: String },
57    /// Publish a JSON-encoded `RoomMessage` to a room's topic.
58    PublishRoomMessage { room_id: String, payload: Vec<u8> },
59    /// Publish a room announcement on the global rooms topic.
60    AnnounceRoom(RoomAnnouncement),
61    /// User-initiated dial of an explicit address. Used for cross-network
62    /// reach when mDNS isn't enough.
63    Dial { address: Multiaddr },
64    /// huddle 0.5.2: dial a peer using multiple candidate addresses,
65    /// letting libp2p race them in parallel. Used by the "add by HD
66    /// ID / username" flow, which resolves a fingerprint to every
67    /// address we know for that peer (room announcement `host_addrs`
68    /// + persisted `known_peers`). libp2p's parallel dialer picks
69    /// the cheapest path that completes — LAN beats public IP beats
70    /// relay-hopped without us having to probe transports manually.
71    DialAddresses { addresses: Vec<Multiaddr> },
72    /// Phase A: user accepted an inbound dial — promote the peer to
73    /// explicit-peer status so room announcements flow.
74    AcceptInbound { peer_id: PeerId },
75    /// Phase A: user rejected an inbound dial — disconnect them and
76    /// add the peer_id to the in-memory blocklist for this session
77    /// (caller is responsible for the persistent blocked_peers row).
78    RejectInbound { peer_id: PeerId },
79    /// Phase C follow-up: drop a connection that failed an
80    /// application-level identity check (e.g. invite-fingerprint
81    /// mismatch). Differs from `RejectInbound` in that it doesn't
82    /// touch the inbound-pending map (the connection is already
83    /// past Identify when we discover the mismatch) and doesn't
84    /// persist a block — the caller may want to retry with a
85    /// corrected invite.
86    DisconnectPeer { peer_id: PeerId },
87    Shutdown,
88}
89
90#[derive(Clone)]
91pub struct NetworkHandle {
92    cmd_tx: mpsc::Sender<NetworkCommand>,
93}
94
95impl NetworkHandle {
96    pub async fn subscribe_room(&self, room_id: String) {
97        let _ = self
98            .cmd_tx
99            .send(NetworkCommand::SubscribeRoom { room_id })
100            .await;
101    }
102
103    pub async fn unsubscribe_room(&self, room_id: String) {
104        let _ = self
105            .cmd_tx
106            .send(NetworkCommand::UnsubscribeRoom { room_id })
107            .await;
108    }
109
110    pub async fn publish_room_message(&self, room_id: String, payload: Vec<u8>) {
111        let _ = self
112            .cmd_tx
113            .send(NetworkCommand::PublishRoomMessage { room_id, payload })
114            .await;
115    }
116
117    pub async fn announce_room(&self, ann: RoomAnnouncement) {
118        let _ = self.cmd_tx.send(NetworkCommand::AnnounceRoom(ann)).await;
119    }
120
121    pub async fn dial(&self, address: Multiaddr) {
122        let _ = self.cmd_tx.send(NetworkCommand::Dial { address }).await;
123    }
124
125    /// huddle 0.5.2: dial a peer with multiple candidate addresses
126    /// at once. libp2p's swarm races them; the first to complete a
127    /// handshake wins, the rest are dropped. Use when you've resolved
128    /// a peer fingerprint to several known transports — typically
129    /// LAN ip4 + public ip4 + relay circuit.
130    pub async fn dial_addresses(&self, addresses: Vec<Multiaddr>) {
131        let _ = self
132            .cmd_tx
133            .send(NetworkCommand::DialAddresses { addresses })
134            .await;
135    }
136
137    pub async fn accept_inbound(&self, peer_id: PeerId) {
138        let _ = self
139            .cmd_tx
140            .send(NetworkCommand::AcceptInbound { peer_id })
141            .await;
142    }
143
144    pub async fn reject_inbound(&self, peer_id: PeerId) {
145        let _ = self
146            .cmd_tx
147            .send(NetworkCommand::RejectInbound { peer_id })
148            .await;
149    }
150
151    pub async fn disconnect_peer(&self, peer_id: PeerId) {
152        let _ = self
153            .cmd_tx
154            .send(NetworkCommand::DisconnectPeer { peer_id })
155            .await;
156    }
157
158    pub async fn shutdown(&self) {
159        let _ = self.cmd_tx.send(NetworkCommand::Shutdown).await;
160    }
161}
162
163/// What kind of connection we're holding open until Identify gives us
164/// the remote's Ed25519 fingerprint. Quarantined inbound dials are NOT
165/// added to the gossipsub mesh until the user accepts; outbound user-
166/// dials add to the mesh on connection but still wait on Identify so
167/// the eventual `DialSucceeded` can carry the fingerprint.
168#[derive(Debug)]
169enum PendingPeer {
170    /// Inbound dial from an unknown peer — modal-pending in the app.
171    InboundUnknown { address: Multiaddr },
172}
173
174struct NetworkTask {
175    swarm: Swarm<HuddleBehavior>,
176    cmd_rx: mpsc::Receiver<NetworkCommand>,
177    event_tx: mpsc::Sender<NetworkEvent>,
178    discovered_peers: HashSet<PeerId>,
179    /// Tracks user-initiated dials so we can correlate the eventual
180    /// `ConnectionEstablished` / `OutgoingConnectionError` back to a
181    /// specific address the user asked us to dial.
182    dial_attempts: HashMap<ConnectionId, Multiaddr>,
183    /// Phase A: peers connected but not yet promoted to the gossipsub
184    /// mesh (inbound) — waiting either for `Identify` to land so we
185    /// know their fingerprint, or for the user's accept/reject decision.
186    /// On `ConnectionClosed` we drop entries here so a peer disconnecting
187    /// mid-prompt doesn't leak state.
188    pending_inbound: HashMap<PeerId, PendingPeer>,
189    /// Phase A: peers the user has explicitly rejected this session —
190    /// auto-disconnect every reconnect attempt without re-prompting.
191    /// Persistent across runs via `blocked_peers`; this in-memory copy
192    /// is loaded at startup (TODO: not yet wired; falls back to the DB
193    /// check in the app layer for now) and updated on `RejectInbound`.
194    session_blocklist: HashSet<PeerId>,
195    /// Phase D: configured relay multiaddrs. When Identify lands for a
196    /// peer whose multiaddr matches one of these, we call
197    /// `listen_on("<addr>/p2p-circuit")` to register a reservation.
198    /// Tracked as multiaddr strings (no PeerId yet — we don't know it
199    /// until Identify) plus a set of peer_ids of confirmed relays so
200    /// we only register once per relay.
201    configured_relays: Vec<Multiaddr>,
202    relay_peer_ids: HashSet<PeerId>,
203    /// huddle 0.7.11: per-peer DCUtR failure counter. Logged at warn
204    /// once the count crosses `DCUTR_FAIL_BUDGET` so symmetric-NAT
205    /// pairs don't generate runaway hole-punch attempts in the logs.
206    /// libp2p's dcutr behavior schedules its own retries internally;
207    /// this is purely an observability signal for the app.
208    dcutr_failures: HashMap<PeerId, u32>,
209}
210
211/// huddle 0.7.11: warn after this many failed DCUtR attempts to the
212/// same peer. dcutr keeps trying internally, but the audit flagged the
213/// spam as a real issue — capping the log noise is the realistic fix.
214const DCUTR_FAIL_BUDGET: u32 = 6;
215
216pub fn start_network(
217    identity: &Identity,
218    event_tx: mpsc::Sender<NetworkEvent>,
219) -> crate::error::Result<NetworkHandle> {
220    start_network_with(identity, event_tx, NetworkMode::Mdns, 0, Vec::new())
221}
222
223/// Start the network task with explicit mode, TCP listen port, and any
224/// pre-configured relay multiaddrs. `listen_port = 0` requests a
225/// random port. Relays are dialed on startup; once `Identify` lands
226/// from a relay peer, we call `listen_on("<relay>/p2p-circuit")` to
227/// register a reservation so peers behind NAT can dial us through it.
228pub fn start_network_with(
229    identity: &Identity,
230    event_tx: mpsc::Sender<NetworkEvent>,
231    mode: NetworkMode,
232    listen_port: u16,
233    relays: Vec<Multiaddr>,
234) -> crate::error::Result<NetworkHandle> {
235    let keypair = identity.keypair().clone();
236    let local_peer_id = identity.peer_id();
237
238    let mut swarm = SwarmBuilder::with_existing_identity(keypair)
239        .with_tokio()
240        .with_tcp(
241            tcp::Config::default(),
242            noise::Config::new,
243            yamux::Config::default,
244        )
245        .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?
246        // Phase D: wrap the transport with relay-client. This both
247        // composes the transport (so we can dial `/p2p-circuit/`
248        // addresses) and surfaces a `relay::client::Behaviour` into
249        // the `with_behaviour` closure as the second argument.
250        .with_relay_client(noise::Config::new, yamux::Config::default)
251        .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?
252        .with_behaviour(|key, relay_client| {
253            let mdns_opt = match mode {
254                NetworkMode::Mdns => Some(
255                    mdns::tokio::Behaviour::new(mdns::Config::default(), local_peer_id)
256                        .expect("mDNS init failed"),
257                ),
258                NetworkMode::Direct => None,
259            };
260            let mdns: libp2p::swarm::behaviour::toggle::Toggle<_> = mdns_opt.into();
261
262            let identify = identify::Behaviour::new(
263                identify::Config::new("/huddle/1.0.0".into(), key.public())
264                    .with_agent_version("huddle/0.5".into()),
265            );
266
267            let ping = ping::Behaviour::default();
268
269            let gossipsub_config = gossipsub::ConfigBuilder::default()
270                .heartbeat_interval(Duration::from_secs(1))
271                .validation_mode(gossipsub::ValidationMode::Strict)
272                // Default is 64 KiB. Raise it so base64-encoded file
273                // chunks (see files::CHUNK_SIZE) keep ample headroom.
274                .max_transmit_size(256 * 1024)
275                .build()
276                .expect("valid gossipsub config");
277
278            let mut gossipsub = gossipsub::Behaviour::new(
279                gossipsub::MessageAuthenticity::Signed(key.clone()),
280                gossipsub_config,
281            )
282            .expect("valid gossipsub init");
283
284            // Every node subscribes to the global rooms topic so the lobby
285            // shows discovered rooms even before joining anything.
286            let rooms_topic = gossipsub::IdentTopic::new(ROOMS_TOPIC);
287            gossipsub
288                .subscribe(&rooms_topic)
289                .expect("subscribe rooms topic");
290
291            // AutoNAT v2: client probes external addresses by asking
292            // remote servers to dial us back; server answers other
293            // peers' probes. Both halves are needed for symmetric
294            // P2P reachability detection (v1 combined them; v2 split).
295            let autonat_client = autonat::v2::client::Behaviour::new(
296                rand::rngs::OsRng,
297                autonat::v2::client::Config::default(),
298            );
299            let autonat_server = autonat::v2::server::Behaviour::new(rand::rngs::OsRng);
300            let dcutr = dcutr::Behaviour::new(local_peer_id);
301
302            HuddleBehavior {
303                mdns,
304                identify,
305                ping,
306                gossipsub,
307                relay_client,
308                autonat_client,
309                autonat_server,
310                dcutr,
311            }
312        })
313        .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?
314        .with_swarm_config(|cfg| cfg.with_idle_connection_timeout(Duration::from_secs(120)))
315        .build();
316
317    let listen_addr: Multiaddr = format!("/ip4/0.0.0.0/tcp/{}", listen_port)
318        .parse()
319        .expect("valid listen addr");
320    swarm
321        .listen_on(listen_addr)
322        .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?;
323    // Also bind IPv6 on all interfaces so users can dial via IPv6.
324    let listen_addr6: Multiaddr = format!("/ip6/::/tcp/{}", listen_port)
325        .parse()
326        .expect("valid ipv6 listen addr");
327    if let Err(e) = swarm.listen_on(listen_addr6) {
328        debug!(%e, "ipv6 listen skipped");
329    }
330
331    let (cmd_tx, cmd_rx) = mpsc::channel(256);
332    let mut task = NetworkTask {
333        swarm,
334        cmd_rx,
335        event_tx,
336        discovered_peers: HashSet::new(),
337        dial_attempts: HashMap::new(),
338        pending_inbound: HashMap::new(),
339        session_blocklist: HashSet::new(),
340        configured_relays: relays.clone(),
341        relay_peer_ids: HashSet::new(),
342        dcutr_failures: HashMap::new(),
343    };
344    // Phase D: dial each configured relay so Identify can complete and
345    // we can register a `/p2p-circuit` reservation. Failures here are
346    // non-fatal — the user can still chat on LAN.
347    for relay_addr in relays {
348        info!(addr = %relay_addr, "dialing configured relay");
349        let opts: libp2p::swarm::dial_opts::DialOpts = relay_addr.clone().into();
350        let conn_id = opts.connection_id();
351        match task.swarm.dial(opts) {
352            Ok(()) => {
353                task.dial_attempts.insert(conn_id, relay_addr);
354            }
355            Err(e) => warn!(%e, "dial relay failed"),
356        }
357    }
358    tokio::spawn(task.run());
359
360    Ok(NetworkHandle { cmd_tx })
361}
362
363impl NetworkTask {
364    async fn run(mut self) {
365        loop {
366            tokio::select! {
367                event = self.swarm.select_next_some() => {
368                    self.handle_swarm_event(event).await;
369                }
370                Some(cmd) = self.cmd_rx.recv() => {
371                    if matches!(cmd, NetworkCommand::Shutdown) {
372                        info!("network task shutting down");
373                        break;
374                    }
375                    self.handle_command(cmd);
376                }
377            }
378        }
379    }
380
381    async fn handle_swarm_event(
382        &mut self,
383        event: libp2p::swarm::SwarmEvent<HuddleBehaviorEvent>,
384    ) {
385        match event {
386            libp2p::swarm::SwarmEvent::NewListenAddr { address, .. } => {
387                info!(%address, "listening");
388                // Phase D: a relay-circuit address is a reachability
389                // milestone — surface it as its own event so the lobby
390                // can show "reachable via N relays" status.
391                use libp2p::multiaddr::Protocol;
392                let is_circuit = address
393                    .iter()
394                    .any(|p| matches!(p, Protocol::P2pCircuit));
395                if is_circuit {
396                    let _ = self
397                        .event_tx
398                        .send(NetworkEvent::RelayReservationEstablished {
399                            address: address.clone(),
400                        })
401                        .await;
402                }
403                let _ = self
404                    .event_tx
405                    .send(NetworkEvent::ListeningOn { address })
406                    .await;
407            }
408            libp2p::swarm::SwarmEvent::ConnectionEstablished {
409                peer_id,
410                connection_id,
411                endpoint,
412                ..
413            } => {
414                if let Some(addr) = self.dial_attempts.remove(&connection_id) {
415                    // Phase D: a connection that was for a configured
416                    // relay shouldn't pollute the lobby with a normal
417                    // DialSucceeded — relays aren't chat peers. We just
418                    // remember the peer_id and wait for Identify, then
419                    // register a /p2p-circuit reservation.
420                    let is_relay = self.configured_relays.iter().any(|r| r == &addr);
421                    if is_relay {
422                        info!(%peer_id, %addr, "connected to configured relay");
423                        self.relay_peer_ids.insert(peer_id);
424                    } else {
425                        info!(%peer_id, %addr, "user-dialed peer connected");
426                        // Treat dialed peers like mDNS-discovered: add
427                        // to gossipsub explicit peers so room
428                        // announcements flow.
429                        self.swarm
430                            .behaviour_mut()
431                            .gossipsub
432                            .add_explicit_peer(&peer_id);
433                        self.discovered_peers.insert(peer_id);
434                        let _ = self
435                            .event_tx
436                            .send(NetworkEvent::DialSucceeded {
437                                peer_id,
438                                address: addr,
439                            })
440                            .await;
441                    }
442                } else if let ConnectedPoint::Dialer { .. } = endpoint {
443                    // Outgoing connection we didn't track (e.g. mDNS auto-dial)
444                    // — still add to mesh; no user-visible event needed.
445                    self.swarm
446                        .behaviour_mut()
447                        .gossipsub
448                        .add_explicit_peer(&peer_id);
449                } else {
450                    // Inbound dial from an unknown peer (Phase A). We
451                    // hold the connection but do NOT add them to the
452                    // explicit-peer set yet — wait for Identify so we
453                    // can show the user the peer's fingerprint, then
454                    // either AcceptInbound (promote to mesh) or
455                    // RejectInbound (disconnect + persist blocklist).
456                    //
457                    // Known limitation: gossipsub's score-based mesh
458                    // formation may still forward topic messages to
459                    // this peer via other peers we have in common.
460                    // True hard-quarantine would need a custom
461                    // ConnectionHandler — out of scope for v1.
462                    if self.session_blocklist.contains(&peer_id) {
463                        info!(%peer_id, "rejecting inbound from session-blocked peer");
464                        let _ = self.swarm.disconnect_peer_id(peer_id);
465                    } else {
466                        let address = match &endpoint {
467                            ConnectedPoint::Listener { send_back_addr, .. } => {
468                                send_back_addr.clone()
469                            }
470                            _ => Multiaddr::empty(),
471                        };
472                        debug!(%peer_id, %address, "inbound peer pending decision");
473                        self.pending_inbound
474                            .insert(peer_id, PendingPeer::InboundUnknown { address });
475                    }
476                }
477            }
478            libp2p::swarm::SwarmEvent::OutgoingConnectionError {
479                connection_id,
480                error,
481                ..
482            } => {
483                if let Some(addr) = self.dial_attempts.remove(&connection_id) {
484                    warn!(%addr, %error, "user-dialed peer failed");
485                    let _ = self
486                        .event_tx
487                        .send(NetworkEvent::DialFailed {
488                            address: addr,
489                            error: error.to_string(),
490                        })
491                        .await;
492                }
493            }
494            libp2p::swarm::SwarmEvent::ConnectionClosed {
495                peer_id,
496                num_established,
497                ..
498            } => {
499                // Drop any pending-inbound entry for this peer — they
500                // disconnected before we could prompt the user (or
501                // before the user accepted). Lets a re-connect start
502                // fresh rather than reusing stale state.
503                self.pending_inbound.remove(&peer_id);
504                // huddle 0.7.11: emit PeerDisconnected so the app can
505                // clean its `connected_dial_addrs` map. Only fire when
506                // the LAST connection to that peer closed — multiple
507                // simultaneous connections (e.g. direct + relay) would
508                // otherwise emit spurious disconnects when one of the
509                // two drops.
510                if num_established == 0 {
511                    let _ = self
512                        .event_tx
513                        .send(NetworkEvent::PeerDisconnected { peer_id })
514                        .await;
515                    // Also clean up gossipsub's explicit-peers set so
516                    // it doesn't accumulate dead PeerIds across the
517                    // session.
518                    self.swarm
519                        .behaviour_mut()
520                        .gossipsub
521                        .remove_explicit_peer(&peer_id);
522                }
523            }
524            libp2p::swarm::SwarmEvent::Behaviour(be) => self.handle_behavior_event(be).await,
525            _ => {}
526        }
527    }
528
529    async fn handle_behavior_event(&mut self, event: HuddleBehaviorEvent) {
530        match event {
531            HuddleBehaviorEvent::Mdns(mdns::Event::Discovered(peers)) => {
532                for (peer_id, addr) in peers {
533                    if self.discovered_peers.insert(peer_id) {
534                        info!(%peer_id, %addr, "mDNS discovered");
535                        self.swarm.add_peer_address(peer_id, addr);
536                        // Explicitly add to gossipsub mesh.
537                        self.swarm
538                            .behaviour_mut()
539                            .gossipsub
540                            .add_explicit_peer(&peer_id);
541                        let _ = self
542                            .event_tx
543                            .send(NetworkEvent::PeerDiscovered { peer_id })
544                            .await;
545                    }
546                }
547            }
548            HuddleBehaviorEvent::Mdns(mdns::Event::Expired(peers)) => {
549                for (peer_id, _) in peers {
550                    if self.discovered_peers.remove(&peer_id) {
551                        info!(%peer_id, "mDNS peer expired");
552                        self.swarm
553                            .behaviour_mut()
554                            .gossipsub
555                            .remove_explicit_peer(&peer_id);
556                        let _ = self.event_tx.send(NetworkEvent::PeerExpired { peer_id }).await;
557                    }
558                }
559            }
560            HuddleBehaviorEvent::Gossipsub(gossipsub::Event::Message {
561                propagation_source,
562                message,
563                ..
564            }) => {
565                self.handle_gossipsub_message(propagation_source, message).await;
566            }
567            HuddleBehaviorEvent::Identify(identify::Event::Received {
568                peer_id, info, ..
569            }) => {
570                debug!(%peer_id, agent = %info.agent_version, "identify received");
571                // Phase D: if this peer is a configured relay, register
572                // a `/p2p-circuit` reservation on first identify. Idem-
573                // potent — only fire if we haven't listened on this
574                // relay already (identify fires periodically).
575                if self.relay_peer_ids.contains(&peer_id) {
576                    use libp2p::multiaddr::Protocol;
577                    if let Some(relay_addr) = self
578                        .configured_relays
579                        .iter()
580                        .find(|a| {
581                            // Match by /p2p/<peer-id> suffix when
582                            // present, else by the addr we dialed.
583                            a.iter().any(|p| matches!(p, Protocol::P2p(pid) if pid == peer_id))
584                                || self.dial_attempts.values().any(|d| d == *a)
585                        })
586                        .cloned()
587                    {
588                        let circuit = relay_addr.with(Protocol::P2pCircuit);
589                        match self.swarm.listen_on(circuit.clone()) {
590                            Ok(_) => info!(%circuit, "listening on relay circuit"),
591                            Err(e) => warn!(%e, %circuit, "relay listen_on failed"),
592                        }
593                    }
594                }
595                // Decode the remote's Ed25519 pubkey and derive our
596                // 24-char fingerprint from it. Non-Ed25519 keys (Secp,
597                // Rsa, Ecdsa) shouldn't appear in practice — huddle
598                // only generates Ed25519 identities — so we just log
599                // and skip if the cast fails.
600                let fingerprint = match info.public_key.clone().try_into_ed25519() {
601                    Ok(ed_pk) => {
602                        let bytes = ed_pk.to_bytes();
603                        compute_fingerprint(&bytes)
604                    }
605                    Err(_) => {
606                        warn!(%peer_id, "identify pubkey isn't Ed25519; skipping fingerprint");
607                        return;
608                    }
609                };
610                // Always notify the app layer so it can populate
611                // `known_peers.fingerprint` and detect that an
612                // outbound peer we dialed has fully identified.
613                let _ = self
614                    .event_tx
615                    .send(NetworkEvent::PeerIdentified {
616                        peer_id,
617                        fingerprint: fingerprint.clone(),
618                    })
619                    .await;
620                // If the peer is in pending_inbound, Identify completing
621                // is the cue to surface the user prompt. Keep them in
622                // pending_inbound until Accept or Reject — we don't
623                // know yet which way the user will decide.
624                if let Some(PendingPeer::InboundUnknown { address }) =
625                    self.pending_inbound.get(&peer_id)
626                {
627                    let address = address.clone();
628                    let _ = self
629                        .event_tx
630                        .send(NetworkEvent::InboundDial {
631                            peer_id,
632                            fingerprint,
633                            address,
634                        })
635                        .await;
636                }
637            }
638            HuddleBehaviorEvent::AutonatClient(ev) => {
639                // One probe per address candidate. `result.is_ok()`
640                // means a remote AutoNAT server dialed us back
641                // successfully on `tested_addr` ⇒ this address is
642                // reachable from the outside. The app layer aggregates
643                // these into the lobby reachability badge.
644                let reachable = ev.result.is_ok();
645                if reachable {
646                    info!(tested = %ev.tested_addr, server = %ev.server, "AutoNAT: reachable");
647                } else {
648                    debug!(tested = %ev.tested_addr, server = %ev.server, "AutoNAT: probe failed");
649                }
650                let _ = self
651                    .event_tx
652                    .send(NetworkEvent::NatProbeResult {
653                        tested_addr: ev.tested_addr,
654                        reachable,
655                    })
656                    .await;
657            }
658            HuddleBehaviorEvent::AutonatServer(_) => {
659                // We answered another peer's reachability probe.
660                // No app-visible action.
661            }
662            HuddleBehaviorEvent::Dcutr(ev) => {
663                let success = ev.result.is_ok();
664                if success {
665                    info!(remote = %ev.remote_peer_id, "DCUtR: direct connection established");
666                    // huddle 0.7.11: a successful hole-punch resets the
667                    // per-peer DCUtR retry counter so future failures
668                    // don't immediately bail out.
669                    self.dcutr_failures.remove(&ev.remote_peer_id);
670                } else {
671                    debug!(remote = %ev.remote_peer_id, "DCUtR: hole-punch failed");
672                    let count = self
673                        .dcutr_failures
674                        .entry(ev.remote_peer_id)
675                        .and_modify(|n| *n += 1)
676                        .or_insert(1);
677                    if *count >= DCUTR_FAIL_BUDGET {
678                        warn!(
679                            remote = %ev.remote_peer_id,
680                            attempts = *count,
681                            "DCUtR: giving up after repeated failures (symmetric NAT likely)"
682                        );
683                    }
684                }
685                let _ = self
686                    .event_tx
687                    .send(NetworkEvent::DcutrUpgrade {
688                        remote_peer: ev.remote_peer_id,
689                        success,
690                    })
691                    .await;
692            }
693            HuddleBehaviorEvent::RelayClient(event) => {
694                // huddle 0.7.11: relay-client lifecycle. Pre-0.7.11 these
695                // events were swallowed by `_ => {}` so a relay
696                // reservation expiry never reached the app — peers
697                // behind NAT became silently unreachable. We can't
698                // re-listen automatically (the `listen_on(/p2p-circuit)`
699                // call requires the relay's peer-id from Identify, and
700                // we may have lost it), but surfacing the loss lets the
701                // app shift the NAT badge.
702                use libp2p::relay::client::Event as Rc;
703                match event {
704                    Rc::ReservationReqAccepted { relay_peer_id, .. } => {
705                        info!(%relay_peer_id, "relay: reservation accepted");
706                    }
707                    Rc::OutboundCircuitEstablished { relay_peer_id, .. } => {
708                        debug!(%relay_peer_id, "relay: outbound circuit established");
709                    }
710                    _ => {
711                        debug!(?event, "relay client event");
712                    }
713                }
714            }
715            _ => {}
716        }
717    }
718
719    async fn handle_gossipsub_message(
720        &mut self,
721        from_peer: PeerId,
722        message: gossipsub::Message,
723    ) {
724        let topic = message.topic.to_string();
725        if topic == ROOMS_TOPIC {
726            match serde_json::from_slice::<RoomAnnouncement>(&message.data) {
727                Ok(ann) => {
728                    let _ = self
729                        .event_tx
730                        .send(NetworkEvent::RoomAnnouncementReceived(ann))
731                        .await;
732                }
733                Err(e) => {
734                    warn!(%e, "bad room announcement");
735                }
736            }
737        } else if let Some(room_id) = topic.strip_prefix(protocol::ROOM_TOPIC_PREFIX) {
738            let _ = self
739                .event_tx
740                .send(NetworkEvent::RoomMessageReceived {
741                    room_id: room_id.to_string(),
742                    payload: message.data,
743                    from_peer,
744                })
745                .await;
746        }
747    }
748
749    fn handle_command(&mut self, cmd: NetworkCommand) {
750        match cmd {
751            NetworkCommand::SubscribeRoom { room_id } => {
752                let topic = gossipsub::IdentTopic::new(room_topic(&room_id));
753                if let Err(e) = self.swarm.behaviour_mut().gossipsub.subscribe(&topic) {
754                    warn!(%e, %room_id, "subscribe room failed");
755                }
756            }
757            NetworkCommand::UnsubscribeRoom { room_id } => {
758                let topic = gossipsub::IdentTopic::new(room_topic(&room_id));
759                self.swarm.behaviour_mut().gossipsub.unsubscribe(&topic);
760            }
761            NetworkCommand::PublishRoomMessage { room_id, payload } => {
762                let topic = gossipsub::IdentTopic::new(room_topic(&room_id));
763                if let Err(e) = self.swarm.behaviour_mut().gossipsub.publish(topic, payload) {
764                    // No subscribed peers is expected before the mesh
765                    // forms; anything else (MessageTooLarge, full queues)
766                    // is a real bug worth surfacing.
767                    match e {
768                        gossipsub::PublishError::NoPeersSubscribedToTopic => {
769                            debug!(%room_id, "publish skipped: no peers subscribed to topic yet");
770                        }
771                        e => warn!(%e, %room_id, "publish room message failed"),
772                    }
773                }
774            }
775            NetworkCommand::AnnounceRoom(ann) => {
776                let topic = gossipsub::IdentTopic::new(ROOMS_TOPIC);
777                match serde_json::to_vec(&ann) {
778                    Ok(payload) => {
779                        if let Err(e) =
780                            self.swarm.behaviour_mut().gossipsub.publish(topic, payload)
781                        {
782                            debug!(%e, "publish room announcement failed");
783                        }
784                    }
785                    Err(e) => warn!(%e, "encode room announcement"),
786                }
787            }
788            NetworkCommand::AcceptInbound { peer_id } => {
789                if self.pending_inbound.remove(&peer_id).is_some() {
790                    info!(%peer_id, "inbound dial accepted — promoting to mesh");
791                    self.swarm
792                        .behaviour_mut()
793                        .gossipsub
794                        .add_explicit_peer(&peer_id);
795                    self.discovered_peers.insert(peer_id);
796                } else {
797                    debug!(%peer_id, "AcceptInbound for unknown peer (already promoted or disconnected)");
798                }
799            }
800            NetworkCommand::RejectInbound { peer_id } => {
801                self.pending_inbound.remove(&peer_id);
802                self.session_blocklist.insert(peer_id);
803                info!(%peer_id, "inbound dial rejected — disconnecting");
804                let _ = self.swarm.disconnect_peer_id(peer_id);
805            }
806            NetworkCommand::DisconnectPeer { peer_id } => {
807                info!(%peer_id, "app-level identity check failed — disconnecting");
808                let _ = self.swarm.disconnect_peer_id(peer_id);
809            }
810            NetworkCommand::Dial { address } => {
811                let opts: DialOpts = address.clone().into();
812                let conn_id = opts.connection_id();
813                match self.swarm.dial(opts) {
814                    Ok(()) => {
815                        self.dial_attempts.insert(conn_id, address);
816                    }
817                    Err(e) => {
818                        // Synchronous dial error (bad multiaddr, transport refused).
819                        let tx = self.event_tx.clone();
820                        let err = e.to_string();
821                        tokio::spawn(async move {
822                            let _ = tx
823                                .send(NetworkEvent::DialFailed {
824                                    address,
825                                    error: err,
826                                })
827                                .await;
828                        });
829                    }
830                }
831            }
832            NetworkCommand::DialAddresses { addresses } => {
833                use libp2p::multiaddr::Protocol;
834                if addresses.is_empty() {
835                    return;
836                }
837                // Extract the shared peer-id from any address with a
838                // `/p2p/<peer-id>` suffix. All host_addrs originating
839                // from the same `RoomAnnouncement` share the
840                // announcer's peer-id, so the first match is enough.
841                let peer_id = addresses
842                    .iter()
843                    .flat_map(|a| a.iter())
844                    .find_map(|p| match p {
845                        Protocol::P2p(pid) => Some(pid),
846                        _ => None,
847                    });
848                let opts = match peer_id {
849                    Some(pid) => DialOpts::peer_id(pid)
850                        .addresses(addresses.clone())
851                        .build(),
852                    // No /p2p/ segment anywhere — fall back to single-
853                    // address dial of the first candidate, matching the
854                    // legacy `Dial` semantics for unanchored multiaddrs.
855                    None => addresses[0].clone().into(),
856                };
857                let conn_id = opts.connection_id();
858                // Use the first address as the representative for
859                // dial_attempts. On synchronous error we report it;
860                // on async success the post-identify handler upserts
861                // with the actually-connected endpoint.
862                let primary = addresses[0].clone();
863                match self.swarm.dial(opts) {
864                    Ok(()) => {
865                        self.dial_attempts.insert(conn_id, primary);
866                    }
867                    Err(e) => {
868                        let tx = self.event_tx.clone();
869                        let err = e.to_string();
870                        tokio::spawn(async move {
871                            let _ = tx
872                                .send(NetworkEvent::DialFailed {
873                                    address: primary,
874                                    error: err,
875                                })
876                                .await;
877                        });
878                    }
879                }
880            }
881            NetworkCommand::Shutdown => unreachable!(),
882        }
883    }
884}