Skip to main content

huddle_core/network/
mod.rs

1pub mod behavior;
2pub mod events;
3pub mod protocol;
4
5use std::collections::{HashMap, HashSet};
6use std::time::Duration;
7
8use futures::StreamExt;
9use libp2p::core::ConnectedPoint;
10use libp2p::swarm::dial_opts::DialOpts;
11use libp2p::swarm::ConnectionId;
12use libp2p::{
13    autonat, dcutr, gossipsub, identify, mdns, noise, ping, tcp, yamux, Multiaddr, PeerId, Swarm,
14    SwarmBuilder,
15};
16use tokio::sync::mpsc;
17use tracing::{debug, info, warn};
18
19/// How the network discovers peers.
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum NetworkMode {
22    /// mDNS on: announce ourselves on the LAN and pick up announcements.
23    Mdns,
24    /// mDNS off: invisible to LAN discovery; the only way to connect is
25    /// for someone to dial our address (or for us to dial theirs).
26    Direct,
27}
28
29impl NetworkMode {
30    pub fn as_str(&self) -> &'static str {
31        match self {
32            NetworkMode::Mdns => "mdns",
33            NetworkMode::Direct => "direct",
34        }
35    }
36
37    pub fn from_str(s: &str) -> Option<Self> {
38        match s.trim().to_ascii_lowercase().as_str() {
39            "mdns" | "lan" | "open" => Some(NetworkMode::Mdns),
40            "direct" | "dial" | "private" => Some(NetworkMode::Direct),
41            _ => None,
42        }
43    }
44}
45
46use crate::identity::{compute_fingerprint, Identity};
47use crate::network::behavior::{HuddleBehavior, HuddleBehaviorEvent};
48use crate::network::events::NetworkEvent;
49use crate::network::protocol::{room_topic, RoomAnnouncement, ROOMS_TOPIC};
50
51#[derive(Debug)]
52pub enum NetworkCommand {
53    /// Subscribe to a room's per-room gossipsub topic.
54    SubscribeRoom { room_id: String },
55    /// Unsubscribe from a room's topic.
56    UnsubscribeRoom { room_id: String },
57    /// Publish a JSON-encoded `RoomMessage` to a room's topic.
58    PublishRoomMessage { room_id: String, payload: Vec<u8> },
59    /// Publish a room announcement on the global rooms topic.
60    AnnounceRoom(RoomAnnouncement),
61    /// User-initiated dial of an explicit address. Used for cross-network
62    /// reach when mDNS isn't enough.
63    Dial { address: Multiaddr },
64    /// Phase A: user accepted an inbound dial — promote the peer to
65    /// explicit-peer status so room announcements flow.
66    AcceptInbound { peer_id: PeerId },
67    /// Phase A: user rejected an inbound dial — disconnect them and
68    /// add the peer_id to the in-memory blocklist for this session
69    /// (caller is responsible for the persistent blocked_peers row).
70    RejectInbound { peer_id: PeerId },
71    /// Phase C follow-up: drop a connection that failed an
72    /// application-level identity check (e.g. invite-fingerprint
73    /// mismatch). Differs from `RejectInbound` in that it doesn't
74    /// touch the inbound-pending map (the connection is already
75    /// past Identify when we discover the mismatch) and doesn't
76    /// persist a block — the caller may want to retry with a
77    /// corrected invite.
78    DisconnectPeer { peer_id: PeerId },
79    Shutdown,
80}
81
82#[derive(Clone)]
83pub struct NetworkHandle {
84    cmd_tx: mpsc::Sender<NetworkCommand>,
85}
86
87impl NetworkHandle {
88    pub async fn subscribe_room(&self, room_id: String) {
89        let _ = self
90            .cmd_tx
91            .send(NetworkCommand::SubscribeRoom { room_id })
92            .await;
93    }
94
95    pub async fn unsubscribe_room(&self, room_id: String) {
96        let _ = self
97            .cmd_tx
98            .send(NetworkCommand::UnsubscribeRoom { room_id })
99            .await;
100    }
101
102    pub async fn publish_room_message(&self, room_id: String, payload: Vec<u8>) {
103        let _ = self
104            .cmd_tx
105            .send(NetworkCommand::PublishRoomMessage { room_id, payload })
106            .await;
107    }
108
109    pub async fn announce_room(&self, ann: RoomAnnouncement) {
110        let _ = self.cmd_tx.send(NetworkCommand::AnnounceRoom(ann)).await;
111    }
112
113    pub async fn dial(&self, address: Multiaddr) {
114        let _ = self.cmd_tx.send(NetworkCommand::Dial { address }).await;
115    }
116
117    pub async fn accept_inbound(&self, peer_id: PeerId) {
118        let _ = self
119            .cmd_tx
120            .send(NetworkCommand::AcceptInbound { peer_id })
121            .await;
122    }
123
124    pub async fn reject_inbound(&self, peer_id: PeerId) {
125        let _ = self
126            .cmd_tx
127            .send(NetworkCommand::RejectInbound { peer_id })
128            .await;
129    }
130
131    pub async fn disconnect_peer(&self, peer_id: PeerId) {
132        let _ = self
133            .cmd_tx
134            .send(NetworkCommand::DisconnectPeer { peer_id })
135            .await;
136    }
137
138    pub async fn shutdown(&self) {
139        let _ = self.cmd_tx.send(NetworkCommand::Shutdown).await;
140    }
141}
142
143/// What kind of connection we're holding open until Identify gives us
144/// the remote's Ed25519 fingerprint. Quarantined inbound dials are NOT
145/// added to the gossipsub mesh until the user accepts; outbound user-
146/// dials add to the mesh on connection but still wait on Identify so
147/// the eventual `DialSucceeded` can carry the fingerprint.
148#[derive(Debug)]
149enum PendingPeer {
150    /// Inbound dial from an unknown peer — modal-pending in the app.
151    InboundUnknown { address: Multiaddr },
152}
153
154struct NetworkTask {
155    swarm: Swarm<HuddleBehavior>,
156    cmd_rx: mpsc::Receiver<NetworkCommand>,
157    event_tx: mpsc::Sender<NetworkEvent>,
158    discovered_peers: HashSet<PeerId>,
159    /// Tracks user-initiated dials so we can correlate the eventual
160    /// `ConnectionEstablished` / `OutgoingConnectionError` back to a
161    /// specific address the user asked us to dial.
162    dial_attempts: HashMap<ConnectionId, Multiaddr>,
163    /// Phase A: peers connected but not yet promoted to the gossipsub
164    /// mesh (inbound) — waiting either for `Identify` to land so we
165    /// know their fingerprint, or for the user's accept/reject decision.
166    /// On `ConnectionClosed` we drop entries here so a peer disconnecting
167    /// mid-prompt doesn't leak state.
168    pending_inbound: HashMap<PeerId, PendingPeer>,
169    /// Phase A: peers the user has explicitly rejected this session —
170    /// auto-disconnect every reconnect attempt without re-prompting.
171    /// Persistent across runs via `blocked_peers`; this in-memory copy
172    /// is loaded at startup (TODO: not yet wired; falls back to the DB
173    /// check in the app layer for now) and updated on `RejectInbound`.
174    session_blocklist: HashSet<PeerId>,
175    /// Phase D: configured relay multiaddrs. When Identify lands for a
176    /// peer whose multiaddr matches one of these, we call
177    /// `listen_on("<addr>/p2p-circuit")` to register a reservation.
178    /// Tracked as multiaddr strings (no PeerId yet — we don't know it
179    /// until Identify) plus a set of peer_ids of confirmed relays so
180    /// we only register once per relay.
181    configured_relays: Vec<Multiaddr>,
182    relay_peer_ids: HashSet<PeerId>,
183}
184
185pub fn start_network(
186    identity: &Identity,
187    event_tx: mpsc::Sender<NetworkEvent>,
188) -> crate::error::Result<NetworkHandle> {
189    start_network_with(identity, event_tx, NetworkMode::Mdns, 0, Vec::new())
190}
191
192/// Start the network task with explicit mode, TCP listen port, and any
193/// pre-configured relay multiaddrs. `listen_port = 0` requests a
194/// random port. Relays are dialed on startup; once `Identify` lands
195/// from a relay peer, we call `listen_on("<relay>/p2p-circuit")` to
196/// register a reservation so peers behind NAT can dial us through it.
197pub fn start_network_with(
198    identity: &Identity,
199    event_tx: mpsc::Sender<NetworkEvent>,
200    mode: NetworkMode,
201    listen_port: u16,
202    relays: Vec<Multiaddr>,
203) -> crate::error::Result<NetworkHandle> {
204    let keypair = identity.keypair().clone();
205    let local_peer_id = identity.peer_id();
206
207    let mut swarm = SwarmBuilder::with_existing_identity(keypair)
208        .with_tokio()
209        .with_tcp(
210            tcp::Config::default(),
211            noise::Config::new,
212            yamux::Config::default,
213        )
214        .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?
215        // Phase D: wrap the transport with relay-client. This both
216        // composes the transport (so we can dial `/p2p-circuit/`
217        // addresses) and surfaces a `relay::client::Behaviour` into
218        // the `with_behaviour` closure as the second argument.
219        .with_relay_client(noise::Config::new, yamux::Config::default)
220        .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?
221        .with_behaviour(|key, relay_client| {
222            let mdns_opt = match mode {
223                NetworkMode::Mdns => Some(
224                    mdns::tokio::Behaviour::new(mdns::Config::default(), local_peer_id)
225                        .expect("mDNS init failed"),
226                ),
227                NetworkMode::Direct => None,
228            };
229            let mdns: libp2p::swarm::behaviour::toggle::Toggle<_> = mdns_opt.into();
230
231            let identify = identify::Behaviour::new(
232                identify::Config::new("/huddle/1.0.0".into(), key.public())
233                    .with_agent_version("huddle/0.4".into()),
234            );
235
236            let ping = ping::Behaviour::default();
237
238            let gossipsub_config = gossipsub::ConfigBuilder::default()
239                .heartbeat_interval(Duration::from_secs(1))
240                .validation_mode(gossipsub::ValidationMode::Strict)
241                // Default is 64 KiB. Raise it so base64-encoded file
242                // chunks (see files::CHUNK_SIZE) keep ample headroom.
243                .max_transmit_size(256 * 1024)
244                .build()
245                .expect("valid gossipsub config");
246
247            let mut gossipsub = gossipsub::Behaviour::new(
248                gossipsub::MessageAuthenticity::Signed(key.clone()),
249                gossipsub_config,
250            )
251            .expect("valid gossipsub init");
252
253            // Every node subscribes to the global rooms topic so the lobby
254            // shows discovered rooms even before joining anything.
255            let rooms_topic = gossipsub::IdentTopic::new(ROOMS_TOPIC);
256            gossipsub
257                .subscribe(&rooms_topic)
258                .expect("subscribe rooms topic");
259
260            // AutoNAT v2: client probes external addresses by asking
261            // remote servers to dial us back; server answers other
262            // peers' probes. Both halves are needed for symmetric
263            // P2P reachability detection (v1 combined them; v2 split).
264            let autonat_client = autonat::v2::client::Behaviour::new(
265                rand::rngs::OsRng,
266                autonat::v2::client::Config::default(),
267            );
268            let autonat_server = autonat::v2::server::Behaviour::new(rand::rngs::OsRng);
269            let dcutr = dcutr::Behaviour::new(local_peer_id);
270
271            HuddleBehavior {
272                mdns,
273                identify,
274                ping,
275                gossipsub,
276                relay_client,
277                autonat_client,
278                autonat_server,
279                dcutr,
280            }
281        })
282        .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?
283        .with_swarm_config(|cfg| cfg.with_idle_connection_timeout(Duration::from_secs(120)))
284        .build();
285
286    let listen_addr: Multiaddr = format!("/ip4/0.0.0.0/tcp/{}", listen_port)
287        .parse()
288        .expect("valid listen addr");
289    swarm
290        .listen_on(listen_addr)
291        .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?;
292    // Also bind IPv6 on all interfaces so users can dial via IPv6.
293    let listen_addr6: Multiaddr = format!("/ip6/::/tcp/{}", listen_port)
294        .parse()
295        .expect("valid ipv6 listen addr");
296    if let Err(e) = swarm.listen_on(listen_addr6) {
297        debug!(%e, "ipv6 listen skipped");
298    }
299
300    let (cmd_tx, cmd_rx) = mpsc::channel(256);
301    let mut task = NetworkTask {
302        swarm,
303        cmd_rx,
304        event_tx,
305        discovered_peers: HashSet::new(),
306        dial_attempts: HashMap::new(),
307        pending_inbound: HashMap::new(),
308        session_blocklist: HashSet::new(),
309        configured_relays: relays.clone(),
310        relay_peer_ids: HashSet::new(),
311    };
312    // Phase D: dial each configured relay so Identify can complete and
313    // we can register a `/p2p-circuit` reservation. Failures here are
314    // non-fatal — the user can still chat on LAN.
315    for relay_addr in relays {
316        info!(addr = %relay_addr, "dialing configured relay");
317        let opts: libp2p::swarm::dial_opts::DialOpts = relay_addr.clone().into();
318        let conn_id = opts.connection_id();
319        match task.swarm.dial(opts) {
320            Ok(()) => {
321                task.dial_attempts.insert(conn_id, relay_addr);
322            }
323            Err(e) => warn!(%e, "dial relay failed"),
324        }
325    }
326    tokio::spawn(task.run());
327
328    Ok(NetworkHandle { cmd_tx })
329}
330
331impl NetworkTask {
332    async fn run(mut self) {
333        loop {
334            tokio::select! {
335                event = self.swarm.select_next_some() => {
336                    self.handle_swarm_event(event).await;
337                }
338                Some(cmd) = self.cmd_rx.recv() => {
339                    if matches!(cmd, NetworkCommand::Shutdown) {
340                        info!("network task shutting down");
341                        break;
342                    }
343                    self.handle_command(cmd);
344                }
345            }
346        }
347    }
348
349    async fn handle_swarm_event(
350        &mut self,
351        event: libp2p::swarm::SwarmEvent<HuddleBehaviorEvent>,
352    ) {
353        match event {
354            libp2p::swarm::SwarmEvent::NewListenAddr { address, .. } => {
355                info!(%address, "listening");
356                // Phase D: a relay-circuit address is a reachability
357                // milestone — surface it as its own event so the lobby
358                // can show "reachable via N relays" status.
359                use libp2p::multiaddr::Protocol;
360                let is_circuit = address
361                    .iter()
362                    .any(|p| matches!(p, Protocol::P2pCircuit));
363                if is_circuit {
364                    let _ = self
365                        .event_tx
366                        .send(NetworkEvent::RelayReservationEstablished {
367                            address: address.clone(),
368                        })
369                        .await;
370                }
371                let _ = self
372                    .event_tx
373                    .send(NetworkEvent::ListeningOn { address })
374                    .await;
375            }
376            libp2p::swarm::SwarmEvent::ConnectionEstablished {
377                peer_id,
378                connection_id,
379                endpoint,
380                ..
381            } => {
382                if let Some(addr) = self.dial_attempts.remove(&connection_id) {
383                    // Phase D: a connection that was for a configured
384                    // relay shouldn't pollute the lobby with a normal
385                    // DialSucceeded — relays aren't chat peers. We just
386                    // remember the peer_id and wait for Identify, then
387                    // register a /p2p-circuit reservation.
388                    let is_relay = self.configured_relays.iter().any(|r| r == &addr);
389                    if is_relay {
390                        info!(%peer_id, %addr, "connected to configured relay");
391                        self.relay_peer_ids.insert(peer_id);
392                    } else {
393                        info!(%peer_id, %addr, "user-dialed peer connected");
394                        // Treat dialed peers like mDNS-discovered: add
395                        // to gossipsub explicit peers so room
396                        // announcements flow.
397                        self.swarm
398                            .behaviour_mut()
399                            .gossipsub
400                            .add_explicit_peer(&peer_id);
401                        self.discovered_peers.insert(peer_id);
402                        let _ = self
403                            .event_tx
404                            .send(NetworkEvent::DialSucceeded {
405                                peer_id,
406                                address: addr,
407                            })
408                            .await;
409                    }
410                } else if let ConnectedPoint::Dialer { .. } = endpoint {
411                    // Outgoing connection we didn't track (e.g. mDNS auto-dial)
412                    // — still add to mesh; no user-visible event needed.
413                    self.swarm
414                        .behaviour_mut()
415                        .gossipsub
416                        .add_explicit_peer(&peer_id);
417                } else {
418                    // Inbound dial from an unknown peer (Phase A). We
419                    // hold the connection but do NOT add them to the
420                    // explicit-peer set yet — wait for Identify so we
421                    // can show the user the peer's fingerprint, then
422                    // either AcceptInbound (promote to mesh) or
423                    // RejectInbound (disconnect + persist blocklist).
424                    //
425                    // Known limitation: gossipsub's score-based mesh
426                    // formation may still forward topic messages to
427                    // this peer via other peers we have in common.
428                    // True hard-quarantine would need a custom
429                    // ConnectionHandler — out of scope for v1.
430                    if self.session_blocklist.contains(&peer_id) {
431                        info!(%peer_id, "rejecting inbound from session-blocked peer");
432                        let _ = self.swarm.disconnect_peer_id(peer_id);
433                    } else {
434                        let address = match &endpoint {
435                            ConnectedPoint::Listener { send_back_addr, .. } => {
436                                send_back_addr.clone()
437                            }
438                            _ => Multiaddr::empty(),
439                        };
440                        debug!(%peer_id, %address, "inbound peer pending decision");
441                        self.pending_inbound
442                            .insert(peer_id, PendingPeer::InboundUnknown { address });
443                    }
444                }
445            }
446            libp2p::swarm::SwarmEvent::OutgoingConnectionError {
447                connection_id,
448                error,
449                ..
450            } => {
451                if let Some(addr) = self.dial_attempts.remove(&connection_id) {
452                    warn!(%addr, %error, "user-dialed peer failed");
453                    let _ = self
454                        .event_tx
455                        .send(NetworkEvent::DialFailed {
456                            address: addr,
457                            error: error.to_string(),
458                        })
459                        .await;
460                }
461            }
462            libp2p::swarm::SwarmEvent::ConnectionClosed { peer_id, .. } => {
463                // Drop any pending-inbound entry for this peer — they
464                // disconnected before we could prompt the user (or
465                // before the user accepted). Lets a re-connect start
466                // fresh rather than reusing stale state.
467                self.pending_inbound.remove(&peer_id);
468            }
469            libp2p::swarm::SwarmEvent::Behaviour(be) => self.handle_behavior_event(be).await,
470            _ => {}
471        }
472    }
473
474    async fn handle_behavior_event(&mut self, event: HuddleBehaviorEvent) {
475        match event {
476            HuddleBehaviorEvent::Mdns(mdns::Event::Discovered(peers)) => {
477                for (peer_id, addr) in peers {
478                    if self.discovered_peers.insert(peer_id) {
479                        info!(%peer_id, %addr, "mDNS discovered");
480                        self.swarm.add_peer_address(peer_id, addr);
481                        // Explicitly add to gossipsub mesh.
482                        self.swarm
483                            .behaviour_mut()
484                            .gossipsub
485                            .add_explicit_peer(&peer_id);
486                        let _ = self
487                            .event_tx
488                            .send(NetworkEvent::PeerDiscovered { peer_id })
489                            .await;
490                    }
491                }
492            }
493            HuddleBehaviorEvent::Mdns(mdns::Event::Expired(peers)) => {
494                for (peer_id, _) in peers {
495                    if self.discovered_peers.remove(&peer_id) {
496                        info!(%peer_id, "mDNS peer expired");
497                        self.swarm
498                            .behaviour_mut()
499                            .gossipsub
500                            .remove_explicit_peer(&peer_id);
501                        let _ = self.event_tx.send(NetworkEvent::PeerExpired { peer_id }).await;
502                    }
503                }
504            }
505            HuddleBehaviorEvent::Gossipsub(gossipsub::Event::Message {
506                propagation_source,
507                message,
508                ..
509            }) => {
510                self.handle_gossipsub_message(propagation_source, message).await;
511            }
512            HuddleBehaviorEvent::Identify(identify::Event::Received {
513                peer_id, info, ..
514            }) => {
515                debug!(%peer_id, agent = %info.agent_version, "identify received");
516                // Phase D: if this peer is a configured relay, register
517                // a `/p2p-circuit` reservation on first identify. Idem-
518                // potent — only fire if we haven't listened on this
519                // relay already (identify fires periodically).
520                if self.relay_peer_ids.contains(&peer_id) {
521                    use libp2p::multiaddr::Protocol;
522                    if let Some(relay_addr) = self
523                        .configured_relays
524                        .iter()
525                        .find(|a| {
526                            // Match by /p2p/<peer-id> suffix when
527                            // present, else by the addr we dialed.
528                            a.iter().any(|p| matches!(p, Protocol::P2p(pid) if pid == peer_id))
529                                || self.dial_attempts.values().any(|d| d == *a)
530                        })
531                        .cloned()
532                    {
533                        let circuit = relay_addr.with(Protocol::P2pCircuit);
534                        match self.swarm.listen_on(circuit.clone()) {
535                            Ok(_) => info!(%circuit, "listening on relay circuit"),
536                            Err(e) => warn!(%e, %circuit, "relay listen_on failed"),
537                        }
538                    }
539                }
540                // Decode the remote's Ed25519 pubkey and derive our
541                // 24-char fingerprint from it. Non-Ed25519 keys (Secp,
542                // Rsa, Ecdsa) shouldn't appear in practice — huddle
543                // only generates Ed25519 identities — so we just log
544                // and skip if the cast fails.
545                let fingerprint = match info.public_key.clone().try_into_ed25519() {
546                    Ok(ed_pk) => {
547                        let bytes = ed_pk.to_bytes();
548                        compute_fingerprint(&bytes)
549                    }
550                    Err(_) => {
551                        warn!(%peer_id, "identify pubkey isn't Ed25519; skipping fingerprint");
552                        return;
553                    }
554                };
555                // Always notify the app layer so it can populate
556                // `known_peers.fingerprint` and detect that an
557                // outbound peer we dialed has fully identified.
558                let _ = self
559                    .event_tx
560                    .send(NetworkEvent::PeerIdentified {
561                        peer_id,
562                        fingerprint: fingerprint.clone(),
563                    })
564                    .await;
565                // If the peer is in pending_inbound, Identify completing
566                // is the cue to surface the user prompt. Keep them in
567                // pending_inbound until Accept or Reject — we don't
568                // know yet which way the user will decide.
569                if let Some(PendingPeer::InboundUnknown { address }) =
570                    self.pending_inbound.get(&peer_id)
571                {
572                    let address = address.clone();
573                    let _ = self
574                        .event_tx
575                        .send(NetworkEvent::InboundDial {
576                            peer_id,
577                            fingerprint,
578                            address,
579                        })
580                        .await;
581                }
582            }
583            HuddleBehaviorEvent::AutonatClient(ev) => {
584                // One probe per address candidate. `result.is_ok()`
585                // means a remote AutoNAT server dialed us back
586                // successfully on `tested_addr` ⇒ this address is
587                // reachable from the outside. The app layer aggregates
588                // these into the lobby reachability badge.
589                let reachable = ev.result.is_ok();
590                if reachable {
591                    info!(tested = %ev.tested_addr, server = %ev.server, "AutoNAT: reachable");
592                } else {
593                    debug!(tested = %ev.tested_addr, server = %ev.server, "AutoNAT: probe failed");
594                }
595                let _ = self
596                    .event_tx
597                    .send(NetworkEvent::NatProbeResult {
598                        tested_addr: ev.tested_addr,
599                        reachable,
600                    })
601                    .await;
602            }
603            HuddleBehaviorEvent::AutonatServer(_) => {
604                // We answered another peer's reachability probe.
605                // No app-visible action.
606            }
607            HuddleBehaviorEvent::Dcutr(ev) => {
608                let success = ev.result.is_ok();
609                if success {
610                    info!(remote = %ev.remote_peer_id, "DCUtR: direct connection established");
611                } else {
612                    debug!(remote = %ev.remote_peer_id, "DCUtR: hole-punch failed");
613                }
614                let _ = self
615                    .event_tx
616                    .send(NetworkEvent::DcutrUpgrade {
617                        remote_peer: ev.remote_peer_id,
618                        success,
619                    })
620                    .await;
621            }
622            _ => {}
623        }
624    }
625
626    async fn handle_gossipsub_message(
627        &mut self,
628        from_peer: PeerId,
629        message: gossipsub::Message,
630    ) {
631        let topic = message.topic.to_string();
632        if topic == ROOMS_TOPIC {
633            match serde_json::from_slice::<RoomAnnouncement>(&message.data) {
634                Ok(ann) => {
635                    let _ = self
636                        .event_tx
637                        .send(NetworkEvent::RoomAnnouncementReceived(ann))
638                        .await;
639                }
640                Err(e) => {
641                    warn!(%e, "bad room announcement");
642                }
643            }
644        } else if let Some(room_id) = topic.strip_prefix(protocol::ROOM_TOPIC_PREFIX) {
645            let _ = self
646                .event_tx
647                .send(NetworkEvent::RoomMessageReceived {
648                    room_id: room_id.to_string(),
649                    payload: message.data,
650                    from_peer,
651                })
652                .await;
653        }
654    }
655
656    fn handle_command(&mut self, cmd: NetworkCommand) {
657        match cmd {
658            NetworkCommand::SubscribeRoom { room_id } => {
659                let topic = gossipsub::IdentTopic::new(room_topic(&room_id));
660                if let Err(e) = self.swarm.behaviour_mut().gossipsub.subscribe(&topic) {
661                    warn!(%e, %room_id, "subscribe room failed");
662                }
663            }
664            NetworkCommand::UnsubscribeRoom { room_id } => {
665                let topic = gossipsub::IdentTopic::new(room_topic(&room_id));
666                self.swarm.behaviour_mut().gossipsub.unsubscribe(&topic);
667            }
668            NetworkCommand::PublishRoomMessage { room_id, payload } => {
669                let topic = gossipsub::IdentTopic::new(room_topic(&room_id));
670                if let Err(e) = self.swarm.behaviour_mut().gossipsub.publish(topic, payload) {
671                    // No subscribed peers is expected before the mesh
672                    // forms; anything else (MessageTooLarge, full queues)
673                    // is a real bug worth surfacing.
674                    match e {
675                        gossipsub::PublishError::NoPeersSubscribedToTopic => {
676                            debug!(%room_id, "publish skipped: no peers subscribed to topic yet");
677                        }
678                        e => warn!(%e, %room_id, "publish room message failed"),
679                    }
680                }
681            }
682            NetworkCommand::AnnounceRoom(ann) => {
683                let topic = gossipsub::IdentTopic::new(ROOMS_TOPIC);
684                match serde_json::to_vec(&ann) {
685                    Ok(payload) => {
686                        if let Err(e) =
687                            self.swarm.behaviour_mut().gossipsub.publish(topic, payload)
688                        {
689                            debug!(%e, "publish room announcement failed");
690                        }
691                    }
692                    Err(e) => warn!(%e, "encode room announcement"),
693                }
694            }
695            NetworkCommand::AcceptInbound { peer_id } => {
696                if self.pending_inbound.remove(&peer_id).is_some() {
697                    info!(%peer_id, "inbound dial accepted — promoting to mesh");
698                    self.swarm
699                        .behaviour_mut()
700                        .gossipsub
701                        .add_explicit_peer(&peer_id);
702                    self.discovered_peers.insert(peer_id);
703                } else {
704                    debug!(%peer_id, "AcceptInbound for unknown peer (already promoted or disconnected)");
705                }
706            }
707            NetworkCommand::RejectInbound { peer_id } => {
708                self.pending_inbound.remove(&peer_id);
709                self.session_blocklist.insert(peer_id);
710                info!(%peer_id, "inbound dial rejected — disconnecting");
711                let _ = self.swarm.disconnect_peer_id(peer_id);
712            }
713            NetworkCommand::DisconnectPeer { peer_id } => {
714                info!(%peer_id, "app-level identity check failed — disconnecting");
715                let _ = self.swarm.disconnect_peer_id(peer_id);
716            }
717            NetworkCommand::Dial { address } => {
718                let opts: DialOpts = address.clone().into();
719                let conn_id = opts.connection_id();
720                match self.swarm.dial(opts) {
721                    Ok(()) => {
722                        self.dial_attempts.insert(conn_id, address);
723                    }
724                    Err(e) => {
725                        // Synchronous dial error (bad multiaddr, transport refused).
726                        let tx = self.event_tx.clone();
727                        let err = e.to_string();
728                        tokio::spawn(async move {
729                            let _ = tx
730                                .send(NetworkEvent::DialFailed {
731                                    address,
732                                    error: err,
733                                })
734                                .await;
735                        });
736                    }
737                }
738            }
739            NetworkCommand::Shutdown => unreachable!(),
740        }
741    }
742}