Skip to main content

huddle_core/network/
mod.rs

1pub mod behavior;
2pub mod events;
3pub mod protocol;
4pub mod server;
5pub mod transport;
6
7use std::collections::{HashMap, HashSet};
8use std::sync::Arc;
9use std::time::Duration;
10
11use futures::StreamExt;
12use sha2::{Digest, Sha256};
13use libp2p::core::ConnectedPoint;
14use libp2p::swarm::dial_opts::DialOpts;
15use libp2p::swarm::ConnectionId;
16use libp2p::{
17    autonat, dcutr, gossipsub, identify, mdns, noise, ping, tcp, yamux, Multiaddr, PeerId, Swarm,
18    SwarmBuilder,
19};
20use tokio::sync::mpsc;
21use tracing::{debug, info, warn};
22
23/// How the network discovers peers.
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub enum NetworkMode {
26    /// huddle 0.8 default: no libp2p at all. The only transport is the
27    /// centralized Tor-onion relay (see `network::server`). No LAN
28    /// discovery, no direct dial, no NAT machinery — every message rides
29    /// the onion. This is what the binary starts in unless the user
30    /// explicitly opts back into libp2p with `--mode mdns|direct`.
31    Server,
32    /// libp2p with mDNS on: announce ourselves on the LAN and pick up
33    /// announcements. Opt-in (`--mode mdns`); runs *alongside* the onion
34    /// relay so LAN peers connect directly while internet peers route
35    /// through the server.
36    Mdns,
37    /// libp2p with mDNS off: invisible to LAN discovery; the only libp2p
38    /// way to connect is an explicit dial. Opt-in (`--mode direct`); also
39    /// runs alongside the onion relay.
40    Direct,
41}
42
43impl NetworkMode {
44    pub fn as_str(&self) -> &'static str {
45        match self {
46            NetworkMode::Server => "server",
47            NetworkMode::Mdns => "mdns",
48            NetworkMode::Direct => "direct",
49        }
50    }
51
52    /// True when this mode starts a libp2p swarm (LAN / direct dial). The
53    /// `Server` default does not — it's onion-relay-only.
54    pub fn uses_libp2p(&self) -> bool {
55        !matches!(self, NetworkMode::Server)
56    }
57
58    pub fn from_str(s: &str) -> Option<Self> {
59        match s.trim().to_ascii_lowercase().as_str() {
60            "server" | "tor" | "onion" => Some(NetworkMode::Server),
61            "mdns" | "lan" | "open" => Some(NetworkMode::Mdns),
62            "direct" | "dial" | "private" => Some(NetworkMode::Direct),
63            _ => None,
64        }
65    }
66}
67
68use crate::identity::{compute_fingerprint, Identity};
69use crate::network::behavior::{HuddleBehavior, HuddleBehaviorEvent};
70use crate::network::events::NetworkEvent;
71use crate::network::protocol::{room_topic, RoomAnnouncement, ROOMS_TOPIC};
72
73#[derive(Debug)]
74pub enum NetworkCommand {
75    /// Subscribe to a room's per-room gossipsub topic.
76    SubscribeRoom { room_id: String },
77    /// Unsubscribe from a room's topic.
78    UnsubscribeRoom { room_id: String },
79    /// Publish a JSON-encoded `RoomMessage` to a room's topic.
80    PublishRoomMessage { room_id: String, payload: Vec<u8> },
81    /// Publish a room announcement on the global rooms topic.
82    AnnounceRoom(RoomAnnouncement),
83    /// User-initiated dial of an explicit address. Used for cross-network
84    /// reach when mDNS isn't enough.
85    Dial { address: Multiaddr },
86    /// huddle 0.5.2: dial a peer using multiple candidate addresses,
87    /// letting libp2p race them in parallel. Used by the "add by HD
88    /// ID / username" flow, which resolves a fingerprint to every
89    /// address we know for that peer (room announcement `host_addrs`
90    /// + persisted `known_peers`). libp2p's parallel dialer picks
91    /// the cheapest path that completes — LAN beats public IP beats
92    /// relay-hopped without us having to probe transports manually.
93    DialAddresses { addresses: Vec<Multiaddr> },
94    /// Phase A: user accepted an inbound dial — promote the peer to
95    /// explicit-peer status so room announcements flow.
96    AcceptInbound { peer_id: PeerId },
97    /// Phase A: user rejected an inbound dial — disconnect them and
98    /// add the peer_id to the in-memory blocklist for this session
99    /// (caller is responsible for the persistent blocked_peers row).
100    RejectInbound { peer_id: PeerId },
101    /// Phase C follow-up: drop a connection that failed an
102    /// application-level identity check (e.g. invite-fingerprint
103    /// mismatch). Differs from `RejectInbound` in that it doesn't
104    /// touch the inbound-pending map (the connection is already
105    /// past Identify when we discover the mismatch) and doesn't
106    /// persist a block — the caller may want to retry with a
107    /// corrected invite.
108    DisconnectPeer { peer_id: PeerId },
109    Shutdown,
110}
111
112#[derive(Clone)]
113pub struct NetworkHandle {
114    cmd_tx: mpsc::Sender<NetworkCommand>,
115    /// huddle 0.8: optional connection to the centralized server (a Tor
116    /// onion relay). When attached, every room subscribe/unsubscribe and
117    /// every published wire message is mirrored to it, so peers we can't
118    /// reach over libp2p still receive our traffic (and we theirs, via the
119    /// server's offline mailbox). `None` until `attach_server` runs.
120    server: Arc<std::sync::Mutex<Option<server::ServerClient>>>,
121}
122
123/// Stable per-message id for the server mailbox/receipts: a short hash of
124/// the wire bytes. Opaque to the server; lets it dedup/queue by id.
125fn server_msg_id(payload: &[u8]) -> String {
126    let mut h = Sha256::new();
127    h.update(payload);
128    hex::encode(&h.finalize()[..8])
129}
130
131impl NetworkHandle {
132    /// Attach a live server connection so subsequent room traffic mirrors
133    /// to it. Replaces any previous one. Called by the app's server
134    /// connection task after each (re)connect.
135    pub fn attach_server(&self, client: server::ServerClient) {
136        *self.server.lock().unwrap() = Some(client);
137    }
138
139    /// Drop the server connection (e.g. after a disconnect) so we stop
140    /// mirroring until it reconnects.
141    pub fn detach_server(&self) {
142        *self.server.lock().unwrap() = None;
143    }
144
145    /// Snapshot the attached server client, if any. Cloned out so we never
146    /// hold the lock across the (sync, non-blocking) mirror call.
147    fn server_client(&self) -> Option<server::ServerClient> {
148        self.server.lock().unwrap().clone()
149    }
150
151    pub async fn subscribe_room(&self, room_id: String) {
152        if let Some(s) = self.server_client() {
153            let _ = s.subscribe(&room_id);
154        }
155        let _ = self
156            .cmd_tx
157            .send(NetworkCommand::SubscribeRoom { room_id })
158            .await;
159    }
160
161    pub async fn unsubscribe_room(&self, room_id: String) {
162        if let Some(s) = self.server_client() {
163            let _ = s.unsubscribe(&room_id);
164        }
165        let _ = self
166            .cmd_tx
167            .send(NetworkCommand::UnsubscribeRoom { room_id })
168            .await;
169    }
170
171    pub async fn publish_room_message(&self, room_id: String, payload: Vec<u8>) {
172        if let Some(s) = self.server_client() {
173            let _ = s.publish(&room_id, &server_msg_id(&payload), &payload);
174        }
175        let _ = self
176            .cmd_tx
177            .send(NetworkCommand::PublishRoomMessage { room_id, payload })
178            .await;
179    }
180
181    pub async fn announce_room(&self, ann: RoomAnnouncement) {
182        let _ = self.cmd_tx.send(NetworkCommand::AnnounceRoom(ann)).await;
183    }
184
185    /// True when a server connection is currently attached.
186    pub fn has_server(&self) -> bool {
187        self.server.lock().unwrap().is_some()
188    }
189
190    pub async fn dial(&self, address: Multiaddr) {
191        let _ = self.cmd_tx.send(NetworkCommand::Dial { address }).await;
192    }
193
194    /// huddle 0.5.2: dial a peer with multiple candidate addresses
195    /// at once. libp2p's swarm races them; the first to complete a
196    /// handshake wins, the rest are dropped. Use when you've resolved
197    /// a peer fingerprint to several known transports — typically
198    /// LAN ip4 + public ip4 + relay circuit.
199    pub async fn dial_addresses(&self, addresses: Vec<Multiaddr>) {
200        let _ = self
201            .cmd_tx
202            .send(NetworkCommand::DialAddresses { addresses })
203            .await;
204    }
205
206    pub async fn accept_inbound(&self, peer_id: PeerId) {
207        let _ = self
208            .cmd_tx
209            .send(NetworkCommand::AcceptInbound { peer_id })
210            .await;
211    }
212
213    pub async fn reject_inbound(&self, peer_id: PeerId) {
214        let _ = self
215            .cmd_tx
216            .send(NetworkCommand::RejectInbound { peer_id })
217            .await;
218    }
219
220    pub async fn disconnect_peer(&self, peer_id: PeerId) {
221        let _ = self
222            .cmd_tx
223            .send(NetworkCommand::DisconnectPeer { peer_id })
224            .await;
225    }
226
227    pub async fn shutdown(&self) {
228        let _ = self.cmd_tx.send(NetworkCommand::Shutdown).await;
229    }
230}
231
232/// What kind of connection we're holding open until Identify gives us
233/// the remote's Ed25519 fingerprint. Quarantined inbound dials are NOT
234/// added to the gossipsub mesh until the user accepts; outbound user-
235/// dials add to the mesh on connection but still wait on Identify so
236/// the eventual `DialSucceeded` can carry the fingerprint.
237#[derive(Debug)]
238enum PendingPeer {
239    /// Inbound dial from an unknown peer — modal-pending in the app.
240    InboundUnknown { address: Multiaddr },
241}
242
243struct NetworkTask {
244    swarm: Swarm<HuddleBehavior>,
245    cmd_rx: mpsc::Receiver<NetworkCommand>,
246    event_tx: mpsc::Sender<NetworkEvent>,
247    discovered_peers: HashSet<PeerId>,
248    /// Tracks user-initiated dials so we can correlate the eventual
249    /// `ConnectionEstablished` / `OutgoingConnectionError` back to a
250    /// specific address the user asked us to dial.
251    dial_attempts: HashMap<ConnectionId, Multiaddr>,
252    /// Phase A: peers connected but not yet promoted to the gossipsub
253    /// mesh (inbound) — waiting either for `Identify` to land so we
254    /// know their fingerprint, or for the user's accept/reject decision.
255    /// On `ConnectionClosed` we drop entries here so a peer disconnecting
256    /// mid-prompt doesn't leak state.
257    pending_inbound: HashMap<PeerId, PendingPeer>,
258    /// Phase A: peers the user has explicitly rejected this session —
259    /// auto-disconnect every reconnect attempt without re-prompting.
260    /// Persistent across runs via `blocked_peers`; this in-memory copy
261    /// is loaded at startup (TODO: not yet wired; falls back to the DB
262    /// check in the app layer for now) and updated on `RejectInbound`.
263    session_blocklist: HashSet<PeerId>,
264    /// Phase D: configured relay multiaddrs. When Identify lands for a
265    /// peer whose multiaddr matches one of these, we call
266    /// `listen_on("<addr>/p2p-circuit")` to register a reservation.
267    /// Tracked as multiaddr strings (no PeerId yet — we don't know it
268    /// until Identify) plus a set of peer_ids of confirmed relays so
269    /// we only register once per relay.
270    configured_relays: Vec<Multiaddr>,
271    relay_peer_ids: HashSet<PeerId>,
272    /// huddle 0.7.11: per-peer DCUtR failure counter. Logged at warn
273    /// once the count crosses `DCUTR_FAIL_BUDGET` so symmetric-NAT
274    /// pairs don't generate runaway hole-punch attempts in the logs.
275    /// libp2p's dcutr behavior schedules its own retries internally;
276    /// this is purely an observability signal for the app.
277    dcutr_failures: HashMap<PeerId, u32>,
278}
279
280/// huddle 0.7.11: warn after this many failed DCUtR attempts to the
281/// same peer. dcutr keeps trying internally, but the audit flagged the
282/// spam as a real issue — capping the log noise is the realistic fix.
283const DCUTR_FAIL_BUDGET: u32 = 6;
284
285pub fn start_network(
286    identity: &Identity,
287    event_tx: mpsc::Sender<NetworkEvent>,
288) -> crate::error::Result<NetworkHandle> {
289    start_network_with(identity, event_tx, NetworkMode::Mdns, 0, Vec::new())
290}
291
292/// huddle 0.8: build a `NetworkHandle` with **no libp2p swarm** — the
293/// onion-relay-only (`NetworkMode::Server`) path. We still hand back a
294/// fully-functional handle so the rest of the app is oblivious: room
295/// subscribe/unsubscribe/publish mirror to the attached `ServerClient`
296/// exactly as before, while the libp2p-specific commands (dial, announce,
297/// accept/reject, …) are simply drained and dropped. A tiny task consumes
298/// `cmd_rx` so the bounded channel never fills and `send().await` never
299/// blocks; it exits on `Shutdown`.
300pub fn start_network_disabled() -> NetworkHandle {
301    let (cmd_tx, mut cmd_rx) = mpsc::channel::<NetworkCommand>(256);
302    tokio::spawn(async move {
303        while let Some(cmd) = cmd_rx.recv().await {
304            if matches!(cmd, NetworkCommand::Shutdown) {
305                info!("network task (libp2p-disabled) shutting down");
306                break;
307            }
308            // Everything else is a libp2p op with no swarm to run it; the
309            // server mirror (in NetworkHandle) already handled the room
310            // traffic before the command reached us. Drop it.
311        }
312    });
313    NetworkHandle {
314        cmd_tx,
315        server: Arc::new(std::sync::Mutex::new(None)),
316    }
317}
318
319/// Start the network task with explicit mode, TCP listen port, and any
320/// pre-configured relay multiaddrs. `listen_port = 0` requests a
321/// random port. Relays are dialed on startup; once `Identify` lands
322/// from a relay peer, we call `listen_on("<relay>/p2p-circuit")` to
323/// register a reservation so peers behind NAT can dial us through it.
324pub fn start_network_with(
325    identity: &Identity,
326    event_tx: mpsc::Sender<NetworkEvent>,
327    mode: NetworkMode,
328    listen_port: u16,
329    relays: Vec<Multiaddr>,
330) -> crate::error::Result<NetworkHandle> {
331    let keypair = identity.keypair().clone();
332    let local_peer_id = identity.peer_id();
333
334    let mut swarm = SwarmBuilder::with_existing_identity(keypair)
335        .with_tokio()
336        .with_tcp(
337            tcp::Config::default(),
338            noise::Config::new,
339            yamux::Config::default,
340        )
341        .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?
342        // Phase D: wrap the transport with relay-client. This both
343        // composes the transport (so we can dial `/p2p-circuit/`
344        // addresses) and surfaces a `relay::client::Behaviour` into
345        // the `with_behaviour` closure as the second argument.
346        .with_relay_client(noise::Config::new, yamux::Config::default)
347        .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?
348        .with_behaviour(|key, relay_client| {
349            let mdns_opt = match mode {
350                NetworkMode::Mdns => Some(
351                    mdns::tokio::Behaviour::new(mdns::Config::default(), local_peer_id)
352                        .expect("mDNS init failed"),
353                ),
354                // `Direct` runs libp2p without mDNS. `Server` never reaches
355                // here (it takes the `start_network_disabled` path and runs
356                // no swarm), but the match must be exhaustive.
357                NetworkMode::Direct | NetworkMode::Server => None,
358            };
359            let mdns: libp2p::swarm::behaviour::toggle::Toggle<_> = mdns_opt.into();
360
361            let identify = identify::Behaviour::new(
362                identify::Config::new("/huddle/1.0.0".into(), key.public())
363                    .with_agent_version("huddle/0.5".into()),
364            );
365
366            let ping = ping::Behaviour::default();
367
368            let gossipsub_config = gossipsub::ConfigBuilder::default()
369                .heartbeat_interval(Duration::from_secs(1))
370                .validation_mode(gossipsub::ValidationMode::Strict)
371                // Default is 64 KiB. Raise it so base64-encoded file
372                // chunks (see files::CHUNK_SIZE) keep ample headroom.
373                .max_transmit_size(256 * 1024)
374                .build()
375                .expect("valid gossipsub config");
376
377            let mut gossipsub = gossipsub::Behaviour::new(
378                gossipsub::MessageAuthenticity::Signed(key.clone()),
379                gossipsub_config,
380            )
381            .expect("valid gossipsub init");
382
383            // Every node subscribes to the global rooms topic so the lobby
384            // shows discovered rooms even before joining anything.
385            let rooms_topic = gossipsub::IdentTopic::new(ROOMS_TOPIC);
386            gossipsub
387                .subscribe(&rooms_topic)
388                .expect("subscribe rooms topic");
389
390            // AutoNAT v2: client probes external addresses by asking
391            // remote servers to dial us back; server answers other
392            // peers' probes. Both halves are needed for symmetric
393            // P2P reachability detection (v1 combined them; v2 split).
394            let autonat_client = autonat::v2::client::Behaviour::new(
395                rand::rngs::OsRng,
396                autonat::v2::client::Config::default(),
397            );
398            let autonat_server = autonat::v2::server::Behaviour::new(rand::rngs::OsRng);
399            let dcutr = dcutr::Behaviour::new(local_peer_id);
400
401            HuddleBehavior {
402                mdns,
403                identify,
404                ping,
405                gossipsub,
406                relay_client,
407                autonat_client,
408                autonat_server,
409                dcutr,
410            }
411        })
412        .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?
413        .with_swarm_config(|cfg| cfg.with_idle_connection_timeout(Duration::from_secs(120)))
414        .build();
415
416    let listen_addr: Multiaddr = format!("/ip4/0.0.0.0/tcp/{}", listen_port)
417        .parse()
418        .expect("valid listen addr");
419    swarm
420        .listen_on(listen_addr)
421        .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?;
422    // Also bind IPv6 on all interfaces so users can dial via IPv6.
423    let listen_addr6: Multiaddr = format!("/ip6/::/tcp/{}", listen_port)
424        .parse()
425        .expect("valid ipv6 listen addr");
426    if let Err(e) = swarm.listen_on(listen_addr6) {
427        debug!(%e, "ipv6 listen skipped");
428    }
429
430    let (cmd_tx, cmd_rx) = mpsc::channel(256);
431    let mut task = NetworkTask {
432        swarm,
433        cmd_rx,
434        event_tx,
435        discovered_peers: HashSet::new(),
436        dial_attempts: HashMap::new(),
437        pending_inbound: HashMap::new(),
438        session_blocklist: HashSet::new(),
439        configured_relays: relays.clone(),
440        relay_peer_ids: HashSet::new(),
441        dcutr_failures: HashMap::new(),
442    };
443    // Phase D: dial each configured relay so Identify can complete and
444    // we can register a `/p2p-circuit` reservation. Failures here are
445    // non-fatal — the user can still chat on LAN.
446    for relay_addr in relays {
447        info!(addr = %relay_addr, "dialing configured relay");
448        let opts: libp2p::swarm::dial_opts::DialOpts = relay_addr.clone().into();
449        let conn_id = opts.connection_id();
450        match task.swarm.dial(opts) {
451            Ok(()) => {
452                task.dial_attempts.insert(conn_id, relay_addr);
453            }
454            Err(e) => warn!(%e, "dial relay failed"),
455        }
456    }
457    tokio::spawn(task.run());
458
459    Ok(NetworkHandle {
460        cmd_tx,
461        server: Arc::new(std::sync::Mutex::new(None)),
462    })
463}
464
465impl NetworkTask {
466    async fn run(mut self) {
467        loop {
468            tokio::select! {
469                event = self.swarm.select_next_some() => {
470                    self.handle_swarm_event(event).await;
471                }
472                Some(cmd) = self.cmd_rx.recv() => {
473                    if matches!(cmd, NetworkCommand::Shutdown) {
474                        info!("network task shutting down");
475                        break;
476                    }
477                    self.handle_command(cmd);
478                }
479            }
480        }
481    }
482
483    async fn handle_swarm_event(
484        &mut self,
485        event: libp2p::swarm::SwarmEvent<HuddleBehaviorEvent>,
486    ) {
487        match event {
488            libp2p::swarm::SwarmEvent::NewListenAddr { address, .. } => {
489                info!(%address, "listening");
490                // Phase D: a relay-circuit address is a reachability
491                // milestone — surface it as its own event so the lobby
492                // can show "reachable via N relays" status.
493                use libp2p::multiaddr::Protocol;
494                let is_circuit = address
495                    .iter()
496                    .any(|p| matches!(p, Protocol::P2pCircuit));
497                if is_circuit {
498                    let _ = self
499                        .event_tx
500                        .send(NetworkEvent::RelayReservationEstablished {
501                            address: address.clone(),
502                        })
503                        .await;
504                }
505                let _ = self
506                    .event_tx
507                    .send(NetworkEvent::ListeningOn { address })
508                    .await;
509            }
510            libp2p::swarm::SwarmEvent::ConnectionEstablished {
511                peer_id,
512                connection_id,
513                endpoint,
514                ..
515            } => {
516                if let Some(addr) = self.dial_attempts.remove(&connection_id) {
517                    // Phase D: a connection that was for a configured
518                    // relay shouldn't pollute the lobby with a normal
519                    // DialSucceeded — relays aren't chat peers. We just
520                    // remember the peer_id and wait for Identify, then
521                    // register a /p2p-circuit reservation.
522                    let is_relay = self.configured_relays.iter().any(|r| r == &addr);
523                    if is_relay {
524                        info!(%peer_id, %addr, "connected to configured relay");
525                        self.relay_peer_ids.insert(peer_id);
526                    } else {
527                        info!(%peer_id, %addr, "user-dialed peer connected");
528                        // Treat dialed peers like mDNS-discovered: add
529                        // to gossipsub explicit peers so room
530                        // announcements flow.
531                        self.swarm
532                            .behaviour_mut()
533                            .gossipsub
534                            .add_explicit_peer(&peer_id);
535                        self.discovered_peers.insert(peer_id);
536                        let _ = self
537                            .event_tx
538                            .send(NetworkEvent::DialSucceeded {
539                                peer_id,
540                                address: addr,
541                            })
542                            .await;
543                    }
544                } else if let ConnectedPoint::Dialer { .. } = endpoint {
545                    // Outgoing connection we didn't track (e.g. mDNS auto-dial)
546                    // — still add to mesh; no user-visible event needed.
547                    self.swarm
548                        .behaviour_mut()
549                        .gossipsub
550                        .add_explicit_peer(&peer_id);
551                } else {
552                    // Inbound dial from an unknown peer (Phase A). We
553                    // hold the connection but do NOT add them to the
554                    // explicit-peer set yet — wait for Identify so we
555                    // can show the user the peer's fingerprint, then
556                    // either AcceptInbound (promote to mesh) or
557                    // RejectInbound (disconnect + persist blocklist).
558                    //
559                    // Known limitation: gossipsub's score-based mesh
560                    // formation may still forward topic messages to
561                    // this peer via other peers we have in common.
562                    // True hard-quarantine would need a custom
563                    // ConnectionHandler — out of scope for v1.
564                    if self.session_blocklist.contains(&peer_id) {
565                        info!(%peer_id, "rejecting inbound from session-blocked peer");
566                        let _ = self.swarm.disconnect_peer_id(peer_id);
567                    } else {
568                        let address = match &endpoint {
569                            ConnectedPoint::Listener { send_back_addr, .. } => {
570                                send_back_addr.clone()
571                            }
572                            _ => Multiaddr::empty(),
573                        };
574                        debug!(%peer_id, %address, "inbound peer pending decision");
575                        self.pending_inbound
576                            .insert(peer_id, PendingPeer::InboundUnknown { address });
577                    }
578                }
579            }
580            libp2p::swarm::SwarmEvent::OutgoingConnectionError {
581                connection_id,
582                error,
583                ..
584            } => {
585                if let Some(addr) = self.dial_attempts.remove(&connection_id) {
586                    warn!(%addr, %error, "user-dialed peer failed");
587                    let _ = self
588                        .event_tx
589                        .send(NetworkEvent::DialFailed {
590                            address: addr,
591                            error: error.to_string(),
592                        })
593                        .await;
594                }
595            }
596            libp2p::swarm::SwarmEvent::ConnectionClosed {
597                peer_id,
598                num_established,
599                ..
600            } => {
601                // Drop any pending-inbound entry for this peer — they
602                // disconnected before we could prompt the user (or
603                // before the user accepted). Lets a re-connect start
604                // fresh rather than reusing stale state.
605                self.pending_inbound.remove(&peer_id);
606                // huddle 0.7.11: emit PeerDisconnected so the app can
607                // clean its `connected_dial_addrs` map. Only fire when
608                // the LAST connection to that peer closed — multiple
609                // simultaneous connections (e.g. direct + relay) would
610                // otherwise emit spurious disconnects when one of the
611                // two drops.
612                if num_established == 0 {
613                    let _ = self
614                        .event_tx
615                        .send(NetworkEvent::PeerDisconnected { peer_id })
616                        .await;
617                    // Also clean up gossipsub's explicit-peers set so
618                    // it doesn't accumulate dead PeerIds across the
619                    // session.
620                    self.swarm
621                        .behaviour_mut()
622                        .gossipsub
623                        .remove_explicit_peer(&peer_id);
624                }
625            }
626            libp2p::swarm::SwarmEvent::Behaviour(be) => self.handle_behavior_event(be).await,
627            _ => {}
628        }
629    }
630
631    async fn handle_behavior_event(&mut self, event: HuddleBehaviorEvent) {
632        match event {
633            HuddleBehaviorEvent::Mdns(mdns::Event::Discovered(peers)) => {
634                for (peer_id, addr) in peers {
635                    if self.discovered_peers.insert(peer_id) {
636                        info!(%peer_id, %addr, "mDNS discovered");
637                        self.swarm.add_peer_address(peer_id, addr);
638                        // Explicitly add to gossipsub mesh.
639                        self.swarm
640                            .behaviour_mut()
641                            .gossipsub
642                            .add_explicit_peer(&peer_id);
643                        let _ = self
644                            .event_tx
645                            .send(NetworkEvent::PeerDiscovered { peer_id })
646                            .await;
647                    }
648                }
649            }
650            HuddleBehaviorEvent::Mdns(mdns::Event::Expired(peers)) => {
651                for (peer_id, _) in peers {
652                    if self.discovered_peers.remove(&peer_id) {
653                        info!(%peer_id, "mDNS peer expired");
654                        self.swarm
655                            .behaviour_mut()
656                            .gossipsub
657                            .remove_explicit_peer(&peer_id);
658                        let _ = self.event_tx.send(NetworkEvent::PeerExpired { peer_id }).await;
659                    }
660                }
661            }
662            HuddleBehaviorEvent::Gossipsub(gossipsub::Event::Message {
663                propagation_source,
664                message,
665                ..
666            }) => {
667                self.handle_gossipsub_message(propagation_source, message).await;
668            }
669            HuddleBehaviorEvent::Identify(identify::Event::Received {
670                peer_id, info, ..
671            }) => {
672                debug!(%peer_id, agent = %info.agent_version, "identify received");
673                // Phase D: if this peer is a configured relay, register
674                // a `/p2p-circuit` reservation on first identify. Idem-
675                // potent — only fire if we haven't listened on this
676                // relay already (identify fires periodically).
677                if self.relay_peer_ids.contains(&peer_id) {
678                    use libp2p::multiaddr::Protocol;
679                    if let Some(relay_addr) = self
680                        .configured_relays
681                        .iter()
682                        .find(|a| {
683                            // Match by /p2p/<peer-id> suffix when
684                            // present, else by the addr we dialed.
685                            a.iter().any(|p| matches!(p, Protocol::P2p(pid) if pid == peer_id))
686                                || self.dial_attempts.values().any(|d| d == *a)
687                        })
688                        .cloned()
689                    {
690                        let circuit = relay_addr.with(Protocol::P2pCircuit);
691                        match self.swarm.listen_on(circuit.clone()) {
692                            Ok(_) => info!(%circuit, "listening on relay circuit"),
693                            Err(e) => warn!(%e, %circuit, "relay listen_on failed"),
694                        }
695                    }
696                }
697                // Decode the remote's Ed25519 pubkey and derive our
698                // 24-char fingerprint from it. Non-Ed25519 keys (Secp,
699                // Rsa, Ecdsa) shouldn't appear in practice — huddle
700                // only generates Ed25519 identities — so we just log
701                // and skip if the cast fails.
702                let fingerprint = match info.public_key.clone().try_into_ed25519() {
703                    Ok(ed_pk) => {
704                        let bytes = ed_pk.to_bytes();
705                        compute_fingerprint(&bytes)
706                    }
707                    Err(_) => {
708                        warn!(%peer_id, "identify pubkey isn't Ed25519; skipping fingerprint");
709                        return;
710                    }
711                };
712                // Always notify the app layer so it can populate
713                // `known_peers.fingerprint` and detect that an
714                // outbound peer we dialed has fully identified.
715                let _ = self
716                    .event_tx
717                    .send(NetworkEvent::PeerIdentified {
718                        peer_id,
719                        fingerprint: fingerprint.clone(),
720                    })
721                    .await;
722                // If the peer is in pending_inbound, Identify completing
723                // is the cue to surface the user prompt. Keep them in
724                // pending_inbound until Accept or Reject — we don't
725                // know yet which way the user will decide.
726                if let Some(PendingPeer::InboundUnknown { address }) =
727                    self.pending_inbound.get(&peer_id)
728                {
729                    let address = address.clone();
730                    let _ = self
731                        .event_tx
732                        .send(NetworkEvent::InboundDial {
733                            peer_id,
734                            fingerprint,
735                            address,
736                        })
737                        .await;
738                }
739            }
740            HuddleBehaviorEvent::AutonatClient(ev) => {
741                // One probe per address candidate. `result.is_ok()`
742                // means a remote AutoNAT server dialed us back
743                // successfully on `tested_addr` ⇒ this address is
744                // reachable from the outside. The app layer aggregates
745                // these into the lobby reachability badge.
746                let reachable = ev.result.is_ok();
747                if reachable {
748                    info!(tested = %ev.tested_addr, server = %ev.server, "AutoNAT: reachable");
749                } else {
750                    debug!(tested = %ev.tested_addr, server = %ev.server, "AutoNAT: probe failed");
751                }
752                let _ = self
753                    .event_tx
754                    .send(NetworkEvent::NatProbeResult {
755                        tested_addr: ev.tested_addr,
756                        reachable,
757                    })
758                    .await;
759            }
760            HuddleBehaviorEvent::AutonatServer(_) => {
761                // We answered another peer's reachability probe.
762                // No app-visible action.
763            }
764            HuddleBehaviorEvent::Dcutr(ev) => {
765                let success = ev.result.is_ok();
766                if success {
767                    info!(remote = %ev.remote_peer_id, "DCUtR: direct connection established");
768                    // huddle 0.7.11: a successful hole-punch resets the
769                    // per-peer DCUtR retry counter so future failures
770                    // don't immediately bail out.
771                    self.dcutr_failures.remove(&ev.remote_peer_id);
772                } else {
773                    debug!(remote = %ev.remote_peer_id, "DCUtR: hole-punch failed");
774                    let count = self
775                        .dcutr_failures
776                        .entry(ev.remote_peer_id)
777                        .and_modify(|n| *n += 1)
778                        .or_insert(1);
779                    if *count >= DCUTR_FAIL_BUDGET {
780                        warn!(
781                            remote = %ev.remote_peer_id,
782                            attempts = *count,
783                            "DCUtR: giving up after repeated failures (symmetric NAT likely)"
784                        );
785                    }
786                }
787                let _ = self
788                    .event_tx
789                    .send(NetworkEvent::DcutrUpgrade {
790                        remote_peer: ev.remote_peer_id,
791                        success,
792                    })
793                    .await;
794            }
795            HuddleBehaviorEvent::RelayClient(event) => {
796                // huddle 0.7.12: libp2p 0.56's `relay::client::Event`
797                // only exposes the success-path variants
798                // (`ReservationReqAccepted`, `OutboundCircuitEstablished`,
799                // `InboundCircuitEstablished`). There's no
800                // `ReservationReqFailed` arm we can match on, so we
801                // can't reliably surface reservation loss without a
802                // separate health-check timer (future work). For now
803                // we log every event so operators can see the
804                // lifecycle in `huddle.log` — pre-0.7.11 the whole
805                // arm was swallowed by `_ => {}` which hid even that.
806                use libp2p::relay::client::Event as Rc;
807                match event {
808                    Rc::ReservationReqAccepted { relay_peer_id, .. } => {
809                        info!(%relay_peer_id, "relay: reservation accepted");
810                    }
811                    Rc::OutboundCircuitEstablished { relay_peer_id, .. } => {
812                        debug!(%relay_peer_id, "relay: outbound circuit established");
813                    }
814                    other => {
815                        debug!(?other, "relay client event");
816                    }
817                }
818            }
819            _ => {}
820        }
821    }
822
823    async fn handle_gossipsub_message(
824        &mut self,
825        from_peer: PeerId,
826        message: gossipsub::Message,
827    ) {
828        let topic = message.topic.to_string();
829        if topic == ROOMS_TOPIC {
830            match serde_json::from_slice::<RoomAnnouncement>(&message.data) {
831                Ok(ann) => {
832                    let _ = self
833                        .event_tx
834                        .send(NetworkEvent::RoomAnnouncementReceived(ann))
835                        .await;
836                }
837                Err(e) => {
838                    warn!(%e, "bad room announcement");
839                }
840            }
841        } else if let Some(room_id) = topic.strip_prefix(protocol::ROOM_TOPIC_PREFIX) {
842            let _ = self
843                .event_tx
844                .send(NetworkEvent::RoomMessageReceived {
845                    room_id: room_id.to_string(),
846                    payload: message.data,
847                    from_peer,
848                })
849                .await;
850        }
851    }
852
853    fn handle_command(&mut self, cmd: NetworkCommand) {
854        match cmd {
855            NetworkCommand::SubscribeRoom { room_id } => {
856                let topic = gossipsub::IdentTopic::new(room_topic(&room_id));
857                if let Err(e) = self.swarm.behaviour_mut().gossipsub.subscribe(&topic) {
858                    warn!(%e, %room_id, "subscribe room failed");
859                }
860            }
861            NetworkCommand::UnsubscribeRoom { room_id } => {
862                let topic = gossipsub::IdentTopic::new(room_topic(&room_id));
863                self.swarm.behaviour_mut().gossipsub.unsubscribe(&topic);
864            }
865            NetworkCommand::PublishRoomMessage { room_id, payload } => {
866                let topic = gossipsub::IdentTopic::new(room_topic(&room_id));
867                if let Err(e) = self.swarm.behaviour_mut().gossipsub.publish(topic, payload) {
868                    // No subscribed peers is expected before the mesh
869                    // forms; anything else (MessageTooLarge, full queues)
870                    // is a real bug worth surfacing.
871                    match e {
872                        gossipsub::PublishError::NoPeersSubscribedToTopic => {
873                            debug!(%room_id, "publish skipped: no peers subscribed to topic yet");
874                        }
875                        e => warn!(%e, %room_id, "publish room message failed"),
876                    }
877                }
878            }
879            NetworkCommand::AnnounceRoom(ann) => {
880                let topic = gossipsub::IdentTopic::new(ROOMS_TOPIC);
881                match serde_json::to_vec(&ann) {
882                    Ok(payload) => {
883                        if let Err(e) =
884                            self.swarm.behaviour_mut().gossipsub.publish(topic, payload)
885                        {
886                            debug!(%e, "publish room announcement failed");
887                        }
888                    }
889                    Err(e) => warn!(%e, "encode room announcement"),
890                }
891            }
892            NetworkCommand::AcceptInbound { peer_id } => {
893                if self.pending_inbound.remove(&peer_id).is_some() {
894                    info!(%peer_id, "inbound dial accepted — promoting to mesh");
895                    self.swarm
896                        .behaviour_mut()
897                        .gossipsub
898                        .add_explicit_peer(&peer_id);
899                    self.discovered_peers.insert(peer_id);
900                } else {
901                    debug!(%peer_id, "AcceptInbound for unknown peer (already promoted or disconnected)");
902                }
903            }
904            NetworkCommand::RejectInbound { peer_id } => {
905                self.pending_inbound.remove(&peer_id);
906                self.session_blocklist.insert(peer_id);
907                info!(%peer_id, "inbound dial rejected — disconnecting");
908                let _ = self.swarm.disconnect_peer_id(peer_id);
909            }
910            NetworkCommand::DisconnectPeer { peer_id } => {
911                info!(%peer_id, "app-level identity check failed — disconnecting");
912                let _ = self.swarm.disconnect_peer_id(peer_id);
913            }
914            NetworkCommand::Dial { address } => {
915                let opts: DialOpts = address.clone().into();
916                let conn_id = opts.connection_id();
917                match self.swarm.dial(opts) {
918                    Ok(()) => {
919                        self.dial_attempts.insert(conn_id, address);
920                    }
921                    Err(e) => {
922                        // Synchronous dial error (bad multiaddr, transport refused).
923                        let tx = self.event_tx.clone();
924                        let err = e.to_string();
925                        tokio::spawn(async move {
926                            let _ = tx
927                                .send(NetworkEvent::DialFailed {
928                                    address,
929                                    error: err,
930                                })
931                                .await;
932                        });
933                    }
934                }
935            }
936            NetworkCommand::DialAddresses { addresses } => {
937                use libp2p::multiaddr::Protocol;
938                if addresses.is_empty() {
939                    return;
940                }
941                // Extract the shared peer-id from any address with a
942                // `/p2p/<peer-id>` suffix. All host_addrs originating
943                // from the same `RoomAnnouncement` share the
944                // announcer's peer-id, so the first match is enough.
945                let peer_id = addresses
946                    .iter()
947                    .flat_map(|a| a.iter())
948                    .find_map(|p| match p {
949                        Protocol::P2p(pid) => Some(pid),
950                        _ => None,
951                    });
952                let opts = match peer_id {
953                    Some(pid) => DialOpts::peer_id(pid)
954                        .addresses(addresses.clone())
955                        .build(),
956                    // No /p2p/ segment anywhere — fall back to single-
957                    // address dial of the first candidate, matching the
958                    // legacy `Dial` semantics for unanchored multiaddrs.
959                    None => addresses[0].clone().into(),
960                };
961                let conn_id = opts.connection_id();
962                // Use the first address as the representative for
963                // dial_attempts. On synchronous error we report it;
964                // on async success the post-identify handler upserts
965                // with the actually-connected endpoint.
966                let primary = addresses[0].clone();
967                match self.swarm.dial(opts) {
968                    Ok(()) => {
969                        self.dial_attempts.insert(conn_id, primary);
970                    }
971                    Err(e) => {
972                        let tx = self.event_tx.clone();
973                        let err = e.to_string();
974                        tokio::spawn(async move {
975                            let _ = tx
976                                .send(NetworkEvent::DialFailed {
977                                    address: primary,
978                                    error: err,
979                                })
980                                .await;
981                        });
982                    }
983                }
984            }
985            NetworkCommand::Shutdown => unreachable!(),
986        }
987    }
988}