huddle_core/network/mod.rs
1pub mod behavior;
2pub mod events;
3pub mod protocol;
4pub mod server;
5pub mod transport;
6
7use std::collections::{HashMap, HashSet};
8use std::sync::Arc;
9use std::time::Duration;
10
11use futures::StreamExt;
12use sha2::{Digest, Sha256};
13use libp2p::core::ConnectedPoint;
14use libp2p::swarm::dial_opts::DialOpts;
15use libp2p::swarm::ConnectionId;
16use libp2p::{
17 autonat, dcutr, gossipsub, identify, mdns, noise, ping, tcp, yamux, Multiaddr, PeerId, Swarm,
18 SwarmBuilder,
19};
20use tokio::sync::mpsc;
21use tracing::{debug, info, warn};
22
23/// How the network discovers peers.
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub enum NetworkMode {
26 /// huddle 0.8 default: no libp2p at all. The only transport is the
27 /// centralized Tor-onion relay (see `network::server`). No LAN
28 /// discovery, no direct dial, no NAT machinery — every message rides
29 /// the onion. This is what the binary starts in unless the user
30 /// explicitly opts back into libp2p with `--mode mdns|direct`.
31 Server,
32 /// libp2p with mDNS on: announce ourselves on the LAN and pick up
33 /// announcements. Opt-in (`--mode mdns`); runs *alongside* the onion
34 /// relay so LAN peers connect directly while internet peers route
35 /// through the server.
36 Mdns,
37 /// libp2p with mDNS off: invisible to LAN discovery; the only libp2p
38 /// way to connect is an explicit dial. Opt-in (`--mode direct`); also
39 /// runs alongside the onion relay.
40 Direct,
41}
42
43impl NetworkMode {
44 pub fn as_str(&self) -> &'static str {
45 match self {
46 NetworkMode::Server => "server",
47 NetworkMode::Mdns => "mdns",
48 NetworkMode::Direct => "direct",
49 }
50 }
51
52 /// True when this mode starts a libp2p swarm (LAN / direct dial). The
53 /// `Server` default does not — it's onion-relay-only.
54 pub fn uses_libp2p(&self) -> bool {
55 !matches!(self, NetworkMode::Server)
56 }
57
58 pub fn from_str(s: &str) -> Option<Self> {
59 match s.trim().to_ascii_lowercase().as_str() {
60 "server" | "tor" | "onion" => Some(NetworkMode::Server),
61 "mdns" | "lan" | "open" => Some(NetworkMode::Mdns),
62 "direct" | "dial" | "private" => Some(NetworkMode::Direct),
63 _ => None,
64 }
65 }
66}
67
68use crate::identity::{compute_fingerprint, Identity};
69use crate::network::behavior::{HuddleBehavior, HuddleBehaviorEvent};
70use crate::network::events::NetworkEvent;
71use crate::network::protocol::{room_topic, RoomAnnouncement, ROOMS_TOPIC};
72
73#[derive(Debug)]
74pub enum NetworkCommand {
75 /// Subscribe to a room's per-room gossipsub topic.
76 SubscribeRoom { room_id: String },
77 /// Unsubscribe from a room's topic.
78 UnsubscribeRoom { room_id: String },
79 /// Publish a JSON-encoded `RoomMessage` to a room's topic.
80 PublishRoomMessage { room_id: String, payload: Vec<u8> },
81 /// Publish a room announcement on the global rooms topic.
82 AnnounceRoom(RoomAnnouncement),
83 /// User-initiated dial of an explicit address. Used for cross-network
84 /// reach when mDNS isn't enough.
85 Dial { address: Multiaddr },
86 /// huddle 0.5.2: dial a peer using multiple candidate addresses,
87 /// letting libp2p race them in parallel. Used by the "add by HD
88 /// ID / username" flow, which resolves a fingerprint to every
89 /// address we know for that peer (room announcement `host_addrs`
90 /// + persisted `known_peers`). libp2p's parallel dialer picks
91 /// the cheapest path that completes — LAN beats public IP beats
92 /// relay-hopped without us having to probe transports manually.
93 DialAddresses { addresses: Vec<Multiaddr> },
94 /// Phase A: user accepted an inbound dial — promote the peer to
95 /// explicit-peer status so room announcements flow.
96 AcceptInbound { peer_id: PeerId },
97 /// Phase A: user rejected an inbound dial — disconnect them and
98 /// add the peer_id to the in-memory blocklist for this session
99 /// (caller is responsible for the persistent blocked_peers row).
100 RejectInbound { peer_id: PeerId },
101 /// Phase C follow-up: drop a connection that failed an
102 /// application-level identity check (e.g. invite-fingerprint
103 /// mismatch). Differs from `RejectInbound` in that it doesn't
104 /// touch the inbound-pending map (the connection is already
105 /// past Identify when we discover the mismatch) and doesn't
106 /// persist a block — the caller may want to retry with a
107 /// corrected invite.
108 DisconnectPeer { peer_id: PeerId },
109 Shutdown,
110}
111
112#[derive(Clone)]
113pub struct NetworkHandle {
114 cmd_tx: mpsc::Sender<NetworkCommand>,
115 /// huddle 0.8: optional connection to the centralized server (a Tor
116 /// onion relay). When attached, every room subscribe/unsubscribe and
117 /// every published wire message is mirrored to it, so peers we can't
118 /// reach over libp2p still receive our traffic (and we theirs, via the
119 /// server's offline mailbox). `None` until `attach_server` runs.
120 server: Arc<std::sync::Mutex<Option<server::ServerClient>>>,
121}
122
123/// Stable per-message id for the server mailbox/receipts: a short hash of
124/// the wire bytes. Opaque to the server; lets it dedup/queue by id.
125fn server_msg_id(payload: &[u8]) -> String {
126 let mut h = Sha256::new();
127 h.update(payload);
128 hex::encode(&h.finalize()[..8])
129}
130
131impl NetworkHandle {
132 /// Attach a live server connection so subsequent room traffic mirrors
133 /// to it. Replaces any previous one. Called by the app's server
134 /// connection task after each (re)connect.
135 pub fn attach_server(&self, client: server::ServerClient) {
136 *self.server.lock().unwrap() = Some(client);
137 }
138
139 /// Drop the server connection (e.g. after a disconnect) so we stop
140 /// mirroring until it reconnects.
141 pub fn detach_server(&self) {
142 *self.server.lock().unwrap() = None;
143 }
144
145 /// Snapshot the attached server client, if any. Cloned out so we never
146 /// hold the lock across the (sync, non-blocking) mirror call.
147 fn server_client(&self) -> Option<server::ServerClient> {
148 self.server.lock().unwrap().clone()
149 }
150
151 pub async fn subscribe_room(&self, room_id: String) {
152 if let Some(s) = self.server_client() {
153 let _ = s.subscribe(&room_id);
154 }
155 let _ = self
156 .cmd_tx
157 .send(NetworkCommand::SubscribeRoom { room_id })
158 .await;
159 }
160
161 pub async fn unsubscribe_room(&self, room_id: String) {
162 if let Some(s) = self.server_client() {
163 let _ = s.unsubscribe(&room_id);
164 }
165 let _ = self
166 .cmd_tx
167 .send(NetworkCommand::UnsubscribeRoom { room_id })
168 .await;
169 }
170
171 pub async fn publish_room_message(&self, room_id: String, payload: Vec<u8>) {
172 if let Some(s) = self.server_client() {
173 let _ = s.publish(&room_id, &server_msg_id(&payload), &payload);
174 }
175 let _ = self
176 .cmd_tx
177 .send(NetworkCommand::PublishRoomMessage { room_id, payload })
178 .await;
179 }
180
181 pub async fn announce_room(&self, ann: RoomAnnouncement) {
182 let _ = self.cmd_tx.send(NetworkCommand::AnnounceRoom(ann)).await;
183 }
184
185 /// True when a server connection is currently attached.
186 pub fn has_server(&self) -> bool {
187 self.server.lock().unwrap().is_some()
188 }
189
190 pub async fn dial(&self, address: Multiaddr) {
191 let _ = self.cmd_tx.send(NetworkCommand::Dial { address }).await;
192 }
193
194 /// huddle 0.5.2: dial a peer with multiple candidate addresses
195 /// at once. libp2p's swarm races them; the first to complete a
196 /// handshake wins, the rest are dropped. Use when you've resolved
197 /// a peer fingerprint to several known transports — typically
198 /// LAN ip4 + public ip4 + relay circuit.
199 pub async fn dial_addresses(&self, addresses: Vec<Multiaddr>) {
200 let _ = self
201 .cmd_tx
202 .send(NetworkCommand::DialAddresses { addresses })
203 .await;
204 }
205
206 pub async fn accept_inbound(&self, peer_id: PeerId) {
207 let _ = self
208 .cmd_tx
209 .send(NetworkCommand::AcceptInbound { peer_id })
210 .await;
211 }
212
213 pub async fn reject_inbound(&self, peer_id: PeerId) {
214 let _ = self
215 .cmd_tx
216 .send(NetworkCommand::RejectInbound { peer_id })
217 .await;
218 }
219
220 pub async fn disconnect_peer(&self, peer_id: PeerId) {
221 let _ = self
222 .cmd_tx
223 .send(NetworkCommand::DisconnectPeer { peer_id })
224 .await;
225 }
226
227 pub async fn shutdown(&self) {
228 let _ = self.cmd_tx.send(NetworkCommand::Shutdown).await;
229 }
230}
231
232/// What kind of connection we're holding open until Identify gives us
233/// the remote's Ed25519 fingerprint. Quarantined inbound dials are NOT
234/// added to the gossipsub mesh until the user accepts; outbound user-
235/// dials add to the mesh on connection but still wait on Identify so
236/// the eventual `DialSucceeded` can carry the fingerprint.
237#[derive(Debug)]
238enum PendingPeer {
239 /// Inbound dial from an unknown peer — modal-pending in the app.
240 InboundUnknown { address: Multiaddr },
241}
242
243struct NetworkTask {
244 swarm: Swarm<HuddleBehavior>,
245 cmd_rx: mpsc::Receiver<NetworkCommand>,
246 event_tx: mpsc::Sender<NetworkEvent>,
247 discovered_peers: HashSet<PeerId>,
248 /// Tracks user-initiated dials so we can correlate the eventual
249 /// `ConnectionEstablished` / `OutgoingConnectionError` back to a
250 /// specific address the user asked us to dial.
251 dial_attempts: HashMap<ConnectionId, Multiaddr>,
252 /// Phase A: peers connected but not yet promoted to the gossipsub
253 /// mesh (inbound) — waiting either for `Identify` to land so we
254 /// know their fingerprint, or for the user's accept/reject decision.
255 /// On `ConnectionClosed` we drop entries here so a peer disconnecting
256 /// mid-prompt doesn't leak state.
257 pending_inbound: HashMap<PeerId, PendingPeer>,
258 /// Phase A: peers the user has explicitly rejected this session —
259 /// auto-disconnect every reconnect attempt without re-prompting.
260 /// Persistent across runs via `blocked_peers`; this in-memory copy
261 /// is loaded at startup (TODO: not yet wired; falls back to the DB
262 /// check in the app layer for now) and updated on `RejectInbound`.
263 session_blocklist: HashSet<PeerId>,
264 /// Phase D: configured relay multiaddrs. When Identify lands for a
265 /// peer whose multiaddr matches one of these, we call
266 /// `listen_on("<addr>/p2p-circuit")` to register a reservation.
267 /// Tracked as multiaddr strings (no PeerId yet — we don't know it
268 /// until Identify) plus a set of peer_ids of confirmed relays so
269 /// we only register once per relay.
270 configured_relays: Vec<Multiaddr>,
271 relay_peer_ids: HashSet<PeerId>,
272 /// huddle 0.7.11: per-peer DCUtR failure counter. Logged at warn
273 /// once the count crosses `DCUTR_FAIL_BUDGET` so symmetric-NAT
274 /// pairs don't generate runaway hole-punch attempts in the logs.
275 /// libp2p's dcutr behavior schedules its own retries internally;
276 /// this is purely an observability signal for the app.
277 dcutr_failures: HashMap<PeerId, u32>,
278}
279
280/// huddle 0.7.11: warn after this many failed DCUtR attempts to the
281/// same peer. dcutr keeps trying internally, but the audit flagged the
282/// spam as a real issue — capping the log noise is the realistic fix.
283const DCUTR_FAIL_BUDGET: u32 = 6;
284
285pub fn start_network(
286 identity: &Identity,
287 event_tx: mpsc::Sender<NetworkEvent>,
288) -> crate::error::Result<NetworkHandle> {
289 start_network_with(identity, event_tx, NetworkMode::Mdns, 0, Vec::new())
290}
291
292/// huddle 0.8: build a `NetworkHandle` with **no libp2p swarm** — the
293/// onion-relay-only (`NetworkMode::Server`) path. We still hand back a
294/// fully-functional handle so the rest of the app is oblivious: room
295/// subscribe/unsubscribe/publish mirror to the attached `ServerClient`
296/// exactly as before, while the libp2p-specific commands (dial, announce,
297/// accept/reject, …) are simply drained and dropped. A tiny task consumes
298/// `cmd_rx` so the bounded channel never fills and `send().await` never
299/// blocks; it exits on `Shutdown`.
300pub fn start_network_disabled() -> NetworkHandle {
301 let (cmd_tx, mut cmd_rx) = mpsc::channel::<NetworkCommand>(256);
302 tokio::spawn(async move {
303 while let Some(cmd) = cmd_rx.recv().await {
304 if matches!(cmd, NetworkCommand::Shutdown) {
305 info!("network task (libp2p-disabled) shutting down");
306 break;
307 }
308 // Everything else is a libp2p op with no swarm to run it; the
309 // server mirror (in NetworkHandle) already handled the room
310 // traffic before the command reached us. Drop it.
311 }
312 });
313 NetworkHandle {
314 cmd_tx,
315 server: Arc::new(std::sync::Mutex::new(None)),
316 }
317}
318
319/// Start the network task with explicit mode, TCP listen port, and any
320/// pre-configured relay multiaddrs. `listen_port = 0` requests a
321/// random port. Relays are dialed on startup; once `Identify` lands
322/// from a relay peer, we call `listen_on("<relay>/p2p-circuit")` to
323/// register a reservation so peers behind NAT can dial us through it.
324pub fn start_network_with(
325 identity: &Identity,
326 event_tx: mpsc::Sender<NetworkEvent>,
327 mode: NetworkMode,
328 listen_port: u16,
329 relays: Vec<Multiaddr>,
330) -> crate::error::Result<NetworkHandle> {
331 let keypair = identity.keypair().clone();
332 let local_peer_id = identity.peer_id();
333
334 let mut swarm = SwarmBuilder::with_existing_identity(keypair)
335 .with_tokio()
336 .with_tcp(
337 tcp::Config::default(),
338 noise::Config::new,
339 yamux::Config::default,
340 )
341 .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?
342 // Phase D: wrap the transport with relay-client. This both
343 // composes the transport (so we can dial `/p2p-circuit/`
344 // addresses) and surfaces a `relay::client::Behaviour` into
345 // the `with_behaviour` closure as the second argument.
346 .with_relay_client(noise::Config::new, yamux::Config::default)
347 .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?
348 .with_behaviour(|key, relay_client| {
349 let mdns_opt = match mode {
350 NetworkMode::Mdns => Some(
351 mdns::tokio::Behaviour::new(mdns::Config::default(), local_peer_id)
352 .expect("mDNS init failed"),
353 ),
354 // `Direct` runs libp2p without mDNS. `Server` never reaches
355 // here (it takes the `start_network_disabled` path and runs
356 // no swarm), but the match must be exhaustive.
357 NetworkMode::Direct | NetworkMode::Server => None,
358 };
359 let mdns: libp2p::swarm::behaviour::toggle::Toggle<_> = mdns_opt.into();
360
361 let identify = identify::Behaviour::new(
362 identify::Config::new("/huddle/1.0.0".into(), key.public())
363 .with_agent_version("huddle/0.5".into()),
364 );
365
366 let ping = ping::Behaviour::default();
367
368 let gossipsub_config = gossipsub::ConfigBuilder::default()
369 .heartbeat_interval(Duration::from_secs(1))
370 .validation_mode(gossipsub::ValidationMode::Strict)
371 // Default is 64 KiB. Raise it so base64-encoded file
372 // chunks (see files::CHUNK_SIZE) keep ample headroom.
373 .max_transmit_size(256 * 1024)
374 .build()
375 .expect("valid gossipsub config");
376
377 let mut gossipsub = gossipsub::Behaviour::new(
378 gossipsub::MessageAuthenticity::Signed(key.clone()),
379 gossipsub_config,
380 )
381 .expect("valid gossipsub init");
382
383 // Every node subscribes to the global rooms topic so the lobby
384 // shows discovered rooms even before joining anything.
385 let rooms_topic = gossipsub::IdentTopic::new(ROOMS_TOPIC);
386 gossipsub
387 .subscribe(&rooms_topic)
388 .expect("subscribe rooms topic");
389
390 // AutoNAT v2: client probes external addresses by asking
391 // remote servers to dial us back; server answers other
392 // peers' probes. Both halves are needed for symmetric
393 // P2P reachability detection (v1 combined them; v2 split).
394 let autonat_client = autonat::v2::client::Behaviour::new(
395 rand::rngs::OsRng,
396 autonat::v2::client::Config::default(),
397 );
398 let autonat_server = autonat::v2::server::Behaviour::new(rand::rngs::OsRng);
399 let dcutr = dcutr::Behaviour::new(local_peer_id);
400
401 HuddleBehavior {
402 mdns,
403 identify,
404 ping,
405 gossipsub,
406 relay_client,
407 autonat_client,
408 autonat_server,
409 dcutr,
410 }
411 })
412 .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?
413 .with_swarm_config(|cfg| cfg.with_idle_connection_timeout(Duration::from_secs(120)))
414 .build();
415
416 let listen_addr: Multiaddr = format!("/ip4/0.0.0.0/tcp/{}", listen_port)
417 .parse()
418 .expect("valid listen addr");
419 swarm
420 .listen_on(listen_addr)
421 .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?;
422 // Also bind IPv6 on all interfaces so users can dial via IPv6.
423 let listen_addr6: Multiaddr = format!("/ip6/::/tcp/{}", listen_port)
424 .parse()
425 .expect("valid ipv6 listen addr");
426 if let Err(e) = swarm.listen_on(listen_addr6) {
427 debug!(%e, "ipv6 listen skipped");
428 }
429
430 let (cmd_tx, cmd_rx) = mpsc::channel(256);
431 let mut task = NetworkTask {
432 swarm,
433 cmd_rx,
434 event_tx,
435 discovered_peers: HashSet::new(),
436 dial_attempts: HashMap::new(),
437 pending_inbound: HashMap::new(),
438 session_blocklist: HashSet::new(),
439 configured_relays: relays.clone(),
440 relay_peer_ids: HashSet::new(),
441 dcutr_failures: HashMap::new(),
442 };
443 // Phase D: dial each configured relay so Identify can complete and
444 // we can register a `/p2p-circuit` reservation. Failures here are
445 // non-fatal — the user can still chat on LAN.
446 for relay_addr in relays {
447 info!(addr = %relay_addr, "dialing configured relay");
448 let opts: libp2p::swarm::dial_opts::DialOpts = relay_addr.clone().into();
449 let conn_id = opts.connection_id();
450 match task.swarm.dial(opts) {
451 Ok(()) => {
452 task.dial_attempts.insert(conn_id, relay_addr);
453 }
454 Err(e) => warn!(%e, "dial relay failed"),
455 }
456 }
457 tokio::spawn(task.run());
458
459 Ok(NetworkHandle {
460 cmd_tx,
461 server: Arc::new(std::sync::Mutex::new(None)),
462 })
463}
464
465impl NetworkTask {
466 async fn run(mut self) {
467 loop {
468 tokio::select! {
469 event = self.swarm.select_next_some() => {
470 self.handle_swarm_event(event).await;
471 }
472 Some(cmd) = self.cmd_rx.recv() => {
473 if matches!(cmd, NetworkCommand::Shutdown) {
474 info!("network task shutting down");
475 break;
476 }
477 self.handle_command(cmd);
478 }
479 }
480 }
481 }
482
483 async fn handle_swarm_event(
484 &mut self,
485 event: libp2p::swarm::SwarmEvent<HuddleBehaviorEvent>,
486 ) {
487 match event {
488 libp2p::swarm::SwarmEvent::NewListenAddr { address, .. } => {
489 info!(%address, "listening");
490 // Phase D: a relay-circuit address is a reachability
491 // milestone — surface it as its own event so the lobby
492 // can show "reachable via N relays" status.
493 use libp2p::multiaddr::Protocol;
494 let is_circuit = address
495 .iter()
496 .any(|p| matches!(p, Protocol::P2pCircuit));
497 if is_circuit {
498 let _ = self
499 .event_tx
500 .send(NetworkEvent::RelayReservationEstablished {
501 address: address.clone(),
502 })
503 .await;
504 }
505 let _ = self
506 .event_tx
507 .send(NetworkEvent::ListeningOn { address })
508 .await;
509 }
510 libp2p::swarm::SwarmEvent::ConnectionEstablished {
511 peer_id,
512 connection_id,
513 endpoint,
514 ..
515 } => {
516 if let Some(addr) = self.dial_attempts.remove(&connection_id) {
517 // Phase D: a connection that was for a configured
518 // relay shouldn't pollute the lobby with a normal
519 // DialSucceeded — relays aren't chat peers. We just
520 // remember the peer_id and wait for Identify, then
521 // register a /p2p-circuit reservation.
522 let is_relay = self.configured_relays.iter().any(|r| r == &addr);
523 if is_relay {
524 info!(%peer_id, %addr, "connected to configured relay");
525 self.relay_peer_ids.insert(peer_id);
526 } else {
527 info!(%peer_id, %addr, "user-dialed peer connected");
528 // Treat dialed peers like mDNS-discovered: add
529 // to gossipsub explicit peers so room
530 // announcements flow.
531 self.swarm
532 .behaviour_mut()
533 .gossipsub
534 .add_explicit_peer(&peer_id);
535 self.discovered_peers.insert(peer_id);
536 let _ = self
537 .event_tx
538 .send(NetworkEvent::DialSucceeded {
539 peer_id,
540 address: addr,
541 })
542 .await;
543 }
544 } else if let ConnectedPoint::Dialer { .. } = endpoint {
545 // Outgoing connection we didn't track (e.g. mDNS auto-dial)
546 // — still add to mesh; no user-visible event needed.
547 self.swarm
548 .behaviour_mut()
549 .gossipsub
550 .add_explicit_peer(&peer_id);
551 } else {
552 // Inbound dial from an unknown peer (Phase A). We
553 // hold the connection but do NOT add them to the
554 // explicit-peer set yet — wait for Identify so we
555 // can show the user the peer's fingerprint, then
556 // either AcceptInbound (promote to mesh) or
557 // RejectInbound (disconnect + persist blocklist).
558 //
559 // Known limitation: gossipsub's score-based mesh
560 // formation may still forward topic messages to
561 // this peer via other peers we have in common.
562 // True hard-quarantine would need a custom
563 // ConnectionHandler — out of scope for v1.
564 if self.session_blocklist.contains(&peer_id) {
565 info!(%peer_id, "rejecting inbound from session-blocked peer");
566 let _ = self.swarm.disconnect_peer_id(peer_id);
567 } else {
568 let address = match &endpoint {
569 ConnectedPoint::Listener { send_back_addr, .. } => {
570 send_back_addr.clone()
571 }
572 _ => Multiaddr::empty(),
573 };
574 debug!(%peer_id, %address, "inbound peer pending decision");
575 self.pending_inbound
576 .insert(peer_id, PendingPeer::InboundUnknown { address });
577 }
578 }
579 }
580 libp2p::swarm::SwarmEvent::OutgoingConnectionError {
581 connection_id,
582 error,
583 ..
584 } => {
585 if let Some(addr) = self.dial_attempts.remove(&connection_id) {
586 warn!(%addr, %error, "user-dialed peer failed");
587 let _ = self
588 .event_tx
589 .send(NetworkEvent::DialFailed {
590 address: addr,
591 error: error.to_string(),
592 })
593 .await;
594 }
595 }
596 libp2p::swarm::SwarmEvent::ConnectionClosed {
597 peer_id,
598 num_established,
599 ..
600 } => {
601 // Drop any pending-inbound entry for this peer — they
602 // disconnected before we could prompt the user (or
603 // before the user accepted). Lets a re-connect start
604 // fresh rather than reusing stale state.
605 self.pending_inbound.remove(&peer_id);
606 // huddle 0.7.11: emit PeerDisconnected so the app can
607 // clean its `connected_dial_addrs` map. Only fire when
608 // the LAST connection to that peer closed — multiple
609 // simultaneous connections (e.g. direct + relay) would
610 // otherwise emit spurious disconnects when one of the
611 // two drops.
612 if num_established == 0 {
613 let _ = self
614 .event_tx
615 .send(NetworkEvent::PeerDisconnected { peer_id })
616 .await;
617 // Also clean up gossipsub's explicit-peers set so
618 // it doesn't accumulate dead PeerIds across the
619 // session.
620 self.swarm
621 .behaviour_mut()
622 .gossipsub
623 .remove_explicit_peer(&peer_id);
624 }
625 }
626 libp2p::swarm::SwarmEvent::Behaviour(be) => self.handle_behavior_event(be).await,
627 _ => {}
628 }
629 }
630
631 async fn handle_behavior_event(&mut self, event: HuddleBehaviorEvent) {
632 match event {
633 HuddleBehaviorEvent::Mdns(mdns::Event::Discovered(peers)) => {
634 for (peer_id, addr) in peers {
635 if self.discovered_peers.insert(peer_id) {
636 info!(%peer_id, %addr, "mDNS discovered");
637 self.swarm.add_peer_address(peer_id, addr);
638 // Explicitly add to gossipsub mesh.
639 self.swarm
640 .behaviour_mut()
641 .gossipsub
642 .add_explicit_peer(&peer_id);
643 let _ = self
644 .event_tx
645 .send(NetworkEvent::PeerDiscovered { peer_id })
646 .await;
647 }
648 }
649 }
650 HuddleBehaviorEvent::Mdns(mdns::Event::Expired(peers)) => {
651 for (peer_id, _) in peers {
652 if self.discovered_peers.remove(&peer_id) {
653 info!(%peer_id, "mDNS peer expired");
654 self.swarm
655 .behaviour_mut()
656 .gossipsub
657 .remove_explicit_peer(&peer_id);
658 let _ = self.event_tx.send(NetworkEvent::PeerExpired { peer_id }).await;
659 }
660 }
661 }
662 HuddleBehaviorEvent::Gossipsub(gossipsub::Event::Message {
663 propagation_source,
664 message,
665 ..
666 }) => {
667 self.handle_gossipsub_message(propagation_source, message).await;
668 }
669 HuddleBehaviorEvent::Identify(identify::Event::Received {
670 peer_id, info, ..
671 }) => {
672 debug!(%peer_id, agent = %info.agent_version, "identify received");
673 // Phase D: if this peer is a configured relay, register
674 // a `/p2p-circuit` reservation on first identify. Idem-
675 // potent — only fire if we haven't listened on this
676 // relay already (identify fires periodically).
677 if self.relay_peer_ids.contains(&peer_id) {
678 use libp2p::multiaddr::Protocol;
679 if let Some(relay_addr) = self
680 .configured_relays
681 .iter()
682 .find(|a| {
683 // Match by /p2p/<peer-id> suffix when
684 // present, else by the addr we dialed.
685 a.iter().any(|p| matches!(p, Protocol::P2p(pid) if pid == peer_id))
686 || self.dial_attempts.values().any(|d| d == *a)
687 })
688 .cloned()
689 {
690 let circuit = relay_addr.with(Protocol::P2pCircuit);
691 match self.swarm.listen_on(circuit.clone()) {
692 Ok(_) => info!(%circuit, "listening on relay circuit"),
693 Err(e) => warn!(%e, %circuit, "relay listen_on failed"),
694 }
695 }
696 }
697 // Decode the remote's Ed25519 pubkey and derive our
698 // 24-char fingerprint from it. Non-Ed25519 keys (Secp,
699 // Rsa, Ecdsa) shouldn't appear in practice — huddle
700 // only generates Ed25519 identities — so we just log
701 // and skip if the cast fails.
702 let fingerprint = match info.public_key.clone().try_into_ed25519() {
703 Ok(ed_pk) => {
704 let bytes = ed_pk.to_bytes();
705 compute_fingerprint(&bytes)
706 }
707 Err(_) => {
708 warn!(%peer_id, "identify pubkey isn't Ed25519; skipping fingerprint");
709 return;
710 }
711 };
712 // Always notify the app layer so it can populate
713 // `known_peers.fingerprint` and detect that an
714 // outbound peer we dialed has fully identified.
715 let _ = self
716 .event_tx
717 .send(NetworkEvent::PeerIdentified {
718 peer_id,
719 fingerprint: fingerprint.clone(),
720 })
721 .await;
722 // If the peer is in pending_inbound, Identify completing
723 // is the cue to surface the user prompt. Keep them in
724 // pending_inbound until Accept or Reject — we don't
725 // know yet which way the user will decide.
726 if let Some(PendingPeer::InboundUnknown { address }) =
727 self.pending_inbound.get(&peer_id)
728 {
729 let address = address.clone();
730 let _ = self
731 .event_tx
732 .send(NetworkEvent::InboundDial {
733 peer_id,
734 fingerprint,
735 address,
736 })
737 .await;
738 }
739 }
740 HuddleBehaviorEvent::AutonatClient(ev) => {
741 // One probe per address candidate. `result.is_ok()`
742 // means a remote AutoNAT server dialed us back
743 // successfully on `tested_addr` ⇒ this address is
744 // reachable from the outside. The app layer aggregates
745 // these into the lobby reachability badge.
746 let reachable = ev.result.is_ok();
747 if reachable {
748 info!(tested = %ev.tested_addr, server = %ev.server, "AutoNAT: reachable");
749 } else {
750 debug!(tested = %ev.tested_addr, server = %ev.server, "AutoNAT: probe failed");
751 }
752 let _ = self
753 .event_tx
754 .send(NetworkEvent::NatProbeResult {
755 tested_addr: ev.tested_addr,
756 reachable,
757 })
758 .await;
759 }
760 HuddleBehaviorEvent::AutonatServer(_) => {
761 // We answered another peer's reachability probe.
762 // No app-visible action.
763 }
764 HuddleBehaviorEvent::Dcutr(ev) => {
765 let success = ev.result.is_ok();
766 if success {
767 info!(remote = %ev.remote_peer_id, "DCUtR: direct connection established");
768 // huddle 0.7.11: a successful hole-punch resets the
769 // per-peer DCUtR retry counter so future failures
770 // don't immediately bail out.
771 self.dcutr_failures.remove(&ev.remote_peer_id);
772 } else {
773 debug!(remote = %ev.remote_peer_id, "DCUtR: hole-punch failed");
774 let count = self
775 .dcutr_failures
776 .entry(ev.remote_peer_id)
777 .and_modify(|n| *n += 1)
778 .or_insert(1);
779 if *count >= DCUTR_FAIL_BUDGET {
780 warn!(
781 remote = %ev.remote_peer_id,
782 attempts = *count,
783 "DCUtR: giving up after repeated failures (symmetric NAT likely)"
784 );
785 }
786 }
787 let _ = self
788 .event_tx
789 .send(NetworkEvent::DcutrUpgrade {
790 remote_peer: ev.remote_peer_id,
791 success,
792 })
793 .await;
794 }
795 HuddleBehaviorEvent::RelayClient(event) => {
796 // huddle 0.7.12: libp2p 0.56's `relay::client::Event`
797 // only exposes the success-path variants
798 // (`ReservationReqAccepted`, `OutboundCircuitEstablished`,
799 // `InboundCircuitEstablished`). There's no
800 // `ReservationReqFailed` arm we can match on, so we
801 // can't reliably surface reservation loss without a
802 // separate health-check timer (future work). For now
803 // we log every event so operators can see the
804 // lifecycle in `huddle.log` — pre-0.7.11 the whole
805 // arm was swallowed by `_ => {}` which hid even that.
806 use libp2p::relay::client::Event as Rc;
807 match event {
808 Rc::ReservationReqAccepted { relay_peer_id, .. } => {
809 info!(%relay_peer_id, "relay: reservation accepted");
810 }
811 Rc::OutboundCircuitEstablished { relay_peer_id, .. } => {
812 debug!(%relay_peer_id, "relay: outbound circuit established");
813 }
814 other => {
815 debug!(?other, "relay client event");
816 }
817 }
818 }
819 _ => {}
820 }
821 }
822
823 async fn handle_gossipsub_message(
824 &mut self,
825 from_peer: PeerId,
826 message: gossipsub::Message,
827 ) {
828 let topic = message.topic.to_string();
829 if topic == ROOMS_TOPIC {
830 match serde_json::from_slice::<RoomAnnouncement>(&message.data) {
831 Ok(ann) => {
832 let _ = self
833 .event_tx
834 .send(NetworkEvent::RoomAnnouncementReceived(ann))
835 .await;
836 }
837 Err(e) => {
838 warn!(%e, "bad room announcement");
839 }
840 }
841 } else if let Some(room_id) = topic.strip_prefix(protocol::ROOM_TOPIC_PREFIX) {
842 let _ = self
843 .event_tx
844 .send(NetworkEvent::RoomMessageReceived {
845 room_id: room_id.to_string(),
846 payload: message.data,
847 from_peer,
848 })
849 .await;
850 }
851 }
852
853 fn handle_command(&mut self, cmd: NetworkCommand) {
854 match cmd {
855 NetworkCommand::SubscribeRoom { room_id } => {
856 let topic = gossipsub::IdentTopic::new(room_topic(&room_id));
857 if let Err(e) = self.swarm.behaviour_mut().gossipsub.subscribe(&topic) {
858 warn!(%e, %room_id, "subscribe room failed");
859 }
860 }
861 NetworkCommand::UnsubscribeRoom { room_id } => {
862 let topic = gossipsub::IdentTopic::new(room_topic(&room_id));
863 self.swarm.behaviour_mut().gossipsub.unsubscribe(&topic);
864 }
865 NetworkCommand::PublishRoomMessage { room_id, payload } => {
866 let topic = gossipsub::IdentTopic::new(room_topic(&room_id));
867 if let Err(e) = self.swarm.behaviour_mut().gossipsub.publish(topic, payload) {
868 // No subscribed peers is expected before the mesh
869 // forms; anything else (MessageTooLarge, full queues)
870 // is a real bug worth surfacing.
871 match e {
872 gossipsub::PublishError::NoPeersSubscribedToTopic => {
873 debug!(%room_id, "publish skipped: no peers subscribed to topic yet");
874 }
875 e => warn!(%e, %room_id, "publish room message failed"),
876 }
877 }
878 }
879 NetworkCommand::AnnounceRoom(ann) => {
880 let topic = gossipsub::IdentTopic::new(ROOMS_TOPIC);
881 match serde_json::to_vec(&ann) {
882 Ok(payload) => {
883 if let Err(e) =
884 self.swarm.behaviour_mut().gossipsub.publish(topic, payload)
885 {
886 debug!(%e, "publish room announcement failed");
887 }
888 }
889 Err(e) => warn!(%e, "encode room announcement"),
890 }
891 }
892 NetworkCommand::AcceptInbound { peer_id } => {
893 if self.pending_inbound.remove(&peer_id).is_some() {
894 info!(%peer_id, "inbound dial accepted — promoting to mesh");
895 self.swarm
896 .behaviour_mut()
897 .gossipsub
898 .add_explicit_peer(&peer_id);
899 self.discovered_peers.insert(peer_id);
900 } else {
901 debug!(%peer_id, "AcceptInbound for unknown peer (already promoted or disconnected)");
902 }
903 }
904 NetworkCommand::RejectInbound { peer_id } => {
905 self.pending_inbound.remove(&peer_id);
906 self.session_blocklist.insert(peer_id);
907 info!(%peer_id, "inbound dial rejected — disconnecting");
908 let _ = self.swarm.disconnect_peer_id(peer_id);
909 }
910 NetworkCommand::DisconnectPeer { peer_id } => {
911 info!(%peer_id, "app-level identity check failed — disconnecting");
912 let _ = self.swarm.disconnect_peer_id(peer_id);
913 }
914 NetworkCommand::Dial { address } => {
915 let opts: DialOpts = address.clone().into();
916 let conn_id = opts.connection_id();
917 match self.swarm.dial(opts) {
918 Ok(()) => {
919 self.dial_attempts.insert(conn_id, address);
920 }
921 Err(e) => {
922 // Synchronous dial error (bad multiaddr, transport refused).
923 let tx = self.event_tx.clone();
924 let err = e.to_string();
925 tokio::spawn(async move {
926 let _ = tx
927 .send(NetworkEvent::DialFailed {
928 address,
929 error: err,
930 })
931 .await;
932 });
933 }
934 }
935 }
936 NetworkCommand::DialAddresses { addresses } => {
937 use libp2p::multiaddr::Protocol;
938 if addresses.is_empty() {
939 return;
940 }
941 // Extract the shared peer-id from any address with a
942 // `/p2p/<peer-id>` suffix. All host_addrs originating
943 // from the same `RoomAnnouncement` share the
944 // announcer's peer-id, so the first match is enough.
945 let peer_id = addresses
946 .iter()
947 .flat_map(|a| a.iter())
948 .find_map(|p| match p {
949 Protocol::P2p(pid) => Some(pid),
950 _ => None,
951 });
952 let opts = match peer_id {
953 Some(pid) => DialOpts::peer_id(pid)
954 .addresses(addresses.clone())
955 .build(),
956 // No /p2p/ segment anywhere — fall back to single-
957 // address dial of the first candidate, matching the
958 // legacy `Dial` semantics for unanchored multiaddrs.
959 None => addresses[0].clone().into(),
960 };
961 let conn_id = opts.connection_id();
962 // Use the first address as the representative for
963 // dial_attempts. On synchronous error we report it;
964 // on async success the post-identify handler upserts
965 // with the actually-connected endpoint.
966 let primary = addresses[0].clone();
967 match self.swarm.dial(opts) {
968 Ok(()) => {
969 self.dial_attempts.insert(conn_id, primary);
970 }
971 Err(e) => {
972 let tx = self.event_tx.clone();
973 let err = e.to_string();
974 tokio::spawn(async move {
975 let _ = tx
976 .send(NetworkEvent::DialFailed {
977 address: primary,
978 error: err,
979 })
980 .await;
981 });
982 }
983 }
984 }
985 NetworkCommand::Shutdown => unreachable!(),
986 }
987 }
988}