Skip to main content

huddle_core/network/
mod.rs

1pub mod behavior;
2pub mod events;
3pub mod protocol;
4
5use std::collections::{HashMap, HashSet};
6use std::time::Duration;
7
8use futures::StreamExt;
9use libp2p::core::ConnectedPoint;
10use libp2p::swarm::dial_opts::DialOpts;
11use libp2p::swarm::ConnectionId;
12use libp2p::{
13    autonat, dcutr, gossipsub, identify, mdns, noise, ping, tcp, yamux, Multiaddr, PeerId, Swarm,
14    SwarmBuilder,
15};
16use tokio::sync::mpsc;
17use tracing::{debug, info, warn};
18
19/// How the network discovers peers.
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum NetworkMode {
22    /// mDNS on: announce ourselves on the LAN and pick up announcements.
23    Mdns,
24    /// mDNS off: invisible to LAN discovery; the only way to connect is
25    /// for someone to dial our address (or for us to dial theirs).
26    Direct,
27}
28
29impl NetworkMode {
30    pub fn as_str(&self) -> &'static str {
31        match self {
32            NetworkMode::Mdns => "mdns",
33            NetworkMode::Direct => "direct",
34        }
35    }
36
37    pub fn from_str(s: &str) -> Option<Self> {
38        match s.trim().to_ascii_lowercase().as_str() {
39            "mdns" | "lan" | "open" => Some(NetworkMode::Mdns),
40            "direct" | "dial" | "private" => Some(NetworkMode::Direct),
41            _ => None,
42        }
43    }
44}
45
46use crate::identity::{compute_fingerprint, Identity};
47use crate::network::behavior::{HuddleBehavior, HuddleBehaviorEvent};
48use crate::network::events::NetworkEvent;
49use crate::network::protocol::{room_topic, RoomAnnouncement, ROOMS_TOPIC};
50
51#[derive(Debug)]
52pub enum NetworkCommand {
53    /// Subscribe to a room's per-room gossipsub topic.
54    SubscribeRoom { room_id: String },
55    /// Unsubscribe from a room's topic.
56    UnsubscribeRoom { room_id: String },
57    /// Publish a JSON-encoded `RoomMessage` to a room's topic.
58    PublishRoomMessage { room_id: String, payload: Vec<u8> },
59    /// Publish a room announcement on the global rooms topic.
60    AnnounceRoom(RoomAnnouncement),
61    /// User-initiated dial of an explicit address. Used for cross-network
62    /// reach when mDNS isn't enough.
63    Dial { address: Multiaddr },
64    /// Phase A: user accepted an inbound dial — promote the peer to
65    /// explicit-peer status so room announcements flow.
66    AcceptInbound { peer_id: PeerId },
67    /// Phase A: user rejected an inbound dial — disconnect them and
68    /// add the peer_id to the in-memory blocklist for this session
69    /// (caller is responsible for the persistent blocked_peers row).
70    RejectInbound { peer_id: PeerId },
71    Shutdown,
72}
73
74#[derive(Clone)]
75pub struct NetworkHandle {
76    cmd_tx: mpsc::Sender<NetworkCommand>,
77}
78
79impl NetworkHandle {
80    pub async fn subscribe_room(&self, room_id: String) {
81        let _ = self
82            .cmd_tx
83            .send(NetworkCommand::SubscribeRoom { room_id })
84            .await;
85    }
86
87    pub async fn unsubscribe_room(&self, room_id: String) {
88        let _ = self
89            .cmd_tx
90            .send(NetworkCommand::UnsubscribeRoom { room_id })
91            .await;
92    }
93
94    pub async fn publish_room_message(&self, room_id: String, payload: Vec<u8>) {
95        let _ = self
96            .cmd_tx
97            .send(NetworkCommand::PublishRoomMessage { room_id, payload })
98            .await;
99    }
100
101    pub async fn announce_room(&self, ann: RoomAnnouncement) {
102        let _ = self.cmd_tx.send(NetworkCommand::AnnounceRoom(ann)).await;
103    }
104
105    pub async fn dial(&self, address: Multiaddr) {
106        let _ = self.cmd_tx.send(NetworkCommand::Dial { address }).await;
107    }
108
109    pub async fn accept_inbound(&self, peer_id: PeerId) {
110        let _ = self
111            .cmd_tx
112            .send(NetworkCommand::AcceptInbound { peer_id })
113            .await;
114    }
115
116    pub async fn reject_inbound(&self, peer_id: PeerId) {
117        let _ = self
118            .cmd_tx
119            .send(NetworkCommand::RejectInbound { peer_id })
120            .await;
121    }
122
123    pub async fn shutdown(&self) {
124        let _ = self.cmd_tx.send(NetworkCommand::Shutdown).await;
125    }
126}
127
128/// What kind of connection we're holding open until Identify gives us
129/// the remote's Ed25519 fingerprint. Quarantined inbound dials are NOT
130/// added to the gossipsub mesh until the user accepts; outbound user-
131/// dials add to the mesh on connection but still wait on Identify so
132/// the eventual `DialSucceeded` can carry the fingerprint.
133#[derive(Debug)]
134enum PendingPeer {
135    /// Inbound dial from an unknown peer — modal-pending in the app.
136    InboundUnknown { address: Multiaddr },
137}
138
139struct NetworkTask {
140    swarm: Swarm<HuddleBehavior>,
141    cmd_rx: mpsc::Receiver<NetworkCommand>,
142    event_tx: mpsc::Sender<NetworkEvent>,
143    discovered_peers: HashSet<PeerId>,
144    /// Tracks user-initiated dials so we can correlate the eventual
145    /// `ConnectionEstablished` / `OutgoingConnectionError` back to a
146    /// specific address the user asked us to dial.
147    dial_attempts: HashMap<ConnectionId, Multiaddr>,
148    /// Phase A: peers connected but not yet promoted to the gossipsub
149    /// mesh (inbound) — waiting either for `Identify` to land so we
150    /// know their fingerprint, or for the user's accept/reject decision.
151    /// On `ConnectionClosed` we drop entries here so a peer disconnecting
152    /// mid-prompt doesn't leak state.
153    pending_inbound: HashMap<PeerId, PendingPeer>,
154    /// Phase A: peers the user has explicitly rejected this session —
155    /// auto-disconnect every reconnect attempt without re-prompting.
156    /// Persistent across runs via `blocked_peers`; this in-memory copy
157    /// is loaded at startup (TODO: not yet wired; falls back to the DB
158    /// check in the app layer for now) and updated on `RejectInbound`.
159    session_blocklist: HashSet<PeerId>,
160    /// Phase D: configured relay multiaddrs. When Identify lands for a
161    /// peer whose multiaddr matches one of these, we call
162    /// `listen_on("<addr>/p2p-circuit")` to register a reservation.
163    /// Tracked as multiaddr strings (no PeerId yet — we don't know it
164    /// until Identify) plus a set of peer_ids of confirmed relays so
165    /// we only register once per relay.
166    configured_relays: Vec<Multiaddr>,
167    relay_peer_ids: HashSet<PeerId>,
168}
169
170pub fn start_network(
171    identity: &Identity,
172    event_tx: mpsc::Sender<NetworkEvent>,
173) -> crate::error::Result<NetworkHandle> {
174    start_network_with(identity, event_tx, NetworkMode::Mdns, 0, Vec::new())
175}
176
177/// Start the network task with explicit mode, TCP listen port, and any
178/// pre-configured relay multiaddrs. `listen_port = 0` requests a
179/// random port. Relays are dialed on startup; once `Identify` lands
180/// from a relay peer, we call `listen_on("<relay>/p2p-circuit")` to
181/// register a reservation so peers behind NAT can dial us through it.
182pub fn start_network_with(
183    identity: &Identity,
184    event_tx: mpsc::Sender<NetworkEvent>,
185    mode: NetworkMode,
186    listen_port: u16,
187    relays: Vec<Multiaddr>,
188) -> crate::error::Result<NetworkHandle> {
189    let keypair = identity.keypair().clone();
190    let local_peer_id = identity.peer_id();
191
192    let mut swarm = SwarmBuilder::with_existing_identity(keypair)
193        .with_tokio()
194        .with_tcp(
195            tcp::Config::default(),
196            noise::Config::new,
197            yamux::Config::default,
198        )
199        .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?
200        // Phase D: wrap the transport with relay-client. This both
201        // composes the transport (so we can dial `/p2p-circuit/`
202        // addresses) and surfaces a `relay::client::Behaviour` into
203        // the `with_behaviour` closure as the second argument.
204        .with_relay_client(noise::Config::new, yamux::Config::default)
205        .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?
206        .with_behaviour(|key, relay_client| {
207            let mdns_opt = match mode {
208                NetworkMode::Mdns => Some(
209                    mdns::tokio::Behaviour::new(mdns::Config::default(), local_peer_id)
210                        .expect("mDNS init failed"),
211                ),
212                NetworkMode::Direct => None,
213            };
214            let mdns: libp2p::swarm::behaviour::toggle::Toggle<_> = mdns_opt.into();
215
216            let identify = identify::Behaviour::new(
217                identify::Config::new("/huddle/1.0.0".into(), key.public())
218                    .with_agent_version("huddle/0.3".into()),
219            );
220
221            let ping = ping::Behaviour::default();
222
223            let gossipsub_config = gossipsub::ConfigBuilder::default()
224                .heartbeat_interval(Duration::from_secs(1))
225                .validation_mode(gossipsub::ValidationMode::Strict)
226                // Default is 64 KiB. Raise it so base64-encoded file
227                // chunks (see files::CHUNK_SIZE) keep ample headroom.
228                .max_transmit_size(256 * 1024)
229                .build()
230                .expect("valid gossipsub config");
231
232            let mut gossipsub = gossipsub::Behaviour::new(
233                gossipsub::MessageAuthenticity::Signed(key.clone()),
234                gossipsub_config,
235            )
236            .expect("valid gossipsub init");
237
238            // Every node subscribes to the global rooms topic so the lobby
239            // shows discovered rooms even before joining anything.
240            let rooms_topic = gossipsub::IdentTopic::new(ROOMS_TOPIC);
241            gossipsub
242                .subscribe(&rooms_topic)
243                .expect("subscribe rooms topic");
244
245            let autonat = autonat::v1::Behaviour::new(local_peer_id, Default::default());
246            let dcutr = dcutr::Behaviour::new(local_peer_id);
247
248            HuddleBehavior {
249                mdns,
250                identify,
251                ping,
252                gossipsub,
253                relay_client,
254                autonat,
255                dcutr,
256            }
257        })
258        .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?
259        .with_swarm_config(|cfg| cfg.with_idle_connection_timeout(Duration::from_secs(120)))
260        .build();
261
262    let listen_addr: Multiaddr = format!("/ip4/0.0.0.0/tcp/{}", listen_port)
263        .parse()
264        .expect("valid listen addr");
265    swarm
266        .listen_on(listen_addr)
267        .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?;
268    // Also bind IPv6 on all interfaces so users can dial via IPv6.
269    let listen_addr6: Multiaddr = format!("/ip6/::/tcp/{}", listen_port)
270        .parse()
271        .expect("valid ipv6 listen addr");
272    if let Err(e) = swarm.listen_on(listen_addr6) {
273        debug!(%e, "ipv6 listen skipped");
274    }
275
276    let (cmd_tx, cmd_rx) = mpsc::channel(256);
277    let mut task = NetworkTask {
278        swarm,
279        cmd_rx,
280        event_tx,
281        discovered_peers: HashSet::new(),
282        dial_attempts: HashMap::new(),
283        pending_inbound: HashMap::new(),
284        session_blocklist: HashSet::new(),
285        configured_relays: relays.clone(),
286        relay_peer_ids: HashSet::new(),
287    };
288    // Phase D: dial each configured relay so Identify can complete and
289    // we can register a `/p2p-circuit` reservation. Failures here are
290    // non-fatal — the user can still chat on LAN.
291    for relay_addr in relays {
292        info!(addr = %relay_addr, "dialing configured relay");
293        let opts: libp2p::swarm::dial_opts::DialOpts = relay_addr.clone().into();
294        let conn_id = opts.connection_id();
295        match task.swarm.dial(opts) {
296            Ok(()) => {
297                task.dial_attempts.insert(conn_id, relay_addr);
298            }
299            Err(e) => warn!(%e, "dial relay failed"),
300        }
301    }
302    tokio::spawn(task.run());
303
304    Ok(NetworkHandle { cmd_tx })
305}
306
307impl NetworkTask {
308    async fn run(mut self) {
309        loop {
310            tokio::select! {
311                event = self.swarm.select_next_some() => {
312                    self.handle_swarm_event(event).await;
313                }
314                Some(cmd) = self.cmd_rx.recv() => {
315                    if matches!(cmd, NetworkCommand::Shutdown) {
316                        info!("network task shutting down");
317                        break;
318                    }
319                    self.handle_command(cmd);
320                }
321            }
322        }
323    }
324
325    async fn handle_swarm_event(
326        &mut self,
327        event: libp2p::swarm::SwarmEvent<HuddleBehaviorEvent>,
328    ) {
329        match event {
330            libp2p::swarm::SwarmEvent::NewListenAddr { address, .. } => {
331                info!(%address, "listening");
332                // Phase D: a relay-circuit address is a reachability
333                // milestone — surface it as its own event so the lobby
334                // can show "reachable via N relays" status.
335                use libp2p::multiaddr::Protocol;
336                let is_circuit = address
337                    .iter()
338                    .any(|p| matches!(p, Protocol::P2pCircuit));
339                if is_circuit {
340                    let _ = self
341                        .event_tx
342                        .send(NetworkEvent::RelayReservationEstablished {
343                            address: address.clone(),
344                        })
345                        .await;
346                }
347                let _ = self
348                    .event_tx
349                    .send(NetworkEvent::ListeningOn { address })
350                    .await;
351            }
352            libp2p::swarm::SwarmEvent::ConnectionEstablished {
353                peer_id,
354                connection_id,
355                endpoint,
356                ..
357            } => {
358                if let Some(addr) = self.dial_attempts.remove(&connection_id) {
359                    // Phase D: a connection that was for a configured
360                    // relay shouldn't pollute the lobby with a normal
361                    // DialSucceeded — relays aren't chat peers. We just
362                    // remember the peer_id and wait for Identify, then
363                    // register a /p2p-circuit reservation.
364                    let is_relay = self.configured_relays.iter().any(|r| r == &addr);
365                    if is_relay {
366                        info!(%peer_id, %addr, "connected to configured relay");
367                        self.relay_peer_ids.insert(peer_id);
368                    } else {
369                        info!(%peer_id, %addr, "user-dialed peer connected");
370                        // Treat dialed peers like mDNS-discovered: add
371                        // to gossipsub explicit peers so room
372                        // announcements flow.
373                        self.swarm
374                            .behaviour_mut()
375                            .gossipsub
376                            .add_explicit_peer(&peer_id);
377                        self.discovered_peers.insert(peer_id);
378                        let _ = self
379                            .event_tx
380                            .send(NetworkEvent::DialSucceeded {
381                                peer_id,
382                                address: addr,
383                            })
384                            .await;
385                    }
386                } else if let ConnectedPoint::Dialer { .. } = endpoint {
387                    // Outgoing connection we didn't track (e.g. mDNS auto-dial)
388                    // — still add to mesh; no user-visible event needed.
389                    self.swarm
390                        .behaviour_mut()
391                        .gossipsub
392                        .add_explicit_peer(&peer_id);
393                } else {
394                    // Inbound dial from an unknown peer (Phase A). We
395                    // hold the connection but do NOT add them to the
396                    // explicit-peer set yet — wait for Identify so we
397                    // can show the user the peer's fingerprint, then
398                    // either AcceptInbound (promote to mesh) or
399                    // RejectInbound (disconnect + persist blocklist).
400                    //
401                    // Known limitation: gossipsub's score-based mesh
402                    // formation may still forward topic messages to
403                    // this peer via other peers we have in common.
404                    // True hard-quarantine would need a custom
405                    // ConnectionHandler — out of scope for v1.
406                    if self.session_blocklist.contains(&peer_id) {
407                        info!(%peer_id, "rejecting inbound from session-blocked peer");
408                        let _ = self.swarm.disconnect_peer_id(peer_id);
409                    } else {
410                        let address = match &endpoint {
411                            ConnectedPoint::Listener { send_back_addr, .. } => {
412                                send_back_addr.clone()
413                            }
414                            _ => Multiaddr::empty(),
415                        };
416                        debug!(%peer_id, %address, "inbound peer pending decision");
417                        self.pending_inbound
418                            .insert(peer_id, PendingPeer::InboundUnknown { address });
419                    }
420                }
421            }
422            libp2p::swarm::SwarmEvent::OutgoingConnectionError {
423                connection_id,
424                error,
425                ..
426            } => {
427                if let Some(addr) = self.dial_attempts.remove(&connection_id) {
428                    warn!(%addr, %error, "user-dialed peer failed");
429                    let _ = self
430                        .event_tx
431                        .send(NetworkEvent::DialFailed {
432                            address: addr,
433                            error: error.to_string(),
434                        })
435                        .await;
436                }
437            }
438            libp2p::swarm::SwarmEvent::ConnectionClosed { peer_id, .. } => {
439                // Drop any pending-inbound entry for this peer — they
440                // disconnected before we could prompt the user (or
441                // before the user accepted). Lets a re-connect start
442                // fresh rather than reusing stale state.
443                self.pending_inbound.remove(&peer_id);
444            }
445            libp2p::swarm::SwarmEvent::Behaviour(be) => self.handle_behavior_event(be).await,
446            _ => {}
447        }
448    }
449
450    async fn handle_behavior_event(&mut self, event: HuddleBehaviorEvent) {
451        match event {
452            HuddleBehaviorEvent::Mdns(mdns::Event::Discovered(peers)) => {
453                for (peer_id, addr) in peers {
454                    if self.discovered_peers.insert(peer_id) {
455                        info!(%peer_id, %addr, "mDNS discovered");
456                        self.swarm.add_peer_address(peer_id, addr);
457                        // Explicitly add to gossipsub mesh.
458                        self.swarm
459                            .behaviour_mut()
460                            .gossipsub
461                            .add_explicit_peer(&peer_id);
462                        let _ = self
463                            .event_tx
464                            .send(NetworkEvent::PeerDiscovered { peer_id })
465                            .await;
466                    }
467                }
468            }
469            HuddleBehaviorEvent::Mdns(mdns::Event::Expired(peers)) => {
470                for (peer_id, _) in peers {
471                    if self.discovered_peers.remove(&peer_id) {
472                        info!(%peer_id, "mDNS peer expired");
473                        self.swarm
474                            .behaviour_mut()
475                            .gossipsub
476                            .remove_explicit_peer(&peer_id);
477                        let _ = self.event_tx.send(NetworkEvent::PeerExpired { peer_id }).await;
478                    }
479                }
480            }
481            HuddleBehaviorEvent::Gossipsub(gossipsub::Event::Message {
482                propagation_source,
483                message,
484                ..
485            }) => {
486                self.handle_gossipsub_message(propagation_source, message).await;
487            }
488            HuddleBehaviorEvent::Identify(identify::Event::Received {
489                peer_id, info, ..
490            }) => {
491                debug!(%peer_id, agent = %info.agent_version, "identify received");
492                // Phase D: if this peer is a configured relay, register
493                // a `/p2p-circuit` reservation on first identify. Idem-
494                // potent — only fire if we haven't listened on this
495                // relay already (identify fires periodically).
496                if self.relay_peer_ids.contains(&peer_id) {
497                    use libp2p::multiaddr::Protocol;
498                    if let Some(relay_addr) = self
499                        .configured_relays
500                        .iter()
501                        .find(|a| {
502                            // Match by /p2p/<peer-id> suffix when
503                            // present, else by the addr we dialed.
504                            a.iter().any(|p| matches!(p, Protocol::P2p(pid) if pid == peer_id))
505                                || self.dial_attempts.values().any(|d| d == *a)
506                        })
507                        .cloned()
508                    {
509                        let circuit = relay_addr.with(Protocol::P2pCircuit);
510                        match self.swarm.listen_on(circuit.clone()) {
511                            Ok(_) => info!(%circuit, "listening on relay circuit"),
512                            Err(e) => warn!(%e, %circuit, "relay listen_on failed"),
513                        }
514                    }
515                }
516                // Decode the remote's Ed25519 pubkey and derive our
517                // 24-char fingerprint from it. Non-Ed25519 keys (Secp,
518                // Rsa, Ecdsa) shouldn't appear in practice — huddle
519                // only generates Ed25519 identities — so we just log
520                // and skip if the cast fails.
521                let fingerprint = match info.public_key.clone().try_into_ed25519() {
522                    Ok(ed_pk) => {
523                        let bytes = ed_pk.to_bytes();
524                        compute_fingerprint(&bytes)
525                    }
526                    Err(_) => {
527                        warn!(%peer_id, "identify pubkey isn't Ed25519; skipping fingerprint");
528                        return;
529                    }
530                };
531                // Always notify the app layer so it can populate
532                // `known_peers.fingerprint` and detect that an
533                // outbound peer we dialed has fully identified.
534                let _ = self
535                    .event_tx
536                    .send(NetworkEvent::PeerIdentified {
537                        peer_id,
538                        fingerprint: fingerprint.clone(),
539                    })
540                    .await;
541                // If the peer is in pending_inbound, Identify completing
542                // is the cue to surface the user prompt. Keep them in
543                // pending_inbound until Accept or Reject — we don't
544                // know yet which way the user will decide.
545                if let Some(PendingPeer::InboundUnknown { address }) =
546                    self.pending_inbound.get(&peer_id)
547                {
548                    let address = address.clone();
549                    let _ = self
550                        .event_tx
551                        .send(NetworkEvent::InboundDial {
552                            peer_id,
553                            fingerprint,
554                            address,
555                        })
556                        .await;
557                }
558            }
559            _ => {}
560        }
561    }
562
563    async fn handle_gossipsub_message(
564        &mut self,
565        from_peer: PeerId,
566        message: gossipsub::Message,
567    ) {
568        let topic = message.topic.to_string();
569        if topic == ROOMS_TOPIC {
570            match serde_json::from_slice::<RoomAnnouncement>(&message.data) {
571                Ok(ann) => {
572                    let _ = self
573                        .event_tx
574                        .send(NetworkEvent::RoomAnnouncementReceived(ann))
575                        .await;
576                }
577                Err(e) => {
578                    warn!(%e, "bad room announcement");
579                }
580            }
581        } else if let Some(room_id) = topic.strip_prefix(protocol::ROOM_TOPIC_PREFIX) {
582            let _ = self
583                .event_tx
584                .send(NetworkEvent::RoomMessageReceived {
585                    room_id: room_id.to_string(),
586                    payload: message.data,
587                    from_peer,
588                })
589                .await;
590        }
591    }
592
593    fn handle_command(&mut self, cmd: NetworkCommand) {
594        match cmd {
595            NetworkCommand::SubscribeRoom { room_id } => {
596                let topic = gossipsub::IdentTopic::new(room_topic(&room_id));
597                if let Err(e) = self.swarm.behaviour_mut().gossipsub.subscribe(&topic) {
598                    warn!(%e, %room_id, "subscribe room failed");
599                }
600            }
601            NetworkCommand::UnsubscribeRoom { room_id } => {
602                let topic = gossipsub::IdentTopic::new(room_topic(&room_id));
603                self.swarm.behaviour_mut().gossipsub.unsubscribe(&topic);
604            }
605            NetworkCommand::PublishRoomMessage { room_id, payload } => {
606                let topic = gossipsub::IdentTopic::new(room_topic(&room_id));
607                if let Err(e) = self.swarm.behaviour_mut().gossipsub.publish(topic, payload) {
608                    // No subscribed peers is expected before the mesh
609                    // forms; anything else (MessageTooLarge, full queues)
610                    // is a real bug worth surfacing.
611                    match e {
612                        gossipsub::PublishError::NoPeersSubscribedToTopic => {
613                            debug!(%room_id, "publish skipped: no peers subscribed to topic yet");
614                        }
615                        e => warn!(%e, %room_id, "publish room message failed"),
616                    }
617                }
618            }
619            NetworkCommand::AnnounceRoom(ann) => {
620                let topic = gossipsub::IdentTopic::new(ROOMS_TOPIC);
621                match serde_json::to_vec(&ann) {
622                    Ok(payload) => {
623                        if let Err(e) =
624                            self.swarm.behaviour_mut().gossipsub.publish(topic, payload)
625                        {
626                            debug!(%e, "publish room announcement failed");
627                        }
628                    }
629                    Err(e) => warn!(%e, "encode room announcement"),
630                }
631            }
632            NetworkCommand::AcceptInbound { peer_id } => {
633                if self.pending_inbound.remove(&peer_id).is_some() {
634                    info!(%peer_id, "inbound dial accepted — promoting to mesh");
635                    self.swarm
636                        .behaviour_mut()
637                        .gossipsub
638                        .add_explicit_peer(&peer_id);
639                    self.discovered_peers.insert(peer_id);
640                } else {
641                    debug!(%peer_id, "AcceptInbound for unknown peer (already promoted or disconnected)");
642                }
643            }
644            NetworkCommand::RejectInbound { peer_id } => {
645                self.pending_inbound.remove(&peer_id);
646                self.session_blocklist.insert(peer_id);
647                info!(%peer_id, "inbound dial rejected — disconnecting");
648                let _ = self.swarm.disconnect_peer_id(peer_id);
649            }
650            NetworkCommand::Dial { address } => {
651                let opts: DialOpts = address.clone().into();
652                let conn_id = opts.connection_id();
653                match self.swarm.dial(opts) {
654                    Ok(()) => {
655                        self.dial_attempts.insert(conn_id, address);
656                    }
657                    Err(e) => {
658                        // Synchronous dial error (bad multiaddr, transport refused).
659                        let tx = self.event_tx.clone();
660                        let err = e.to_string();
661                        tokio::spawn(async move {
662                            let _ = tx
663                                .send(NetworkEvent::DialFailed {
664                                    address,
665                                    error: err,
666                                })
667                                .await;
668                        });
669                    }
670                }
671            }
672            NetworkCommand::Shutdown => unreachable!(),
673        }
674    }
675}