huddle_core/network/mod.rs
1pub mod behavior;
2pub mod events;
3pub mod protocol;
4
5use std::collections::{HashMap, HashSet};
6use std::time::Duration;
7
8use futures::StreamExt;
9use libp2p::core::ConnectedPoint;
10use libp2p::swarm::dial_opts::DialOpts;
11use libp2p::swarm::ConnectionId;
12use libp2p::{
13 autonat, dcutr, gossipsub, identify, mdns, noise, ping, tcp, yamux, Multiaddr, PeerId, Swarm,
14 SwarmBuilder,
15};
16use tokio::sync::mpsc;
17use tracing::{debug, info, warn};
18
19/// How the network discovers peers.
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum NetworkMode {
22 /// mDNS on: announce ourselves on the LAN and pick up announcements.
23 Mdns,
24 /// mDNS off: invisible to LAN discovery; the only way to connect is
25 /// for someone to dial our address (or for us to dial theirs).
26 Direct,
27}
28
29impl NetworkMode {
30 pub fn as_str(&self) -> &'static str {
31 match self {
32 NetworkMode::Mdns => "mdns",
33 NetworkMode::Direct => "direct",
34 }
35 }
36
37 pub fn from_str(s: &str) -> Option<Self> {
38 match s.trim().to_ascii_lowercase().as_str() {
39 "mdns" | "lan" | "open" => Some(NetworkMode::Mdns),
40 "direct" | "dial" | "private" => Some(NetworkMode::Direct),
41 _ => None,
42 }
43 }
44}
45
46use crate::identity::{compute_fingerprint, Identity};
47use crate::network::behavior::{HuddleBehavior, HuddleBehaviorEvent};
48use crate::network::events::NetworkEvent;
49use crate::network::protocol::{room_topic, RoomAnnouncement, ROOMS_TOPIC};
50
51#[derive(Debug)]
52pub enum NetworkCommand {
53 /// Subscribe to a room's per-room gossipsub topic.
54 SubscribeRoom { room_id: String },
55 /// Unsubscribe from a room's topic.
56 UnsubscribeRoom { room_id: String },
57 /// Publish a JSON-encoded `RoomMessage` to a room's topic.
58 PublishRoomMessage { room_id: String, payload: Vec<u8> },
59 /// Publish a room announcement on the global rooms topic.
60 AnnounceRoom(RoomAnnouncement),
61 /// User-initiated dial of an explicit address. Used for cross-network
62 /// reach when mDNS isn't enough.
63 Dial { address: Multiaddr },
64 /// huddle 0.5.2: dial a peer using multiple candidate addresses,
65 /// letting libp2p race them in parallel. Used by the "add by HD
66 /// ID / username" flow, which resolves a fingerprint to every
67 /// address we know for that peer (room announcement `host_addrs`
68 /// + persisted `known_peers`). libp2p's parallel dialer picks
69 /// the cheapest path that completes — LAN beats public IP beats
70 /// relay-hopped without us having to probe transports manually.
71 DialAddresses { addresses: Vec<Multiaddr> },
72 /// Phase A: user accepted an inbound dial — promote the peer to
73 /// explicit-peer status so room announcements flow.
74 AcceptInbound { peer_id: PeerId },
75 /// Phase A: user rejected an inbound dial — disconnect them and
76 /// add the peer_id to the in-memory blocklist for this session
77 /// (caller is responsible for the persistent blocked_peers row).
78 RejectInbound { peer_id: PeerId },
79 /// Phase C follow-up: drop a connection that failed an
80 /// application-level identity check (e.g. invite-fingerprint
81 /// mismatch). Differs from `RejectInbound` in that it doesn't
82 /// touch the inbound-pending map (the connection is already
83 /// past Identify when we discover the mismatch) and doesn't
84 /// persist a block — the caller may want to retry with a
85 /// corrected invite.
86 DisconnectPeer { peer_id: PeerId },
87 Shutdown,
88}
89
90#[derive(Clone)]
91pub struct NetworkHandle {
92 cmd_tx: mpsc::Sender<NetworkCommand>,
93}
94
95impl NetworkHandle {
96 pub async fn subscribe_room(&self, room_id: String) {
97 let _ = self
98 .cmd_tx
99 .send(NetworkCommand::SubscribeRoom { room_id })
100 .await;
101 }
102
103 pub async fn unsubscribe_room(&self, room_id: String) {
104 let _ = self
105 .cmd_tx
106 .send(NetworkCommand::UnsubscribeRoom { room_id })
107 .await;
108 }
109
110 pub async fn publish_room_message(&self, room_id: String, payload: Vec<u8>) {
111 let _ = self
112 .cmd_tx
113 .send(NetworkCommand::PublishRoomMessage { room_id, payload })
114 .await;
115 }
116
117 pub async fn announce_room(&self, ann: RoomAnnouncement) {
118 let _ = self.cmd_tx.send(NetworkCommand::AnnounceRoom(ann)).await;
119 }
120
121 pub async fn dial(&self, address: Multiaddr) {
122 let _ = self.cmd_tx.send(NetworkCommand::Dial { address }).await;
123 }
124
125 /// huddle 0.5.2: dial a peer with multiple candidate addresses
126 /// at once. libp2p's swarm races them; the first to complete a
127 /// handshake wins, the rest are dropped. Use when you've resolved
128 /// a peer fingerprint to several known transports — typically
129 /// LAN ip4 + public ip4 + relay circuit.
130 pub async fn dial_addresses(&self, addresses: Vec<Multiaddr>) {
131 let _ = self
132 .cmd_tx
133 .send(NetworkCommand::DialAddresses { addresses })
134 .await;
135 }
136
137 pub async fn accept_inbound(&self, peer_id: PeerId) {
138 let _ = self
139 .cmd_tx
140 .send(NetworkCommand::AcceptInbound { peer_id })
141 .await;
142 }
143
144 pub async fn reject_inbound(&self, peer_id: PeerId) {
145 let _ = self
146 .cmd_tx
147 .send(NetworkCommand::RejectInbound { peer_id })
148 .await;
149 }
150
151 pub async fn disconnect_peer(&self, peer_id: PeerId) {
152 let _ = self
153 .cmd_tx
154 .send(NetworkCommand::DisconnectPeer { peer_id })
155 .await;
156 }
157
158 pub async fn shutdown(&self) {
159 let _ = self.cmd_tx.send(NetworkCommand::Shutdown).await;
160 }
161}
162
163/// What kind of connection we're holding open until Identify gives us
164/// the remote's Ed25519 fingerprint. Quarantined inbound dials are NOT
165/// added to the gossipsub mesh until the user accepts; outbound user-
166/// dials add to the mesh on connection but still wait on Identify so
167/// the eventual `DialSucceeded` can carry the fingerprint.
168#[derive(Debug)]
169enum PendingPeer {
170 /// Inbound dial from an unknown peer — modal-pending in the app.
171 InboundUnknown { address: Multiaddr },
172}
173
174struct NetworkTask {
175 swarm: Swarm<HuddleBehavior>,
176 cmd_rx: mpsc::Receiver<NetworkCommand>,
177 event_tx: mpsc::Sender<NetworkEvent>,
178 discovered_peers: HashSet<PeerId>,
179 /// Tracks user-initiated dials so we can correlate the eventual
180 /// `ConnectionEstablished` / `OutgoingConnectionError` back to a
181 /// specific address the user asked us to dial.
182 dial_attempts: HashMap<ConnectionId, Multiaddr>,
183 /// Phase A: peers connected but not yet promoted to the gossipsub
184 /// mesh (inbound) — waiting either for `Identify` to land so we
185 /// know their fingerprint, or for the user's accept/reject decision.
186 /// On `ConnectionClosed` we drop entries here so a peer disconnecting
187 /// mid-prompt doesn't leak state.
188 pending_inbound: HashMap<PeerId, PendingPeer>,
189 /// Phase A: peers the user has explicitly rejected this session —
190 /// auto-disconnect every reconnect attempt without re-prompting.
191 /// Persistent across runs via `blocked_peers`; this in-memory copy
192 /// is loaded at startup (TODO: not yet wired; falls back to the DB
193 /// check in the app layer for now) and updated on `RejectInbound`.
194 session_blocklist: HashSet<PeerId>,
195 /// Phase D: configured relay multiaddrs. When Identify lands for a
196 /// peer whose multiaddr matches one of these, we call
197 /// `listen_on("<addr>/p2p-circuit")` to register a reservation.
198 /// Tracked as multiaddr strings (no PeerId yet — we don't know it
199 /// until Identify) plus a set of peer_ids of confirmed relays so
200 /// we only register once per relay.
201 configured_relays: Vec<Multiaddr>,
202 relay_peer_ids: HashSet<PeerId>,
203}
204
205pub fn start_network(
206 identity: &Identity,
207 event_tx: mpsc::Sender<NetworkEvent>,
208) -> crate::error::Result<NetworkHandle> {
209 start_network_with(identity, event_tx, NetworkMode::Mdns, 0, Vec::new())
210}
211
212/// Start the network task with explicit mode, TCP listen port, and any
213/// pre-configured relay multiaddrs. `listen_port = 0` requests a
214/// random port. Relays are dialed on startup; once `Identify` lands
215/// from a relay peer, we call `listen_on("<relay>/p2p-circuit")` to
216/// register a reservation so peers behind NAT can dial us through it.
217pub fn start_network_with(
218 identity: &Identity,
219 event_tx: mpsc::Sender<NetworkEvent>,
220 mode: NetworkMode,
221 listen_port: u16,
222 relays: Vec<Multiaddr>,
223) -> crate::error::Result<NetworkHandle> {
224 let keypair = identity.keypair().clone();
225 let local_peer_id = identity.peer_id();
226
227 let mut swarm = SwarmBuilder::with_existing_identity(keypair)
228 .with_tokio()
229 .with_tcp(
230 tcp::Config::default(),
231 noise::Config::new,
232 yamux::Config::default,
233 )
234 .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?
235 // Phase D: wrap the transport with relay-client. This both
236 // composes the transport (so we can dial `/p2p-circuit/`
237 // addresses) and surfaces a `relay::client::Behaviour` into
238 // the `with_behaviour` closure as the second argument.
239 .with_relay_client(noise::Config::new, yamux::Config::default)
240 .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?
241 .with_behaviour(|key, relay_client| {
242 let mdns_opt = match mode {
243 NetworkMode::Mdns => Some(
244 mdns::tokio::Behaviour::new(mdns::Config::default(), local_peer_id)
245 .expect("mDNS init failed"),
246 ),
247 NetworkMode::Direct => None,
248 };
249 let mdns: libp2p::swarm::behaviour::toggle::Toggle<_> = mdns_opt.into();
250
251 let identify = identify::Behaviour::new(
252 identify::Config::new("/huddle/1.0.0".into(), key.public())
253 .with_agent_version("huddle/0.5".into()),
254 );
255
256 let ping = ping::Behaviour::default();
257
258 let gossipsub_config = gossipsub::ConfigBuilder::default()
259 .heartbeat_interval(Duration::from_secs(1))
260 .validation_mode(gossipsub::ValidationMode::Strict)
261 // Default is 64 KiB. Raise it so base64-encoded file
262 // chunks (see files::CHUNK_SIZE) keep ample headroom.
263 .max_transmit_size(256 * 1024)
264 .build()
265 .expect("valid gossipsub config");
266
267 let mut gossipsub = gossipsub::Behaviour::new(
268 gossipsub::MessageAuthenticity::Signed(key.clone()),
269 gossipsub_config,
270 )
271 .expect("valid gossipsub init");
272
273 // Every node subscribes to the global rooms topic so the lobby
274 // shows discovered rooms even before joining anything.
275 let rooms_topic = gossipsub::IdentTopic::new(ROOMS_TOPIC);
276 gossipsub
277 .subscribe(&rooms_topic)
278 .expect("subscribe rooms topic");
279
280 // AutoNAT v2: client probes external addresses by asking
281 // remote servers to dial us back; server answers other
282 // peers' probes. Both halves are needed for symmetric
283 // P2P reachability detection (v1 combined them; v2 split).
284 let autonat_client = autonat::v2::client::Behaviour::new(
285 rand::rngs::OsRng,
286 autonat::v2::client::Config::default(),
287 );
288 let autonat_server = autonat::v2::server::Behaviour::new(rand::rngs::OsRng);
289 let dcutr = dcutr::Behaviour::new(local_peer_id);
290
291 HuddleBehavior {
292 mdns,
293 identify,
294 ping,
295 gossipsub,
296 relay_client,
297 autonat_client,
298 autonat_server,
299 dcutr,
300 }
301 })
302 .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?
303 .with_swarm_config(|cfg| cfg.with_idle_connection_timeout(Duration::from_secs(120)))
304 .build();
305
306 let listen_addr: Multiaddr = format!("/ip4/0.0.0.0/tcp/{}", listen_port)
307 .parse()
308 .expect("valid listen addr");
309 swarm
310 .listen_on(listen_addr)
311 .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?;
312 // Also bind IPv6 on all interfaces so users can dial via IPv6.
313 let listen_addr6: Multiaddr = format!("/ip6/::/tcp/{}", listen_port)
314 .parse()
315 .expect("valid ipv6 listen addr");
316 if let Err(e) = swarm.listen_on(listen_addr6) {
317 debug!(%e, "ipv6 listen skipped");
318 }
319
320 let (cmd_tx, cmd_rx) = mpsc::channel(256);
321 let mut task = NetworkTask {
322 swarm,
323 cmd_rx,
324 event_tx,
325 discovered_peers: HashSet::new(),
326 dial_attempts: HashMap::new(),
327 pending_inbound: HashMap::new(),
328 session_blocklist: HashSet::new(),
329 configured_relays: relays.clone(),
330 relay_peer_ids: HashSet::new(),
331 };
332 // Phase D: dial each configured relay so Identify can complete and
333 // we can register a `/p2p-circuit` reservation. Failures here are
334 // non-fatal — the user can still chat on LAN.
335 for relay_addr in relays {
336 info!(addr = %relay_addr, "dialing configured relay");
337 let opts: libp2p::swarm::dial_opts::DialOpts = relay_addr.clone().into();
338 let conn_id = opts.connection_id();
339 match task.swarm.dial(opts) {
340 Ok(()) => {
341 task.dial_attempts.insert(conn_id, relay_addr);
342 }
343 Err(e) => warn!(%e, "dial relay failed"),
344 }
345 }
346 tokio::spawn(task.run());
347
348 Ok(NetworkHandle { cmd_tx })
349}
350
351impl NetworkTask {
352 async fn run(mut self) {
353 loop {
354 tokio::select! {
355 event = self.swarm.select_next_some() => {
356 self.handle_swarm_event(event).await;
357 }
358 Some(cmd) = self.cmd_rx.recv() => {
359 if matches!(cmd, NetworkCommand::Shutdown) {
360 info!("network task shutting down");
361 break;
362 }
363 self.handle_command(cmd);
364 }
365 }
366 }
367 }
368
369 async fn handle_swarm_event(
370 &mut self,
371 event: libp2p::swarm::SwarmEvent<HuddleBehaviorEvent>,
372 ) {
373 match event {
374 libp2p::swarm::SwarmEvent::NewListenAddr { address, .. } => {
375 info!(%address, "listening");
376 // Phase D: a relay-circuit address is a reachability
377 // milestone — surface it as its own event so the lobby
378 // can show "reachable via N relays" status.
379 use libp2p::multiaddr::Protocol;
380 let is_circuit = address
381 .iter()
382 .any(|p| matches!(p, Protocol::P2pCircuit));
383 if is_circuit {
384 let _ = self
385 .event_tx
386 .send(NetworkEvent::RelayReservationEstablished {
387 address: address.clone(),
388 })
389 .await;
390 }
391 let _ = self
392 .event_tx
393 .send(NetworkEvent::ListeningOn { address })
394 .await;
395 }
396 libp2p::swarm::SwarmEvent::ConnectionEstablished {
397 peer_id,
398 connection_id,
399 endpoint,
400 ..
401 } => {
402 if let Some(addr) = self.dial_attempts.remove(&connection_id) {
403 // Phase D: a connection that was for a configured
404 // relay shouldn't pollute the lobby with a normal
405 // DialSucceeded — relays aren't chat peers. We just
406 // remember the peer_id and wait for Identify, then
407 // register a /p2p-circuit reservation.
408 let is_relay = self.configured_relays.iter().any(|r| r == &addr);
409 if is_relay {
410 info!(%peer_id, %addr, "connected to configured relay");
411 self.relay_peer_ids.insert(peer_id);
412 } else {
413 info!(%peer_id, %addr, "user-dialed peer connected");
414 // Treat dialed peers like mDNS-discovered: add
415 // to gossipsub explicit peers so room
416 // announcements flow.
417 self.swarm
418 .behaviour_mut()
419 .gossipsub
420 .add_explicit_peer(&peer_id);
421 self.discovered_peers.insert(peer_id);
422 let _ = self
423 .event_tx
424 .send(NetworkEvent::DialSucceeded {
425 peer_id,
426 address: addr,
427 })
428 .await;
429 }
430 } else if let ConnectedPoint::Dialer { .. } = endpoint {
431 // Outgoing connection we didn't track (e.g. mDNS auto-dial)
432 // — still add to mesh; no user-visible event needed.
433 self.swarm
434 .behaviour_mut()
435 .gossipsub
436 .add_explicit_peer(&peer_id);
437 } else {
438 // Inbound dial from an unknown peer (Phase A). We
439 // hold the connection but do NOT add them to the
440 // explicit-peer set yet — wait for Identify so we
441 // can show the user the peer's fingerprint, then
442 // either AcceptInbound (promote to mesh) or
443 // RejectInbound (disconnect + persist blocklist).
444 //
445 // Known limitation: gossipsub's score-based mesh
446 // formation may still forward topic messages to
447 // this peer via other peers we have in common.
448 // True hard-quarantine would need a custom
449 // ConnectionHandler — out of scope for v1.
450 if self.session_blocklist.contains(&peer_id) {
451 info!(%peer_id, "rejecting inbound from session-blocked peer");
452 let _ = self.swarm.disconnect_peer_id(peer_id);
453 } else {
454 let address = match &endpoint {
455 ConnectedPoint::Listener { send_back_addr, .. } => {
456 send_back_addr.clone()
457 }
458 _ => Multiaddr::empty(),
459 };
460 debug!(%peer_id, %address, "inbound peer pending decision");
461 self.pending_inbound
462 .insert(peer_id, PendingPeer::InboundUnknown { address });
463 }
464 }
465 }
466 libp2p::swarm::SwarmEvent::OutgoingConnectionError {
467 connection_id,
468 error,
469 ..
470 } => {
471 if let Some(addr) = self.dial_attempts.remove(&connection_id) {
472 warn!(%addr, %error, "user-dialed peer failed");
473 let _ = self
474 .event_tx
475 .send(NetworkEvent::DialFailed {
476 address: addr,
477 error: error.to_string(),
478 })
479 .await;
480 }
481 }
482 libp2p::swarm::SwarmEvent::ConnectionClosed { peer_id, .. } => {
483 // Drop any pending-inbound entry for this peer — they
484 // disconnected before we could prompt the user (or
485 // before the user accepted). Lets a re-connect start
486 // fresh rather than reusing stale state.
487 self.pending_inbound.remove(&peer_id);
488 }
489 libp2p::swarm::SwarmEvent::Behaviour(be) => self.handle_behavior_event(be).await,
490 _ => {}
491 }
492 }
493
494 async fn handle_behavior_event(&mut self, event: HuddleBehaviorEvent) {
495 match event {
496 HuddleBehaviorEvent::Mdns(mdns::Event::Discovered(peers)) => {
497 for (peer_id, addr) in peers {
498 if self.discovered_peers.insert(peer_id) {
499 info!(%peer_id, %addr, "mDNS discovered");
500 self.swarm.add_peer_address(peer_id, addr);
501 // Explicitly add to gossipsub mesh.
502 self.swarm
503 .behaviour_mut()
504 .gossipsub
505 .add_explicit_peer(&peer_id);
506 let _ = self
507 .event_tx
508 .send(NetworkEvent::PeerDiscovered { peer_id })
509 .await;
510 }
511 }
512 }
513 HuddleBehaviorEvent::Mdns(mdns::Event::Expired(peers)) => {
514 for (peer_id, _) in peers {
515 if self.discovered_peers.remove(&peer_id) {
516 info!(%peer_id, "mDNS peer expired");
517 self.swarm
518 .behaviour_mut()
519 .gossipsub
520 .remove_explicit_peer(&peer_id);
521 let _ = self.event_tx.send(NetworkEvent::PeerExpired { peer_id }).await;
522 }
523 }
524 }
525 HuddleBehaviorEvent::Gossipsub(gossipsub::Event::Message {
526 propagation_source,
527 message,
528 ..
529 }) => {
530 self.handle_gossipsub_message(propagation_source, message).await;
531 }
532 HuddleBehaviorEvent::Identify(identify::Event::Received {
533 peer_id, info, ..
534 }) => {
535 debug!(%peer_id, agent = %info.agent_version, "identify received");
536 // Phase D: if this peer is a configured relay, register
537 // a `/p2p-circuit` reservation on first identify. Idem-
538 // potent — only fire if we haven't listened on this
539 // relay already (identify fires periodically).
540 if self.relay_peer_ids.contains(&peer_id) {
541 use libp2p::multiaddr::Protocol;
542 if let Some(relay_addr) = self
543 .configured_relays
544 .iter()
545 .find(|a| {
546 // Match by /p2p/<peer-id> suffix when
547 // present, else by the addr we dialed.
548 a.iter().any(|p| matches!(p, Protocol::P2p(pid) if pid == peer_id))
549 || self.dial_attempts.values().any(|d| d == *a)
550 })
551 .cloned()
552 {
553 let circuit = relay_addr.with(Protocol::P2pCircuit);
554 match self.swarm.listen_on(circuit.clone()) {
555 Ok(_) => info!(%circuit, "listening on relay circuit"),
556 Err(e) => warn!(%e, %circuit, "relay listen_on failed"),
557 }
558 }
559 }
560 // Decode the remote's Ed25519 pubkey and derive our
561 // 24-char fingerprint from it. Non-Ed25519 keys (Secp,
562 // Rsa, Ecdsa) shouldn't appear in practice — huddle
563 // only generates Ed25519 identities — so we just log
564 // and skip if the cast fails.
565 let fingerprint = match info.public_key.clone().try_into_ed25519() {
566 Ok(ed_pk) => {
567 let bytes = ed_pk.to_bytes();
568 compute_fingerprint(&bytes)
569 }
570 Err(_) => {
571 warn!(%peer_id, "identify pubkey isn't Ed25519; skipping fingerprint");
572 return;
573 }
574 };
575 // Always notify the app layer so it can populate
576 // `known_peers.fingerprint` and detect that an
577 // outbound peer we dialed has fully identified.
578 let _ = self
579 .event_tx
580 .send(NetworkEvent::PeerIdentified {
581 peer_id,
582 fingerprint: fingerprint.clone(),
583 })
584 .await;
585 // If the peer is in pending_inbound, Identify completing
586 // is the cue to surface the user prompt. Keep them in
587 // pending_inbound until Accept or Reject — we don't
588 // know yet which way the user will decide.
589 if let Some(PendingPeer::InboundUnknown { address }) =
590 self.pending_inbound.get(&peer_id)
591 {
592 let address = address.clone();
593 let _ = self
594 .event_tx
595 .send(NetworkEvent::InboundDial {
596 peer_id,
597 fingerprint,
598 address,
599 })
600 .await;
601 }
602 }
603 HuddleBehaviorEvent::AutonatClient(ev) => {
604 // One probe per address candidate. `result.is_ok()`
605 // means a remote AutoNAT server dialed us back
606 // successfully on `tested_addr` ⇒ this address is
607 // reachable from the outside. The app layer aggregates
608 // these into the lobby reachability badge.
609 let reachable = ev.result.is_ok();
610 if reachable {
611 info!(tested = %ev.tested_addr, server = %ev.server, "AutoNAT: reachable");
612 } else {
613 debug!(tested = %ev.tested_addr, server = %ev.server, "AutoNAT: probe failed");
614 }
615 let _ = self
616 .event_tx
617 .send(NetworkEvent::NatProbeResult {
618 tested_addr: ev.tested_addr,
619 reachable,
620 })
621 .await;
622 }
623 HuddleBehaviorEvent::AutonatServer(_) => {
624 // We answered another peer's reachability probe.
625 // No app-visible action.
626 }
627 HuddleBehaviorEvent::Dcutr(ev) => {
628 let success = ev.result.is_ok();
629 if success {
630 info!(remote = %ev.remote_peer_id, "DCUtR: direct connection established");
631 } else {
632 debug!(remote = %ev.remote_peer_id, "DCUtR: hole-punch failed");
633 }
634 let _ = self
635 .event_tx
636 .send(NetworkEvent::DcutrUpgrade {
637 remote_peer: ev.remote_peer_id,
638 success,
639 })
640 .await;
641 }
642 _ => {}
643 }
644 }
645
646 async fn handle_gossipsub_message(
647 &mut self,
648 from_peer: PeerId,
649 message: gossipsub::Message,
650 ) {
651 let topic = message.topic.to_string();
652 if topic == ROOMS_TOPIC {
653 match serde_json::from_slice::<RoomAnnouncement>(&message.data) {
654 Ok(ann) => {
655 let _ = self
656 .event_tx
657 .send(NetworkEvent::RoomAnnouncementReceived(ann))
658 .await;
659 }
660 Err(e) => {
661 warn!(%e, "bad room announcement");
662 }
663 }
664 } else if let Some(room_id) = topic.strip_prefix(protocol::ROOM_TOPIC_PREFIX) {
665 let _ = self
666 .event_tx
667 .send(NetworkEvent::RoomMessageReceived {
668 room_id: room_id.to_string(),
669 payload: message.data,
670 from_peer,
671 })
672 .await;
673 }
674 }
675
676 fn handle_command(&mut self, cmd: NetworkCommand) {
677 match cmd {
678 NetworkCommand::SubscribeRoom { room_id } => {
679 let topic = gossipsub::IdentTopic::new(room_topic(&room_id));
680 if let Err(e) = self.swarm.behaviour_mut().gossipsub.subscribe(&topic) {
681 warn!(%e, %room_id, "subscribe room failed");
682 }
683 }
684 NetworkCommand::UnsubscribeRoom { room_id } => {
685 let topic = gossipsub::IdentTopic::new(room_topic(&room_id));
686 self.swarm.behaviour_mut().gossipsub.unsubscribe(&topic);
687 }
688 NetworkCommand::PublishRoomMessage { room_id, payload } => {
689 let topic = gossipsub::IdentTopic::new(room_topic(&room_id));
690 if let Err(e) = self.swarm.behaviour_mut().gossipsub.publish(topic, payload) {
691 // No subscribed peers is expected before the mesh
692 // forms; anything else (MessageTooLarge, full queues)
693 // is a real bug worth surfacing.
694 match e {
695 gossipsub::PublishError::NoPeersSubscribedToTopic => {
696 debug!(%room_id, "publish skipped: no peers subscribed to topic yet");
697 }
698 e => warn!(%e, %room_id, "publish room message failed"),
699 }
700 }
701 }
702 NetworkCommand::AnnounceRoom(ann) => {
703 let topic = gossipsub::IdentTopic::new(ROOMS_TOPIC);
704 match serde_json::to_vec(&ann) {
705 Ok(payload) => {
706 if let Err(e) =
707 self.swarm.behaviour_mut().gossipsub.publish(topic, payload)
708 {
709 debug!(%e, "publish room announcement failed");
710 }
711 }
712 Err(e) => warn!(%e, "encode room announcement"),
713 }
714 }
715 NetworkCommand::AcceptInbound { peer_id } => {
716 if self.pending_inbound.remove(&peer_id).is_some() {
717 info!(%peer_id, "inbound dial accepted — promoting to mesh");
718 self.swarm
719 .behaviour_mut()
720 .gossipsub
721 .add_explicit_peer(&peer_id);
722 self.discovered_peers.insert(peer_id);
723 } else {
724 debug!(%peer_id, "AcceptInbound for unknown peer (already promoted or disconnected)");
725 }
726 }
727 NetworkCommand::RejectInbound { peer_id } => {
728 self.pending_inbound.remove(&peer_id);
729 self.session_blocklist.insert(peer_id);
730 info!(%peer_id, "inbound dial rejected — disconnecting");
731 let _ = self.swarm.disconnect_peer_id(peer_id);
732 }
733 NetworkCommand::DisconnectPeer { peer_id } => {
734 info!(%peer_id, "app-level identity check failed — disconnecting");
735 let _ = self.swarm.disconnect_peer_id(peer_id);
736 }
737 NetworkCommand::Dial { address } => {
738 let opts: DialOpts = address.clone().into();
739 let conn_id = opts.connection_id();
740 match self.swarm.dial(opts) {
741 Ok(()) => {
742 self.dial_attempts.insert(conn_id, address);
743 }
744 Err(e) => {
745 // Synchronous dial error (bad multiaddr, transport refused).
746 let tx = self.event_tx.clone();
747 let err = e.to_string();
748 tokio::spawn(async move {
749 let _ = tx
750 .send(NetworkEvent::DialFailed {
751 address,
752 error: err,
753 })
754 .await;
755 });
756 }
757 }
758 }
759 NetworkCommand::DialAddresses { addresses } => {
760 use libp2p::multiaddr::Protocol;
761 if addresses.is_empty() {
762 return;
763 }
764 // Extract the shared peer-id from any address with a
765 // `/p2p/<peer-id>` suffix. All host_addrs originating
766 // from the same `RoomAnnouncement` share the
767 // announcer's peer-id, so the first match is enough.
768 let peer_id = addresses
769 .iter()
770 .flat_map(|a| a.iter())
771 .find_map(|p| match p {
772 Protocol::P2p(pid) => Some(pid),
773 _ => None,
774 });
775 let opts = match peer_id {
776 Some(pid) => DialOpts::peer_id(pid)
777 .addresses(addresses.clone())
778 .build(),
779 // No /p2p/ segment anywhere — fall back to single-
780 // address dial of the first candidate, matching the
781 // legacy `Dial` semantics for unanchored multiaddrs.
782 None => addresses[0].clone().into(),
783 };
784 let conn_id = opts.connection_id();
785 // Use the first address as the representative for
786 // dial_attempts. On synchronous error we report it;
787 // on async success the post-identify handler upserts
788 // with the actually-connected endpoint.
789 let primary = addresses[0].clone();
790 match self.swarm.dial(opts) {
791 Ok(()) => {
792 self.dial_attempts.insert(conn_id, primary);
793 }
794 Err(e) => {
795 let tx = self.event_tx.clone();
796 let err = e.to_string();
797 tokio::spawn(async move {
798 let _ = tx
799 .send(NetworkEvent::DialFailed {
800 address: primary,
801 error: err,
802 })
803 .await;
804 });
805 }
806 }
807 }
808 NetworkCommand::Shutdown => unreachable!(),
809 }
810 }
811}