Skip to main content

huddle_core/network/
mod.rs

1pub mod behavior;
2pub mod events;
3pub mod protocol;
4pub mod server;
5
6use std::collections::{HashMap, HashSet};
7use std::sync::Arc;
8use std::time::Duration;
9
10use futures::StreamExt;
11use sha2::{Digest, Sha256};
12use libp2p::core::ConnectedPoint;
13use libp2p::swarm::dial_opts::DialOpts;
14use libp2p::swarm::ConnectionId;
15use libp2p::{
16    autonat, dcutr, gossipsub, identify, mdns, noise, ping, tcp, yamux, Multiaddr, PeerId, Swarm,
17    SwarmBuilder,
18};
19use tokio::sync::mpsc;
20use tracing::{debug, info, warn};
21
22/// How the network discovers peers.
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24pub enum NetworkMode {
25    /// huddle 0.8 default: no libp2p at all. The only transport is the
26    /// centralized Tor-onion relay (see `network::server`). No LAN
27    /// discovery, no direct dial, no NAT machinery — every message rides
28    /// the onion. This is what the binary starts in unless the user
29    /// explicitly opts back into libp2p with `--mode mdns|direct`.
30    Server,
31    /// libp2p with mDNS on: announce ourselves on the LAN and pick up
32    /// announcements. Opt-in (`--mode mdns`); runs *alongside* the onion
33    /// relay so LAN peers connect directly while internet peers route
34    /// through the server.
35    Mdns,
36    /// libp2p with mDNS off: invisible to LAN discovery; the only libp2p
37    /// way to connect is an explicit dial. Opt-in (`--mode direct`); also
38    /// runs alongside the onion relay.
39    Direct,
40}
41
42impl NetworkMode {
43    pub fn as_str(&self) -> &'static str {
44        match self {
45            NetworkMode::Server => "server",
46            NetworkMode::Mdns => "mdns",
47            NetworkMode::Direct => "direct",
48        }
49    }
50
51    /// True when this mode starts a libp2p swarm (LAN / direct dial). The
52    /// `Server` default does not — it's onion-relay-only.
53    pub fn uses_libp2p(&self) -> bool {
54        !matches!(self, NetworkMode::Server)
55    }
56
57    pub fn from_str(s: &str) -> Option<Self> {
58        match s.trim().to_ascii_lowercase().as_str() {
59            "server" | "tor" | "onion" => Some(NetworkMode::Server),
60            "mdns" | "lan" | "open" => Some(NetworkMode::Mdns),
61            "direct" | "dial" | "private" => Some(NetworkMode::Direct),
62            _ => None,
63        }
64    }
65}
66
67use crate::identity::{compute_fingerprint, Identity};
68use crate::network::behavior::{HuddleBehavior, HuddleBehaviorEvent};
69use crate::network::events::NetworkEvent;
70use crate::network::protocol::{room_topic, RoomAnnouncement, ROOMS_TOPIC};
71
72#[derive(Debug)]
73pub enum NetworkCommand {
74    /// Subscribe to a room's per-room gossipsub topic.
75    SubscribeRoom { room_id: String },
76    /// Unsubscribe from a room's topic.
77    UnsubscribeRoom { room_id: String },
78    /// Publish a JSON-encoded `RoomMessage` to a room's topic.
79    PublishRoomMessage { room_id: String, payload: Vec<u8> },
80    /// Publish a room announcement on the global rooms topic.
81    AnnounceRoom(RoomAnnouncement),
82    /// User-initiated dial of an explicit address. Used for cross-network
83    /// reach when mDNS isn't enough.
84    Dial { address: Multiaddr },
85    /// huddle 0.5.2: dial a peer using multiple candidate addresses,
86    /// letting libp2p race them in parallel. Used by the "add by HD
87    /// ID / username" flow, which resolves a fingerprint to every
88    /// address we know for that peer (room announcement `host_addrs`
89    /// + persisted `known_peers`). libp2p's parallel dialer picks
90    /// the cheapest path that completes — LAN beats public IP beats
91    /// relay-hopped without us having to probe transports manually.
92    DialAddresses { addresses: Vec<Multiaddr> },
93    /// Phase A: user accepted an inbound dial — promote the peer to
94    /// explicit-peer status so room announcements flow.
95    AcceptInbound { peer_id: PeerId },
96    /// Phase A: user rejected an inbound dial — disconnect them and
97    /// add the peer_id to the in-memory blocklist for this session
98    /// (caller is responsible for the persistent blocked_peers row).
99    RejectInbound { peer_id: PeerId },
100    /// Phase C follow-up: drop a connection that failed an
101    /// application-level identity check (e.g. invite-fingerprint
102    /// mismatch). Differs from `RejectInbound` in that it doesn't
103    /// touch the inbound-pending map (the connection is already
104    /// past Identify when we discover the mismatch) and doesn't
105    /// persist a block — the caller may want to retry with a
106    /// corrected invite.
107    DisconnectPeer { peer_id: PeerId },
108    Shutdown,
109}
110
111#[derive(Clone)]
112pub struct NetworkHandle {
113    cmd_tx: mpsc::Sender<NetworkCommand>,
114    /// huddle 0.8: optional connection to the centralized server (a Tor
115    /// onion relay). When attached, every room subscribe/unsubscribe and
116    /// every published wire message is mirrored to it, so peers we can't
117    /// reach over libp2p still receive our traffic (and we theirs, via the
118    /// server's offline mailbox). `None` until `attach_server` runs.
119    server: Arc<std::sync::Mutex<Option<server::ServerClient>>>,
120}
121
122/// Stable per-message id for the server mailbox/receipts: a short hash of
123/// the wire bytes. Opaque to the server; lets it dedup/queue by id.
124fn server_msg_id(payload: &[u8]) -> String {
125    let mut h = Sha256::new();
126    h.update(payload);
127    hex::encode(&h.finalize()[..8])
128}
129
130impl NetworkHandle {
131    /// Attach a live server connection so subsequent room traffic mirrors
132    /// to it. Replaces any previous one. Called by the app's server
133    /// connection task after each (re)connect.
134    pub fn attach_server(&self, client: server::ServerClient) {
135        *self.server.lock().unwrap() = Some(client);
136    }
137
138    /// Drop the server connection (e.g. after a disconnect) so we stop
139    /// mirroring until it reconnects.
140    pub fn detach_server(&self) {
141        *self.server.lock().unwrap() = None;
142    }
143
144    /// Snapshot the attached server client, if any. Cloned out so we never
145    /// hold the lock across the (sync, non-blocking) mirror call.
146    fn server_client(&self) -> Option<server::ServerClient> {
147        self.server.lock().unwrap().clone()
148    }
149
150    pub async fn subscribe_room(&self, room_id: String) {
151        if let Some(s) = self.server_client() {
152            let _ = s.subscribe(&room_id);
153        }
154        let _ = self
155            .cmd_tx
156            .send(NetworkCommand::SubscribeRoom { room_id })
157            .await;
158    }
159
160    pub async fn unsubscribe_room(&self, room_id: String) {
161        if let Some(s) = self.server_client() {
162            let _ = s.unsubscribe(&room_id);
163        }
164        let _ = self
165            .cmd_tx
166            .send(NetworkCommand::UnsubscribeRoom { room_id })
167            .await;
168    }
169
170    pub async fn publish_room_message(&self, room_id: String, payload: Vec<u8>) {
171        if let Some(s) = self.server_client() {
172            let _ = s.publish(&room_id, &server_msg_id(&payload), &payload);
173        }
174        let _ = self
175            .cmd_tx
176            .send(NetworkCommand::PublishRoomMessage { room_id, payload })
177            .await;
178    }
179
180    pub async fn announce_room(&self, ann: RoomAnnouncement) {
181        let _ = self.cmd_tx.send(NetworkCommand::AnnounceRoom(ann)).await;
182    }
183
184    /// True when a server connection is currently attached.
185    pub fn has_server(&self) -> bool {
186        self.server.lock().unwrap().is_some()
187    }
188
189    pub async fn dial(&self, address: Multiaddr) {
190        let _ = self.cmd_tx.send(NetworkCommand::Dial { address }).await;
191    }
192
193    /// huddle 0.5.2: dial a peer with multiple candidate addresses
194    /// at once. libp2p's swarm races them; the first to complete a
195    /// handshake wins, the rest are dropped. Use when you've resolved
196    /// a peer fingerprint to several known transports — typically
197    /// LAN ip4 + public ip4 + relay circuit.
198    pub async fn dial_addresses(&self, addresses: Vec<Multiaddr>) {
199        let _ = self
200            .cmd_tx
201            .send(NetworkCommand::DialAddresses { addresses })
202            .await;
203    }
204
205    pub async fn accept_inbound(&self, peer_id: PeerId) {
206        let _ = self
207            .cmd_tx
208            .send(NetworkCommand::AcceptInbound { peer_id })
209            .await;
210    }
211
212    pub async fn reject_inbound(&self, peer_id: PeerId) {
213        let _ = self
214            .cmd_tx
215            .send(NetworkCommand::RejectInbound { peer_id })
216            .await;
217    }
218
219    pub async fn disconnect_peer(&self, peer_id: PeerId) {
220        let _ = self
221            .cmd_tx
222            .send(NetworkCommand::DisconnectPeer { peer_id })
223            .await;
224    }
225
226    pub async fn shutdown(&self) {
227        let _ = self.cmd_tx.send(NetworkCommand::Shutdown).await;
228    }
229}
230
231/// What kind of connection we're holding open until Identify gives us
232/// the remote's Ed25519 fingerprint. Quarantined inbound dials are NOT
233/// added to the gossipsub mesh until the user accepts; outbound user-
234/// dials add to the mesh on connection but still wait on Identify so
235/// the eventual `DialSucceeded` can carry the fingerprint.
236#[derive(Debug)]
237enum PendingPeer {
238    /// Inbound dial from an unknown peer — modal-pending in the app.
239    InboundUnknown { address: Multiaddr },
240}
241
242struct NetworkTask {
243    swarm: Swarm<HuddleBehavior>,
244    cmd_rx: mpsc::Receiver<NetworkCommand>,
245    event_tx: mpsc::Sender<NetworkEvent>,
246    discovered_peers: HashSet<PeerId>,
247    /// Tracks user-initiated dials so we can correlate the eventual
248    /// `ConnectionEstablished` / `OutgoingConnectionError` back to a
249    /// specific address the user asked us to dial.
250    dial_attempts: HashMap<ConnectionId, Multiaddr>,
251    /// Phase A: peers connected but not yet promoted to the gossipsub
252    /// mesh (inbound) — waiting either for `Identify` to land so we
253    /// know their fingerprint, or for the user's accept/reject decision.
254    /// On `ConnectionClosed` we drop entries here so a peer disconnecting
255    /// mid-prompt doesn't leak state.
256    pending_inbound: HashMap<PeerId, PendingPeer>,
257    /// Phase A: peers the user has explicitly rejected this session —
258    /// auto-disconnect every reconnect attempt without re-prompting.
259    /// Persistent across runs via `blocked_peers`; this in-memory copy
260    /// is loaded at startup (TODO: not yet wired; falls back to the DB
261    /// check in the app layer for now) and updated on `RejectInbound`.
262    session_blocklist: HashSet<PeerId>,
263    /// Phase D: configured relay multiaddrs. When Identify lands for a
264    /// peer whose multiaddr matches one of these, we call
265    /// `listen_on("<addr>/p2p-circuit")` to register a reservation.
266    /// Tracked as multiaddr strings (no PeerId yet — we don't know it
267    /// until Identify) plus a set of peer_ids of confirmed relays so
268    /// we only register once per relay.
269    configured_relays: Vec<Multiaddr>,
270    relay_peer_ids: HashSet<PeerId>,
271    /// huddle 0.7.11: per-peer DCUtR failure counter. Logged at warn
272    /// once the count crosses `DCUTR_FAIL_BUDGET` so symmetric-NAT
273    /// pairs don't generate runaway hole-punch attempts in the logs.
274    /// libp2p's dcutr behavior schedules its own retries internally;
275    /// this is purely an observability signal for the app.
276    dcutr_failures: HashMap<PeerId, u32>,
277}
278
279/// huddle 0.7.11: warn after this many failed DCUtR attempts to the
280/// same peer. dcutr keeps trying internally, but the audit flagged the
281/// spam as a real issue — capping the log noise is the realistic fix.
282const DCUTR_FAIL_BUDGET: u32 = 6;
283
284pub fn start_network(
285    identity: &Identity,
286    event_tx: mpsc::Sender<NetworkEvent>,
287) -> crate::error::Result<NetworkHandle> {
288    start_network_with(identity, event_tx, NetworkMode::Mdns, 0, Vec::new())
289}
290
291/// huddle 0.8: build a `NetworkHandle` with **no libp2p swarm** — the
292/// onion-relay-only (`NetworkMode::Server`) path. We still hand back a
293/// fully-functional handle so the rest of the app is oblivious: room
294/// subscribe/unsubscribe/publish mirror to the attached `ServerClient`
295/// exactly as before, while the libp2p-specific commands (dial, announce,
296/// accept/reject, …) are simply drained and dropped. A tiny task consumes
297/// `cmd_rx` so the bounded channel never fills and `send().await` never
298/// blocks; it exits on `Shutdown`.
299pub fn start_network_disabled() -> NetworkHandle {
300    let (cmd_tx, mut cmd_rx) = mpsc::channel::<NetworkCommand>(256);
301    tokio::spawn(async move {
302        while let Some(cmd) = cmd_rx.recv().await {
303            if matches!(cmd, NetworkCommand::Shutdown) {
304                info!("network task (libp2p-disabled) shutting down");
305                break;
306            }
307            // Everything else is a libp2p op with no swarm to run it; the
308            // server mirror (in NetworkHandle) already handled the room
309            // traffic before the command reached us. Drop it.
310        }
311    });
312    NetworkHandle {
313        cmd_tx,
314        server: Arc::new(std::sync::Mutex::new(None)),
315    }
316}
317
318/// Start the network task with explicit mode, TCP listen port, and any
319/// pre-configured relay multiaddrs. `listen_port = 0` requests a
320/// random port. Relays are dialed on startup; once `Identify` lands
321/// from a relay peer, we call `listen_on("<relay>/p2p-circuit")` to
322/// register a reservation so peers behind NAT can dial us through it.
323pub fn start_network_with(
324    identity: &Identity,
325    event_tx: mpsc::Sender<NetworkEvent>,
326    mode: NetworkMode,
327    listen_port: u16,
328    relays: Vec<Multiaddr>,
329) -> crate::error::Result<NetworkHandle> {
330    let keypair = identity.keypair().clone();
331    let local_peer_id = identity.peer_id();
332
333    let mut swarm = SwarmBuilder::with_existing_identity(keypair)
334        .with_tokio()
335        .with_tcp(
336            tcp::Config::default(),
337            noise::Config::new,
338            yamux::Config::default,
339        )
340        .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?
341        // Phase D: wrap the transport with relay-client. This both
342        // composes the transport (so we can dial `/p2p-circuit/`
343        // addresses) and surfaces a `relay::client::Behaviour` into
344        // the `with_behaviour` closure as the second argument.
345        .with_relay_client(noise::Config::new, yamux::Config::default)
346        .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?
347        .with_behaviour(|key, relay_client| {
348            let mdns_opt = match mode {
349                NetworkMode::Mdns => Some(
350                    mdns::tokio::Behaviour::new(mdns::Config::default(), local_peer_id)
351                        .expect("mDNS init failed"),
352                ),
353                // `Direct` runs libp2p without mDNS. `Server` never reaches
354                // here (it takes the `start_network_disabled` path and runs
355                // no swarm), but the match must be exhaustive.
356                NetworkMode::Direct | NetworkMode::Server => None,
357            };
358            let mdns: libp2p::swarm::behaviour::toggle::Toggle<_> = mdns_opt.into();
359
360            let identify = identify::Behaviour::new(
361                identify::Config::new("/huddle/1.0.0".into(), key.public())
362                    .with_agent_version("huddle/0.5".into()),
363            );
364
365            let ping = ping::Behaviour::default();
366
367            let gossipsub_config = gossipsub::ConfigBuilder::default()
368                .heartbeat_interval(Duration::from_secs(1))
369                .validation_mode(gossipsub::ValidationMode::Strict)
370                // Default is 64 KiB. Raise it so base64-encoded file
371                // chunks (see files::CHUNK_SIZE) keep ample headroom.
372                .max_transmit_size(256 * 1024)
373                .build()
374                .expect("valid gossipsub config");
375
376            let mut gossipsub = gossipsub::Behaviour::new(
377                gossipsub::MessageAuthenticity::Signed(key.clone()),
378                gossipsub_config,
379            )
380            .expect("valid gossipsub init");
381
382            // Every node subscribes to the global rooms topic so the lobby
383            // shows discovered rooms even before joining anything.
384            let rooms_topic = gossipsub::IdentTopic::new(ROOMS_TOPIC);
385            gossipsub
386                .subscribe(&rooms_topic)
387                .expect("subscribe rooms topic");
388
389            // AutoNAT v2: client probes external addresses by asking
390            // remote servers to dial us back; server answers other
391            // peers' probes. Both halves are needed for symmetric
392            // P2P reachability detection (v1 combined them; v2 split).
393            let autonat_client = autonat::v2::client::Behaviour::new(
394                rand::rngs::OsRng,
395                autonat::v2::client::Config::default(),
396            );
397            let autonat_server = autonat::v2::server::Behaviour::new(rand::rngs::OsRng);
398            let dcutr = dcutr::Behaviour::new(local_peer_id);
399
400            HuddleBehavior {
401                mdns,
402                identify,
403                ping,
404                gossipsub,
405                relay_client,
406                autonat_client,
407                autonat_server,
408                dcutr,
409            }
410        })
411        .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?
412        .with_swarm_config(|cfg| cfg.with_idle_connection_timeout(Duration::from_secs(120)))
413        .build();
414
415    let listen_addr: Multiaddr = format!("/ip4/0.0.0.0/tcp/{}", listen_port)
416        .parse()
417        .expect("valid listen addr");
418    swarm
419        .listen_on(listen_addr)
420        .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?;
421    // Also bind IPv6 on all interfaces so users can dial via IPv6.
422    let listen_addr6: Multiaddr = format!("/ip6/::/tcp/{}", listen_port)
423        .parse()
424        .expect("valid ipv6 listen addr");
425    if let Err(e) = swarm.listen_on(listen_addr6) {
426        debug!(%e, "ipv6 listen skipped");
427    }
428
429    let (cmd_tx, cmd_rx) = mpsc::channel(256);
430    let mut task = NetworkTask {
431        swarm,
432        cmd_rx,
433        event_tx,
434        discovered_peers: HashSet::new(),
435        dial_attempts: HashMap::new(),
436        pending_inbound: HashMap::new(),
437        session_blocklist: HashSet::new(),
438        configured_relays: relays.clone(),
439        relay_peer_ids: HashSet::new(),
440        dcutr_failures: HashMap::new(),
441    };
442    // Phase D: dial each configured relay so Identify can complete and
443    // we can register a `/p2p-circuit` reservation. Failures here are
444    // non-fatal — the user can still chat on LAN.
445    for relay_addr in relays {
446        info!(addr = %relay_addr, "dialing configured relay");
447        let opts: libp2p::swarm::dial_opts::DialOpts = relay_addr.clone().into();
448        let conn_id = opts.connection_id();
449        match task.swarm.dial(opts) {
450            Ok(()) => {
451                task.dial_attempts.insert(conn_id, relay_addr);
452            }
453            Err(e) => warn!(%e, "dial relay failed"),
454        }
455    }
456    tokio::spawn(task.run());
457
458    Ok(NetworkHandle {
459        cmd_tx,
460        server: Arc::new(std::sync::Mutex::new(None)),
461    })
462}
463
464impl NetworkTask {
465    async fn run(mut self) {
466        loop {
467            tokio::select! {
468                event = self.swarm.select_next_some() => {
469                    self.handle_swarm_event(event).await;
470                }
471                Some(cmd) = self.cmd_rx.recv() => {
472                    if matches!(cmd, NetworkCommand::Shutdown) {
473                        info!("network task shutting down");
474                        break;
475                    }
476                    self.handle_command(cmd);
477                }
478            }
479        }
480    }
481
482    async fn handle_swarm_event(
483        &mut self,
484        event: libp2p::swarm::SwarmEvent<HuddleBehaviorEvent>,
485    ) {
486        match event {
487            libp2p::swarm::SwarmEvent::NewListenAddr { address, .. } => {
488                info!(%address, "listening");
489                // Phase D: a relay-circuit address is a reachability
490                // milestone — surface it as its own event so the lobby
491                // can show "reachable via N relays" status.
492                use libp2p::multiaddr::Protocol;
493                let is_circuit = address
494                    .iter()
495                    .any(|p| matches!(p, Protocol::P2pCircuit));
496                if is_circuit {
497                    let _ = self
498                        .event_tx
499                        .send(NetworkEvent::RelayReservationEstablished {
500                            address: address.clone(),
501                        })
502                        .await;
503                }
504                let _ = self
505                    .event_tx
506                    .send(NetworkEvent::ListeningOn { address })
507                    .await;
508            }
509            libp2p::swarm::SwarmEvent::ConnectionEstablished {
510                peer_id,
511                connection_id,
512                endpoint,
513                ..
514            } => {
515                if let Some(addr) = self.dial_attempts.remove(&connection_id) {
516                    // Phase D: a connection that was for a configured
517                    // relay shouldn't pollute the lobby with a normal
518                    // DialSucceeded — relays aren't chat peers. We just
519                    // remember the peer_id and wait for Identify, then
520                    // register a /p2p-circuit reservation.
521                    let is_relay = self.configured_relays.iter().any(|r| r == &addr);
522                    if is_relay {
523                        info!(%peer_id, %addr, "connected to configured relay");
524                        self.relay_peer_ids.insert(peer_id);
525                    } else {
526                        info!(%peer_id, %addr, "user-dialed peer connected");
527                        // Treat dialed peers like mDNS-discovered: add
528                        // to gossipsub explicit peers so room
529                        // announcements flow.
530                        self.swarm
531                            .behaviour_mut()
532                            .gossipsub
533                            .add_explicit_peer(&peer_id);
534                        self.discovered_peers.insert(peer_id);
535                        let _ = self
536                            .event_tx
537                            .send(NetworkEvent::DialSucceeded {
538                                peer_id,
539                                address: addr,
540                            })
541                            .await;
542                    }
543                } else if let ConnectedPoint::Dialer { .. } = endpoint {
544                    // Outgoing connection we didn't track (e.g. mDNS auto-dial)
545                    // — still add to mesh; no user-visible event needed.
546                    self.swarm
547                        .behaviour_mut()
548                        .gossipsub
549                        .add_explicit_peer(&peer_id);
550                } else {
551                    // Inbound dial from an unknown peer (Phase A). We
552                    // hold the connection but do NOT add them to the
553                    // explicit-peer set yet — wait for Identify so we
554                    // can show the user the peer's fingerprint, then
555                    // either AcceptInbound (promote to mesh) or
556                    // RejectInbound (disconnect + persist blocklist).
557                    //
558                    // Known limitation: gossipsub's score-based mesh
559                    // formation may still forward topic messages to
560                    // this peer via other peers we have in common.
561                    // True hard-quarantine would need a custom
562                    // ConnectionHandler — out of scope for v1.
563                    if self.session_blocklist.contains(&peer_id) {
564                        info!(%peer_id, "rejecting inbound from session-blocked peer");
565                        let _ = self.swarm.disconnect_peer_id(peer_id);
566                    } else {
567                        let address = match &endpoint {
568                            ConnectedPoint::Listener { send_back_addr, .. } => {
569                                send_back_addr.clone()
570                            }
571                            _ => Multiaddr::empty(),
572                        };
573                        debug!(%peer_id, %address, "inbound peer pending decision");
574                        self.pending_inbound
575                            .insert(peer_id, PendingPeer::InboundUnknown { address });
576                    }
577                }
578            }
579            libp2p::swarm::SwarmEvent::OutgoingConnectionError {
580                connection_id,
581                error,
582                ..
583            } => {
584                if let Some(addr) = self.dial_attempts.remove(&connection_id) {
585                    warn!(%addr, %error, "user-dialed peer failed");
586                    let _ = self
587                        .event_tx
588                        .send(NetworkEvent::DialFailed {
589                            address: addr,
590                            error: error.to_string(),
591                        })
592                        .await;
593                }
594            }
595            libp2p::swarm::SwarmEvent::ConnectionClosed {
596                peer_id,
597                num_established,
598                ..
599            } => {
600                // Drop any pending-inbound entry for this peer — they
601                // disconnected before we could prompt the user (or
602                // before the user accepted). Lets a re-connect start
603                // fresh rather than reusing stale state.
604                self.pending_inbound.remove(&peer_id);
605                // huddle 0.7.11: emit PeerDisconnected so the app can
606                // clean its `connected_dial_addrs` map. Only fire when
607                // the LAST connection to that peer closed — multiple
608                // simultaneous connections (e.g. direct + relay) would
609                // otherwise emit spurious disconnects when one of the
610                // two drops.
611                if num_established == 0 {
612                    let _ = self
613                        .event_tx
614                        .send(NetworkEvent::PeerDisconnected { peer_id })
615                        .await;
616                    // Also clean up gossipsub's explicit-peers set so
617                    // it doesn't accumulate dead PeerIds across the
618                    // session.
619                    self.swarm
620                        .behaviour_mut()
621                        .gossipsub
622                        .remove_explicit_peer(&peer_id);
623                }
624            }
625            libp2p::swarm::SwarmEvent::Behaviour(be) => self.handle_behavior_event(be).await,
626            _ => {}
627        }
628    }
629
630    async fn handle_behavior_event(&mut self, event: HuddleBehaviorEvent) {
631        match event {
632            HuddleBehaviorEvent::Mdns(mdns::Event::Discovered(peers)) => {
633                for (peer_id, addr) in peers {
634                    if self.discovered_peers.insert(peer_id) {
635                        info!(%peer_id, %addr, "mDNS discovered");
636                        self.swarm.add_peer_address(peer_id, addr);
637                        // Explicitly add to gossipsub mesh.
638                        self.swarm
639                            .behaviour_mut()
640                            .gossipsub
641                            .add_explicit_peer(&peer_id);
642                        let _ = self
643                            .event_tx
644                            .send(NetworkEvent::PeerDiscovered { peer_id })
645                            .await;
646                    }
647                }
648            }
649            HuddleBehaviorEvent::Mdns(mdns::Event::Expired(peers)) => {
650                for (peer_id, _) in peers {
651                    if self.discovered_peers.remove(&peer_id) {
652                        info!(%peer_id, "mDNS peer expired");
653                        self.swarm
654                            .behaviour_mut()
655                            .gossipsub
656                            .remove_explicit_peer(&peer_id);
657                        let _ = self.event_tx.send(NetworkEvent::PeerExpired { peer_id }).await;
658                    }
659                }
660            }
661            HuddleBehaviorEvent::Gossipsub(gossipsub::Event::Message {
662                propagation_source,
663                message,
664                ..
665            }) => {
666                self.handle_gossipsub_message(propagation_source, message).await;
667            }
668            HuddleBehaviorEvent::Identify(identify::Event::Received {
669                peer_id, info, ..
670            }) => {
671                debug!(%peer_id, agent = %info.agent_version, "identify received");
672                // Phase D: if this peer is a configured relay, register
673                // a `/p2p-circuit` reservation on first identify. Idem-
674                // potent — only fire if we haven't listened on this
675                // relay already (identify fires periodically).
676                if self.relay_peer_ids.contains(&peer_id) {
677                    use libp2p::multiaddr::Protocol;
678                    if let Some(relay_addr) = self
679                        .configured_relays
680                        .iter()
681                        .find(|a| {
682                            // Match by /p2p/<peer-id> suffix when
683                            // present, else by the addr we dialed.
684                            a.iter().any(|p| matches!(p, Protocol::P2p(pid) if pid == peer_id))
685                                || self.dial_attempts.values().any(|d| d == *a)
686                        })
687                        .cloned()
688                    {
689                        let circuit = relay_addr.with(Protocol::P2pCircuit);
690                        match self.swarm.listen_on(circuit.clone()) {
691                            Ok(_) => info!(%circuit, "listening on relay circuit"),
692                            Err(e) => warn!(%e, %circuit, "relay listen_on failed"),
693                        }
694                    }
695                }
696                // Decode the remote's Ed25519 pubkey and derive our
697                // 24-char fingerprint from it. Non-Ed25519 keys (Secp,
698                // Rsa, Ecdsa) shouldn't appear in practice — huddle
699                // only generates Ed25519 identities — so we just log
700                // and skip if the cast fails.
701                let fingerprint = match info.public_key.clone().try_into_ed25519() {
702                    Ok(ed_pk) => {
703                        let bytes = ed_pk.to_bytes();
704                        compute_fingerprint(&bytes)
705                    }
706                    Err(_) => {
707                        warn!(%peer_id, "identify pubkey isn't Ed25519; skipping fingerprint");
708                        return;
709                    }
710                };
711                // Always notify the app layer so it can populate
712                // `known_peers.fingerprint` and detect that an
713                // outbound peer we dialed has fully identified.
714                let _ = self
715                    .event_tx
716                    .send(NetworkEvent::PeerIdentified {
717                        peer_id,
718                        fingerprint: fingerprint.clone(),
719                    })
720                    .await;
721                // If the peer is in pending_inbound, Identify completing
722                // is the cue to surface the user prompt. Keep them in
723                // pending_inbound until Accept or Reject — we don't
724                // know yet which way the user will decide.
725                if let Some(PendingPeer::InboundUnknown { address }) =
726                    self.pending_inbound.get(&peer_id)
727                {
728                    let address = address.clone();
729                    let _ = self
730                        .event_tx
731                        .send(NetworkEvent::InboundDial {
732                            peer_id,
733                            fingerprint,
734                            address,
735                        })
736                        .await;
737                }
738            }
739            HuddleBehaviorEvent::AutonatClient(ev) => {
740                // One probe per address candidate. `result.is_ok()`
741                // means a remote AutoNAT server dialed us back
742                // successfully on `tested_addr` ⇒ this address is
743                // reachable from the outside. The app layer aggregates
744                // these into the lobby reachability badge.
745                let reachable = ev.result.is_ok();
746                if reachable {
747                    info!(tested = %ev.tested_addr, server = %ev.server, "AutoNAT: reachable");
748                } else {
749                    debug!(tested = %ev.tested_addr, server = %ev.server, "AutoNAT: probe failed");
750                }
751                let _ = self
752                    .event_tx
753                    .send(NetworkEvent::NatProbeResult {
754                        tested_addr: ev.tested_addr,
755                        reachable,
756                    })
757                    .await;
758            }
759            HuddleBehaviorEvent::AutonatServer(_) => {
760                // We answered another peer's reachability probe.
761                // No app-visible action.
762            }
763            HuddleBehaviorEvent::Dcutr(ev) => {
764                let success = ev.result.is_ok();
765                if success {
766                    info!(remote = %ev.remote_peer_id, "DCUtR: direct connection established");
767                    // huddle 0.7.11: a successful hole-punch resets the
768                    // per-peer DCUtR retry counter so future failures
769                    // don't immediately bail out.
770                    self.dcutr_failures.remove(&ev.remote_peer_id);
771                } else {
772                    debug!(remote = %ev.remote_peer_id, "DCUtR: hole-punch failed");
773                    let count = self
774                        .dcutr_failures
775                        .entry(ev.remote_peer_id)
776                        .and_modify(|n| *n += 1)
777                        .or_insert(1);
778                    if *count >= DCUTR_FAIL_BUDGET {
779                        warn!(
780                            remote = %ev.remote_peer_id,
781                            attempts = *count,
782                            "DCUtR: giving up after repeated failures (symmetric NAT likely)"
783                        );
784                    }
785                }
786                let _ = self
787                    .event_tx
788                    .send(NetworkEvent::DcutrUpgrade {
789                        remote_peer: ev.remote_peer_id,
790                        success,
791                    })
792                    .await;
793            }
794            HuddleBehaviorEvent::RelayClient(event) => {
795                // huddle 0.7.12: libp2p 0.56's `relay::client::Event`
796                // only exposes the success-path variants
797                // (`ReservationReqAccepted`, `OutboundCircuitEstablished`,
798                // `InboundCircuitEstablished`). There's no
799                // `ReservationReqFailed` arm we can match on, so we
800                // can't reliably surface reservation loss without a
801                // separate health-check timer (future work). For now
802                // we log every event so operators can see the
803                // lifecycle in `huddle.log` — pre-0.7.11 the whole
804                // arm was swallowed by `_ => {}` which hid even that.
805                use libp2p::relay::client::Event as Rc;
806                match event {
807                    Rc::ReservationReqAccepted { relay_peer_id, .. } => {
808                        info!(%relay_peer_id, "relay: reservation accepted");
809                    }
810                    Rc::OutboundCircuitEstablished { relay_peer_id, .. } => {
811                        debug!(%relay_peer_id, "relay: outbound circuit established");
812                    }
813                    other => {
814                        debug!(?other, "relay client event");
815                    }
816                }
817            }
818            _ => {}
819        }
820    }
821
822    async fn handle_gossipsub_message(
823        &mut self,
824        from_peer: PeerId,
825        message: gossipsub::Message,
826    ) {
827        let topic = message.topic.to_string();
828        if topic == ROOMS_TOPIC {
829            match serde_json::from_slice::<RoomAnnouncement>(&message.data) {
830                Ok(ann) => {
831                    let _ = self
832                        .event_tx
833                        .send(NetworkEvent::RoomAnnouncementReceived(ann))
834                        .await;
835                }
836                Err(e) => {
837                    warn!(%e, "bad room announcement");
838                }
839            }
840        } else if let Some(room_id) = topic.strip_prefix(protocol::ROOM_TOPIC_PREFIX) {
841            let _ = self
842                .event_tx
843                .send(NetworkEvent::RoomMessageReceived {
844                    room_id: room_id.to_string(),
845                    payload: message.data,
846                    from_peer,
847                })
848                .await;
849        }
850    }
851
852    fn handle_command(&mut self, cmd: NetworkCommand) {
853        match cmd {
854            NetworkCommand::SubscribeRoom { room_id } => {
855                let topic = gossipsub::IdentTopic::new(room_topic(&room_id));
856                if let Err(e) = self.swarm.behaviour_mut().gossipsub.subscribe(&topic) {
857                    warn!(%e, %room_id, "subscribe room failed");
858                }
859            }
860            NetworkCommand::UnsubscribeRoom { room_id } => {
861                let topic = gossipsub::IdentTopic::new(room_topic(&room_id));
862                self.swarm.behaviour_mut().gossipsub.unsubscribe(&topic);
863            }
864            NetworkCommand::PublishRoomMessage { room_id, payload } => {
865                let topic = gossipsub::IdentTopic::new(room_topic(&room_id));
866                if let Err(e) = self.swarm.behaviour_mut().gossipsub.publish(topic, payload) {
867                    // No subscribed peers is expected before the mesh
868                    // forms; anything else (MessageTooLarge, full queues)
869                    // is a real bug worth surfacing.
870                    match e {
871                        gossipsub::PublishError::NoPeersSubscribedToTopic => {
872                            debug!(%room_id, "publish skipped: no peers subscribed to topic yet");
873                        }
874                        e => warn!(%e, %room_id, "publish room message failed"),
875                    }
876                }
877            }
878            NetworkCommand::AnnounceRoom(ann) => {
879                let topic = gossipsub::IdentTopic::new(ROOMS_TOPIC);
880                match serde_json::to_vec(&ann) {
881                    Ok(payload) => {
882                        if let Err(e) =
883                            self.swarm.behaviour_mut().gossipsub.publish(topic, payload)
884                        {
885                            debug!(%e, "publish room announcement failed");
886                        }
887                    }
888                    Err(e) => warn!(%e, "encode room announcement"),
889                }
890            }
891            NetworkCommand::AcceptInbound { peer_id } => {
892                if self.pending_inbound.remove(&peer_id).is_some() {
893                    info!(%peer_id, "inbound dial accepted — promoting to mesh");
894                    self.swarm
895                        .behaviour_mut()
896                        .gossipsub
897                        .add_explicit_peer(&peer_id);
898                    self.discovered_peers.insert(peer_id);
899                } else {
900                    debug!(%peer_id, "AcceptInbound for unknown peer (already promoted or disconnected)");
901                }
902            }
903            NetworkCommand::RejectInbound { peer_id } => {
904                self.pending_inbound.remove(&peer_id);
905                self.session_blocklist.insert(peer_id);
906                info!(%peer_id, "inbound dial rejected — disconnecting");
907                let _ = self.swarm.disconnect_peer_id(peer_id);
908            }
909            NetworkCommand::DisconnectPeer { peer_id } => {
910                info!(%peer_id, "app-level identity check failed — disconnecting");
911                let _ = self.swarm.disconnect_peer_id(peer_id);
912            }
913            NetworkCommand::Dial { address } => {
914                let opts: DialOpts = address.clone().into();
915                let conn_id = opts.connection_id();
916                match self.swarm.dial(opts) {
917                    Ok(()) => {
918                        self.dial_attempts.insert(conn_id, address);
919                    }
920                    Err(e) => {
921                        // Synchronous dial error (bad multiaddr, transport refused).
922                        let tx = self.event_tx.clone();
923                        let err = e.to_string();
924                        tokio::spawn(async move {
925                            let _ = tx
926                                .send(NetworkEvent::DialFailed {
927                                    address,
928                                    error: err,
929                                })
930                                .await;
931                        });
932                    }
933                }
934            }
935            NetworkCommand::DialAddresses { addresses } => {
936                use libp2p::multiaddr::Protocol;
937                if addresses.is_empty() {
938                    return;
939                }
940                // Extract the shared peer-id from any address with a
941                // `/p2p/<peer-id>` suffix. All host_addrs originating
942                // from the same `RoomAnnouncement` share the
943                // announcer's peer-id, so the first match is enough.
944                let peer_id = addresses
945                    .iter()
946                    .flat_map(|a| a.iter())
947                    .find_map(|p| match p {
948                        Protocol::P2p(pid) => Some(pid),
949                        _ => None,
950                    });
951                let opts = match peer_id {
952                    Some(pid) => DialOpts::peer_id(pid)
953                        .addresses(addresses.clone())
954                        .build(),
955                    // No /p2p/ segment anywhere — fall back to single-
956                    // address dial of the first candidate, matching the
957                    // legacy `Dial` semantics for unanchored multiaddrs.
958                    None => addresses[0].clone().into(),
959                };
960                let conn_id = opts.connection_id();
961                // Use the first address as the representative for
962                // dial_attempts. On synchronous error we report it;
963                // on async success the post-identify handler upserts
964                // with the actually-connected endpoint.
965                let primary = addresses[0].clone();
966                match self.swarm.dial(opts) {
967                    Ok(()) => {
968                        self.dial_attempts.insert(conn_id, primary);
969                    }
970                    Err(e) => {
971                        let tx = self.event_tx.clone();
972                        let err = e.to_string();
973                        tokio::spawn(async move {
974                            let _ = tx
975                                .send(NetworkEvent::DialFailed {
976                                    address: primary,
977                                    error: err,
978                                })
979                                .await;
980                        });
981                    }
982                }
983            }
984            NetworkCommand::Shutdown => unreachable!(),
985        }
986    }
987}