Skip to main content

huddle_core/network/
mod.rs

1pub mod behavior;
2pub mod events;
3pub mod protocol;
4pub mod server;
5pub mod transport;
6
7use std::collections::{HashMap, HashSet};
8use std::sync::Arc;
9use std::time::Duration;
10
11use futures::StreamExt;
12use sha2::{Digest, Sha256};
13use libp2p::core::ConnectedPoint;
14use libp2p::swarm::dial_opts::DialOpts;
15use libp2p::swarm::ConnectionId;
16use libp2p::{
17    autonat, dcutr, gossipsub, identify, mdns, noise, ping, tcp, yamux, Multiaddr, PeerId, Swarm,
18    SwarmBuilder,
19};
20use tokio::sync::mpsc;
21use tracing::{debug, info, warn};
22
23/// How the network discovers peers.
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub enum NetworkMode {
26    /// huddle 0.8 default: no libp2p at all. The only transport is the
27    /// centralized Tor-onion relay (see `network::server`). No LAN
28    /// discovery, no direct dial, no NAT machinery — every message rides
29    /// the onion. This is what the binary starts in unless the user
30    /// explicitly opts back into libp2p with `--mode mdns|direct`.
31    Server,
32    /// libp2p with mDNS on: announce ourselves on the LAN and pick up
33    /// announcements. Opt-in (`--mode mdns`); runs *alongside* the onion
34    /// relay so LAN peers connect directly while internet peers route
35    /// through the server.
36    Mdns,
37    /// libp2p with mDNS off: invisible to LAN discovery; the only libp2p
38    /// way to connect is an explicit dial. Opt-in (`--mode direct`); also
39    /// runs alongside the onion relay.
40    Direct,
41}
42
43impl NetworkMode {
44    pub fn as_str(&self) -> &'static str {
45        match self {
46            NetworkMode::Server => "server",
47            NetworkMode::Mdns => "mdns",
48            NetworkMode::Direct => "direct",
49        }
50    }
51
52    /// True when this mode starts a libp2p swarm (LAN / direct dial). The
53    /// `Server` default does not — it's onion-relay-only.
54    pub fn uses_libp2p(&self) -> bool {
55        !matches!(self, NetworkMode::Server)
56    }
57
58    pub fn from_str(s: &str) -> Option<Self> {
59        match s.trim().to_ascii_lowercase().as_str() {
60            "server" | "tor" | "onion" => Some(NetworkMode::Server),
61            "mdns" | "lan" | "open" => Some(NetworkMode::Mdns),
62            "direct" | "dial" | "private" => Some(NetworkMode::Direct),
63            _ => None,
64        }
65    }
66}
67
68use crate::identity::{compute_fingerprint, Identity};
69use crate::network::behavior::{HuddleBehavior, HuddleBehaviorEvent};
70use crate::network::events::NetworkEvent;
71use crate::network::protocol::{room_topic, RoomAnnouncement, ROOMS_TOPIC};
72
73#[derive(Debug)]
74pub enum NetworkCommand {
75    /// Subscribe to a room's per-room gossipsub topic.
76    SubscribeRoom { room_id: String },
77    /// Unsubscribe from a room's topic.
78    UnsubscribeRoom { room_id: String },
79    /// Publish a JSON-encoded `RoomMessage` to a room's topic.
80    PublishRoomMessage { room_id: String, payload: Vec<u8> },
81    /// Publish a room announcement on the global rooms topic.
82    AnnounceRoom(RoomAnnouncement),
83    /// User-initiated dial of an explicit address. Used for cross-network
84    /// reach when mDNS isn't enough.
85    Dial { address: Multiaddr },
86    /// huddle 0.5.2: dial a peer using multiple candidate addresses,
87    /// letting libp2p race them in parallel. Used by the "add by HD
88    /// ID / username" flow, which resolves a fingerprint to every
89    /// address we know for that peer (room announcement `host_addrs`
90    /// + persisted `known_peers`). libp2p's parallel dialer picks
91    /// the cheapest path that completes — LAN beats public IP beats
92    /// relay-hopped without us having to probe transports manually.
93    DialAddresses { addresses: Vec<Multiaddr> },
94    /// Phase A: user accepted an inbound dial — promote the peer to
95    /// explicit-peer status so room announcements flow.
96    AcceptInbound { peer_id: PeerId },
97    /// Phase A: user rejected an inbound dial — disconnect them and
98    /// add the peer_id to the in-memory blocklist for this session
99    /// (caller is responsible for the persistent blocked_peers row).
100    RejectInbound { peer_id: PeerId },
101    /// Phase C follow-up: drop a connection that failed an
102    /// application-level identity check (e.g. invite-fingerprint
103    /// mismatch). Differs from `RejectInbound` in that it doesn't
104    /// touch the inbound-pending map (the connection is already
105    /// past Identify when we discover the mismatch) and doesn't
106    /// persist a block — the caller may want to retry with a
107    /// corrected invite.
108    DisconnectPeer { peer_id: PeerId },
109    Shutdown,
110}
111
112#[derive(Clone)]
113pub struct NetworkHandle {
114    cmd_tx: mpsc::Sender<NetworkCommand>,
115    /// huddle 0.8: optional connection to the centralized server (a Tor
116    /// onion relay). When attached, every room subscribe/unsubscribe and
117    /// every published wire message is mirrored to it, so peers we can't
118    /// reach over libp2p still receive our traffic (and we theirs, via the
119    /// server's offline mailbox). `None` until `attach_server` runs.
120    server: Arc<std::sync::Mutex<Option<server::ServerClient>>>,
121    /// huddle 1.2: map of `room_id -> partner fingerprint` for 1:1 DM rooms.
122    /// When a room is registered here, relay traffic for it is delivered by
123    /// recipient fingerprint (`ServerClient::send_direct`) instead of by room
124    /// membership fan-out — so DMs reach the partner reliably (live or via
125    /// their offline mailbox) without both sides having subscribed the same
126    /// room on the relay. Group rooms are absent from this map and keep using
127    /// membership fan-out (`publish`). Populated by the app from
128    /// `start_direct` / `bootstrap_direct_room` / `add_contact`.
129    dm_partners: Arc<std::sync::Mutex<HashMap<String, String>>>,
130}
131
132/// Stable per-message id for the server mailbox/receipts: a short hash of
133/// the wire bytes. Opaque to the server; lets it dedup/queue by id.
134fn server_msg_id(payload: &[u8]) -> String {
135    let mut h = Sha256::new();
136    h.update(payload);
137    hex::encode(&h.finalize()[..8])
138}
139
140impl NetworkHandle {
141    /// Attach a live server connection so subsequent room traffic mirrors
142    /// to it. Replaces any previous one. Called by the app's server
143    /// connection task after each (re)connect.
144    pub fn attach_server(&self, client: server::ServerClient) {
145        *self.server.lock().unwrap() = Some(client);
146    }
147
148    /// Drop the server connection (e.g. after a disconnect) so we stop
149    /// mirroring until it reconnects.
150    pub fn detach_server(&self) {
151        *self.server.lock().unwrap() = None;
152    }
153
154    /// Snapshot the attached server client, if any. Cloned out so we never
155    /// hold the lock across the (sync, non-blocking) mirror call.
156    fn server_client(&self) -> Option<server::ServerClient> {
157        self.server.lock().unwrap().clone()
158    }
159
160    pub async fn subscribe_room(&self, room_id: String) {
161        if let Some(s) = self.server_client() {
162            let _ = s.subscribe(&room_id);
163        }
164        let _ = self
165            .cmd_tx
166            .send(NetworkCommand::SubscribeRoom { room_id })
167            .await;
168    }
169
170    pub async fn unsubscribe_room(&self, room_id: String) {
171        if let Some(s) = self.server_client() {
172            let _ = s.unsubscribe(&room_id);
173        }
174        let _ = self
175            .cmd_tx
176            .send(NetworkCommand::UnsubscribeRoom { room_id })
177            .await;
178    }
179
180    /// huddle 1.2: register a 1:1 DM room so its relay traffic is delivered by
181    /// the partner's fingerprint instead of by room-membership fan-out. Safe
182    /// to call repeatedly (idempotent). Never registers an empty partner.
183    pub fn register_dm(&self, room_id: String, partner_fingerprint: String) {
184        if partner_fingerprint.is_empty() {
185            return;
186        }
187        self.dm_partners
188            .lock()
189            .unwrap()
190            .insert(room_id, partner_fingerprint);
191    }
192
193    /// huddle 1.2: forget a DM room's direct-delivery mapping (e.g. on leave).
194    pub fn forget_dm(&self, room_id: &str) {
195        self.dm_partners.lock().unwrap().remove(room_id);
196    }
197
198    fn dm_partner(&self, room_id: &str) -> Option<String> {
199        self.dm_partners.lock().unwrap().get(room_id).cloned()
200    }
201
202    pub async fn publish_room_message(&self, room_id: String, payload: Vec<u8>) {
203        if let Some(s) = self.server_client() {
204            // huddle 1.2: a registered DM room delivers straight to the
205            // partner's fingerprint (reliable even if they never subscribed
206            // this room on the relay); a group room fans out by membership.
207            match self.dm_partner(&room_id) {
208                Some(to) => {
209                    let _ = s.send_direct(&to, &room_id, &server_msg_id(&payload), &payload);
210                }
211                None => {
212                    let _ = s.publish(&room_id, &server_msg_id(&payload), &payload);
213                }
214            }
215        }
216        let _ = self
217            .cmd_tx
218            .send(NetworkCommand::PublishRoomMessage { room_id, payload })
219            .await;
220    }
221
222    /// huddle 1.2: deliver `payload` straight to `to`'s fingerprint over the
223    /// relay (independent of room membership), and also ride libp2p gossipsub
224    /// on the `room_id` topic for LAN reachability. Used for friend requests
225    /// (delivered to the target's inbox tag) and any other point-to-point
226    /// control message where the recipient is known.
227    pub async fn publish_direct(&self, to: String, room_id: String, payload: Vec<u8>) {
228        if let Some(s) = self.server_client() {
229            let _ = s.send_direct(&to, &room_id, &server_msg_id(&payload), &payload);
230        }
231        let _ = self
232            .cmd_tx
233            .send(NetworkCommand::PublishRoomMessage { room_id, payload })
234            .await;
235    }
236
237    pub async fn announce_room(&self, ann: RoomAnnouncement) {
238        let _ = self.cmd_tx.send(NetworkCommand::AnnounceRoom(ann)).await;
239    }
240
241    /// huddle 1.2.1: ask the attached relay to mint a short-lived connect code.
242    /// Returns `false` (no request sent) when no relay is currently attached.
243    /// The reply surfaces as `ServerEvent::ConnectToken` → `AppEvent`.
244    pub fn create_connect_token(&self) -> bool {
245        match self.server_client() {
246            Some(s) => s.create_connect_token().is_ok(),
247            None => false,
248        }
249    }
250
251    /// huddle 1.2.1: ask the attached relay to resolve a connect code.
252    /// Returns `false` when no relay is attached.
253    pub fn redeem_connect_token(&self, token: &str) -> bool {
254        match self.server_client() {
255            Some(s) => s.redeem_connect_token(token).is_ok(),
256            None => false,
257        }
258    }
259
260    /// True when a server connection is currently attached.
261    pub fn has_server(&self) -> bool {
262        self.server.lock().unwrap().is_some()
263    }
264
265    pub async fn dial(&self, address: Multiaddr) {
266        let _ = self.cmd_tx.send(NetworkCommand::Dial { address }).await;
267    }
268
269    /// huddle 0.5.2: dial a peer with multiple candidate addresses
270    /// at once. libp2p's swarm races them; the first to complete a
271    /// handshake wins, the rest are dropped. Use when you've resolved
272    /// a peer fingerprint to several known transports — typically
273    /// LAN ip4 + public ip4 + relay circuit.
274    pub async fn dial_addresses(&self, addresses: Vec<Multiaddr>) {
275        let _ = self
276            .cmd_tx
277            .send(NetworkCommand::DialAddresses { addresses })
278            .await;
279    }
280
281    pub async fn accept_inbound(&self, peer_id: PeerId) {
282        let _ = self
283            .cmd_tx
284            .send(NetworkCommand::AcceptInbound { peer_id })
285            .await;
286    }
287
288    pub async fn reject_inbound(&self, peer_id: PeerId) {
289        let _ = self
290            .cmd_tx
291            .send(NetworkCommand::RejectInbound { peer_id })
292            .await;
293    }
294
295    pub async fn disconnect_peer(&self, peer_id: PeerId) {
296        let _ = self
297            .cmd_tx
298            .send(NetworkCommand::DisconnectPeer { peer_id })
299            .await;
300    }
301
302    pub async fn shutdown(&self) {
303        let _ = self.cmd_tx.send(NetworkCommand::Shutdown).await;
304    }
305}
306
307/// What kind of connection we're holding open until Identify gives us
308/// the remote's Ed25519 fingerprint. Quarantined inbound dials are NOT
309/// added to the gossipsub mesh until the user accepts; outbound user-
310/// dials add to the mesh on connection but still wait on Identify so
311/// the eventual `DialSucceeded` can carry the fingerprint.
312#[derive(Debug)]
313enum PendingPeer {
314    /// Inbound dial from an unknown peer — modal-pending in the app.
315    InboundUnknown { address: Multiaddr },
316}
317
318struct NetworkTask {
319    swarm: Swarm<HuddleBehavior>,
320    cmd_rx: mpsc::Receiver<NetworkCommand>,
321    event_tx: mpsc::Sender<NetworkEvent>,
322    discovered_peers: HashSet<PeerId>,
323    /// Tracks user-initiated dials so we can correlate the eventual
324    /// `ConnectionEstablished` / `OutgoingConnectionError` back to a
325    /// specific address the user asked us to dial.
326    dial_attempts: HashMap<ConnectionId, Multiaddr>,
327    /// Phase A: peers connected but not yet promoted to the gossipsub
328    /// mesh (inbound) — waiting either for `Identify` to land so we
329    /// know their fingerprint, or for the user's accept/reject decision.
330    /// On `ConnectionClosed` we drop entries here so a peer disconnecting
331    /// mid-prompt doesn't leak state.
332    pending_inbound: HashMap<PeerId, PendingPeer>,
333    /// Phase A: peers the user has explicitly rejected this session —
334    /// auto-disconnect every reconnect attempt without re-prompting.
335    /// Persistent across runs via `blocked_peers`; this in-memory copy
336    /// is loaded at startup (TODO: not yet wired; falls back to the DB
337    /// check in the app layer for now) and updated on `RejectInbound`.
338    session_blocklist: HashSet<PeerId>,
339    /// Phase D: configured relay multiaddrs. When Identify lands for a
340    /// peer whose multiaddr matches one of these, we call
341    /// `listen_on("<addr>/p2p-circuit")` to register a reservation.
342    /// Tracked as multiaddr strings (no PeerId yet — we don't know it
343    /// until Identify) plus a set of peer_ids of confirmed relays so
344    /// we only register once per relay.
345    configured_relays: Vec<Multiaddr>,
346    relay_peer_ids: HashSet<PeerId>,
347    /// huddle 0.7.11: per-peer DCUtR failure counter. Logged at warn
348    /// once the count crosses `DCUTR_FAIL_BUDGET` so symmetric-NAT
349    /// pairs don't generate runaway hole-punch attempts in the logs.
350    /// libp2p's dcutr behavior schedules its own retries internally;
351    /// this is purely an observability signal for the app.
352    dcutr_failures: HashMap<PeerId, u32>,
353}
354
355/// huddle 0.7.11: warn after this many failed DCUtR attempts to the
356/// same peer. dcutr keeps trying internally, but the audit flagged the
357/// spam as a real issue — capping the log noise is the realistic fix.
358const DCUTR_FAIL_BUDGET: u32 = 6;
359
360pub fn start_network(
361    identity: &Identity,
362    event_tx: mpsc::Sender<NetworkEvent>,
363) -> crate::error::Result<NetworkHandle> {
364    start_network_with(identity, event_tx, NetworkMode::Mdns, 0, Vec::new())
365}
366
367/// huddle 0.8: build a `NetworkHandle` with **no libp2p swarm** — the
368/// onion-relay-only (`NetworkMode::Server`) path. We still hand back a
369/// fully-functional handle so the rest of the app is oblivious: room
370/// subscribe/unsubscribe/publish mirror to the attached `ServerClient`
371/// exactly as before, while the libp2p-specific commands (dial, announce,
372/// accept/reject, …) are simply drained and dropped. A tiny task consumes
373/// `cmd_rx` so the bounded channel never fills and `send().await` never
374/// blocks; it exits on `Shutdown`.
375pub fn start_network_disabled() -> NetworkHandle {
376    let (cmd_tx, mut cmd_rx) = mpsc::channel::<NetworkCommand>(256);
377    tokio::spawn(async move {
378        while let Some(cmd) = cmd_rx.recv().await {
379            if matches!(cmd, NetworkCommand::Shutdown) {
380                info!("network task (libp2p-disabled) shutting down");
381                break;
382            }
383            // Everything else is a libp2p op with no swarm to run it; the
384            // server mirror (in NetworkHandle) already handled the room
385            // traffic before the command reached us. Drop it.
386        }
387    });
388    NetworkHandle {
389        cmd_tx,
390        server: Arc::new(std::sync::Mutex::new(None)),
391        dm_partners: Arc::new(std::sync::Mutex::new(HashMap::new())),
392    }
393}
394
395/// Start the network task with explicit mode, TCP listen port, and any
396/// pre-configured relay multiaddrs. `listen_port = 0` requests a
397/// random port. Relays are dialed on startup; once `Identify` lands
398/// from a relay peer, we call `listen_on("<relay>/p2p-circuit")` to
399/// register a reservation so peers behind NAT can dial us through it.
400pub fn start_network_with(
401    identity: &Identity,
402    event_tx: mpsc::Sender<NetworkEvent>,
403    mode: NetworkMode,
404    listen_port: u16,
405    relays: Vec<Multiaddr>,
406) -> crate::error::Result<NetworkHandle> {
407    let keypair = identity.keypair().clone();
408    let local_peer_id = identity.peer_id();
409
410    let mut swarm = SwarmBuilder::with_existing_identity(keypair)
411        .with_tokio()
412        .with_tcp(
413            tcp::Config::default(),
414            noise::Config::new,
415            yamux::Config::default,
416        )
417        .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?
418        // Phase D: wrap the transport with relay-client. This both
419        // composes the transport (so we can dial `/p2p-circuit/`
420        // addresses) and surfaces a `relay::client::Behaviour` into
421        // the `with_behaviour` closure as the second argument.
422        .with_relay_client(noise::Config::new, yamux::Config::default)
423        .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?
424        .with_behaviour(|key, relay_client| {
425            let mdns_opt = match mode {
426                NetworkMode::Mdns => Some(
427                    mdns::tokio::Behaviour::new(mdns::Config::default(), local_peer_id)
428                        .expect("mDNS init failed"),
429                ),
430                // `Direct` runs libp2p without mDNS. `Server` never reaches
431                // here (it takes the `start_network_disabled` path and runs
432                // no swarm), but the match must be exhaustive.
433                NetworkMode::Direct | NetworkMode::Server => None,
434            };
435            let mdns: libp2p::swarm::behaviour::toggle::Toggle<_> = mdns_opt.into();
436
437            let identify = identify::Behaviour::new(
438                identify::Config::new("/huddle/1.0.0".into(), key.public())
439                    .with_agent_version("huddle/0.5".into()),
440            );
441
442            let ping = ping::Behaviour::default();
443
444            let gossipsub_config = gossipsub::ConfigBuilder::default()
445                .heartbeat_interval(Duration::from_secs(1))
446                .validation_mode(gossipsub::ValidationMode::Strict)
447                // Default is 64 KiB. Raise it so base64-encoded file
448                // chunks (see files::CHUNK_SIZE) keep ample headroom.
449                .max_transmit_size(256 * 1024)
450                .build()
451                .expect("valid gossipsub config");
452
453            let mut gossipsub = gossipsub::Behaviour::new(
454                gossipsub::MessageAuthenticity::Signed(key.clone()),
455                gossipsub_config,
456            )
457            .expect("valid gossipsub init");
458
459            // Every node subscribes to the global rooms topic so the lobby
460            // shows discovered rooms even before joining anything.
461            let rooms_topic = gossipsub::IdentTopic::new(ROOMS_TOPIC);
462            gossipsub
463                .subscribe(&rooms_topic)
464                .expect("subscribe rooms topic");
465
466            // AutoNAT v2: client probes external addresses by asking
467            // remote servers to dial us back; server answers other
468            // peers' probes. Both halves are needed for symmetric
469            // P2P reachability detection (v1 combined them; v2 split).
470            let autonat_client = autonat::v2::client::Behaviour::new(
471                rand::rngs::OsRng,
472                autonat::v2::client::Config::default(),
473            );
474            let autonat_server = autonat::v2::server::Behaviour::new(rand::rngs::OsRng);
475            let dcutr = dcutr::Behaviour::new(local_peer_id);
476
477            HuddleBehavior {
478                mdns,
479                identify,
480                ping,
481                gossipsub,
482                relay_client,
483                autonat_client,
484                autonat_server,
485                dcutr,
486            }
487        })
488        .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?
489        .with_swarm_config(|cfg| cfg.with_idle_connection_timeout(Duration::from_secs(120)))
490        .build();
491
492    let listen_addr: Multiaddr = format!("/ip4/0.0.0.0/tcp/{}", listen_port)
493        .parse()
494        .expect("valid listen addr");
495    swarm
496        .listen_on(listen_addr)
497        .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?;
498    // Also bind IPv6 on all interfaces so users can dial via IPv6.
499    let listen_addr6: Multiaddr = format!("/ip6/::/tcp/{}", listen_port)
500        .parse()
501        .expect("valid ipv6 listen addr");
502    if let Err(e) = swarm.listen_on(listen_addr6) {
503        debug!(%e, "ipv6 listen skipped");
504    }
505
506    let (cmd_tx, cmd_rx) = mpsc::channel(256);
507    let mut task = NetworkTask {
508        swarm,
509        cmd_rx,
510        event_tx,
511        discovered_peers: HashSet::new(),
512        dial_attempts: HashMap::new(),
513        pending_inbound: HashMap::new(),
514        session_blocklist: HashSet::new(),
515        configured_relays: relays.clone(),
516        relay_peer_ids: HashSet::new(),
517        dcutr_failures: HashMap::new(),
518    };
519    // Phase D: dial each configured relay so Identify can complete and
520    // we can register a `/p2p-circuit` reservation. Failures here are
521    // non-fatal — the user can still chat on LAN.
522    for relay_addr in relays {
523        info!(addr = %relay_addr, "dialing configured relay");
524        let opts: libp2p::swarm::dial_opts::DialOpts = relay_addr.clone().into();
525        let conn_id = opts.connection_id();
526        match task.swarm.dial(opts) {
527            Ok(()) => {
528                task.dial_attempts.insert(conn_id, relay_addr);
529            }
530            Err(e) => warn!(%e, "dial relay failed"),
531        }
532    }
533    tokio::spawn(task.run());
534
535    Ok(NetworkHandle {
536        cmd_tx,
537        server: Arc::new(std::sync::Mutex::new(None)),
538        dm_partners: Arc::new(std::sync::Mutex::new(HashMap::new())),
539    })
540}
541
542impl NetworkTask {
543    async fn run(mut self) {
544        loop {
545            tokio::select! {
546                event = self.swarm.select_next_some() => {
547                    self.handle_swarm_event(event).await;
548                }
549                Some(cmd) = self.cmd_rx.recv() => {
550                    if matches!(cmd, NetworkCommand::Shutdown) {
551                        info!("network task shutting down");
552                        break;
553                    }
554                    self.handle_command(cmd);
555                }
556            }
557        }
558    }
559
560    async fn handle_swarm_event(
561        &mut self,
562        event: libp2p::swarm::SwarmEvent<HuddleBehaviorEvent>,
563    ) {
564        match event {
565            libp2p::swarm::SwarmEvent::NewListenAddr { address, .. } => {
566                info!(%address, "listening");
567                // Phase D: a relay-circuit address is a reachability
568                // milestone — surface it as its own event so the lobby
569                // can show "reachable via N relays" status.
570                use libp2p::multiaddr::Protocol;
571                let is_circuit = address
572                    .iter()
573                    .any(|p| matches!(p, Protocol::P2pCircuit));
574                if is_circuit {
575                    let _ = self
576                        .event_tx
577                        .send(NetworkEvent::RelayReservationEstablished {
578                            address: address.clone(),
579                        })
580                        .await;
581                }
582                let _ = self
583                    .event_tx
584                    .send(NetworkEvent::ListeningOn { address })
585                    .await;
586            }
587            libp2p::swarm::SwarmEvent::ConnectionEstablished {
588                peer_id,
589                connection_id,
590                endpoint,
591                ..
592            } => {
593                if let Some(addr) = self.dial_attempts.remove(&connection_id) {
594                    // Phase D: a connection that was for a configured
595                    // relay shouldn't pollute the lobby with a normal
596                    // DialSucceeded — relays aren't chat peers. We just
597                    // remember the peer_id and wait for Identify, then
598                    // register a /p2p-circuit reservation.
599                    let is_relay = self.configured_relays.iter().any(|r| r == &addr);
600                    if is_relay {
601                        info!(%peer_id, %addr, "connected to configured relay");
602                        self.relay_peer_ids.insert(peer_id);
603                    } else {
604                        info!(%peer_id, %addr, "user-dialed peer connected");
605                        // Treat dialed peers like mDNS-discovered: add
606                        // to gossipsub explicit peers so room
607                        // announcements flow.
608                        self.swarm
609                            .behaviour_mut()
610                            .gossipsub
611                            .add_explicit_peer(&peer_id);
612                        self.discovered_peers.insert(peer_id);
613                        let _ = self
614                            .event_tx
615                            .send(NetworkEvent::DialSucceeded {
616                                peer_id,
617                                address: addr,
618                            })
619                            .await;
620                    }
621                } else if let ConnectedPoint::Dialer { .. } = endpoint {
622                    // Outgoing connection we didn't track (e.g. mDNS auto-dial)
623                    // — still add to mesh; no user-visible event needed.
624                    self.swarm
625                        .behaviour_mut()
626                        .gossipsub
627                        .add_explicit_peer(&peer_id);
628                } else {
629                    // Inbound dial from an unknown peer (Phase A). We
630                    // hold the connection but do NOT add them to the
631                    // explicit-peer set yet — wait for Identify so we
632                    // can show the user the peer's fingerprint, then
633                    // either AcceptInbound (promote to mesh) or
634                    // RejectInbound (disconnect + persist blocklist).
635                    //
636                    // Known limitation: gossipsub's score-based mesh
637                    // formation may still forward topic messages to
638                    // this peer via other peers we have in common.
639                    // True hard-quarantine would need a custom
640                    // ConnectionHandler — out of scope for v1.
641                    if self.session_blocklist.contains(&peer_id) {
642                        info!(%peer_id, "rejecting inbound from session-blocked peer");
643                        let _ = self.swarm.disconnect_peer_id(peer_id);
644                    } else {
645                        let address = match &endpoint {
646                            ConnectedPoint::Listener { send_back_addr, .. } => {
647                                send_back_addr.clone()
648                            }
649                            _ => Multiaddr::empty(),
650                        };
651                        debug!(%peer_id, %address, "inbound peer pending decision");
652                        self.pending_inbound
653                            .insert(peer_id, PendingPeer::InboundUnknown { address });
654                    }
655                }
656            }
657            libp2p::swarm::SwarmEvent::OutgoingConnectionError {
658                connection_id,
659                error,
660                ..
661            } => {
662                if let Some(addr) = self.dial_attempts.remove(&connection_id) {
663                    warn!(%addr, %error, "user-dialed peer failed");
664                    let _ = self
665                        .event_tx
666                        .send(NetworkEvent::DialFailed {
667                            address: addr,
668                            error: error.to_string(),
669                        })
670                        .await;
671                }
672            }
673            libp2p::swarm::SwarmEvent::ConnectionClosed {
674                peer_id,
675                num_established,
676                ..
677            } => {
678                // Drop any pending-inbound entry for this peer — they
679                // disconnected before we could prompt the user (or
680                // before the user accepted). Lets a re-connect start
681                // fresh rather than reusing stale state.
682                self.pending_inbound.remove(&peer_id);
683                // huddle 0.7.11: emit PeerDisconnected so the app can
684                // clean its `connected_dial_addrs` map. Only fire when
685                // the LAST connection to that peer closed — multiple
686                // simultaneous connections (e.g. direct + relay) would
687                // otherwise emit spurious disconnects when one of the
688                // two drops.
689                if num_established == 0 {
690                    let _ = self
691                        .event_tx
692                        .send(NetworkEvent::PeerDisconnected { peer_id })
693                        .await;
694                    // Also clean up gossipsub's explicit-peers set so
695                    // it doesn't accumulate dead PeerIds across the
696                    // session.
697                    self.swarm
698                        .behaviour_mut()
699                        .gossipsub
700                        .remove_explicit_peer(&peer_id);
701                }
702            }
703            libp2p::swarm::SwarmEvent::Behaviour(be) => self.handle_behavior_event(be).await,
704            _ => {}
705        }
706    }
707
708    async fn handle_behavior_event(&mut self, event: HuddleBehaviorEvent) {
709        match event {
710            HuddleBehaviorEvent::Mdns(mdns::Event::Discovered(peers)) => {
711                for (peer_id, addr) in peers {
712                    if self.discovered_peers.insert(peer_id) {
713                        info!(%peer_id, %addr, "mDNS discovered");
714                        self.swarm.add_peer_address(peer_id, addr);
715                        // Explicitly add to gossipsub mesh.
716                        self.swarm
717                            .behaviour_mut()
718                            .gossipsub
719                            .add_explicit_peer(&peer_id);
720                        let _ = self
721                            .event_tx
722                            .send(NetworkEvent::PeerDiscovered { peer_id })
723                            .await;
724                    }
725                }
726            }
727            HuddleBehaviorEvent::Mdns(mdns::Event::Expired(peers)) => {
728                for (peer_id, _) in peers {
729                    if self.discovered_peers.remove(&peer_id) {
730                        info!(%peer_id, "mDNS peer expired");
731                        self.swarm
732                            .behaviour_mut()
733                            .gossipsub
734                            .remove_explicit_peer(&peer_id);
735                        let _ = self.event_tx.send(NetworkEvent::PeerExpired { peer_id }).await;
736                    }
737                }
738            }
739            HuddleBehaviorEvent::Gossipsub(gossipsub::Event::Message {
740                propagation_source,
741                message,
742                ..
743            }) => {
744                self.handle_gossipsub_message(propagation_source, message).await;
745            }
746            HuddleBehaviorEvent::Identify(identify::Event::Received {
747                peer_id, info, ..
748            }) => {
749                debug!(%peer_id, agent = %info.agent_version, "identify received");
750                // Phase D: if this peer is a configured relay, register
751                // a `/p2p-circuit` reservation on first identify. Idem-
752                // potent — only fire if we haven't listened on this
753                // relay already (identify fires periodically).
754                if self.relay_peer_ids.contains(&peer_id) {
755                    use libp2p::multiaddr::Protocol;
756                    if let Some(relay_addr) = self
757                        .configured_relays
758                        .iter()
759                        .find(|a| {
760                            // Match by /p2p/<peer-id> suffix when
761                            // present, else by the addr we dialed.
762                            a.iter().any(|p| matches!(p, Protocol::P2p(pid) if pid == peer_id))
763                                || self.dial_attempts.values().any(|d| d == *a)
764                        })
765                        .cloned()
766                    {
767                        let circuit = relay_addr.with(Protocol::P2pCircuit);
768                        match self.swarm.listen_on(circuit.clone()) {
769                            Ok(_) => info!(%circuit, "listening on relay circuit"),
770                            Err(e) => warn!(%e, %circuit, "relay listen_on failed"),
771                        }
772                    }
773                }
774                // Decode the remote's Ed25519 pubkey and derive our
775                // 24-char fingerprint from it. Non-Ed25519 keys (Secp,
776                // Rsa, Ecdsa) shouldn't appear in practice — huddle
777                // only generates Ed25519 identities — so we just log
778                // and skip if the cast fails.
779                let fingerprint = match info.public_key.clone().try_into_ed25519() {
780                    Ok(ed_pk) => {
781                        let bytes = ed_pk.to_bytes();
782                        compute_fingerprint(&bytes)
783                    }
784                    Err(_) => {
785                        warn!(%peer_id, "identify pubkey isn't Ed25519; skipping fingerprint");
786                        return;
787                    }
788                };
789                // Always notify the app layer so it can populate
790                // `known_peers.fingerprint` and detect that an
791                // outbound peer we dialed has fully identified.
792                let _ = self
793                    .event_tx
794                    .send(NetworkEvent::PeerIdentified {
795                        peer_id,
796                        fingerprint: fingerprint.clone(),
797                    })
798                    .await;
799                // If the peer is in pending_inbound, Identify completing
800                // is the cue to surface the user prompt. Keep them in
801                // pending_inbound until Accept or Reject — we don't
802                // know yet which way the user will decide.
803                if let Some(PendingPeer::InboundUnknown { address }) =
804                    self.pending_inbound.get(&peer_id)
805                {
806                    let address = address.clone();
807                    let _ = self
808                        .event_tx
809                        .send(NetworkEvent::InboundDial {
810                            peer_id,
811                            fingerprint,
812                            address,
813                        })
814                        .await;
815                }
816            }
817            HuddleBehaviorEvent::AutonatClient(ev) => {
818                // One probe per address candidate. `result.is_ok()`
819                // means a remote AutoNAT server dialed us back
820                // successfully on `tested_addr` ⇒ this address is
821                // reachable from the outside. The app layer aggregates
822                // these into the lobby reachability badge.
823                let reachable = ev.result.is_ok();
824                if reachable {
825                    info!(tested = %ev.tested_addr, server = %ev.server, "AutoNAT: reachable");
826                } else {
827                    debug!(tested = %ev.tested_addr, server = %ev.server, "AutoNAT: probe failed");
828                }
829                let _ = self
830                    .event_tx
831                    .send(NetworkEvent::NatProbeResult {
832                        tested_addr: ev.tested_addr,
833                        reachable,
834                    })
835                    .await;
836            }
837            HuddleBehaviorEvent::AutonatServer(_) => {
838                // We answered another peer's reachability probe.
839                // No app-visible action.
840            }
841            HuddleBehaviorEvent::Dcutr(ev) => {
842                let success = ev.result.is_ok();
843                if success {
844                    info!(remote = %ev.remote_peer_id, "DCUtR: direct connection established");
845                    // huddle 0.7.11: a successful hole-punch resets the
846                    // per-peer DCUtR retry counter so future failures
847                    // don't immediately bail out.
848                    self.dcutr_failures.remove(&ev.remote_peer_id);
849                } else {
850                    debug!(remote = %ev.remote_peer_id, "DCUtR: hole-punch failed");
851                    let count = self
852                        .dcutr_failures
853                        .entry(ev.remote_peer_id)
854                        .and_modify(|n| *n += 1)
855                        .or_insert(1);
856                    if *count >= DCUTR_FAIL_BUDGET {
857                        warn!(
858                            remote = %ev.remote_peer_id,
859                            attempts = *count,
860                            "DCUtR: giving up after repeated failures (symmetric NAT likely)"
861                        );
862                    }
863                }
864                let _ = self
865                    .event_tx
866                    .send(NetworkEvent::DcutrUpgrade {
867                        remote_peer: ev.remote_peer_id,
868                        success,
869                    })
870                    .await;
871            }
872            HuddleBehaviorEvent::RelayClient(event) => {
873                // huddle 0.7.12: libp2p 0.56's `relay::client::Event`
874                // only exposes the success-path variants
875                // (`ReservationReqAccepted`, `OutboundCircuitEstablished`,
876                // `InboundCircuitEstablished`). There's no
877                // `ReservationReqFailed` arm we can match on, so we
878                // can't reliably surface reservation loss without a
879                // separate health-check timer (future work). For now
880                // we log every event so operators can see the
881                // lifecycle in `huddle.log` — pre-0.7.11 the whole
882                // arm was swallowed by `_ => {}` which hid even that.
883                use libp2p::relay::client::Event as Rc;
884                match event {
885                    Rc::ReservationReqAccepted { relay_peer_id, .. } => {
886                        info!(%relay_peer_id, "relay: reservation accepted");
887                    }
888                    Rc::OutboundCircuitEstablished { relay_peer_id, .. } => {
889                        debug!(%relay_peer_id, "relay: outbound circuit established");
890                    }
891                    other => {
892                        debug!(?other, "relay client event");
893                    }
894                }
895            }
896            _ => {}
897        }
898    }
899
900    async fn handle_gossipsub_message(
901        &mut self,
902        from_peer: PeerId,
903        message: gossipsub::Message,
904    ) {
905        let topic = message.topic.to_string();
906        if topic == ROOMS_TOPIC {
907            match serde_json::from_slice::<RoomAnnouncement>(&message.data) {
908                Ok(ann) => {
909                    let _ = self
910                        .event_tx
911                        .send(NetworkEvent::RoomAnnouncementReceived(ann))
912                        .await;
913                }
914                Err(e) => {
915                    warn!(%e, "bad room announcement");
916                }
917            }
918        } else if let Some(room_id) = topic.strip_prefix(protocol::ROOM_TOPIC_PREFIX) {
919            let _ = self
920                .event_tx
921                .send(NetworkEvent::RoomMessageReceived {
922                    room_id: room_id.to_string(),
923                    payload: message.data,
924                    from_peer,
925                })
926                .await;
927        }
928    }
929
930    fn handle_command(&mut self, cmd: NetworkCommand) {
931        match cmd {
932            NetworkCommand::SubscribeRoom { room_id } => {
933                let topic = gossipsub::IdentTopic::new(room_topic(&room_id));
934                if let Err(e) = self.swarm.behaviour_mut().gossipsub.subscribe(&topic) {
935                    warn!(%e, %room_id, "subscribe room failed");
936                }
937            }
938            NetworkCommand::UnsubscribeRoom { room_id } => {
939                let topic = gossipsub::IdentTopic::new(room_topic(&room_id));
940                self.swarm.behaviour_mut().gossipsub.unsubscribe(&topic);
941            }
942            NetworkCommand::PublishRoomMessage { room_id, payload } => {
943                let topic = gossipsub::IdentTopic::new(room_topic(&room_id));
944                if let Err(e) = self.swarm.behaviour_mut().gossipsub.publish(topic, payload) {
945                    // No subscribed peers is expected before the mesh
946                    // forms; anything else (MessageTooLarge, full queues)
947                    // is a real bug worth surfacing.
948                    match e {
949                        gossipsub::PublishError::NoPeersSubscribedToTopic => {
950                            debug!(%room_id, "publish skipped: no peers subscribed to topic yet");
951                        }
952                        e => warn!(%e, %room_id, "publish room message failed"),
953                    }
954                }
955            }
956            NetworkCommand::AnnounceRoom(ann) => {
957                let topic = gossipsub::IdentTopic::new(ROOMS_TOPIC);
958                match serde_json::to_vec(&ann) {
959                    Ok(payload) => {
960                        if let Err(e) =
961                            self.swarm.behaviour_mut().gossipsub.publish(topic, payload)
962                        {
963                            debug!(%e, "publish room announcement failed");
964                        }
965                    }
966                    Err(e) => warn!(%e, "encode room announcement"),
967                }
968            }
969            NetworkCommand::AcceptInbound { peer_id } => {
970                if self.pending_inbound.remove(&peer_id).is_some() {
971                    info!(%peer_id, "inbound dial accepted — promoting to mesh");
972                    self.swarm
973                        .behaviour_mut()
974                        .gossipsub
975                        .add_explicit_peer(&peer_id);
976                    self.discovered_peers.insert(peer_id);
977                } else {
978                    debug!(%peer_id, "AcceptInbound for unknown peer (already promoted or disconnected)");
979                }
980            }
981            NetworkCommand::RejectInbound { peer_id } => {
982                self.pending_inbound.remove(&peer_id);
983                self.session_blocklist.insert(peer_id);
984                info!(%peer_id, "inbound dial rejected — disconnecting");
985                let _ = self.swarm.disconnect_peer_id(peer_id);
986            }
987            NetworkCommand::DisconnectPeer { peer_id } => {
988                info!(%peer_id, "app-level identity check failed — disconnecting");
989                let _ = self.swarm.disconnect_peer_id(peer_id);
990            }
991            NetworkCommand::Dial { address } => {
992                let opts: DialOpts = address.clone().into();
993                let conn_id = opts.connection_id();
994                match self.swarm.dial(opts) {
995                    Ok(()) => {
996                        self.dial_attempts.insert(conn_id, address);
997                    }
998                    Err(e) => {
999                        // Synchronous dial error (bad multiaddr, transport refused).
1000                        let tx = self.event_tx.clone();
1001                        let err = e.to_string();
1002                        tokio::spawn(async move {
1003                            let _ = tx
1004                                .send(NetworkEvent::DialFailed {
1005                                    address,
1006                                    error: err,
1007                                })
1008                                .await;
1009                        });
1010                    }
1011                }
1012            }
1013            NetworkCommand::DialAddresses { addresses } => {
1014                use libp2p::multiaddr::Protocol;
1015                if addresses.is_empty() {
1016                    return;
1017                }
1018                // Extract the shared peer-id from any address with a
1019                // `/p2p/<peer-id>` suffix. All host_addrs originating
1020                // from the same `RoomAnnouncement` share the
1021                // announcer's peer-id, so the first match is enough.
1022                let peer_id = addresses
1023                    .iter()
1024                    .flat_map(|a| a.iter())
1025                    .find_map(|p| match p {
1026                        Protocol::P2p(pid) => Some(pid),
1027                        _ => None,
1028                    });
1029                let opts = match peer_id {
1030                    Some(pid) => DialOpts::peer_id(pid)
1031                        .addresses(addresses.clone())
1032                        .build(),
1033                    // No /p2p/ segment anywhere — fall back to single-
1034                    // address dial of the first candidate, matching the
1035                    // legacy `Dial` semantics for unanchored multiaddrs.
1036                    None => addresses[0].clone().into(),
1037                };
1038                let conn_id = opts.connection_id();
1039                // Use the first address as the representative for
1040                // dial_attempts. On synchronous error we report it;
1041                // on async success the post-identify handler upserts
1042                // with the actually-connected endpoint.
1043                let primary = addresses[0].clone();
1044                match self.swarm.dial(opts) {
1045                    Ok(()) => {
1046                        self.dial_attempts.insert(conn_id, primary);
1047                    }
1048                    Err(e) => {
1049                        let tx = self.event_tx.clone();
1050                        let err = e.to_string();
1051                        tokio::spawn(async move {
1052                            let _ = tx
1053                                .send(NetworkEvent::DialFailed {
1054                                    address: primary,
1055                                    error: err,
1056                                })
1057                                .await;
1058                        });
1059                    }
1060                }
1061            }
1062            NetworkCommand::Shutdown => unreachable!(),
1063        }
1064    }
1065}