huddle_core/network/mod.rs
1pub mod behavior;
2pub mod events;
3pub mod protocol;
4pub mod server;
5
6use std::collections::{HashMap, HashSet};
7use std::sync::Arc;
8use std::time::Duration;
9
10use futures::StreamExt;
11use sha2::{Digest, Sha256};
12use libp2p::core::ConnectedPoint;
13use libp2p::swarm::dial_opts::DialOpts;
14use libp2p::swarm::ConnectionId;
15use libp2p::{
16 autonat, dcutr, gossipsub, identify, mdns, noise, ping, tcp, yamux, Multiaddr, PeerId, Swarm,
17 SwarmBuilder,
18};
19use tokio::sync::mpsc;
20use tracing::{debug, info, warn};
21
22/// How the network discovers peers.
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24pub enum NetworkMode {
25 /// huddle 0.8 default: no libp2p at all. The only transport is the
26 /// centralized Tor-onion relay (see `network::server`). No LAN
27 /// discovery, no direct dial, no NAT machinery — every message rides
28 /// the onion. This is what the binary starts in unless the user
29 /// explicitly opts back into libp2p with `--mode mdns|direct`.
30 Server,
31 /// libp2p with mDNS on: announce ourselves on the LAN and pick up
32 /// announcements. Opt-in (`--mode mdns`); runs *alongside* the onion
33 /// relay so LAN peers connect directly while internet peers route
34 /// through the server.
35 Mdns,
36 /// libp2p with mDNS off: invisible to LAN discovery; the only libp2p
37 /// way to connect is an explicit dial. Opt-in (`--mode direct`); also
38 /// runs alongside the onion relay.
39 Direct,
40}
41
42impl NetworkMode {
43 pub fn as_str(&self) -> &'static str {
44 match self {
45 NetworkMode::Server => "server",
46 NetworkMode::Mdns => "mdns",
47 NetworkMode::Direct => "direct",
48 }
49 }
50
51 /// True when this mode starts a libp2p swarm (LAN / direct dial). The
52 /// `Server` default does not — it's onion-relay-only.
53 pub fn uses_libp2p(&self) -> bool {
54 !matches!(self, NetworkMode::Server)
55 }
56
57 pub fn from_str(s: &str) -> Option<Self> {
58 match s.trim().to_ascii_lowercase().as_str() {
59 "server" | "tor" | "onion" => Some(NetworkMode::Server),
60 "mdns" | "lan" | "open" => Some(NetworkMode::Mdns),
61 "direct" | "dial" | "private" => Some(NetworkMode::Direct),
62 _ => None,
63 }
64 }
65}
66
67use crate::identity::{compute_fingerprint, Identity};
68use crate::network::behavior::{HuddleBehavior, HuddleBehaviorEvent};
69use crate::network::events::NetworkEvent;
70use crate::network::protocol::{room_topic, RoomAnnouncement, ROOMS_TOPIC};
71
72#[derive(Debug)]
73pub enum NetworkCommand {
74 /// Subscribe to a room's per-room gossipsub topic.
75 SubscribeRoom { room_id: String },
76 /// Unsubscribe from a room's topic.
77 UnsubscribeRoom { room_id: String },
78 /// Publish a JSON-encoded `RoomMessage` to a room's topic.
79 PublishRoomMessage { room_id: String, payload: Vec<u8> },
80 /// Publish a room announcement on the global rooms topic.
81 AnnounceRoom(RoomAnnouncement),
82 /// User-initiated dial of an explicit address. Used for cross-network
83 /// reach when mDNS isn't enough.
84 Dial { address: Multiaddr },
85 /// huddle 0.5.2: dial a peer using multiple candidate addresses,
86 /// letting libp2p race them in parallel. Used by the "add by HD
87 /// ID / username" flow, which resolves a fingerprint to every
88 /// address we know for that peer (room announcement `host_addrs`
89 /// + persisted `known_peers`). libp2p's parallel dialer picks
90 /// the cheapest path that completes — LAN beats public IP beats
91 /// relay-hopped without us having to probe transports manually.
92 DialAddresses { addresses: Vec<Multiaddr> },
93 /// Phase A: user accepted an inbound dial — promote the peer to
94 /// explicit-peer status so room announcements flow.
95 AcceptInbound { peer_id: PeerId },
96 /// Phase A: user rejected an inbound dial — disconnect them and
97 /// add the peer_id to the in-memory blocklist for this session
98 /// (caller is responsible for the persistent blocked_peers row).
99 RejectInbound { peer_id: PeerId },
100 /// Phase C follow-up: drop a connection that failed an
101 /// application-level identity check (e.g. invite-fingerprint
102 /// mismatch). Differs from `RejectInbound` in that it doesn't
103 /// touch the inbound-pending map (the connection is already
104 /// past Identify when we discover the mismatch) and doesn't
105 /// persist a block — the caller may want to retry with a
106 /// corrected invite.
107 DisconnectPeer { peer_id: PeerId },
108 Shutdown,
109}
110
111#[derive(Clone)]
112pub struct NetworkHandle {
113 cmd_tx: mpsc::Sender<NetworkCommand>,
114 /// huddle 0.8: optional connection to the centralized server (a Tor
115 /// onion relay). When attached, every room subscribe/unsubscribe and
116 /// every published wire message is mirrored to it, so peers we can't
117 /// reach over libp2p still receive our traffic (and we theirs, via the
118 /// server's offline mailbox). `None` until `attach_server` runs.
119 server: Arc<std::sync::Mutex<Option<server::ServerClient>>>,
120}
121
122/// Stable per-message id for the server mailbox/receipts: a short hash of
123/// the wire bytes. Opaque to the server; lets it dedup/queue by id.
124fn server_msg_id(payload: &[u8]) -> String {
125 let mut h = Sha256::new();
126 h.update(payload);
127 hex::encode(&h.finalize()[..8])
128}
129
130impl NetworkHandle {
131 /// Attach a live server connection so subsequent room traffic mirrors
132 /// to it. Replaces any previous one. Called by the app's server
133 /// connection task after each (re)connect.
134 pub fn attach_server(&self, client: server::ServerClient) {
135 *self.server.lock().unwrap() = Some(client);
136 }
137
138 /// Drop the server connection (e.g. after a disconnect) so we stop
139 /// mirroring until it reconnects.
140 pub fn detach_server(&self) {
141 *self.server.lock().unwrap() = None;
142 }
143
144 /// Snapshot the attached server client, if any. Cloned out so we never
145 /// hold the lock across the (sync, non-blocking) mirror call.
146 fn server_client(&self) -> Option<server::ServerClient> {
147 self.server.lock().unwrap().clone()
148 }
149
150 pub async fn subscribe_room(&self, room_id: String) {
151 if let Some(s) = self.server_client() {
152 let _ = s.subscribe(&room_id);
153 }
154 let _ = self
155 .cmd_tx
156 .send(NetworkCommand::SubscribeRoom { room_id })
157 .await;
158 }
159
160 pub async fn unsubscribe_room(&self, room_id: String) {
161 if let Some(s) = self.server_client() {
162 let _ = s.unsubscribe(&room_id);
163 }
164 let _ = self
165 .cmd_tx
166 .send(NetworkCommand::UnsubscribeRoom { room_id })
167 .await;
168 }
169
170 pub async fn publish_room_message(&self, room_id: String, payload: Vec<u8>) {
171 if let Some(s) = self.server_client() {
172 let _ = s.publish(&room_id, &server_msg_id(&payload), &payload);
173 }
174 let _ = self
175 .cmd_tx
176 .send(NetworkCommand::PublishRoomMessage { room_id, payload })
177 .await;
178 }
179
180 pub async fn announce_room(&self, ann: RoomAnnouncement) {
181 let _ = self.cmd_tx.send(NetworkCommand::AnnounceRoom(ann)).await;
182 }
183
184 /// True when a server connection is currently attached.
185 pub fn has_server(&self) -> bool {
186 self.server.lock().unwrap().is_some()
187 }
188
189 pub async fn dial(&self, address: Multiaddr) {
190 let _ = self.cmd_tx.send(NetworkCommand::Dial { address }).await;
191 }
192
193 /// huddle 0.5.2: dial a peer with multiple candidate addresses
194 /// at once. libp2p's swarm races them; the first to complete a
195 /// handshake wins, the rest are dropped. Use when you've resolved
196 /// a peer fingerprint to several known transports — typically
197 /// LAN ip4 + public ip4 + relay circuit.
198 pub async fn dial_addresses(&self, addresses: Vec<Multiaddr>) {
199 let _ = self
200 .cmd_tx
201 .send(NetworkCommand::DialAddresses { addresses })
202 .await;
203 }
204
205 pub async fn accept_inbound(&self, peer_id: PeerId) {
206 let _ = self
207 .cmd_tx
208 .send(NetworkCommand::AcceptInbound { peer_id })
209 .await;
210 }
211
212 pub async fn reject_inbound(&self, peer_id: PeerId) {
213 let _ = self
214 .cmd_tx
215 .send(NetworkCommand::RejectInbound { peer_id })
216 .await;
217 }
218
219 pub async fn disconnect_peer(&self, peer_id: PeerId) {
220 let _ = self
221 .cmd_tx
222 .send(NetworkCommand::DisconnectPeer { peer_id })
223 .await;
224 }
225
226 pub async fn shutdown(&self) {
227 let _ = self.cmd_tx.send(NetworkCommand::Shutdown).await;
228 }
229}
230
231/// What kind of connection we're holding open until Identify gives us
232/// the remote's Ed25519 fingerprint. Quarantined inbound dials are NOT
233/// added to the gossipsub mesh until the user accepts; outbound user-
234/// dials add to the mesh on connection but still wait on Identify so
235/// the eventual `DialSucceeded` can carry the fingerprint.
236#[derive(Debug)]
237enum PendingPeer {
238 /// Inbound dial from an unknown peer — modal-pending in the app.
239 InboundUnknown { address: Multiaddr },
240}
241
242struct NetworkTask {
243 swarm: Swarm<HuddleBehavior>,
244 cmd_rx: mpsc::Receiver<NetworkCommand>,
245 event_tx: mpsc::Sender<NetworkEvent>,
246 discovered_peers: HashSet<PeerId>,
247 /// Tracks user-initiated dials so we can correlate the eventual
248 /// `ConnectionEstablished` / `OutgoingConnectionError` back to a
249 /// specific address the user asked us to dial.
250 dial_attempts: HashMap<ConnectionId, Multiaddr>,
251 /// Phase A: peers connected but not yet promoted to the gossipsub
252 /// mesh (inbound) — waiting either for `Identify` to land so we
253 /// know their fingerprint, or for the user's accept/reject decision.
254 /// On `ConnectionClosed` we drop entries here so a peer disconnecting
255 /// mid-prompt doesn't leak state.
256 pending_inbound: HashMap<PeerId, PendingPeer>,
257 /// Phase A: peers the user has explicitly rejected this session —
258 /// auto-disconnect every reconnect attempt without re-prompting.
259 /// Persistent across runs via `blocked_peers`; this in-memory copy
260 /// is loaded at startup (TODO: not yet wired; falls back to the DB
261 /// check in the app layer for now) and updated on `RejectInbound`.
262 session_blocklist: HashSet<PeerId>,
263 /// Phase D: configured relay multiaddrs. When Identify lands for a
264 /// peer whose multiaddr matches one of these, we call
265 /// `listen_on("<addr>/p2p-circuit")` to register a reservation.
266 /// Tracked as multiaddr strings (no PeerId yet — we don't know it
267 /// until Identify) plus a set of peer_ids of confirmed relays so
268 /// we only register once per relay.
269 configured_relays: Vec<Multiaddr>,
270 relay_peer_ids: HashSet<PeerId>,
271 /// huddle 0.7.11: per-peer DCUtR failure counter. Logged at warn
272 /// once the count crosses `DCUTR_FAIL_BUDGET` so symmetric-NAT
273 /// pairs don't generate runaway hole-punch attempts in the logs.
274 /// libp2p's dcutr behavior schedules its own retries internally;
275 /// this is purely an observability signal for the app.
276 dcutr_failures: HashMap<PeerId, u32>,
277}
278
279/// huddle 0.7.11: warn after this many failed DCUtR attempts to the
280/// same peer. dcutr keeps trying internally, but the audit flagged the
281/// spam as a real issue — capping the log noise is the realistic fix.
282const DCUTR_FAIL_BUDGET: u32 = 6;
283
284pub fn start_network(
285 identity: &Identity,
286 event_tx: mpsc::Sender<NetworkEvent>,
287) -> crate::error::Result<NetworkHandle> {
288 start_network_with(identity, event_tx, NetworkMode::Mdns, 0, Vec::new())
289}
290
291/// huddle 0.8: build a `NetworkHandle` with **no libp2p swarm** — the
292/// onion-relay-only (`NetworkMode::Server`) path. We still hand back a
293/// fully-functional handle so the rest of the app is oblivious: room
294/// subscribe/unsubscribe/publish mirror to the attached `ServerClient`
295/// exactly as before, while the libp2p-specific commands (dial, announce,
296/// accept/reject, …) are simply drained and dropped. A tiny task consumes
297/// `cmd_rx` so the bounded channel never fills and `send().await` never
298/// blocks; it exits on `Shutdown`.
299pub fn start_network_disabled() -> NetworkHandle {
300 let (cmd_tx, mut cmd_rx) = mpsc::channel::<NetworkCommand>(256);
301 tokio::spawn(async move {
302 while let Some(cmd) = cmd_rx.recv().await {
303 if matches!(cmd, NetworkCommand::Shutdown) {
304 info!("network task (libp2p-disabled) shutting down");
305 break;
306 }
307 // Everything else is a libp2p op with no swarm to run it; the
308 // server mirror (in NetworkHandle) already handled the room
309 // traffic before the command reached us. Drop it.
310 }
311 });
312 NetworkHandle {
313 cmd_tx,
314 server: Arc::new(std::sync::Mutex::new(None)),
315 }
316}
317
318/// Start the network task with explicit mode, TCP listen port, and any
319/// pre-configured relay multiaddrs. `listen_port = 0` requests a
320/// random port. Relays are dialed on startup; once `Identify` lands
321/// from a relay peer, we call `listen_on("<relay>/p2p-circuit")` to
322/// register a reservation so peers behind NAT can dial us through it.
323pub fn start_network_with(
324 identity: &Identity,
325 event_tx: mpsc::Sender<NetworkEvent>,
326 mode: NetworkMode,
327 listen_port: u16,
328 relays: Vec<Multiaddr>,
329) -> crate::error::Result<NetworkHandle> {
330 let keypair = identity.keypair().clone();
331 let local_peer_id = identity.peer_id();
332
333 let mut swarm = SwarmBuilder::with_existing_identity(keypair)
334 .with_tokio()
335 .with_tcp(
336 tcp::Config::default(),
337 noise::Config::new,
338 yamux::Config::default,
339 )
340 .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?
341 // Phase D: wrap the transport with relay-client. This both
342 // composes the transport (so we can dial `/p2p-circuit/`
343 // addresses) and surfaces a `relay::client::Behaviour` into
344 // the `with_behaviour` closure as the second argument.
345 .with_relay_client(noise::Config::new, yamux::Config::default)
346 .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?
347 .with_behaviour(|key, relay_client| {
348 let mdns_opt = match mode {
349 NetworkMode::Mdns => Some(
350 mdns::tokio::Behaviour::new(mdns::Config::default(), local_peer_id)
351 .expect("mDNS init failed"),
352 ),
353 // `Direct` runs libp2p without mDNS. `Server` never reaches
354 // here (it takes the `start_network_disabled` path and runs
355 // no swarm), but the match must be exhaustive.
356 NetworkMode::Direct | NetworkMode::Server => None,
357 };
358 let mdns: libp2p::swarm::behaviour::toggle::Toggle<_> = mdns_opt.into();
359
360 let identify = identify::Behaviour::new(
361 identify::Config::new("/huddle/1.0.0".into(), key.public())
362 .with_agent_version("huddle/0.5".into()),
363 );
364
365 let ping = ping::Behaviour::default();
366
367 let gossipsub_config = gossipsub::ConfigBuilder::default()
368 .heartbeat_interval(Duration::from_secs(1))
369 .validation_mode(gossipsub::ValidationMode::Strict)
370 // Default is 64 KiB. Raise it so base64-encoded file
371 // chunks (see files::CHUNK_SIZE) keep ample headroom.
372 .max_transmit_size(256 * 1024)
373 .build()
374 .expect("valid gossipsub config");
375
376 let mut gossipsub = gossipsub::Behaviour::new(
377 gossipsub::MessageAuthenticity::Signed(key.clone()),
378 gossipsub_config,
379 )
380 .expect("valid gossipsub init");
381
382 // Every node subscribes to the global rooms topic so the lobby
383 // shows discovered rooms even before joining anything.
384 let rooms_topic = gossipsub::IdentTopic::new(ROOMS_TOPIC);
385 gossipsub
386 .subscribe(&rooms_topic)
387 .expect("subscribe rooms topic");
388
389 // AutoNAT v2: client probes external addresses by asking
390 // remote servers to dial us back; server answers other
391 // peers' probes. Both halves are needed for symmetric
392 // P2P reachability detection (v1 combined them; v2 split).
393 let autonat_client = autonat::v2::client::Behaviour::new(
394 rand::rngs::OsRng,
395 autonat::v2::client::Config::default(),
396 );
397 let autonat_server = autonat::v2::server::Behaviour::new(rand::rngs::OsRng);
398 let dcutr = dcutr::Behaviour::new(local_peer_id);
399
400 HuddleBehavior {
401 mdns,
402 identify,
403 ping,
404 gossipsub,
405 relay_client,
406 autonat_client,
407 autonat_server,
408 dcutr,
409 }
410 })
411 .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?
412 .with_swarm_config(|cfg| cfg.with_idle_connection_timeout(Duration::from_secs(120)))
413 .build();
414
415 let listen_addr: Multiaddr = format!("/ip4/0.0.0.0/tcp/{}", listen_port)
416 .parse()
417 .expect("valid listen addr");
418 swarm
419 .listen_on(listen_addr)
420 .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?;
421 // Also bind IPv6 on all interfaces so users can dial via IPv6.
422 let listen_addr6: Multiaddr = format!("/ip6/::/tcp/{}", listen_port)
423 .parse()
424 .expect("valid ipv6 listen addr");
425 if let Err(e) = swarm.listen_on(listen_addr6) {
426 debug!(%e, "ipv6 listen skipped");
427 }
428
429 let (cmd_tx, cmd_rx) = mpsc::channel(256);
430 let mut task = NetworkTask {
431 swarm,
432 cmd_rx,
433 event_tx,
434 discovered_peers: HashSet::new(),
435 dial_attempts: HashMap::new(),
436 pending_inbound: HashMap::new(),
437 session_blocklist: HashSet::new(),
438 configured_relays: relays.clone(),
439 relay_peer_ids: HashSet::new(),
440 dcutr_failures: HashMap::new(),
441 };
442 // Phase D: dial each configured relay so Identify can complete and
443 // we can register a `/p2p-circuit` reservation. Failures here are
444 // non-fatal — the user can still chat on LAN.
445 for relay_addr in relays {
446 info!(addr = %relay_addr, "dialing configured relay");
447 let opts: libp2p::swarm::dial_opts::DialOpts = relay_addr.clone().into();
448 let conn_id = opts.connection_id();
449 match task.swarm.dial(opts) {
450 Ok(()) => {
451 task.dial_attempts.insert(conn_id, relay_addr);
452 }
453 Err(e) => warn!(%e, "dial relay failed"),
454 }
455 }
456 tokio::spawn(task.run());
457
458 Ok(NetworkHandle {
459 cmd_tx,
460 server: Arc::new(std::sync::Mutex::new(None)),
461 })
462}
463
464impl NetworkTask {
465 async fn run(mut self) {
466 loop {
467 tokio::select! {
468 event = self.swarm.select_next_some() => {
469 self.handle_swarm_event(event).await;
470 }
471 Some(cmd) = self.cmd_rx.recv() => {
472 if matches!(cmd, NetworkCommand::Shutdown) {
473 info!("network task shutting down");
474 break;
475 }
476 self.handle_command(cmd);
477 }
478 }
479 }
480 }
481
482 async fn handle_swarm_event(
483 &mut self,
484 event: libp2p::swarm::SwarmEvent<HuddleBehaviorEvent>,
485 ) {
486 match event {
487 libp2p::swarm::SwarmEvent::NewListenAddr { address, .. } => {
488 info!(%address, "listening");
489 // Phase D: a relay-circuit address is a reachability
490 // milestone — surface it as its own event so the lobby
491 // can show "reachable via N relays" status.
492 use libp2p::multiaddr::Protocol;
493 let is_circuit = address
494 .iter()
495 .any(|p| matches!(p, Protocol::P2pCircuit));
496 if is_circuit {
497 let _ = self
498 .event_tx
499 .send(NetworkEvent::RelayReservationEstablished {
500 address: address.clone(),
501 })
502 .await;
503 }
504 let _ = self
505 .event_tx
506 .send(NetworkEvent::ListeningOn { address })
507 .await;
508 }
509 libp2p::swarm::SwarmEvent::ConnectionEstablished {
510 peer_id,
511 connection_id,
512 endpoint,
513 ..
514 } => {
515 if let Some(addr) = self.dial_attempts.remove(&connection_id) {
516 // Phase D: a connection that was for a configured
517 // relay shouldn't pollute the lobby with a normal
518 // DialSucceeded — relays aren't chat peers. We just
519 // remember the peer_id and wait for Identify, then
520 // register a /p2p-circuit reservation.
521 let is_relay = self.configured_relays.iter().any(|r| r == &addr);
522 if is_relay {
523 info!(%peer_id, %addr, "connected to configured relay");
524 self.relay_peer_ids.insert(peer_id);
525 } else {
526 info!(%peer_id, %addr, "user-dialed peer connected");
527 // Treat dialed peers like mDNS-discovered: add
528 // to gossipsub explicit peers so room
529 // announcements flow.
530 self.swarm
531 .behaviour_mut()
532 .gossipsub
533 .add_explicit_peer(&peer_id);
534 self.discovered_peers.insert(peer_id);
535 let _ = self
536 .event_tx
537 .send(NetworkEvent::DialSucceeded {
538 peer_id,
539 address: addr,
540 })
541 .await;
542 }
543 } else if let ConnectedPoint::Dialer { .. } = endpoint {
544 // Outgoing connection we didn't track (e.g. mDNS auto-dial)
545 // — still add to mesh; no user-visible event needed.
546 self.swarm
547 .behaviour_mut()
548 .gossipsub
549 .add_explicit_peer(&peer_id);
550 } else {
551 // Inbound dial from an unknown peer (Phase A). We
552 // hold the connection but do NOT add them to the
553 // explicit-peer set yet — wait for Identify so we
554 // can show the user the peer's fingerprint, then
555 // either AcceptInbound (promote to mesh) or
556 // RejectInbound (disconnect + persist blocklist).
557 //
558 // Known limitation: gossipsub's score-based mesh
559 // formation may still forward topic messages to
560 // this peer via other peers we have in common.
561 // True hard-quarantine would need a custom
562 // ConnectionHandler — out of scope for v1.
563 if self.session_blocklist.contains(&peer_id) {
564 info!(%peer_id, "rejecting inbound from session-blocked peer");
565 let _ = self.swarm.disconnect_peer_id(peer_id);
566 } else {
567 let address = match &endpoint {
568 ConnectedPoint::Listener { send_back_addr, .. } => {
569 send_back_addr.clone()
570 }
571 _ => Multiaddr::empty(),
572 };
573 debug!(%peer_id, %address, "inbound peer pending decision");
574 self.pending_inbound
575 .insert(peer_id, PendingPeer::InboundUnknown { address });
576 }
577 }
578 }
579 libp2p::swarm::SwarmEvent::OutgoingConnectionError {
580 connection_id,
581 error,
582 ..
583 } => {
584 if let Some(addr) = self.dial_attempts.remove(&connection_id) {
585 warn!(%addr, %error, "user-dialed peer failed");
586 let _ = self
587 .event_tx
588 .send(NetworkEvent::DialFailed {
589 address: addr,
590 error: error.to_string(),
591 })
592 .await;
593 }
594 }
595 libp2p::swarm::SwarmEvent::ConnectionClosed {
596 peer_id,
597 num_established,
598 ..
599 } => {
600 // Drop any pending-inbound entry for this peer — they
601 // disconnected before we could prompt the user (or
602 // before the user accepted). Lets a re-connect start
603 // fresh rather than reusing stale state.
604 self.pending_inbound.remove(&peer_id);
605 // huddle 0.7.11: emit PeerDisconnected so the app can
606 // clean its `connected_dial_addrs` map. Only fire when
607 // the LAST connection to that peer closed — multiple
608 // simultaneous connections (e.g. direct + relay) would
609 // otherwise emit spurious disconnects when one of the
610 // two drops.
611 if num_established == 0 {
612 let _ = self
613 .event_tx
614 .send(NetworkEvent::PeerDisconnected { peer_id })
615 .await;
616 // Also clean up gossipsub's explicit-peers set so
617 // it doesn't accumulate dead PeerIds across the
618 // session.
619 self.swarm
620 .behaviour_mut()
621 .gossipsub
622 .remove_explicit_peer(&peer_id);
623 }
624 }
625 libp2p::swarm::SwarmEvent::Behaviour(be) => self.handle_behavior_event(be).await,
626 _ => {}
627 }
628 }
629
630 async fn handle_behavior_event(&mut self, event: HuddleBehaviorEvent) {
631 match event {
632 HuddleBehaviorEvent::Mdns(mdns::Event::Discovered(peers)) => {
633 for (peer_id, addr) in peers {
634 if self.discovered_peers.insert(peer_id) {
635 info!(%peer_id, %addr, "mDNS discovered");
636 self.swarm.add_peer_address(peer_id, addr);
637 // Explicitly add to gossipsub mesh.
638 self.swarm
639 .behaviour_mut()
640 .gossipsub
641 .add_explicit_peer(&peer_id);
642 let _ = self
643 .event_tx
644 .send(NetworkEvent::PeerDiscovered { peer_id })
645 .await;
646 }
647 }
648 }
649 HuddleBehaviorEvent::Mdns(mdns::Event::Expired(peers)) => {
650 for (peer_id, _) in peers {
651 if self.discovered_peers.remove(&peer_id) {
652 info!(%peer_id, "mDNS peer expired");
653 self.swarm
654 .behaviour_mut()
655 .gossipsub
656 .remove_explicit_peer(&peer_id);
657 let _ = self.event_tx.send(NetworkEvent::PeerExpired { peer_id }).await;
658 }
659 }
660 }
661 HuddleBehaviorEvent::Gossipsub(gossipsub::Event::Message {
662 propagation_source,
663 message,
664 ..
665 }) => {
666 self.handle_gossipsub_message(propagation_source, message).await;
667 }
668 HuddleBehaviorEvent::Identify(identify::Event::Received {
669 peer_id, info, ..
670 }) => {
671 debug!(%peer_id, agent = %info.agent_version, "identify received");
672 // Phase D: if this peer is a configured relay, register
673 // a `/p2p-circuit` reservation on first identify. Idem-
674 // potent — only fire if we haven't listened on this
675 // relay already (identify fires periodically).
676 if self.relay_peer_ids.contains(&peer_id) {
677 use libp2p::multiaddr::Protocol;
678 if let Some(relay_addr) = self
679 .configured_relays
680 .iter()
681 .find(|a| {
682 // Match by /p2p/<peer-id> suffix when
683 // present, else by the addr we dialed.
684 a.iter().any(|p| matches!(p, Protocol::P2p(pid) if pid == peer_id))
685 || self.dial_attempts.values().any(|d| d == *a)
686 })
687 .cloned()
688 {
689 let circuit = relay_addr.with(Protocol::P2pCircuit);
690 match self.swarm.listen_on(circuit.clone()) {
691 Ok(_) => info!(%circuit, "listening on relay circuit"),
692 Err(e) => warn!(%e, %circuit, "relay listen_on failed"),
693 }
694 }
695 }
696 // Decode the remote's Ed25519 pubkey and derive our
697 // 24-char fingerprint from it. Non-Ed25519 keys (Secp,
698 // Rsa, Ecdsa) shouldn't appear in practice — huddle
699 // only generates Ed25519 identities — so we just log
700 // and skip if the cast fails.
701 let fingerprint = match info.public_key.clone().try_into_ed25519() {
702 Ok(ed_pk) => {
703 let bytes = ed_pk.to_bytes();
704 compute_fingerprint(&bytes)
705 }
706 Err(_) => {
707 warn!(%peer_id, "identify pubkey isn't Ed25519; skipping fingerprint");
708 return;
709 }
710 };
711 // Always notify the app layer so it can populate
712 // `known_peers.fingerprint` and detect that an
713 // outbound peer we dialed has fully identified.
714 let _ = self
715 .event_tx
716 .send(NetworkEvent::PeerIdentified {
717 peer_id,
718 fingerprint: fingerprint.clone(),
719 })
720 .await;
721 // If the peer is in pending_inbound, Identify completing
722 // is the cue to surface the user prompt. Keep them in
723 // pending_inbound until Accept or Reject — we don't
724 // know yet which way the user will decide.
725 if let Some(PendingPeer::InboundUnknown { address }) =
726 self.pending_inbound.get(&peer_id)
727 {
728 let address = address.clone();
729 let _ = self
730 .event_tx
731 .send(NetworkEvent::InboundDial {
732 peer_id,
733 fingerprint,
734 address,
735 })
736 .await;
737 }
738 }
739 HuddleBehaviorEvent::AutonatClient(ev) => {
740 // One probe per address candidate. `result.is_ok()`
741 // means a remote AutoNAT server dialed us back
742 // successfully on `tested_addr` ⇒ this address is
743 // reachable from the outside. The app layer aggregates
744 // these into the lobby reachability badge.
745 let reachable = ev.result.is_ok();
746 if reachable {
747 info!(tested = %ev.tested_addr, server = %ev.server, "AutoNAT: reachable");
748 } else {
749 debug!(tested = %ev.tested_addr, server = %ev.server, "AutoNAT: probe failed");
750 }
751 let _ = self
752 .event_tx
753 .send(NetworkEvent::NatProbeResult {
754 tested_addr: ev.tested_addr,
755 reachable,
756 })
757 .await;
758 }
759 HuddleBehaviorEvent::AutonatServer(_) => {
760 // We answered another peer's reachability probe.
761 // No app-visible action.
762 }
763 HuddleBehaviorEvent::Dcutr(ev) => {
764 let success = ev.result.is_ok();
765 if success {
766 info!(remote = %ev.remote_peer_id, "DCUtR: direct connection established");
767 // huddle 0.7.11: a successful hole-punch resets the
768 // per-peer DCUtR retry counter so future failures
769 // don't immediately bail out.
770 self.dcutr_failures.remove(&ev.remote_peer_id);
771 } else {
772 debug!(remote = %ev.remote_peer_id, "DCUtR: hole-punch failed");
773 let count = self
774 .dcutr_failures
775 .entry(ev.remote_peer_id)
776 .and_modify(|n| *n += 1)
777 .or_insert(1);
778 if *count >= DCUTR_FAIL_BUDGET {
779 warn!(
780 remote = %ev.remote_peer_id,
781 attempts = *count,
782 "DCUtR: giving up after repeated failures (symmetric NAT likely)"
783 );
784 }
785 }
786 let _ = self
787 .event_tx
788 .send(NetworkEvent::DcutrUpgrade {
789 remote_peer: ev.remote_peer_id,
790 success,
791 })
792 .await;
793 }
794 HuddleBehaviorEvent::RelayClient(event) => {
795 // huddle 0.7.12: libp2p 0.56's `relay::client::Event`
796 // only exposes the success-path variants
797 // (`ReservationReqAccepted`, `OutboundCircuitEstablished`,
798 // `InboundCircuitEstablished`). There's no
799 // `ReservationReqFailed` arm we can match on, so we
800 // can't reliably surface reservation loss without a
801 // separate health-check timer (future work). For now
802 // we log every event so operators can see the
803 // lifecycle in `huddle.log` — pre-0.7.11 the whole
804 // arm was swallowed by `_ => {}` which hid even that.
805 use libp2p::relay::client::Event as Rc;
806 match event {
807 Rc::ReservationReqAccepted { relay_peer_id, .. } => {
808 info!(%relay_peer_id, "relay: reservation accepted");
809 }
810 Rc::OutboundCircuitEstablished { relay_peer_id, .. } => {
811 debug!(%relay_peer_id, "relay: outbound circuit established");
812 }
813 other => {
814 debug!(?other, "relay client event");
815 }
816 }
817 }
818 _ => {}
819 }
820 }
821
822 async fn handle_gossipsub_message(
823 &mut self,
824 from_peer: PeerId,
825 message: gossipsub::Message,
826 ) {
827 let topic = message.topic.to_string();
828 if topic == ROOMS_TOPIC {
829 match serde_json::from_slice::<RoomAnnouncement>(&message.data) {
830 Ok(ann) => {
831 let _ = self
832 .event_tx
833 .send(NetworkEvent::RoomAnnouncementReceived(ann))
834 .await;
835 }
836 Err(e) => {
837 warn!(%e, "bad room announcement");
838 }
839 }
840 } else if let Some(room_id) = topic.strip_prefix(protocol::ROOM_TOPIC_PREFIX) {
841 let _ = self
842 .event_tx
843 .send(NetworkEvent::RoomMessageReceived {
844 room_id: room_id.to_string(),
845 payload: message.data,
846 from_peer,
847 })
848 .await;
849 }
850 }
851
852 fn handle_command(&mut self, cmd: NetworkCommand) {
853 match cmd {
854 NetworkCommand::SubscribeRoom { room_id } => {
855 let topic = gossipsub::IdentTopic::new(room_topic(&room_id));
856 if let Err(e) = self.swarm.behaviour_mut().gossipsub.subscribe(&topic) {
857 warn!(%e, %room_id, "subscribe room failed");
858 }
859 }
860 NetworkCommand::UnsubscribeRoom { room_id } => {
861 let topic = gossipsub::IdentTopic::new(room_topic(&room_id));
862 self.swarm.behaviour_mut().gossipsub.unsubscribe(&topic);
863 }
864 NetworkCommand::PublishRoomMessage { room_id, payload } => {
865 let topic = gossipsub::IdentTopic::new(room_topic(&room_id));
866 if let Err(e) = self.swarm.behaviour_mut().gossipsub.publish(topic, payload) {
867 // No subscribed peers is expected before the mesh
868 // forms; anything else (MessageTooLarge, full queues)
869 // is a real bug worth surfacing.
870 match e {
871 gossipsub::PublishError::NoPeersSubscribedToTopic => {
872 debug!(%room_id, "publish skipped: no peers subscribed to topic yet");
873 }
874 e => warn!(%e, %room_id, "publish room message failed"),
875 }
876 }
877 }
878 NetworkCommand::AnnounceRoom(ann) => {
879 let topic = gossipsub::IdentTopic::new(ROOMS_TOPIC);
880 match serde_json::to_vec(&ann) {
881 Ok(payload) => {
882 if let Err(e) =
883 self.swarm.behaviour_mut().gossipsub.publish(topic, payload)
884 {
885 debug!(%e, "publish room announcement failed");
886 }
887 }
888 Err(e) => warn!(%e, "encode room announcement"),
889 }
890 }
891 NetworkCommand::AcceptInbound { peer_id } => {
892 if self.pending_inbound.remove(&peer_id).is_some() {
893 info!(%peer_id, "inbound dial accepted — promoting to mesh");
894 self.swarm
895 .behaviour_mut()
896 .gossipsub
897 .add_explicit_peer(&peer_id);
898 self.discovered_peers.insert(peer_id);
899 } else {
900 debug!(%peer_id, "AcceptInbound for unknown peer (already promoted or disconnected)");
901 }
902 }
903 NetworkCommand::RejectInbound { peer_id } => {
904 self.pending_inbound.remove(&peer_id);
905 self.session_blocklist.insert(peer_id);
906 info!(%peer_id, "inbound dial rejected — disconnecting");
907 let _ = self.swarm.disconnect_peer_id(peer_id);
908 }
909 NetworkCommand::DisconnectPeer { peer_id } => {
910 info!(%peer_id, "app-level identity check failed — disconnecting");
911 let _ = self.swarm.disconnect_peer_id(peer_id);
912 }
913 NetworkCommand::Dial { address } => {
914 let opts: DialOpts = address.clone().into();
915 let conn_id = opts.connection_id();
916 match self.swarm.dial(opts) {
917 Ok(()) => {
918 self.dial_attempts.insert(conn_id, address);
919 }
920 Err(e) => {
921 // Synchronous dial error (bad multiaddr, transport refused).
922 let tx = self.event_tx.clone();
923 let err = e.to_string();
924 tokio::spawn(async move {
925 let _ = tx
926 .send(NetworkEvent::DialFailed {
927 address,
928 error: err,
929 })
930 .await;
931 });
932 }
933 }
934 }
935 NetworkCommand::DialAddresses { addresses } => {
936 use libp2p::multiaddr::Protocol;
937 if addresses.is_empty() {
938 return;
939 }
940 // Extract the shared peer-id from any address with a
941 // `/p2p/<peer-id>` suffix. All host_addrs originating
942 // from the same `RoomAnnouncement` share the
943 // announcer's peer-id, so the first match is enough.
944 let peer_id = addresses
945 .iter()
946 .flat_map(|a| a.iter())
947 .find_map(|p| match p {
948 Protocol::P2p(pid) => Some(pid),
949 _ => None,
950 });
951 let opts = match peer_id {
952 Some(pid) => DialOpts::peer_id(pid)
953 .addresses(addresses.clone())
954 .build(),
955 // No /p2p/ segment anywhere — fall back to single-
956 // address dial of the first candidate, matching the
957 // legacy `Dial` semantics for unanchored multiaddrs.
958 None => addresses[0].clone().into(),
959 };
960 let conn_id = opts.connection_id();
961 // Use the first address as the representative for
962 // dial_attempts. On synchronous error we report it;
963 // on async success the post-identify handler upserts
964 // with the actually-connected endpoint.
965 let primary = addresses[0].clone();
966 match self.swarm.dial(opts) {
967 Ok(()) => {
968 self.dial_attempts.insert(conn_id, primary);
969 }
970 Err(e) => {
971 let tx = self.event_tx.clone();
972 let err = e.to_string();
973 tokio::spawn(async move {
974 let _ = tx
975 .send(NetworkEvent::DialFailed {
976 address: primary,
977 error: err,
978 })
979 .await;
980 });
981 }
982 }
983 }
984 NetworkCommand::Shutdown => unreachable!(),
985 }
986 }
987}