Skip to main content

huddle_core/network/
mod.rs

1pub mod behavior;
2pub mod events;
3pub mod protocol;
4
5use std::collections::{HashMap, HashSet};
6use std::time::Duration;
7
8use futures::StreamExt;
9use libp2p::core::ConnectedPoint;
10use libp2p::swarm::dial_opts::DialOpts;
11use libp2p::swarm::ConnectionId;
12use libp2p::{
13    autonat, dcutr, gossipsub, identify, mdns, noise, ping, tcp, yamux, Multiaddr, PeerId, Swarm,
14    SwarmBuilder,
15};
16use tokio::sync::mpsc;
17use tracing::{debug, info, warn};
18
19/// How the network discovers peers.
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum NetworkMode {
22    /// mDNS on: announce ourselves on the LAN and pick up announcements.
23    Mdns,
24    /// mDNS off: invisible to LAN discovery; the only way to connect is
25    /// for someone to dial our address (or for us to dial theirs).
26    Direct,
27}
28
29impl NetworkMode {
30    pub fn as_str(&self) -> &'static str {
31        match self {
32            NetworkMode::Mdns => "mdns",
33            NetworkMode::Direct => "direct",
34        }
35    }
36
37    pub fn from_str(s: &str) -> Option<Self> {
38        match s.trim().to_ascii_lowercase().as_str() {
39            "mdns" | "lan" | "open" => Some(NetworkMode::Mdns),
40            "direct" | "dial" | "private" => Some(NetworkMode::Direct),
41            _ => None,
42        }
43    }
44}
45
46use crate::identity::{compute_fingerprint, Identity};
47use crate::network::behavior::{HuddleBehavior, HuddleBehaviorEvent};
48use crate::network::events::NetworkEvent;
49use crate::network::protocol::{room_topic, RoomAnnouncement, ROOMS_TOPIC};
50
51#[derive(Debug)]
52pub enum NetworkCommand {
53    /// Subscribe to a room's per-room gossipsub topic.
54    SubscribeRoom { room_id: String },
55    /// Unsubscribe from a room's topic.
56    UnsubscribeRoom { room_id: String },
57    /// Publish a JSON-encoded `RoomMessage` to a room's topic.
58    PublishRoomMessage { room_id: String, payload: Vec<u8> },
59    /// Publish a room announcement on the global rooms topic.
60    AnnounceRoom(RoomAnnouncement),
61    /// User-initiated dial of an explicit address. Used for cross-network
62    /// reach when mDNS isn't enough.
63    Dial { address: Multiaddr },
64    /// huddle 0.5.2: dial a peer using multiple candidate addresses,
65    /// letting libp2p race them in parallel. Used by the "add by HD
66    /// ID / username" flow, which resolves a fingerprint to every
67    /// address we know for that peer (room announcement `host_addrs`
68    /// + persisted `known_peers`). libp2p's parallel dialer picks
69    /// the cheapest path that completes — LAN beats public IP beats
70    /// relay-hopped without us having to probe transports manually.
71    DialAddresses { addresses: Vec<Multiaddr> },
72    /// Phase A: user accepted an inbound dial — promote the peer to
73    /// explicit-peer status so room announcements flow.
74    AcceptInbound { peer_id: PeerId },
75    /// Phase A: user rejected an inbound dial — disconnect them and
76    /// add the peer_id to the in-memory blocklist for this session
77    /// (caller is responsible for the persistent blocked_peers row).
78    RejectInbound { peer_id: PeerId },
79    /// Phase C follow-up: drop a connection that failed an
80    /// application-level identity check (e.g. invite-fingerprint
81    /// mismatch). Differs from `RejectInbound` in that it doesn't
82    /// touch the inbound-pending map (the connection is already
83    /// past Identify when we discover the mismatch) and doesn't
84    /// persist a block — the caller may want to retry with a
85    /// corrected invite.
86    DisconnectPeer { peer_id: PeerId },
87    Shutdown,
88}
89
90#[derive(Clone)]
91pub struct NetworkHandle {
92    cmd_tx: mpsc::Sender<NetworkCommand>,
93}
94
95impl NetworkHandle {
96    pub async fn subscribe_room(&self, room_id: String) {
97        let _ = self
98            .cmd_tx
99            .send(NetworkCommand::SubscribeRoom { room_id })
100            .await;
101    }
102
103    pub async fn unsubscribe_room(&self, room_id: String) {
104        let _ = self
105            .cmd_tx
106            .send(NetworkCommand::UnsubscribeRoom { room_id })
107            .await;
108    }
109
110    pub async fn publish_room_message(&self, room_id: String, payload: Vec<u8>) {
111        let _ = self
112            .cmd_tx
113            .send(NetworkCommand::PublishRoomMessage { room_id, payload })
114            .await;
115    }
116
117    pub async fn announce_room(&self, ann: RoomAnnouncement) {
118        let _ = self.cmd_tx.send(NetworkCommand::AnnounceRoom(ann)).await;
119    }
120
121    pub async fn dial(&self, address: Multiaddr) {
122        let _ = self.cmd_tx.send(NetworkCommand::Dial { address }).await;
123    }
124
125    /// huddle 0.5.2: dial a peer with multiple candidate addresses
126    /// at once. libp2p's swarm races them; the first to complete a
127    /// handshake wins, the rest are dropped. Use when you've resolved
128    /// a peer fingerprint to several known transports — typically
129    /// LAN ip4 + public ip4 + relay circuit.
130    pub async fn dial_addresses(&self, addresses: Vec<Multiaddr>) {
131        let _ = self
132            .cmd_tx
133            .send(NetworkCommand::DialAddresses { addresses })
134            .await;
135    }
136
137    pub async fn accept_inbound(&self, peer_id: PeerId) {
138        let _ = self
139            .cmd_tx
140            .send(NetworkCommand::AcceptInbound { peer_id })
141            .await;
142    }
143
144    pub async fn reject_inbound(&self, peer_id: PeerId) {
145        let _ = self
146            .cmd_tx
147            .send(NetworkCommand::RejectInbound { peer_id })
148            .await;
149    }
150
151    pub async fn disconnect_peer(&self, peer_id: PeerId) {
152        let _ = self
153            .cmd_tx
154            .send(NetworkCommand::DisconnectPeer { peer_id })
155            .await;
156    }
157
158    pub async fn shutdown(&self) {
159        let _ = self.cmd_tx.send(NetworkCommand::Shutdown).await;
160    }
161}
162
163/// What kind of connection we're holding open until Identify gives us
164/// the remote's Ed25519 fingerprint. Quarantined inbound dials are NOT
165/// added to the gossipsub mesh until the user accepts; outbound user-
166/// dials add to the mesh on connection but still wait on Identify so
167/// the eventual `DialSucceeded` can carry the fingerprint.
168#[derive(Debug)]
169enum PendingPeer {
170    /// Inbound dial from an unknown peer — modal-pending in the app.
171    InboundUnknown { address: Multiaddr },
172}
173
174struct NetworkTask {
175    swarm: Swarm<HuddleBehavior>,
176    cmd_rx: mpsc::Receiver<NetworkCommand>,
177    event_tx: mpsc::Sender<NetworkEvent>,
178    discovered_peers: HashSet<PeerId>,
179    /// Tracks user-initiated dials so we can correlate the eventual
180    /// `ConnectionEstablished` / `OutgoingConnectionError` back to a
181    /// specific address the user asked us to dial.
182    dial_attempts: HashMap<ConnectionId, Multiaddr>,
183    /// Phase A: peers connected but not yet promoted to the gossipsub
184    /// mesh (inbound) — waiting either for `Identify` to land so we
185    /// know their fingerprint, or for the user's accept/reject decision.
186    /// On `ConnectionClosed` we drop entries here so a peer disconnecting
187    /// mid-prompt doesn't leak state.
188    pending_inbound: HashMap<PeerId, PendingPeer>,
189    /// Phase A: peers the user has explicitly rejected this session —
190    /// auto-disconnect every reconnect attempt without re-prompting.
191    /// Persistent across runs via `blocked_peers`; this in-memory copy
192    /// is loaded at startup (TODO: not yet wired; falls back to the DB
193    /// check in the app layer for now) and updated on `RejectInbound`.
194    session_blocklist: HashSet<PeerId>,
195    /// Phase D: configured relay multiaddrs. When Identify lands for a
196    /// peer whose multiaddr matches one of these, we call
197    /// `listen_on("<addr>/p2p-circuit")` to register a reservation.
198    /// Tracked as multiaddr strings (no PeerId yet — we don't know it
199    /// until Identify) plus a set of peer_ids of confirmed relays so
200    /// we only register once per relay.
201    configured_relays: Vec<Multiaddr>,
202    relay_peer_ids: HashSet<PeerId>,
203}
204
205pub fn start_network(
206    identity: &Identity,
207    event_tx: mpsc::Sender<NetworkEvent>,
208) -> crate::error::Result<NetworkHandle> {
209    start_network_with(identity, event_tx, NetworkMode::Mdns, 0, Vec::new())
210}
211
212/// Start the network task with explicit mode, TCP listen port, and any
213/// pre-configured relay multiaddrs. `listen_port = 0` requests a
214/// random port. Relays are dialed on startup; once `Identify` lands
215/// from a relay peer, we call `listen_on("<relay>/p2p-circuit")` to
216/// register a reservation so peers behind NAT can dial us through it.
217pub fn start_network_with(
218    identity: &Identity,
219    event_tx: mpsc::Sender<NetworkEvent>,
220    mode: NetworkMode,
221    listen_port: u16,
222    relays: Vec<Multiaddr>,
223) -> crate::error::Result<NetworkHandle> {
224    let keypair = identity.keypair().clone();
225    let local_peer_id = identity.peer_id();
226
227    let mut swarm = SwarmBuilder::with_existing_identity(keypair)
228        .with_tokio()
229        .with_tcp(
230            tcp::Config::default(),
231            noise::Config::new,
232            yamux::Config::default,
233        )
234        .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?
235        // Phase D: wrap the transport with relay-client. This both
236        // composes the transport (so we can dial `/p2p-circuit/`
237        // addresses) and surfaces a `relay::client::Behaviour` into
238        // the `with_behaviour` closure as the second argument.
239        .with_relay_client(noise::Config::new, yamux::Config::default)
240        .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?
241        .with_behaviour(|key, relay_client| {
242            let mdns_opt = match mode {
243                NetworkMode::Mdns => Some(
244                    mdns::tokio::Behaviour::new(mdns::Config::default(), local_peer_id)
245                        .expect("mDNS init failed"),
246                ),
247                NetworkMode::Direct => None,
248            };
249            let mdns: libp2p::swarm::behaviour::toggle::Toggle<_> = mdns_opt.into();
250
251            let identify = identify::Behaviour::new(
252                identify::Config::new("/huddle/1.0.0".into(), key.public())
253                    .with_agent_version("huddle/0.5".into()),
254            );
255
256            let ping = ping::Behaviour::default();
257
258            let gossipsub_config = gossipsub::ConfigBuilder::default()
259                .heartbeat_interval(Duration::from_secs(1))
260                .validation_mode(gossipsub::ValidationMode::Strict)
261                // Default is 64 KiB. Raise it so base64-encoded file
262                // chunks (see files::CHUNK_SIZE) keep ample headroom.
263                .max_transmit_size(256 * 1024)
264                .build()
265                .expect("valid gossipsub config");
266
267            let mut gossipsub = gossipsub::Behaviour::new(
268                gossipsub::MessageAuthenticity::Signed(key.clone()),
269                gossipsub_config,
270            )
271            .expect("valid gossipsub init");
272
273            // Every node subscribes to the global rooms topic so the lobby
274            // shows discovered rooms even before joining anything.
275            let rooms_topic = gossipsub::IdentTopic::new(ROOMS_TOPIC);
276            gossipsub
277                .subscribe(&rooms_topic)
278                .expect("subscribe rooms topic");
279
280            // AutoNAT v2: client probes external addresses by asking
281            // remote servers to dial us back; server answers other
282            // peers' probes. Both halves are needed for symmetric
283            // P2P reachability detection (v1 combined them; v2 split).
284            let autonat_client = autonat::v2::client::Behaviour::new(
285                rand::rngs::OsRng,
286                autonat::v2::client::Config::default(),
287            );
288            let autonat_server = autonat::v2::server::Behaviour::new(rand::rngs::OsRng);
289            let dcutr = dcutr::Behaviour::new(local_peer_id);
290
291            HuddleBehavior {
292                mdns,
293                identify,
294                ping,
295                gossipsub,
296                relay_client,
297                autonat_client,
298                autonat_server,
299                dcutr,
300            }
301        })
302        .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?
303        .with_swarm_config(|cfg| cfg.with_idle_connection_timeout(Duration::from_secs(120)))
304        .build();
305
306    let listen_addr: Multiaddr = format!("/ip4/0.0.0.0/tcp/{}", listen_port)
307        .parse()
308        .expect("valid listen addr");
309    swarm
310        .listen_on(listen_addr)
311        .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?;
312    // Also bind IPv6 on all interfaces so users can dial via IPv6.
313    let listen_addr6: Multiaddr = format!("/ip6/::/tcp/{}", listen_port)
314        .parse()
315        .expect("valid ipv6 listen addr");
316    if let Err(e) = swarm.listen_on(listen_addr6) {
317        debug!(%e, "ipv6 listen skipped");
318    }
319
320    let (cmd_tx, cmd_rx) = mpsc::channel(256);
321    let mut task = NetworkTask {
322        swarm,
323        cmd_rx,
324        event_tx,
325        discovered_peers: HashSet::new(),
326        dial_attempts: HashMap::new(),
327        pending_inbound: HashMap::new(),
328        session_blocklist: HashSet::new(),
329        configured_relays: relays.clone(),
330        relay_peer_ids: HashSet::new(),
331    };
332    // Phase D: dial each configured relay so Identify can complete and
333    // we can register a `/p2p-circuit` reservation. Failures here are
334    // non-fatal — the user can still chat on LAN.
335    for relay_addr in relays {
336        info!(addr = %relay_addr, "dialing configured relay");
337        let opts: libp2p::swarm::dial_opts::DialOpts = relay_addr.clone().into();
338        let conn_id = opts.connection_id();
339        match task.swarm.dial(opts) {
340            Ok(()) => {
341                task.dial_attempts.insert(conn_id, relay_addr);
342            }
343            Err(e) => warn!(%e, "dial relay failed"),
344        }
345    }
346    tokio::spawn(task.run());
347
348    Ok(NetworkHandle { cmd_tx })
349}
350
351impl NetworkTask {
352    async fn run(mut self) {
353        loop {
354            tokio::select! {
355                event = self.swarm.select_next_some() => {
356                    self.handle_swarm_event(event).await;
357                }
358                Some(cmd) = self.cmd_rx.recv() => {
359                    if matches!(cmd, NetworkCommand::Shutdown) {
360                        info!("network task shutting down");
361                        break;
362                    }
363                    self.handle_command(cmd);
364                }
365            }
366        }
367    }
368
369    async fn handle_swarm_event(
370        &mut self,
371        event: libp2p::swarm::SwarmEvent<HuddleBehaviorEvent>,
372    ) {
373        match event {
374            libp2p::swarm::SwarmEvent::NewListenAddr { address, .. } => {
375                info!(%address, "listening");
376                // Phase D: a relay-circuit address is a reachability
377                // milestone — surface it as its own event so the lobby
378                // can show "reachable via N relays" status.
379                use libp2p::multiaddr::Protocol;
380                let is_circuit = address
381                    .iter()
382                    .any(|p| matches!(p, Protocol::P2pCircuit));
383                if is_circuit {
384                    let _ = self
385                        .event_tx
386                        .send(NetworkEvent::RelayReservationEstablished {
387                            address: address.clone(),
388                        })
389                        .await;
390                }
391                let _ = self
392                    .event_tx
393                    .send(NetworkEvent::ListeningOn { address })
394                    .await;
395            }
396            libp2p::swarm::SwarmEvent::ConnectionEstablished {
397                peer_id,
398                connection_id,
399                endpoint,
400                ..
401            } => {
402                if let Some(addr) = self.dial_attempts.remove(&connection_id) {
403                    // Phase D: a connection that was for a configured
404                    // relay shouldn't pollute the lobby with a normal
405                    // DialSucceeded — relays aren't chat peers. We just
406                    // remember the peer_id and wait for Identify, then
407                    // register a /p2p-circuit reservation.
408                    let is_relay = self.configured_relays.iter().any(|r| r == &addr);
409                    if is_relay {
410                        info!(%peer_id, %addr, "connected to configured relay");
411                        self.relay_peer_ids.insert(peer_id);
412                    } else {
413                        info!(%peer_id, %addr, "user-dialed peer connected");
414                        // Treat dialed peers like mDNS-discovered: add
415                        // to gossipsub explicit peers so room
416                        // announcements flow.
417                        self.swarm
418                            .behaviour_mut()
419                            .gossipsub
420                            .add_explicit_peer(&peer_id);
421                        self.discovered_peers.insert(peer_id);
422                        let _ = self
423                            .event_tx
424                            .send(NetworkEvent::DialSucceeded {
425                                peer_id,
426                                address: addr,
427                            })
428                            .await;
429                    }
430                } else if let ConnectedPoint::Dialer { .. } = endpoint {
431                    // Outgoing connection we didn't track (e.g. mDNS auto-dial)
432                    // — still add to mesh; no user-visible event needed.
433                    self.swarm
434                        .behaviour_mut()
435                        .gossipsub
436                        .add_explicit_peer(&peer_id);
437                } else {
438                    // Inbound dial from an unknown peer (Phase A). We
439                    // hold the connection but do NOT add them to the
440                    // explicit-peer set yet — wait for Identify so we
441                    // can show the user the peer's fingerprint, then
442                    // either AcceptInbound (promote to mesh) or
443                    // RejectInbound (disconnect + persist blocklist).
444                    //
445                    // Known limitation: gossipsub's score-based mesh
446                    // formation may still forward topic messages to
447                    // this peer via other peers we have in common.
448                    // True hard-quarantine would need a custom
449                    // ConnectionHandler — out of scope for v1.
450                    if self.session_blocklist.contains(&peer_id) {
451                        info!(%peer_id, "rejecting inbound from session-blocked peer");
452                        let _ = self.swarm.disconnect_peer_id(peer_id);
453                    } else {
454                        let address = match &endpoint {
455                            ConnectedPoint::Listener { send_back_addr, .. } => {
456                                send_back_addr.clone()
457                            }
458                            _ => Multiaddr::empty(),
459                        };
460                        debug!(%peer_id, %address, "inbound peer pending decision");
461                        self.pending_inbound
462                            .insert(peer_id, PendingPeer::InboundUnknown { address });
463                    }
464                }
465            }
466            libp2p::swarm::SwarmEvent::OutgoingConnectionError {
467                connection_id,
468                error,
469                ..
470            } => {
471                if let Some(addr) = self.dial_attempts.remove(&connection_id) {
472                    warn!(%addr, %error, "user-dialed peer failed");
473                    let _ = self
474                        .event_tx
475                        .send(NetworkEvent::DialFailed {
476                            address: addr,
477                            error: error.to_string(),
478                        })
479                        .await;
480                }
481            }
482            libp2p::swarm::SwarmEvent::ConnectionClosed { peer_id, .. } => {
483                // Drop any pending-inbound entry for this peer — they
484                // disconnected before we could prompt the user (or
485                // before the user accepted). Lets a re-connect start
486                // fresh rather than reusing stale state.
487                self.pending_inbound.remove(&peer_id);
488            }
489            libp2p::swarm::SwarmEvent::Behaviour(be) => self.handle_behavior_event(be).await,
490            _ => {}
491        }
492    }
493
494    async fn handle_behavior_event(&mut self, event: HuddleBehaviorEvent) {
495        match event {
496            HuddleBehaviorEvent::Mdns(mdns::Event::Discovered(peers)) => {
497                for (peer_id, addr) in peers {
498                    if self.discovered_peers.insert(peer_id) {
499                        info!(%peer_id, %addr, "mDNS discovered");
500                        self.swarm.add_peer_address(peer_id, addr);
501                        // Explicitly add to gossipsub mesh.
502                        self.swarm
503                            .behaviour_mut()
504                            .gossipsub
505                            .add_explicit_peer(&peer_id);
506                        let _ = self
507                            .event_tx
508                            .send(NetworkEvent::PeerDiscovered { peer_id })
509                            .await;
510                    }
511                }
512            }
513            HuddleBehaviorEvent::Mdns(mdns::Event::Expired(peers)) => {
514                for (peer_id, _) in peers {
515                    if self.discovered_peers.remove(&peer_id) {
516                        info!(%peer_id, "mDNS peer expired");
517                        self.swarm
518                            .behaviour_mut()
519                            .gossipsub
520                            .remove_explicit_peer(&peer_id);
521                        let _ = self.event_tx.send(NetworkEvent::PeerExpired { peer_id }).await;
522                    }
523                }
524            }
525            HuddleBehaviorEvent::Gossipsub(gossipsub::Event::Message {
526                propagation_source,
527                message,
528                ..
529            }) => {
530                self.handle_gossipsub_message(propagation_source, message).await;
531            }
532            HuddleBehaviorEvent::Identify(identify::Event::Received {
533                peer_id, info, ..
534            }) => {
535                debug!(%peer_id, agent = %info.agent_version, "identify received");
536                // Phase D: if this peer is a configured relay, register
537                // a `/p2p-circuit` reservation on first identify. Idem-
538                // potent — only fire if we haven't listened on this
539                // relay already (identify fires periodically).
540                if self.relay_peer_ids.contains(&peer_id) {
541                    use libp2p::multiaddr::Protocol;
542                    if let Some(relay_addr) = self
543                        .configured_relays
544                        .iter()
545                        .find(|a| {
546                            // Match by /p2p/<peer-id> suffix when
547                            // present, else by the addr we dialed.
548                            a.iter().any(|p| matches!(p, Protocol::P2p(pid) if pid == peer_id))
549                                || self.dial_attempts.values().any(|d| d == *a)
550                        })
551                        .cloned()
552                    {
553                        let circuit = relay_addr.with(Protocol::P2pCircuit);
554                        match self.swarm.listen_on(circuit.clone()) {
555                            Ok(_) => info!(%circuit, "listening on relay circuit"),
556                            Err(e) => warn!(%e, %circuit, "relay listen_on failed"),
557                        }
558                    }
559                }
560                // Decode the remote's Ed25519 pubkey and derive our
561                // 24-char fingerprint from it. Non-Ed25519 keys (Secp,
562                // Rsa, Ecdsa) shouldn't appear in practice — huddle
563                // only generates Ed25519 identities — so we just log
564                // and skip if the cast fails.
565                let fingerprint = match info.public_key.clone().try_into_ed25519() {
566                    Ok(ed_pk) => {
567                        let bytes = ed_pk.to_bytes();
568                        compute_fingerprint(&bytes)
569                    }
570                    Err(_) => {
571                        warn!(%peer_id, "identify pubkey isn't Ed25519; skipping fingerprint");
572                        return;
573                    }
574                };
575                // Always notify the app layer so it can populate
576                // `known_peers.fingerprint` and detect that an
577                // outbound peer we dialed has fully identified.
578                let _ = self
579                    .event_tx
580                    .send(NetworkEvent::PeerIdentified {
581                        peer_id,
582                        fingerprint: fingerprint.clone(),
583                    })
584                    .await;
585                // If the peer is in pending_inbound, Identify completing
586                // is the cue to surface the user prompt. Keep them in
587                // pending_inbound until Accept or Reject — we don't
588                // know yet which way the user will decide.
589                if let Some(PendingPeer::InboundUnknown { address }) =
590                    self.pending_inbound.get(&peer_id)
591                {
592                    let address = address.clone();
593                    let _ = self
594                        .event_tx
595                        .send(NetworkEvent::InboundDial {
596                            peer_id,
597                            fingerprint,
598                            address,
599                        })
600                        .await;
601                }
602            }
603            HuddleBehaviorEvent::AutonatClient(ev) => {
604                // One probe per address candidate. `result.is_ok()`
605                // means a remote AutoNAT server dialed us back
606                // successfully on `tested_addr` ⇒ this address is
607                // reachable from the outside. The app layer aggregates
608                // these into the lobby reachability badge.
609                let reachable = ev.result.is_ok();
610                if reachable {
611                    info!(tested = %ev.tested_addr, server = %ev.server, "AutoNAT: reachable");
612                } else {
613                    debug!(tested = %ev.tested_addr, server = %ev.server, "AutoNAT: probe failed");
614                }
615                let _ = self
616                    .event_tx
617                    .send(NetworkEvent::NatProbeResult {
618                        tested_addr: ev.tested_addr,
619                        reachable,
620                    })
621                    .await;
622            }
623            HuddleBehaviorEvent::AutonatServer(_) => {
624                // We answered another peer's reachability probe.
625                // No app-visible action.
626            }
627            HuddleBehaviorEvent::Dcutr(ev) => {
628                let success = ev.result.is_ok();
629                if success {
630                    info!(remote = %ev.remote_peer_id, "DCUtR: direct connection established");
631                } else {
632                    debug!(remote = %ev.remote_peer_id, "DCUtR: hole-punch failed");
633                }
634                let _ = self
635                    .event_tx
636                    .send(NetworkEvent::DcutrUpgrade {
637                        remote_peer: ev.remote_peer_id,
638                        success,
639                    })
640                    .await;
641            }
642            _ => {}
643        }
644    }
645
646    async fn handle_gossipsub_message(
647        &mut self,
648        from_peer: PeerId,
649        message: gossipsub::Message,
650    ) {
651        let topic = message.topic.to_string();
652        if topic == ROOMS_TOPIC {
653            match serde_json::from_slice::<RoomAnnouncement>(&message.data) {
654                Ok(ann) => {
655                    let _ = self
656                        .event_tx
657                        .send(NetworkEvent::RoomAnnouncementReceived(ann))
658                        .await;
659                }
660                Err(e) => {
661                    warn!(%e, "bad room announcement");
662                }
663            }
664        } else if let Some(room_id) = topic.strip_prefix(protocol::ROOM_TOPIC_PREFIX) {
665            let _ = self
666                .event_tx
667                .send(NetworkEvent::RoomMessageReceived {
668                    room_id: room_id.to_string(),
669                    payload: message.data,
670                    from_peer,
671                })
672                .await;
673        }
674    }
675
676    fn handle_command(&mut self, cmd: NetworkCommand) {
677        match cmd {
678            NetworkCommand::SubscribeRoom { room_id } => {
679                let topic = gossipsub::IdentTopic::new(room_topic(&room_id));
680                if let Err(e) = self.swarm.behaviour_mut().gossipsub.subscribe(&topic) {
681                    warn!(%e, %room_id, "subscribe room failed");
682                }
683            }
684            NetworkCommand::UnsubscribeRoom { room_id } => {
685                let topic = gossipsub::IdentTopic::new(room_topic(&room_id));
686                self.swarm.behaviour_mut().gossipsub.unsubscribe(&topic);
687            }
688            NetworkCommand::PublishRoomMessage { room_id, payload } => {
689                let topic = gossipsub::IdentTopic::new(room_topic(&room_id));
690                if let Err(e) = self.swarm.behaviour_mut().gossipsub.publish(topic, payload) {
691                    // No subscribed peers is expected before the mesh
692                    // forms; anything else (MessageTooLarge, full queues)
693                    // is a real bug worth surfacing.
694                    match e {
695                        gossipsub::PublishError::NoPeersSubscribedToTopic => {
696                            debug!(%room_id, "publish skipped: no peers subscribed to topic yet");
697                        }
698                        e => warn!(%e, %room_id, "publish room message failed"),
699                    }
700                }
701            }
702            NetworkCommand::AnnounceRoom(ann) => {
703                let topic = gossipsub::IdentTopic::new(ROOMS_TOPIC);
704                match serde_json::to_vec(&ann) {
705                    Ok(payload) => {
706                        if let Err(e) =
707                            self.swarm.behaviour_mut().gossipsub.publish(topic, payload)
708                        {
709                            debug!(%e, "publish room announcement failed");
710                        }
711                    }
712                    Err(e) => warn!(%e, "encode room announcement"),
713                }
714            }
715            NetworkCommand::AcceptInbound { peer_id } => {
716                if self.pending_inbound.remove(&peer_id).is_some() {
717                    info!(%peer_id, "inbound dial accepted — promoting to mesh");
718                    self.swarm
719                        .behaviour_mut()
720                        .gossipsub
721                        .add_explicit_peer(&peer_id);
722                    self.discovered_peers.insert(peer_id);
723                } else {
724                    debug!(%peer_id, "AcceptInbound for unknown peer (already promoted or disconnected)");
725                }
726            }
727            NetworkCommand::RejectInbound { peer_id } => {
728                self.pending_inbound.remove(&peer_id);
729                self.session_blocklist.insert(peer_id);
730                info!(%peer_id, "inbound dial rejected — disconnecting");
731                let _ = self.swarm.disconnect_peer_id(peer_id);
732            }
733            NetworkCommand::DisconnectPeer { peer_id } => {
734                info!(%peer_id, "app-level identity check failed — disconnecting");
735                let _ = self.swarm.disconnect_peer_id(peer_id);
736            }
737            NetworkCommand::Dial { address } => {
738                let opts: DialOpts = address.clone().into();
739                let conn_id = opts.connection_id();
740                match self.swarm.dial(opts) {
741                    Ok(()) => {
742                        self.dial_attempts.insert(conn_id, address);
743                    }
744                    Err(e) => {
745                        // Synchronous dial error (bad multiaddr, transport refused).
746                        let tx = self.event_tx.clone();
747                        let err = e.to_string();
748                        tokio::spawn(async move {
749                            let _ = tx
750                                .send(NetworkEvent::DialFailed {
751                                    address,
752                                    error: err,
753                                })
754                                .await;
755                        });
756                    }
757                }
758            }
759            NetworkCommand::DialAddresses { addresses } => {
760                use libp2p::multiaddr::Protocol;
761                if addresses.is_empty() {
762                    return;
763                }
764                // Extract the shared peer-id from any address with a
765                // `/p2p/<peer-id>` suffix. All host_addrs originating
766                // from the same `RoomAnnouncement` share the
767                // announcer's peer-id, so the first match is enough.
768                let peer_id = addresses
769                    .iter()
770                    .flat_map(|a| a.iter())
771                    .find_map(|p| match p {
772                        Protocol::P2p(pid) => Some(pid),
773                        _ => None,
774                    });
775                let opts = match peer_id {
776                    Some(pid) => DialOpts::peer_id(pid)
777                        .addresses(addresses.clone())
778                        .build(),
779                    // No /p2p/ segment anywhere — fall back to single-
780                    // address dial of the first candidate, matching the
781                    // legacy `Dial` semantics for unanchored multiaddrs.
782                    None => addresses[0].clone().into(),
783                };
784                let conn_id = opts.connection_id();
785                // Use the first address as the representative for
786                // dial_attempts. On synchronous error we report it;
787                // on async success the post-identify handler upserts
788                // with the actually-connected endpoint.
789                let primary = addresses[0].clone();
790                match self.swarm.dial(opts) {
791                    Ok(()) => {
792                        self.dial_attempts.insert(conn_id, primary);
793                    }
794                    Err(e) => {
795                        let tx = self.event_tx.clone();
796                        let err = e.to_string();
797                        tokio::spawn(async move {
798                            let _ = tx
799                                .send(NetworkEvent::DialFailed {
800                                    address: primary,
801                                    error: err,
802                                })
803                                .await;
804                        });
805                    }
806                }
807            }
808            NetworkCommand::Shutdown => unreachable!(),
809        }
810    }
811}