Skip to main content

huddle_core/network/
mod.rs

1pub mod behavior;
2pub mod events;
3pub mod protocol;
4pub mod server;
5pub mod transport;
6
7use std::collections::{HashMap, HashSet};
8use std::sync::Arc;
9use std::time::Duration;
10
11use futures::StreamExt;
12use sha2::{Digest, Sha256};
13use libp2p::core::ConnectedPoint;
14use libp2p::swarm::dial_opts::DialOpts;
15use libp2p::swarm::ConnectionId;
16use libp2p::{
17    autonat, dcutr, gossipsub, identify, mdns, noise, ping, tcp, yamux, Multiaddr, PeerId, Swarm,
18    SwarmBuilder,
19};
20use tokio::sync::mpsc;
21use tracing::{debug, info, warn};
22
23/// How the network discovers peers.
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub enum NetworkMode {
26    /// huddle 0.8 default: no libp2p at all. The only transport is the
27    /// centralized Tor-onion relay (see `network::server`). No LAN
28    /// discovery, no direct dial, no NAT machinery — every message rides
29    /// the onion. This is what the binary starts in unless the user
30    /// explicitly opts back into libp2p with `--mode mdns|direct`.
31    Server,
32    /// libp2p with mDNS on: announce ourselves on the LAN and pick up
33    /// announcements. Opt-in (`--mode mdns`); runs *alongside* the onion
34    /// relay so LAN peers connect directly while internet peers route
35    /// through the server.
36    Mdns,
37    /// libp2p with mDNS off: invisible to LAN discovery; the only libp2p
38    /// way to connect is an explicit dial. Opt-in (`--mode direct`); also
39    /// runs alongside the onion relay.
40    Direct,
41}
42
43impl NetworkMode {
44    pub fn as_str(&self) -> &'static str {
45        match self {
46            NetworkMode::Server => "server",
47            NetworkMode::Mdns => "mdns",
48            NetworkMode::Direct => "direct",
49        }
50    }
51
52    /// True when this mode starts a libp2p swarm (LAN / direct dial). The
53    /// `Server` default does not — it's onion-relay-only.
54    pub fn uses_libp2p(&self) -> bool {
55        !matches!(self, NetworkMode::Server)
56    }
57
58    pub fn from_str(s: &str) -> Option<Self> {
59        match s.trim().to_ascii_lowercase().as_str() {
60            "server" | "tor" | "onion" => Some(NetworkMode::Server),
61            "mdns" | "lan" | "open" => Some(NetworkMode::Mdns),
62            "direct" | "dial" | "private" => Some(NetworkMode::Direct),
63            _ => None,
64        }
65    }
66}
67
68use crate::identity::{compute_fingerprint, Identity};
69use crate::network::behavior::{HuddleBehavior, HuddleBehaviorEvent};
70use crate::network::events::NetworkEvent;
71use crate::network::protocol::{room_topic, RoomAnnouncement, ROOMS_TOPIC};
72
73#[derive(Debug)]
74pub enum NetworkCommand {
75    /// Subscribe to a room's per-room gossipsub topic.
76    SubscribeRoom { room_id: String },
77    /// Unsubscribe from a room's topic.
78    UnsubscribeRoom { room_id: String },
79    /// Publish a JSON-encoded `RoomMessage` to a room's topic.
80    PublishRoomMessage { room_id: String, payload: Vec<u8> },
81    /// Publish a room announcement on the global rooms topic.
82    AnnounceRoom(RoomAnnouncement),
83    /// User-initiated dial of an explicit address. Used for cross-network
84    /// reach when mDNS isn't enough.
85    Dial { address: Multiaddr },
86    /// huddle 0.5.2: dial a peer using multiple candidate addresses,
87    /// letting libp2p race them in parallel. Used by the "add by HD
88    /// ID / username" flow, which resolves a fingerprint to every
89    /// address we know for that peer (room announcement `host_addrs`
90    /// + persisted `known_peers`). libp2p's parallel dialer picks
91    /// the cheapest path that completes — LAN beats public IP beats
92    /// relay-hopped without us having to probe transports manually.
93    DialAddresses { addresses: Vec<Multiaddr> },
94    /// Phase A: user accepted an inbound dial — promote the peer to
95    /// explicit-peer status so room announcements flow.
96    AcceptInbound { peer_id: PeerId },
97    /// Phase A: user rejected an inbound dial — disconnect them and
98    /// add the peer_id to the in-memory blocklist for this session
99    /// (caller is responsible for the persistent blocked_peers row).
100    RejectInbound { peer_id: PeerId },
101    /// Phase C follow-up: drop a connection that failed an
102    /// application-level identity check (e.g. invite-fingerprint
103    /// mismatch). Differs from `RejectInbound` in that it doesn't
104    /// touch the inbound-pending map (the connection is already
105    /// past Identify when we discover the mismatch) and doesn't
106    /// persist a block — the caller may want to retry with a
107    /// corrected invite.
108    DisconnectPeer { peer_id: PeerId },
109    Shutdown,
110}
111
112#[derive(Clone)]
113pub struct NetworkHandle {
114    cmd_tx: mpsc::Sender<NetworkCommand>,
115    /// huddle 0.8: optional connection to the centralized server (a Tor
116    /// onion relay). When attached, every room subscribe/unsubscribe and
117    /// every published wire message is mirrored to it, so peers we can't
118    /// reach over libp2p still receive our traffic (and we theirs, via the
119    /// server's offline mailbox). `None` until `attach_server` runs.
120    server: Arc<std::sync::Mutex<Option<server::ServerClient>>>,
121    /// huddle 1.2: map of `room_id -> partner fingerprint` for 1:1 DM rooms.
122    /// When a room is registered here, relay traffic for it is delivered by
123    /// recipient fingerprint (`ServerClient::send_direct`) instead of by room
124    /// membership fan-out — so DMs reach the partner reliably (live or via
125    /// their offline mailbox) without both sides having subscribed the same
126    /// room on the relay. Group rooms are absent from this map and keep using
127    /// membership fan-out (`publish`). Populated by the app from
128    /// `start_direct` / `bootstrap_direct_room` / `add_contact`.
129    dm_partners: Arc<std::sync::Mutex<HashMap<String, String>>>,
130}
131
132/// Stable per-message id for the server mailbox/receipts: a short hash of
133/// the wire bytes. Opaque to the server; lets it dedup/queue by id.
134fn server_msg_id(payload: &[u8]) -> String {
135    let mut h = Sha256::new();
136    h.update(payload);
137    hex::encode(&h.finalize()[..8])
138}
139
140impl NetworkHandle {
141    /// Attach a live server connection so subsequent room traffic mirrors
142    /// to it. Replaces any previous one. Called by the app's server
143    /// connection task after each (re)connect.
144    pub fn attach_server(&self, client: server::ServerClient) {
145        *self.server.lock().unwrap() = Some(client);
146    }
147
148    /// Drop the server connection (e.g. after a disconnect) so we stop
149    /// mirroring until it reconnects.
150    pub fn detach_server(&self) {
151        *self.server.lock().unwrap() = None;
152    }
153
154    /// Snapshot the attached server client, if any. Cloned out so we never
155    /// hold the lock across the (sync, non-blocking) mirror call.
156    fn server_client(&self) -> Option<server::ServerClient> {
157        self.server.lock().unwrap().clone()
158    }
159
160    pub async fn subscribe_room(&self, room_id: String) {
161        if let Some(s) = self.server_client() {
162            let _ = s.subscribe(&room_id);
163        }
164        let _ = self
165            .cmd_tx
166            .send(NetworkCommand::SubscribeRoom { room_id })
167            .await;
168    }
169
170    pub async fn unsubscribe_room(&self, room_id: String) {
171        if let Some(s) = self.server_client() {
172            let _ = s.unsubscribe(&room_id);
173        }
174        let _ = self
175            .cmd_tx
176            .send(NetworkCommand::UnsubscribeRoom { room_id })
177            .await;
178    }
179
180    /// huddle 1.2: register a 1:1 DM room so its relay traffic is delivered by
181    /// the partner's fingerprint instead of by room-membership fan-out. Safe
182    /// to call repeatedly (idempotent). Never registers an empty partner.
183    pub fn register_dm(&self, room_id: String, partner_fingerprint: String) {
184        if partner_fingerprint.is_empty() {
185            return;
186        }
187        self.dm_partners
188            .lock()
189            .unwrap()
190            .insert(room_id, partner_fingerprint);
191    }
192
193    /// huddle 1.2: forget a DM room's direct-delivery mapping (e.g. on leave).
194    pub fn forget_dm(&self, room_id: &str) {
195        self.dm_partners.lock().unwrap().remove(room_id);
196    }
197
198    fn dm_partner(&self, room_id: &str) -> Option<String> {
199        self.dm_partners.lock().unwrap().get(room_id).cloned()
200    }
201
202    pub async fn publish_room_message(&self, room_id: String, payload: Vec<u8>) {
203        if let Some(s) = self.server_client() {
204            // huddle 1.2: a registered DM room delivers straight to the
205            // partner's fingerprint (reliable even if they never subscribed
206            // this room on the relay); a group room fans out by membership.
207            match self.dm_partner(&room_id) {
208                Some(to) => {
209                    let _ = s.send_direct(&to, &room_id, &server_msg_id(&payload), &payload);
210                }
211                None => {
212                    let _ = s.publish(&room_id, &server_msg_id(&payload), &payload);
213                }
214            }
215        }
216        let _ = self
217            .cmd_tx
218            .send(NetworkCommand::PublishRoomMessage { room_id, payload })
219            .await;
220    }
221
222    /// huddle 1.2: deliver `payload` straight to `to`'s fingerprint over the
223    /// relay (independent of room membership), and also ride libp2p gossipsub
224    /// on the `room_id` topic for LAN reachability. Used for friend requests
225    /// (delivered to the target's inbox tag) and any other point-to-point
226    /// control message where the recipient is known.
227    pub async fn publish_direct(&self, to: String, room_id: String, payload: Vec<u8>) {
228        if let Some(s) = self.server_client() {
229            let _ = s.send_direct(&to, &room_id, &server_msg_id(&payload), &payload);
230        }
231        let _ = self
232            .cmd_tx
233            .send(NetworkCommand::PublishRoomMessage { room_id, payload })
234            .await;
235    }
236
237    pub async fn announce_room(&self, ann: RoomAnnouncement) {
238        let _ = self.cmd_tx.send(NetworkCommand::AnnounceRoom(ann)).await;
239    }
240
241    /// True when a server connection is currently attached.
242    pub fn has_server(&self) -> bool {
243        self.server.lock().unwrap().is_some()
244    }
245
246    pub async fn dial(&self, address: Multiaddr) {
247        let _ = self.cmd_tx.send(NetworkCommand::Dial { address }).await;
248    }
249
250    /// huddle 0.5.2: dial a peer with multiple candidate addresses
251    /// at once. libp2p's swarm races them; the first to complete a
252    /// handshake wins, the rest are dropped. Use when you've resolved
253    /// a peer fingerprint to several known transports — typically
254    /// LAN ip4 + public ip4 + relay circuit.
255    pub async fn dial_addresses(&self, addresses: Vec<Multiaddr>) {
256        let _ = self
257            .cmd_tx
258            .send(NetworkCommand::DialAddresses { addresses })
259            .await;
260    }
261
262    pub async fn accept_inbound(&self, peer_id: PeerId) {
263        let _ = self
264            .cmd_tx
265            .send(NetworkCommand::AcceptInbound { peer_id })
266            .await;
267    }
268
269    pub async fn reject_inbound(&self, peer_id: PeerId) {
270        let _ = self
271            .cmd_tx
272            .send(NetworkCommand::RejectInbound { peer_id })
273            .await;
274    }
275
276    pub async fn disconnect_peer(&self, peer_id: PeerId) {
277        let _ = self
278            .cmd_tx
279            .send(NetworkCommand::DisconnectPeer { peer_id })
280            .await;
281    }
282
283    pub async fn shutdown(&self) {
284        let _ = self.cmd_tx.send(NetworkCommand::Shutdown).await;
285    }
286}
287
288/// What kind of connection we're holding open until Identify gives us
289/// the remote's Ed25519 fingerprint. Quarantined inbound dials are NOT
290/// added to the gossipsub mesh until the user accepts; outbound user-
291/// dials add to the mesh on connection but still wait on Identify so
292/// the eventual `DialSucceeded` can carry the fingerprint.
293#[derive(Debug)]
294enum PendingPeer {
295    /// Inbound dial from an unknown peer — modal-pending in the app.
296    InboundUnknown { address: Multiaddr },
297}
298
299struct NetworkTask {
300    swarm: Swarm<HuddleBehavior>,
301    cmd_rx: mpsc::Receiver<NetworkCommand>,
302    event_tx: mpsc::Sender<NetworkEvent>,
303    discovered_peers: HashSet<PeerId>,
304    /// Tracks user-initiated dials so we can correlate the eventual
305    /// `ConnectionEstablished` / `OutgoingConnectionError` back to a
306    /// specific address the user asked us to dial.
307    dial_attempts: HashMap<ConnectionId, Multiaddr>,
308    /// Phase A: peers connected but not yet promoted to the gossipsub
309    /// mesh (inbound) — waiting either for `Identify` to land so we
310    /// know their fingerprint, or for the user's accept/reject decision.
311    /// On `ConnectionClosed` we drop entries here so a peer disconnecting
312    /// mid-prompt doesn't leak state.
313    pending_inbound: HashMap<PeerId, PendingPeer>,
314    /// Phase A: peers the user has explicitly rejected this session —
315    /// auto-disconnect every reconnect attempt without re-prompting.
316    /// Persistent across runs via `blocked_peers`; this in-memory copy
317    /// is loaded at startup (TODO: not yet wired; falls back to the DB
318    /// check in the app layer for now) and updated on `RejectInbound`.
319    session_blocklist: HashSet<PeerId>,
320    /// Phase D: configured relay multiaddrs. When Identify lands for a
321    /// peer whose multiaddr matches one of these, we call
322    /// `listen_on("<addr>/p2p-circuit")` to register a reservation.
323    /// Tracked as multiaddr strings (no PeerId yet — we don't know it
324    /// until Identify) plus a set of peer_ids of confirmed relays so
325    /// we only register once per relay.
326    configured_relays: Vec<Multiaddr>,
327    relay_peer_ids: HashSet<PeerId>,
328    /// huddle 0.7.11: per-peer DCUtR failure counter. Logged at warn
329    /// once the count crosses `DCUTR_FAIL_BUDGET` so symmetric-NAT
330    /// pairs don't generate runaway hole-punch attempts in the logs.
331    /// libp2p's dcutr behavior schedules its own retries internally;
332    /// this is purely an observability signal for the app.
333    dcutr_failures: HashMap<PeerId, u32>,
334}
335
336/// huddle 0.7.11: warn after this many failed DCUtR attempts to the
337/// same peer. dcutr keeps trying internally, but the audit flagged the
338/// spam as a real issue — capping the log noise is the realistic fix.
339const DCUTR_FAIL_BUDGET: u32 = 6;
340
341pub fn start_network(
342    identity: &Identity,
343    event_tx: mpsc::Sender<NetworkEvent>,
344) -> crate::error::Result<NetworkHandle> {
345    start_network_with(identity, event_tx, NetworkMode::Mdns, 0, Vec::new())
346}
347
348/// huddle 0.8: build a `NetworkHandle` with **no libp2p swarm** — the
349/// onion-relay-only (`NetworkMode::Server`) path. We still hand back a
350/// fully-functional handle so the rest of the app is oblivious: room
351/// subscribe/unsubscribe/publish mirror to the attached `ServerClient`
352/// exactly as before, while the libp2p-specific commands (dial, announce,
353/// accept/reject, …) are simply drained and dropped. A tiny task consumes
354/// `cmd_rx` so the bounded channel never fills and `send().await` never
355/// blocks; it exits on `Shutdown`.
356pub fn start_network_disabled() -> NetworkHandle {
357    let (cmd_tx, mut cmd_rx) = mpsc::channel::<NetworkCommand>(256);
358    tokio::spawn(async move {
359        while let Some(cmd) = cmd_rx.recv().await {
360            if matches!(cmd, NetworkCommand::Shutdown) {
361                info!("network task (libp2p-disabled) shutting down");
362                break;
363            }
364            // Everything else is a libp2p op with no swarm to run it; the
365            // server mirror (in NetworkHandle) already handled the room
366            // traffic before the command reached us. Drop it.
367        }
368    });
369    NetworkHandle {
370        cmd_tx,
371        server: Arc::new(std::sync::Mutex::new(None)),
372        dm_partners: Arc::new(std::sync::Mutex::new(HashMap::new())),
373    }
374}
375
376/// Start the network task with explicit mode, TCP listen port, and any
377/// pre-configured relay multiaddrs. `listen_port = 0` requests a
378/// random port. Relays are dialed on startup; once `Identify` lands
379/// from a relay peer, we call `listen_on("<relay>/p2p-circuit")` to
380/// register a reservation so peers behind NAT can dial us through it.
381pub fn start_network_with(
382    identity: &Identity,
383    event_tx: mpsc::Sender<NetworkEvent>,
384    mode: NetworkMode,
385    listen_port: u16,
386    relays: Vec<Multiaddr>,
387) -> crate::error::Result<NetworkHandle> {
388    let keypair = identity.keypair().clone();
389    let local_peer_id = identity.peer_id();
390
391    let mut swarm = SwarmBuilder::with_existing_identity(keypair)
392        .with_tokio()
393        .with_tcp(
394            tcp::Config::default(),
395            noise::Config::new,
396            yamux::Config::default,
397        )
398        .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?
399        // Phase D: wrap the transport with relay-client. This both
400        // composes the transport (so we can dial `/p2p-circuit/`
401        // addresses) and surfaces a `relay::client::Behaviour` into
402        // the `with_behaviour` closure as the second argument.
403        .with_relay_client(noise::Config::new, yamux::Config::default)
404        .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?
405        .with_behaviour(|key, relay_client| {
406            let mdns_opt = match mode {
407                NetworkMode::Mdns => Some(
408                    mdns::tokio::Behaviour::new(mdns::Config::default(), local_peer_id)
409                        .expect("mDNS init failed"),
410                ),
411                // `Direct` runs libp2p without mDNS. `Server` never reaches
412                // here (it takes the `start_network_disabled` path and runs
413                // no swarm), but the match must be exhaustive.
414                NetworkMode::Direct | NetworkMode::Server => None,
415            };
416            let mdns: libp2p::swarm::behaviour::toggle::Toggle<_> = mdns_opt.into();
417
418            let identify = identify::Behaviour::new(
419                identify::Config::new("/huddle/1.0.0".into(), key.public())
420                    .with_agent_version("huddle/0.5".into()),
421            );
422
423            let ping = ping::Behaviour::default();
424
425            let gossipsub_config = gossipsub::ConfigBuilder::default()
426                .heartbeat_interval(Duration::from_secs(1))
427                .validation_mode(gossipsub::ValidationMode::Strict)
428                // Default is 64 KiB. Raise it so base64-encoded file
429                // chunks (see files::CHUNK_SIZE) keep ample headroom.
430                .max_transmit_size(256 * 1024)
431                .build()
432                .expect("valid gossipsub config");
433
434            let mut gossipsub = gossipsub::Behaviour::new(
435                gossipsub::MessageAuthenticity::Signed(key.clone()),
436                gossipsub_config,
437            )
438            .expect("valid gossipsub init");
439
440            // Every node subscribes to the global rooms topic so the lobby
441            // shows discovered rooms even before joining anything.
442            let rooms_topic = gossipsub::IdentTopic::new(ROOMS_TOPIC);
443            gossipsub
444                .subscribe(&rooms_topic)
445                .expect("subscribe rooms topic");
446
447            // AutoNAT v2: client probes external addresses by asking
448            // remote servers to dial us back; server answers other
449            // peers' probes. Both halves are needed for symmetric
450            // P2P reachability detection (v1 combined them; v2 split).
451            let autonat_client = autonat::v2::client::Behaviour::new(
452                rand::rngs::OsRng,
453                autonat::v2::client::Config::default(),
454            );
455            let autonat_server = autonat::v2::server::Behaviour::new(rand::rngs::OsRng);
456            let dcutr = dcutr::Behaviour::new(local_peer_id);
457
458            HuddleBehavior {
459                mdns,
460                identify,
461                ping,
462                gossipsub,
463                relay_client,
464                autonat_client,
465                autonat_server,
466                dcutr,
467            }
468        })
469        .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?
470        .with_swarm_config(|cfg| cfg.with_idle_connection_timeout(Duration::from_secs(120)))
471        .build();
472
473    let listen_addr: Multiaddr = format!("/ip4/0.0.0.0/tcp/{}", listen_port)
474        .parse()
475        .expect("valid listen addr");
476    swarm
477        .listen_on(listen_addr)
478        .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?;
479    // Also bind IPv6 on all interfaces so users can dial via IPv6.
480    let listen_addr6: Multiaddr = format!("/ip6/::/tcp/{}", listen_port)
481        .parse()
482        .expect("valid ipv6 listen addr");
483    if let Err(e) = swarm.listen_on(listen_addr6) {
484        debug!(%e, "ipv6 listen skipped");
485    }
486
487    let (cmd_tx, cmd_rx) = mpsc::channel(256);
488    let mut task = NetworkTask {
489        swarm,
490        cmd_rx,
491        event_tx,
492        discovered_peers: HashSet::new(),
493        dial_attempts: HashMap::new(),
494        pending_inbound: HashMap::new(),
495        session_blocklist: HashSet::new(),
496        configured_relays: relays.clone(),
497        relay_peer_ids: HashSet::new(),
498        dcutr_failures: HashMap::new(),
499    };
500    // Phase D: dial each configured relay so Identify can complete and
501    // we can register a `/p2p-circuit` reservation. Failures here are
502    // non-fatal — the user can still chat on LAN.
503    for relay_addr in relays {
504        info!(addr = %relay_addr, "dialing configured relay");
505        let opts: libp2p::swarm::dial_opts::DialOpts = relay_addr.clone().into();
506        let conn_id = opts.connection_id();
507        match task.swarm.dial(opts) {
508            Ok(()) => {
509                task.dial_attempts.insert(conn_id, relay_addr);
510            }
511            Err(e) => warn!(%e, "dial relay failed"),
512        }
513    }
514    tokio::spawn(task.run());
515
516    Ok(NetworkHandle {
517        cmd_tx,
518        server: Arc::new(std::sync::Mutex::new(None)),
519        dm_partners: Arc::new(std::sync::Mutex::new(HashMap::new())),
520    })
521}
522
523impl NetworkTask {
524    async fn run(mut self) {
525        loop {
526            tokio::select! {
527                event = self.swarm.select_next_some() => {
528                    self.handle_swarm_event(event).await;
529                }
530                Some(cmd) = self.cmd_rx.recv() => {
531                    if matches!(cmd, NetworkCommand::Shutdown) {
532                        info!("network task shutting down");
533                        break;
534                    }
535                    self.handle_command(cmd);
536                }
537            }
538        }
539    }
540
541    async fn handle_swarm_event(
542        &mut self,
543        event: libp2p::swarm::SwarmEvent<HuddleBehaviorEvent>,
544    ) {
545        match event {
546            libp2p::swarm::SwarmEvent::NewListenAddr { address, .. } => {
547                info!(%address, "listening");
548                // Phase D: a relay-circuit address is a reachability
549                // milestone — surface it as its own event so the lobby
550                // can show "reachable via N relays" status.
551                use libp2p::multiaddr::Protocol;
552                let is_circuit = address
553                    .iter()
554                    .any(|p| matches!(p, Protocol::P2pCircuit));
555                if is_circuit {
556                    let _ = self
557                        .event_tx
558                        .send(NetworkEvent::RelayReservationEstablished {
559                            address: address.clone(),
560                        })
561                        .await;
562                }
563                let _ = self
564                    .event_tx
565                    .send(NetworkEvent::ListeningOn { address })
566                    .await;
567            }
568            libp2p::swarm::SwarmEvent::ConnectionEstablished {
569                peer_id,
570                connection_id,
571                endpoint,
572                ..
573            } => {
574                if let Some(addr) = self.dial_attempts.remove(&connection_id) {
575                    // Phase D: a connection that was for a configured
576                    // relay shouldn't pollute the lobby with a normal
577                    // DialSucceeded — relays aren't chat peers. We just
578                    // remember the peer_id and wait for Identify, then
579                    // register a /p2p-circuit reservation.
580                    let is_relay = self.configured_relays.iter().any(|r| r == &addr);
581                    if is_relay {
582                        info!(%peer_id, %addr, "connected to configured relay");
583                        self.relay_peer_ids.insert(peer_id);
584                    } else {
585                        info!(%peer_id, %addr, "user-dialed peer connected");
586                        // Treat dialed peers like mDNS-discovered: add
587                        // to gossipsub explicit peers so room
588                        // announcements flow.
589                        self.swarm
590                            .behaviour_mut()
591                            .gossipsub
592                            .add_explicit_peer(&peer_id);
593                        self.discovered_peers.insert(peer_id);
594                        let _ = self
595                            .event_tx
596                            .send(NetworkEvent::DialSucceeded {
597                                peer_id,
598                                address: addr,
599                            })
600                            .await;
601                    }
602                } else if let ConnectedPoint::Dialer { .. } = endpoint {
603                    // Outgoing connection we didn't track (e.g. mDNS auto-dial)
604                    // — still add to mesh; no user-visible event needed.
605                    self.swarm
606                        .behaviour_mut()
607                        .gossipsub
608                        .add_explicit_peer(&peer_id);
609                } else {
610                    // Inbound dial from an unknown peer (Phase A). We
611                    // hold the connection but do NOT add them to the
612                    // explicit-peer set yet — wait for Identify so we
613                    // can show the user the peer's fingerprint, then
614                    // either AcceptInbound (promote to mesh) or
615                    // RejectInbound (disconnect + persist blocklist).
616                    //
617                    // Known limitation: gossipsub's score-based mesh
618                    // formation may still forward topic messages to
619                    // this peer via other peers we have in common.
620                    // True hard-quarantine would need a custom
621                    // ConnectionHandler — out of scope for v1.
622                    if self.session_blocklist.contains(&peer_id) {
623                        info!(%peer_id, "rejecting inbound from session-blocked peer");
624                        let _ = self.swarm.disconnect_peer_id(peer_id);
625                    } else {
626                        let address = match &endpoint {
627                            ConnectedPoint::Listener { send_back_addr, .. } => {
628                                send_back_addr.clone()
629                            }
630                            _ => Multiaddr::empty(),
631                        };
632                        debug!(%peer_id, %address, "inbound peer pending decision");
633                        self.pending_inbound
634                            .insert(peer_id, PendingPeer::InboundUnknown { address });
635                    }
636                }
637            }
638            libp2p::swarm::SwarmEvent::OutgoingConnectionError {
639                connection_id,
640                error,
641                ..
642            } => {
643                if let Some(addr) = self.dial_attempts.remove(&connection_id) {
644                    warn!(%addr, %error, "user-dialed peer failed");
645                    let _ = self
646                        .event_tx
647                        .send(NetworkEvent::DialFailed {
648                            address: addr,
649                            error: error.to_string(),
650                        })
651                        .await;
652                }
653            }
654            libp2p::swarm::SwarmEvent::ConnectionClosed {
655                peer_id,
656                num_established,
657                ..
658            } => {
659                // Drop any pending-inbound entry for this peer — they
660                // disconnected before we could prompt the user (or
661                // before the user accepted). Lets a re-connect start
662                // fresh rather than reusing stale state.
663                self.pending_inbound.remove(&peer_id);
664                // huddle 0.7.11: emit PeerDisconnected so the app can
665                // clean its `connected_dial_addrs` map. Only fire when
666                // the LAST connection to that peer closed — multiple
667                // simultaneous connections (e.g. direct + relay) would
668                // otherwise emit spurious disconnects when one of the
669                // two drops.
670                if num_established == 0 {
671                    let _ = self
672                        .event_tx
673                        .send(NetworkEvent::PeerDisconnected { peer_id })
674                        .await;
675                    // Also clean up gossipsub's explicit-peers set so
676                    // it doesn't accumulate dead PeerIds across the
677                    // session.
678                    self.swarm
679                        .behaviour_mut()
680                        .gossipsub
681                        .remove_explicit_peer(&peer_id);
682                }
683            }
684            libp2p::swarm::SwarmEvent::Behaviour(be) => self.handle_behavior_event(be).await,
685            _ => {}
686        }
687    }
688
689    async fn handle_behavior_event(&mut self, event: HuddleBehaviorEvent) {
690        match event {
691            HuddleBehaviorEvent::Mdns(mdns::Event::Discovered(peers)) => {
692                for (peer_id, addr) in peers {
693                    if self.discovered_peers.insert(peer_id) {
694                        info!(%peer_id, %addr, "mDNS discovered");
695                        self.swarm.add_peer_address(peer_id, addr);
696                        // Explicitly add to gossipsub mesh.
697                        self.swarm
698                            .behaviour_mut()
699                            .gossipsub
700                            .add_explicit_peer(&peer_id);
701                        let _ = self
702                            .event_tx
703                            .send(NetworkEvent::PeerDiscovered { peer_id })
704                            .await;
705                    }
706                }
707            }
708            HuddleBehaviorEvent::Mdns(mdns::Event::Expired(peers)) => {
709                for (peer_id, _) in peers {
710                    if self.discovered_peers.remove(&peer_id) {
711                        info!(%peer_id, "mDNS peer expired");
712                        self.swarm
713                            .behaviour_mut()
714                            .gossipsub
715                            .remove_explicit_peer(&peer_id);
716                        let _ = self.event_tx.send(NetworkEvent::PeerExpired { peer_id }).await;
717                    }
718                }
719            }
720            HuddleBehaviorEvent::Gossipsub(gossipsub::Event::Message {
721                propagation_source,
722                message,
723                ..
724            }) => {
725                self.handle_gossipsub_message(propagation_source, message).await;
726            }
727            HuddleBehaviorEvent::Identify(identify::Event::Received {
728                peer_id, info, ..
729            }) => {
730                debug!(%peer_id, agent = %info.agent_version, "identify received");
731                // Phase D: if this peer is a configured relay, register
732                // a `/p2p-circuit` reservation on first identify. Idem-
733                // potent — only fire if we haven't listened on this
734                // relay already (identify fires periodically).
735                if self.relay_peer_ids.contains(&peer_id) {
736                    use libp2p::multiaddr::Protocol;
737                    if let Some(relay_addr) = self
738                        .configured_relays
739                        .iter()
740                        .find(|a| {
741                            // Match by /p2p/<peer-id> suffix when
742                            // present, else by the addr we dialed.
743                            a.iter().any(|p| matches!(p, Protocol::P2p(pid) if pid == peer_id))
744                                || self.dial_attempts.values().any(|d| d == *a)
745                        })
746                        .cloned()
747                    {
748                        let circuit = relay_addr.with(Protocol::P2pCircuit);
749                        match self.swarm.listen_on(circuit.clone()) {
750                            Ok(_) => info!(%circuit, "listening on relay circuit"),
751                            Err(e) => warn!(%e, %circuit, "relay listen_on failed"),
752                        }
753                    }
754                }
755                // Decode the remote's Ed25519 pubkey and derive our
756                // 24-char fingerprint from it. Non-Ed25519 keys (Secp,
757                // Rsa, Ecdsa) shouldn't appear in practice — huddle
758                // only generates Ed25519 identities — so we just log
759                // and skip if the cast fails.
760                let fingerprint = match info.public_key.clone().try_into_ed25519() {
761                    Ok(ed_pk) => {
762                        let bytes = ed_pk.to_bytes();
763                        compute_fingerprint(&bytes)
764                    }
765                    Err(_) => {
766                        warn!(%peer_id, "identify pubkey isn't Ed25519; skipping fingerprint");
767                        return;
768                    }
769                };
770                // Always notify the app layer so it can populate
771                // `known_peers.fingerprint` and detect that an
772                // outbound peer we dialed has fully identified.
773                let _ = self
774                    .event_tx
775                    .send(NetworkEvent::PeerIdentified {
776                        peer_id,
777                        fingerprint: fingerprint.clone(),
778                    })
779                    .await;
780                // If the peer is in pending_inbound, Identify completing
781                // is the cue to surface the user prompt. Keep them in
782                // pending_inbound until Accept or Reject — we don't
783                // know yet which way the user will decide.
784                if let Some(PendingPeer::InboundUnknown { address }) =
785                    self.pending_inbound.get(&peer_id)
786                {
787                    let address = address.clone();
788                    let _ = self
789                        .event_tx
790                        .send(NetworkEvent::InboundDial {
791                            peer_id,
792                            fingerprint,
793                            address,
794                        })
795                        .await;
796                }
797            }
798            HuddleBehaviorEvent::AutonatClient(ev) => {
799                // One probe per address candidate. `result.is_ok()`
800                // means a remote AutoNAT server dialed us back
801                // successfully on `tested_addr` ⇒ this address is
802                // reachable from the outside. The app layer aggregates
803                // these into the lobby reachability badge.
804                let reachable = ev.result.is_ok();
805                if reachable {
806                    info!(tested = %ev.tested_addr, server = %ev.server, "AutoNAT: reachable");
807                } else {
808                    debug!(tested = %ev.tested_addr, server = %ev.server, "AutoNAT: probe failed");
809                }
810                let _ = self
811                    .event_tx
812                    .send(NetworkEvent::NatProbeResult {
813                        tested_addr: ev.tested_addr,
814                        reachable,
815                    })
816                    .await;
817            }
818            HuddleBehaviorEvent::AutonatServer(_) => {
819                // We answered another peer's reachability probe.
820                // No app-visible action.
821            }
822            HuddleBehaviorEvent::Dcutr(ev) => {
823                let success = ev.result.is_ok();
824                if success {
825                    info!(remote = %ev.remote_peer_id, "DCUtR: direct connection established");
826                    // huddle 0.7.11: a successful hole-punch resets the
827                    // per-peer DCUtR retry counter so future failures
828                    // don't immediately bail out.
829                    self.dcutr_failures.remove(&ev.remote_peer_id);
830                } else {
831                    debug!(remote = %ev.remote_peer_id, "DCUtR: hole-punch failed");
832                    let count = self
833                        .dcutr_failures
834                        .entry(ev.remote_peer_id)
835                        .and_modify(|n| *n += 1)
836                        .or_insert(1);
837                    if *count >= DCUTR_FAIL_BUDGET {
838                        warn!(
839                            remote = %ev.remote_peer_id,
840                            attempts = *count,
841                            "DCUtR: giving up after repeated failures (symmetric NAT likely)"
842                        );
843                    }
844                }
845                let _ = self
846                    .event_tx
847                    .send(NetworkEvent::DcutrUpgrade {
848                        remote_peer: ev.remote_peer_id,
849                        success,
850                    })
851                    .await;
852            }
853            HuddleBehaviorEvent::RelayClient(event) => {
854                // huddle 0.7.12: libp2p 0.56's `relay::client::Event`
855                // only exposes the success-path variants
856                // (`ReservationReqAccepted`, `OutboundCircuitEstablished`,
857                // `InboundCircuitEstablished`). There's no
858                // `ReservationReqFailed` arm we can match on, so we
859                // can't reliably surface reservation loss without a
860                // separate health-check timer (future work). For now
861                // we log every event so operators can see the
862                // lifecycle in `huddle.log` — pre-0.7.11 the whole
863                // arm was swallowed by `_ => {}` which hid even that.
864                use libp2p::relay::client::Event as Rc;
865                match event {
866                    Rc::ReservationReqAccepted { relay_peer_id, .. } => {
867                        info!(%relay_peer_id, "relay: reservation accepted");
868                    }
869                    Rc::OutboundCircuitEstablished { relay_peer_id, .. } => {
870                        debug!(%relay_peer_id, "relay: outbound circuit established");
871                    }
872                    other => {
873                        debug!(?other, "relay client event");
874                    }
875                }
876            }
877            _ => {}
878        }
879    }
880
881    async fn handle_gossipsub_message(
882        &mut self,
883        from_peer: PeerId,
884        message: gossipsub::Message,
885    ) {
886        let topic = message.topic.to_string();
887        if topic == ROOMS_TOPIC {
888            match serde_json::from_slice::<RoomAnnouncement>(&message.data) {
889                Ok(ann) => {
890                    let _ = self
891                        .event_tx
892                        .send(NetworkEvent::RoomAnnouncementReceived(ann))
893                        .await;
894                }
895                Err(e) => {
896                    warn!(%e, "bad room announcement");
897                }
898            }
899        } else if let Some(room_id) = topic.strip_prefix(protocol::ROOM_TOPIC_PREFIX) {
900            let _ = self
901                .event_tx
902                .send(NetworkEvent::RoomMessageReceived {
903                    room_id: room_id.to_string(),
904                    payload: message.data,
905                    from_peer,
906                })
907                .await;
908        }
909    }
910
911    fn handle_command(&mut self, cmd: NetworkCommand) {
912        match cmd {
913            NetworkCommand::SubscribeRoom { room_id } => {
914                let topic = gossipsub::IdentTopic::new(room_topic(&room_id));
915                if let Err(e) = self.swarm.behaviour_mut().gossipsub.subscribe(&topic) {
916                    warn!(%e, %room_id, "subscribe room failed");
917                }
918            }
919            NetworkCommand::UnsubscribeRoom { room_id } => {
920                let topic = gossipsub::IdentTopic::new(room_topic(&room_id));
921                self.swarm.behaviour_mut().gossipsub.unsubscribe(&topic);
922            }
923            NetworkCommand::PublishRoomMessage { room_id, payload } => {
924                let topic = gossipsub::IdentTopic::new(room_topic(&room_id));
925                if let Err(e) = self.swarm.behaviour_mut().gossipsub.publish(topic, payload) {
926                    // No subscribed peers is expected before the mesh
927                    // forms; anything else (MessageTooLarge, full queues)
928                    // is a real bug worth surfacing.
929                    match e {
930                        gossipsub::PublishError::NoPeersSubscribedToTopic => {
931                            debug!(%room_id, "publish skipped: no peers subscribed to topic yet");
932                        }
933                        e => warn!(%e, %room_id, "publish room message failed"),
934                    }
935                }
936            }
937            NetworkCommand::AnnounceRoom(ann) => {
938                let topic = gossipsub::IdentTopic::new(ROOMS_TOPIC);
939                match serde_json::to_vec(&ann) {
940                    Ok(payload) => {
941                        if let Err(e) =
942                            self.swarm.behaviour_mut().gossipsub.publish(topic, payload)
943                        {
944                            debug!(%e, "publish room announcement failed");
945                        }
946                    }
947                    Err(e) => warn!(%e, "encode room announcement"),
948                }
949            }
950            NetworkCommand::AcceptInbound { peer_id } => {
951                if self.pending_inbound.remove(&peer_id).is_some() {
952                    info!(%peer_id, "inbound dial accepted — promoting to mesh");
953                    self.swarm
954                        .behaviour_mut()
955                        .gossipsub
956                        .add_explicit_peer(&peer_id);
957                    self.discovered_peers.insert(peer_id);
958                } else {
959                    debug!(%peer_id, "AcceptInbound for unknown peer (already promoted or disconnected)");
960                }
961            }
962            NetworkCommand::RejectInbound { peer_id } => {
963                self.pending_inbound.remove(&peer_id);
964                self.session_blocklist.insert(peer_id);
965                info!(%peer_id, "inbound dial rejected — disconnecting");
966                let _ = self.swarm.disconnect_peer_id(peer_id);
967            }
968            NetworkCommand::DisconnectPeer { peer_id } => {
969                info!(%peer_id, "app-level identity check failed — disconnecting");
970                let _ = self.swarm.disconnect_peer_id(peer_id);
971            }
972            NetworkCommand::Dial { address } => {
973                let opts: DialOpts = address.clone().into();
974                let conn_id = opts.connection_id();
975                match self.swarm.dial(opts) {
976                    Ok(()) => {
977                        self.dial_attempts.insert(conn_id, address);
978                    }
979                    Err(e) => {
980                        // Synchronous dial error (bad multiaddr, transport refused).
981                        let tx = self.event_tx.clone();
982                        let err = e.to_string();
983                        tokio::spawn(async move {
984                            let _ = tx
985                                .send(NetworkEvent::DialFailed {
986                                    address,
987                                    error: err,
988                                })
989                                .await;
990                        });
991                    }
992                }
993            }
994            NetworkCommand::DialAddresses { addresses } => {
995                use libp2p::multiaddr::Protocol;
996                if addresses.is_empty() {
997                    return;
998                }
999                // Extract the shared peer-id from any address with a
1000                // `/p2p/<peer-id>` suffix. All host_addrs originating
1001                // from the same `RoomAnnouncement` share the
1002                // announcer's peer-id, so the first match is enough.
1003                let peer_id = addresses
1004                    .iter()
1005                    .flat_map(|a| a.iter())
1006                    .find_map(|p| match p {
1007                        Protocol::P2p(pid) => Some(pid),
1008                        _ => None,
1009                    });
1010                let opts = match peer_id {
1011                    Some(pid) => DialOpts::peer_id(pid)
1012                        .addresses(addresses.clone())
1013                        .build(),
1014                    // No /p2p/ segment anywhere — fall back to single-
1015                    // address dial of the first candidate, matching the
1016                    // legacy `Dial` semantics for unanchored multiaddrs.
1017                    None => addresses[0].clone().into(),
1018                };
1019                let conn_id = opts.connection_id();
1020                // Use the first address as the representative for
1021                // dial_attempts. On synchronous error we report it;
1022                // on async success the post-identify handler upserts
1023                // with the actually-connected endpoint.
1024                let primary = addresses[0].clone();
1025                match self.swarm.dial(opts) {
1026                    Ok(()) => {
1027                        self.dial_attempts.insert(conn_id, primary);
1028                    }
1029                    Err(e) => {
1030                        let tx = self.event_tx.clone();
1031                        let err = e.to_string();
1032                        tokio::spawn(async move {
1033                            let _ = tx
1034                                .send(NetworkEvent::DialFailed {
1035                                    address: primary,
1036                                    error: err,
1037                                })
1038                                .await;
1039                        });
1040                    }
1041                }
1042            }
1043            NetworkCommand::Shutdown => unreachable!(),
1044        }
1045    }
1046}