huddle_core/network/mod.rs
1pub mod behavior;
2pub mod events;
3pub mod protocol;
4pub mod server;
5pub mod transport;
6
7use std::collections::{HashMap, HashSet};
8use std::sync::Arc;
9use std::time::Duration;
10
11use futures::StreamExt;
12use sha2::{Digest, Sha256};
13use libp2p::core::ConnectedPoint;
14use libp2p::swarm::dial_opts::DialOpts;
15use libp2p::swarm::ConnectionId;
16use libp2p::{
17 autonat, dcutr, gossipsub, identify, mdns, noise, ping, tcp, yamux, Multiaddr, PeerId, Swarm,
18 SwarmBuilder,
19};
20use tokio::sync::mpsc;
21use tracing::{debug, info, warn};
22
23/// How the network discovers peers.
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub enum NetworkMode {
26 /// huddle 0.8 default: no libp2p at all. The only transport is the
27 /// centralized Tor-onion relay (see `network::server`). No LAN
28 /// discovery, no direct dial, no NAT machinery — every message rides
29 /// the onion. This is what the binary starts in unless the user
30 /// explicitly opts back into libp2p with `--mode mdns|direct`.
31 Server,
32 /// libp2p with mDNS on: announce ourselves on the LAN and pick up
33 /// announcements. Opt-in (`--mode mdns`); runs *alongside* the onion
34 /// relay so LAN peers connect directly while internet peers route
35 /// through the server.
36 Mdns,
37 /// libp2p with mDNS off: invisible to LAN discovery; the only libp2p
38 /// way to connect is an explicit dial. Opt-in (`--mode direct`); also
39 /// runs alongside the onion relay.
40 Direct,
41}
42
43impl NetworkMode {
44 pub fn as_str(&self) -> &'static str {
45 match self {
46 NetworkMode::Server => "server",
47 NetworkMode::Mdns => "mdns",
48 NetworkMode::Direct => "direct",
49 }
50 }
51
52 /// True when this mode starts a libp2p swarm (LAN / direct dial). The
53 /// `Server` default does not — it's onion-relay-only.
54 pub fn uses_libp2p(&self) -> bool {
55 !matches!(self, NetworkMode::Server)
56 }
57
58 pub fn from_str(s: &str) -> Option<Self> {
59 match s.trim().to_ascii_lowercase().as_str() {
60 "server" | "tor" | "onion" => Some(NetworkMode::Server),
61 "mdns" | "lan" | "open" => Some(NetworkMode::Mdns),
62 "direct" | "dial" | "private" => Some(NetworkMode::Direct),
63 _ => None,
64 }
65 }
66}
67
68use crate::identity::{compute_fingerprint, Identity};
69use crate::network::behavior::{HuddleBehavior, HuddleBehaviorEvent};
70use crate::network::events::NetworkEvent;
71use crate::network::protocol::{room_topic, RoomAnnouncement, ROOMS_TOPIC};
72
73#[derive(Debug)]
74pub enum NetworkCommand {
75 /// Subscribe to a room's per-room gossipsub topic.
76 SubscribeRoom { room_id: String },
77 /// Unsubscribe from a room's topic.
78 UnsubscribeRoom { room_id: String },
79 /// Publish a JSON-encoded `RoomMessage` to a room's topic.
80 PublishRoomMessage { room_id: String, payload: Vec<u8> },
81 /// Publish a room announcement on the global rooms topic.
82 AnnounceRoom(RoomAnnouncement),
83 /// User-initiated dial of an explicit address. Used for cross-network
84 /// reach when mDNS isn't enough.
85 Dial { address: Multiaddr },
86 /// huddle 0.5.2: dial a peer using multiple candidate addresses,
87 /// letting libp2p race them in parallel. Used by the "add by HD
88 /// ID / username" flow, which resolves a fingerprint to every
89 /// address we know for that peer (room announcement `host_addrs`
90 /// + persisted `known_peers`). libp2p's parallel dialer picks
91 /// the cheapest path that completes — LAN beats public IP beats
92 /// relay-hopped without us having to probe transports manually.
93 DialAddresses { addresses: Vec<Multiaddr> },
94 /// Phase A: user accepted an inbound dial — promote the peer to
95 /// explicit-peer status so room announcements flow.
96 AcceptInbound { peer_id: PeerId },
97 /// Phase A: user rejected an inbound dial — disconnect them and
98 /// add the peer_id to the in-memory blocklist for this session
99 /// (caller is responsible for the persistent blocked_peers row).
100 RejectInbound { peer_id: PeerId },
101 /// Phase C follow-up: drop a connection that failed an
102 /// application-level identity check (e.g. invite-fingerprint
103 /// mismatch). Differs from `RejectInbound` in that it doesn't
104 /// touch the inbound-pending map (the connection is already
105 /// past Identify when we discover the mismatch) and doesn't
106 /// persist a block — the caller may want to retry with a
107 /// corrected invite.
108 DisconnectPeer { peer_id: PeerId },
109 Shutdown,
110}
111
112#[derive(Clone)]
113pub struct NetworkHandle {
114 cmd_tx: mpsc::Sender<NetworkCommand>,
115 /// huddle 0.8: optional connection to the centralized server (a Tor
116 /// onion relay). When attached, every room subscribe/unsubscribe and
117 /// every published wire message is mirrored to it, so peers we can't
118 /// reach over libp2p still receive our traffic (and we theirs, via the
119 /// server's offline mailbox). `None` until `attach_server` runs.
120 server: Arc<std::sync::Mutex<Option<server::ServerClient>>>,
121 /// huddle 1.2: map of `room_id -> partner fingerprint` for 1:1 DM rooms.
122 /// When a room is registered here, relay traffic for it is delivered by
123 /// recipient fingerprint (`ServerClient::send_direct`) instead of by room
124 /// membership fan-out — so DMs reach the partner reliably (live or via
125 /// their offline mailbox) without both sides having subscribed the same
126 /// room on the relay. Group rooms are absent from this map and keep using
127 /// membership fan-out (`publish`). Populated by the app from
128 /// `start_direct` / `bootstrap_direct_room` / `add_contact`.
129 dm_partners: Arc<std::sync::Mutex<HashMap<String, String>>>,
130}
131
132/// Stable per-message id for the server mailbox/receipts: a short hash of
133/// the wire bytes. Opaque to the server; lets it dedup/queue by id.
134fn server_msg_id(payload: &[u8]) -> String {
135 let mut h = Sha256::new();
136 h.update(payload);
137 hex::encode(&h.finalize()[..8])
138}
139
140impl NetworkHandle {
141 /// Attach a live server connection so subsequent room traffic mirrors
142 /// to it. Replaces any previous one. Called by the app's server
143 /// connection task after each (re)connect.
144 pub fn attach_server(&self, client: server::ServerClient) {
145 *self.server.lock().unwrap() = Some(client);
146 }
147
148 /// Drop the server connection (e.g. after a disconnect) so we stop
149 /// mirroring until it reconnects.
150 pub fn detach_server(&self) {
151 *self.server.lock().unwrap() = None;
152 }
153
154 /// Snapshot the attached server client, if any. Cloned out so we never
155 /// hold the lock across the (sync, non-blocking) mirror call.
156 fn server_client(&self) -> Option<server::ServerClient> {
157 self.server.lock().unwrap().clone()
158 }
159
160 pub async fn subscribe_room(&self, room_id: String) {
161 if let Some(s) = self.server_client() {
162 let _ = s.subscribe(&room_id);
163 }
164 let _ = self
165 .cmd_tx
166 .send(NetworkCommand::SubscribeRoom { room_id })
167 .await;
168 }
169
170 pub async fn unsubscribe_room(&self, room_id: String) {
171 if let Some(s) = self.server_client() {
172 let _ = s.unsubscribe(&room_id);
173 }
174 let _ = self
175 .cmd_tx
176 .send(NetworkCommand::UnsubscribeRoom { room_id })
177 .await;
178 }
179
180 /// huddle 1.2: register a 1:1 DM room so its relay traffic is delivered by
181 /// the partner's fingerprint instead of by room-membership fan-out. Safe
182 /// to call repeatedly (idempotent). Never registers an empty partner.
183 pub fn register_dm(&self, room_id: String, partner_fingerprint: String) {
184 if partner_fingerprint.is_empty() {
185 return;
186 }
187 self.dm_partners
188 .lock()
189 .unwrap()
190 .insert(room_id, partner_fingerprint);
191 }
192
193 /// huddle 1.2: forget a DM room's direct-delivery mapping (e.g. on leave).
194 pub fn forget_dm(&self, room_id: &str) {
195 self.dm_partners.lock().unwrap().remove(room_id);
196 }
197
198 fn dm_partner(&self, room_id: &str) -> Option<String> {
199 self.dm_partners.lock().unwrap().get(room_id).cloned()
200 }
201
202 pub async fn publish_room_message(&self, room_id: String, payload: Vec<u8>) {
203 if let Some(s) = self.server_client() {
204 // huddle 1.2: a registered DM room delivers straight to the
205 // partner's fingerprint (reliable even if they never subscribed
206 // this room on the relay); a group room fans out by membership.
207 match self.dm_partner(&room_id) {
208 Some(to) => {
209 let _ = s.send_direct(&to, &room_id, &server_msg_id(&payload), &payload);
210 }
211 None => {
212 let _ = s.publish(&room_id, &server_msg_id(&payload), &payload);
213 }
214 }
215 }
216 let _ = self
217 .cmd_tx
218 .send(NetworkCommand::PublishRoomMessage { room_id, payload })
219 .await;
220 }
221
222 /// huddle 1.2: deliver `payload` straight to `to`'s fingerprint over the
223 /// relay (independent of room membership), and also ride libp2p gossipsub
224 /// on the `room_id` topic for LAN reachability. Used for friend requests
225 /// (delivered to the target's inbox tag) and any other point-to-point
226 /// control message where the recipient is known.
227 pub async fn publish_direct(&self, to: String, room_id: String, payload: Vec<u8>) {
228 if let Some(s) = self.server_client() {
229 let _ = s.send_direct(&to, &room_id, &server_msg_id(&payload), &payload);
230 }
231 let _ = self
232 .cmd_tx
233 .send(NetworkCommand::PublishRoomMessage { room_id, payload })
234 .await;
235 }
236
237 pub async fn announce_room(&self, ann: RoomAnnouncement) {
238 let _ = self.cmd_tx.send(NetworkCommand::AnnounceRoom(ann)).await;
239 }
240
241 /// True when a server connection is currently attached.
242 pub fn has_server(&self) -> bool {
243 self.server.lock().unwrap().is_some()
244 }
245
246 pub async fn dial(&self, address: Multiaddr) {
247 let _ = self.cmd_tx.send(NetworkCommand::Dial { address }).await;
248 }
249
250 /// huddle 0.5.2: dial a peer with multiple candidate addresses
251 /// at once. libp2p's swarm races them; the first to complete a
252 /// handshake wins, the rest are dropped. Use when you've resolved
253 /// a peer fingerprint to several known transports — typically
254 /// LAN ip4 + public ip4 + relay circuit.
255 pub async fn dial_addresses(&self, addresses: Vec<Multiaddr>) {
256 let _ = self
257 .cmd_tx
258 .send(NetworkCommand::DialAddresses { addresses })
259 .await;
260 }
261
262 pub async fn accept_inbound(&self, peer_id: PeerId) {
263 let _ = self
264 .cmd_tx
265 .send(NetworkCommand::AcceptInbound { peer_id })
266 .await;
267 }
268
269 pub async fn reject_inbound(&self, peer_id: PeerId) {
270 let _ = self
271 .cmd_tx
272 .send(NetworkCommand::RejectInbound { peer_id })
273 .await;
274 }
275
276 pub async fn disconnect_peer(&self, peer_id: PeerId) {
277 let _ = self
278 .cmd_tx
279 .send(NetworkCommand::DisconnectPeer { peer_id })
280 .await;
281 }
282
283 pub async fn shutdown(&self) {
284 let _ = self.cmd_tx.send(NetworkCommand::Shutdown).await;
285 }
286}
287
288/// What kind of connection we're holding open until Identify gives us
289/// the remote's Ed25519 fingerprint. Quarantined inbound dials are NOT
290/// added to the gossipsub mesh until the user accepts; outbound user-
291/// dials add to the mesh on connection but still wait on Identify so
292/// the eventual `DialSucceeded` can carry the fingerprint.
293#[derive(Debug)]
294enum PendingPeer {
295 /// Inbound dial from an unknown peer — modal-pending in the app.
296 InboundUnknown { address: Multiaddr },
297}
298
299struct NetworkTask {
300 swarm: Swarm<HuddleBehavior>,
301 cmd_rx: mpsc::Receiver<NetworkCommand>,
302 event_tx: mpsc::Sender<NetworkEvent>,
303 discovered_peers: HashSet<PeerId>,
304 /// Tracks user-initiated dials so we can correlate the eventual
305 /// `ConnectionEstablished` / `OutgoingConnectionError` back to a
306 /// specific address the user asked us to dial.
307 dial_attempts: HashMap<ConnectionId, Multiaddr>,
308 /// Phase A: peers connected but not yet promoted to the gossipsub
309 /// mesh (inbound) — waiting either for `Identify` to land so we
310 /// know their fingerprint, or for the user's accept/reject decision.
311 /// On `ConnectionClosed` we drop entries here so a peer disconnecting
312 /// mid-prompt doesn't leak state.
313 pending_inbound: HashMap<PeerId, PendingPeer>,
314 /// Phase A: peers the user has explicitly rejected this session —
315 /// auto-disconnect every reconnect attempt without re-prompting.
316 /// Persistent across runs via `blocked_peers`; this in-memory copy
317 /// is loaded at startup (TODO: not yet wired; falls back to the DB
318 /// check in the app layer for now) and updated on `RejectInbound`.
319 session_blocklist: HashSet<PeerId>,
320 /// Phase D: configured relay multiaddrs. When Identify lands for a
321 /// peer whose multiaddr matches one of these, we call
322 /// `listen_on("<addr>/p2p-circuit")` to register a reservation.
323 /// Tracked as multiaddr strings (no PeerId yet — we don't know it
324 /// until Identify) plus a set of peer_ids of confirmed relays so
325 /// we only register once per relay.
326 configured_relays: Vec<Multiaddr>,
327 relay_peer_ids: HashSet<PeerId>,
328 /// huddle 0.7.11: per-peer DCUtR failure counter. Logged at warn
329 /// once the count crosses `DCUTR_FAIL_BUDGET` so symmetric-NAT
330 /// pairs don't generate runaway hole-punch attempts in the logs.
331 /// libp2p's dcutr behavior schedules its own retries internally;
332 /// this is purely an observability signal for the app.
333 dcutr_failures: HashMap<PeerId, u32>,
334}
335
336/// huddle 0.7.11: warn after this many failed DCUtR attempts to the
337/// same peer. dcutr keeps trying internally, but the audit flagged the
338/// spam as a real issue — capping the log noise is the realistic fix.
339const DCUTR_FAIL_BUDGET: u32 = 6;
340
341pub fn start_network(
342 identity: &Identity,
343 event_tx: mpsc::Sender<NetworkEvent>,
344) -> crate::error::Result<NetworkHandle> {
345 start_network_with(identity, event_tx, NetworkMode::Mdns, 0, Vec::new())
346}
347
348/// huddle 0.8: build a `NetworkHandle` with **no libp2p swarm** — the
349/// onion-relay-only (`NetworkMode::Server`) path. We still hand back a
350/// fully-functional handle so the rest of the app is oblivious: room
351/// subscribe/unsubscribe/publish mirror to the attached `ServerClient`
352/// exactly as before, while the libp2p-specific commands (dial, announce,
353/// accept/reject, …) are simply drained and dropped. A tiny task consumes
354/// `cmd_rx` so the bounded channel never fills and `send().await` never
355/// blocks; it exits on `Shutdown`.
356pub fn start_network_disabled() -> NetworkHandle {
357 let (cmd_tx, mut cmd_rx) = mpsc::channel::<NetworkCommand>(256);
358 tokio::spawn(async move {
359 while let Some(cmd) = cmd_rx.recv().await {
360 if matches!(cmd, NetworkCommand::Shutdown) {
361 info!("network task (libp2p-disabled) shutting down");
362 break;
363 }
364 // Everything else is a libp2p op with no swarm to run it; the
365 // server mirror (in NetworkHandle) already handled the room
366 // traffic before the command reached us. Drop it.
367 }
368 });
369 NetworkHandle {
370 cmd_tx,
371 server: Arc::new(std::sync::Mutex::new(None)),
372 dm_partners: Arc::new(std::sync::Mutex::new(HashMap::new())),
373 }
374}
375
376/// Start the network task with explicit mode, TCP listen port, and any
377/// pre-configured relay multiaddrs. `listen_port = 0` requests a
378/// random port. Relays are dialed on startup; once `Identify` lands
379/// from a relay peer, we call `listen_on("<relay>/p2p-circuit")` to
380/// register a reservation so peers behind NAT can dial us through it.
381pub fn start_network_with(
382 identity: &Identity,
383 event_tx: mpsc::Sender<NetworkEvent>,
384 mode: NetworkMode,
385 listen_port: u16,
386 relays: Vec<Multiaddr>,
387) -> crate::error::Result<NetworkHandle> {
388 let keypair = identity.keypair().clone();
389 let local_peer_id = identity.peer_id();
390
391 let mut swarm = SwarmBuilder::with_existing_identity(keypair)
392 .with_tokio()
393 .with_tcp(
394 tcp::Config::default(),
395 noise::Config::new,
396 yamux::Config::default,
397 )
398 .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?
399 // Phase D: wrap the transport with relay-client. This both
400 // composes the transport (so we can dial `/p2p-circuit/`
401 // addresses) and surfaces a `relay::client::Behaviour` into
402 // the `with_behaviour` closure as the second argument.
403 .with_relay_client(noise::Config::new, yamux::Config::default)
404 .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?
405 .with_behaviour(|key, relay_client| {
406 let mdns_opt = match mode {
407 NetworkMode::Mdns => Some(
408 mdns::tokio::Behaviour::new(mdns::Config::default(), local_peer_id)
409 .expect("mDNS init failed"),
410 ),
411 // `Direct` runs libp2p without mDNS. `Server` never reaches
412 // here (it takes the `start_network_disabled` path and runs
413 // no swarm), but the match must be exhaustive.
414 NetworkMode::Direct | NetworkMode::Server => None,
415 };
416 let mdns: libp2p::swarm::behaviour::toggle::Toggle<_> = mdns_opt.into();
417
418 let identify = identify::Behaviour::new(
419 identify::Config::new("/huddle/1.0.0".into(), key.public())
420 .with_agent_version("huddle/0.5".into()),
421 );
422
423 let ping = ping::Behaviour::default();
424
425 let gossipsub_config = gossipsub::ConfigBuilder::default()
426 .heartbeat_interval(Duration::from_secs(1))
427 .validation_mode(gossipsub::ValidationMode::Strict)
428 // Default is 64 KiB. Raise it so base64-encoded file
429 // chunks (see files::CHUNK_SIZE) keep ample headroom.
430 .max_transmit_size(256 * 1024)
431 .build()
432 .expect("valid gossipsub config");
433
434 let mut gossipsub = gossipsub::Behaviour::new(
435 gossipsub::MessageAuthenticity::Signed(key.clone()),
436 gossipsub_config,
437 )
438 .expect("valid gossipsub init");
439
440 // Every node subscribes to the global rooms topic so the lobby
441 // shows discovered rooms even before joining anything.
442 let rooms_topic = gossipsub::IdentTopic::new(ROOMS_TOPIC);
443 gossipsub
444 .subscribe(&rooms_topic)
445 .expect("subscribe rooms topic");
446
447 // AutoNAT v2: client probes external addresses by asking
448 // remote servers to dial us back; server answers other
449 // peers' probes. Both halves are needed for symmetric
450 // P2P reachability detection (v1 combined them; v2 split).
451 let autonat_client = autonat::v2::client::Behaviour::new(
452 rand::rngs::OsRng,
453 autonat::v2::client::Config::default(),
454 );
455 let autonat_server = autonat::v2::server::Behaviour::new(rand::rngs::OsRng);
456 let dcutr = dcutr::Behaviour::new(local_peer_id);
457
458 HuddleBehavior {
459 mdns,
460 identify,
461 ping,
462 gossipsub,
463 relay_client,
464 autonat_client,
465 autonat_server,
466 dcutr,
467 }
468 })
469 .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?
470 .with_swarm_config(|cfg| cfg.with_idle_connection_timeout(Duration::from_secs(120)))
471 .build();
472
473 let listen_addr: Multiaddr = format!("/ip4/0.0.0.0/tcp/{}", listen_port)
474 .parse()
475 .expect("valid listen addr");
476 swarm
477 .listen_on(listen_addr)
478 .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?;
479 // Also bind IPv6 on all interfaces so users can dial via IPv6.
480 let listen_addr6: Multiaddr = format!("/ip6/::/tcp/{}", listen_port)
481 .parse()
482 .expect("valid ipv6 listen addr");
483 if let Err(e) = swarm.listen_on(listen_addr6) {
484 debug!(%e, "ipv6 listen skipped");
485 }
486
487 let (cmd_tx, cmd_rx) = mpsc::channel(256);
488 let mut task = NetworkTask {
489 swarm,
490 cmd_rx,
491 event_tx,
492 discovered_peers: HashSet::new(),
493 dial_attempts: HashMap::new(),
494 pending_inbound: HashMap::new(),
495 session_blocklist: HashSet::new(),
496 configured_relays: relays.clone(),
497 relay_peer_ids: HashSet::new(),
498 dcutr_failures: HashMap::new(),
499 };
500 // Phase D: dial each configured relay so Identify can complete and
501 // we can register a `/p2p-circuit` reservation. Failures here are
502 // non-fatal — the user can still chat on LAN.
503 for relay_addr in relays {
504 info!(addr = %relay_addr, "dialing configured relay");
505 let opts: libp2p::swarm::dial_opts::DialOpts = relay_addr.clone().into();
506 let conn_id = opts.connection_id();
507 match task.swarm.dial(opts) {
508 Ok(()) => {
509 task.dial_attempts.insert(conn_id, relay_addr);
510 }
511 Err(e) => warn!(%e, "dial relay failed"),
512 }
513 }
514 tokio::spawn(task.run());
515
516 Ok(NetworkHandle {
517 cmd_tx,
518 server: Arc::new(std::sync::Mutex::new(None)),
519 dm_partners: Arc::new(std::sync::Mutex::new(HashMap::new())),
520 })
521}
522
523impl NetworkTask {
524 async fn run(mut self) {
525 loop {
526 tokio::select! {
527 event = self.swarm.select_next_some() => {
528 self.handle_swarm_event(event).await;
529 }
530 Some(cmd) = self.cmd_rx.recv() => {
531 if matches!(cmd, NetworkCommand::Shutdown) {
532 info!("network task shutting down");
533 break;
534 }
535 self.handle_command(cmd);
536 }
537 }
538 }
539 }
540
541 async fn handle_swarm_event(
542 &mut self,
543 event: libp2p::swarm::SwarmEvent<HuddleBehaviorEvent>,
544 ) {
545 match event {
546 libp2p::swarm::SwarmEvent::NewListenAddr { address, .. } => {
547 info!(%address, "listening");
548 // Phase D: a relay-circuit address is a reachability
549 // milestone — surface it as its own event so the lobby
550 // can show "reachable via N relays" status.
551 use libp2p::multiaddr::Protocol;
552 let is_circuit = address
553 .iter()
554 .any(|p| matches!(p, Protocol::P2pCircuit));
555 if is_circuit {
556 let _ = self
557 .event_tx
558 .send(NetworkEvent::RelayReservationEstablished {
559 address: address.clone(),
560 })
561 .await;
562 }
563 let _ = self
564 .event_tx
565 .send(NetworkEvent::ListeningOn { address })
566 .await;
567 }
568 libp2p::swarm::SwarmEvent::ConnectionEstablished {
569 peer_id,
570 connection_id,
571 endpoint,
572 ..
573 } => {
574 if let Some(addr) = self.dial_attempts.remove(&connection_id) {
575 // Phase D: a connection that was for a configured
576 // relay shouldn't pollute the lobby with a normal
577 // DialSucceeded — relays aren't chat peers. We just
578 // remember the peer_id and wait for Identify, then
579 // register a /p2p-circuit reservation.
580 let is_relay = self.configured_relays.iter().any(|r| r == &addr);
581 if is_relay {
582 info!(%peer_id, %addr, "connected to configured relay");
583 self.relay_peer_ids.insert(peer_id);
584 } else {
585 info!(%peer_id, %addr, "user-dialed peer connected");
586 // Treat dialed peers like mDNS-discovered: add
587 // to gossipsub explicit peers so room
588 // announcements flow.
589 self.swarm
590 .behaviour_mut()
591 .gossipsub
592 .add_explicit_peer(&peer_id);
593 self.discovered_peers.insert(peer_id);
594 let _ = self
595 .event_tx
596 .send(NetworkEvent::DialSucceeded {
597 peer_id,
598 address: addr,
599 })
600 .await;
601 }
602 } else if let ConnectedPoint::Dialer { .. } = endpoint {
603 // Outgoing connection we didn't track (e.g. mDNS auto-dial)
604 // — still add to mesh; no user-visible event needed.
605 self.swarm
606 .behaviour_mut()
607 .gossipsub
608 .add_explicit_peer(&peer_id);
609 } else {
610 // Inbound dial from an unknown peer (Phase A). We
611 // hold the connection but do NOT add them to the
612 // explicit-peer set yet — wait for Identify so we
613 // can show the user the peer's fingerprint, then
614 // either AcceptInbound (promote to mesh) or
615 // RejectInbound (disconnect + persist blocklist).
616 //
617 // Known limitation: gossipsub's score-based mesh
618 // formation may still forward topic messages to
619 // this peer via other peers we have in common.
620 // True hard-quarantine would need a custom
621 // ConnectionHandler — out of scope for v1.
622 if self.session_blocklist.contains(&peer_id) {
623 info!(%peer_id, "rejecting inbound from session-blocked peer");
624 let _ = self.swarm.disconnect_peer_id(peer_id);
625 } else {
626 let address = match &endpoint {
627 ConnectedPoint::Listener { send_back_addr, .. } => {
628 send_back_addr.clone()
629 }
630 _ => Multiaddr::empty(),
631 };
632 debug!(%peer_id, %address, "inbound peer pending decision");
633 self.pending_inbound
634 .insert(peer_id, PendingPeer::InboundUnknown { address });
635 }
636 }
637 }
638 libp2p::swarm::SwarmEvent::OutgoingConnectionError {
639 connection_id,
640 error,
641 ..
642 } => {
643 if let Some(addr) = self.dial_attempts.remove(&connection_id) {
644 warn!(%addr, %error, "user-dialed peer failed");
645 let _ = self
646 .event_tx
647 .send(NetworkEvent::DialFailed {
648 address: addr,
649 error: error.to_string(),
650 })
651 .await;
652 }
653 }
654 libp2p::swarm::SwarmEvent::ConnectionClosed {
655 peer_id,
656 num_established,
657 ..
658 } => {
659 // Drop any pending-inbound entry for this peer — they
660 // disconnected before we could prompt the user (or
661 // before the user accepted). Lets a re-connect start
662 // fresh rather than reusing stale state.
663 self.pending_inbound.remove(&peer_id);
664 // huddle 0.7.11: emit PeerDisconnected so the app can
665 // clean its `connected_dial_addrs` map. Only fire when
666 // the LAST connection to that peer closed — multiple
667 // simultaneous connections (e.g. direct + relay) would
668 // otherwise emit spurious disconnects when one of the
669 // two drops.
670 if num_established == 0 {
671 let _ = self
672 .event_tx
673 .send(NetworkEvent::PeerDisconnected { peer_id })
674 .await;
675 // Also clean up gossipsub's explicit-peers set so
676 // it doesn't accumulate dead PeerIds across the
677 // session.
678 self.swarm
679 .behaviour_mut()
680 .gossipsub
681 .remove_explicit_peer(&peer_id);
682 }
683 }
684 libp2p::swarm::SwarmEvent::Behaviour(be) => self.handle_behavior_event(be).await,
685 _ => {}
686 }
687 }
688
689 async fn handle_behavior_event(&mut self, event: HuddleBehaviorEvent) {
690 match event {
691 HuddleBehaviorEvent::Mdns(mdns::Event::Discovered(peers)) => {
692 for (peer_id, addr) in peers {
693 if self.discovered_peers.insert(peer_id) {
694 info!(%peer_id, %addr, "mDNS discovered");
695 self.swarm.add_peer_address(peer_id, addr);
696 // Explicitly add to gossipsub mesh.
697 self.swarm
698 .behaviour_mut()
699 .gossipsub
700 .add_explicit_peer(&peer_id);
701 let _ = self
702 .event_tx
703 .send(NetworkEvent::PeerDiscovered { peer_id })
704 .await;
705 }
706 }
707 }
708 HuddleBehaviorEvent::Mdns(mdns::Event::Expired(peers)) => {
709 for (peer_id, _) in peers {
710 if self.discovered_peers.remove(&peer_id) {
711 info!(%peer_id, "mDNS peer expired");
712 self.swarm
713 .behaviour_mut()
714 .gossipsub
715 .remove_explicit_peer(&peer_id);
716 let _ = self.event_tx.send(NetworkEvent::PeerExpired { peer_id }).await;
717 }
718 }
719 }
720 HuddleBehaviorEvent::Gossipsub(gossipsub::Event::Message {
721 propagation_source,
722 message,
723 ..
724 }) => {
725 self.handle_gossipsub_message(propagation_source, message).await;
726 }
727 HuddleBehaviorEvent::Identify(identify::Event::Received {
728 peer_id, info, ..
729 }) => {
730 debug!(%peer_id, agent = %info.agent_version, "identify received");
731 // Phase D: if this peer is a configured relay, register
732 // a `/p2p-circuit` reservation on first identify. Idem-
733 // potent — only fire if we haven't listened on this
734 // relay already (identify fires periodically).
735 if self.relay_peer_ids.contains(&peer_id) {
736 use libp2p::multiaddr::Protocol;
737 if let Some(relay_addr) = self
738 .configured_relays
739 .iter()
740 .find(|a| {
741 // Match by /p2p/<peer-id> suffix when
742 // present, else by the addr we dialed.
743 a.iter().any(|p| matches!(p, Protocol::P2p(pid) if pid == peer_id))
744 || self.dial_attempts.values().any(|d| d == *a)
745 })
746 .cloned()
747 {
748 let circuit = relay_addr.with(Protocol::P2pCircuit);
749 match self.swarm.listen_on(circuit.clone()) {
750 Ok(_) => info!(%circuit, "listening on relay circuit"),
751 Err(e) => warn!(%e, %circuit, "relay listen_on failed"),
752 }
753 }
754 }
755 // Decode the remote's Ed25519 pubkey and derive our
756 // 24-char fingerprint from it. Non-Ed25519 keys (Secp,
757 // Rsa, Ecdsa) shouldn't appear in practice — huddle
758 // only generates Ed25519 identities — so we just log
759 // and skip if the cast fails.
760 let fingerprint = match info.public_key.clone().try_into_ed25519() {
761 Ok(ed_pk) => {
762 let bytes = ed_pk.to_bytes();
763 compute_fingerprint(&bytes)
764 }
765 Err(_) => {
766 warn!(%peer_id, "identify pubkey isn't Ed25519; skipping fingerprint");
767 return;
768 }
769 };
770 // Always notify the app layer so it can populate
771 // `known_peers.fingerprint` and detect that an
772 // outbound peer we dialed has fully identified.
773 let _ = self
774 .event_tx
775 .send(NetworkEvent::PeerIdentified {
776 peer_id,
777 fingerprint: fingerprint.clone(),
778 })
779 .await;
780 // If the peer is in pending_inbound, Identify completing
781 // is the cue to surface the user prompt. Keep them in
782 // pending_inbound until Accept or Reject — we don't
783 // know yet which way the user will decide.
784 if let Some(PendingPeer::InboundUnknown { address }) =
785 self.pending_inbound.get(&peer_id)
786 {
787 let address = address.clone();
788 let _ = self
789 .event_tx
790 .send(NetworkEvent::InboundDial {
791 peer_id,
792 fingerprint,
793 address,
794 })
795 .await;
796 }
797 }
798 HuddleBehaviorEvent::AutonatClient(ev) => {
799 // One probe per address candidate. `result.is_ok()`
800 // means a remote AutoNAT server dialed us back
801 // successfully on `tested_addr` ⇒ this address is
802 // reachable from the outside. The app layer aggregates
803 // these into the lobby reachability badge.
804 let reachable = ev.result.is_ok();
805 if reachable {
806 info!(tested = %ev.tested_addr, server = %ev.server, "AutoNAT: reachable");
807 } else {
808 debug!(tested = %ev.tested_addr, server = %ev.server, "AutoNAT: probe failed");
809 }
810 let _ = self
811 .event_tx
812 .send(NetworkEvent::NatProbeResult {
813 tested_addr: ev.tested_addr,
814 reachable,
815 })
816 .await;
817 }
818 HuddleBehaviorEvent::AutonatServer(_) => {
819 // We answered another peer's reachability probe.
820 // No app-visible action.
821 }
822 HuddleBehaviorEvent::Dcutr(ev) => {
823 let success = ev.result.is_ok();
824 if success {
825 info!(remote = %ev.remote_peer_id, "DCUtR: direct connection established");
826 // huddle 0.7.11: a successful hole-punch resets the
827 // per-peer DCUtR retry counter so future failures
828 // don't immediately bail out.
829 self.dcutr_failures.remove(&ev.remote_peer_id);
830 } else {
831 debug!(remote = %ev.remote_peer_id, "DCUtR: hole-punch failed");
832 let count = self
833 .dcutr_failures
834 .entry(ev.remote_peer_id)
835 .and_modify(|n| *n += 1)
836 .or_insert(1);
837 if *count >= DCUTR_FAIL_BUDGET {
838 warn!(
839 remote = %ev.remote_peer_id,
840 attempts = *count,
841 "DCUtR: giving up after repeated failures (symmetric NAT likely)"
842 );
843 }
844 }
845 let _ = self
846 .event_tx
847 .send(NetworkEvent::DcutrUpgrade {
848 remote_peer: ev.remote_peer_id,
849 success,
850 })
851 .await;
852 }
853 HuddleBehaviorEvent::RelayClient(event) => {
854 // huddle 0.7.12: libp2p 0.56's `relay::client::Event`
855 // only exposes the success-path variants
856 // (`ReservationReqAccepted`, `OutboundCircuitEstablished`,
857 // `InboundCircuitEstablished`). There's no
858 // `ReservationReqFailed` arm we can match on, so we
859 // can't reliably surface reservation loss without a
860 // separate health-check timer (future work). For now
861 // we log every event so operators can see the
862 // lifecycle in `huddle.log` — pre-0.7.11 the whole
863 // arm was swallowed by `_ => {}` which hid even that.
864 use libp2p::relay::client::Event as Rc;
865 match event {
866 Rc::ReservationReqAccepted { relay_peer_id, .. } => {
867 info!(%relay_peer_id, "relay: reservation accepted");
868 }
869 Rc::OutboundCircuitEstablished { relay_peer_id, .. } => {
870 debug!(%relay_peer_id, "relay: outbound circuit established");
871 }
872 other => {
873 debug!(?other, "relay client event");
874 }
875 }
876 }
877 _ => {}
878 }
879 }
880
881 async fn handle_gossipsub_message(
882 &mut self,
883 from_peer: PeerId,
884 message: gossipsub::Message,
885 ) {
886 let topic = message.topic.to_string();
887 if topic == ROOMS_TOPIC {
888 match serde_json::from_slice::<RoomAnnouncement>(&message.data) {
889 Ok(ann) => {
890 let _ = self
891 .event_tx
892 .send(NetworkEvent::RoomAnnouncementReceived(ann))
893 .await;
894 }
895 Err(e) => {
896 warn!(%e, "bad room announcement");
897 }
898 }
899 } else if let Some(room_id) = topic.strip_prefix(protocol::ROOM_TOPIC_PREFIX) {
900 let _ = self
901 .event_tx
902 .send(NetworkEvent::RoomMessageReceived {
903 room_id: room_id.to_string(),
904 payload: message.data,
905 from_peer,
906 })
907 .await;
908 }
909 }
910
911 fn handle_command(&mut self, cmd: NetworkCommand) {
912 match cmd {
913 NetworkCommand::SubscribeRoom { room_id } => {
914 let topic = gossipsub::IdentTopic::new(room_topic(&room_id));
915 if let Err(e) = self.swarm.behaviour_mut().gossipsub.subscribe(&topic) {
916 warn!(%e, %room_id, "subscribe room failed");
917 }
918 }
919 NetworkCommand::UnsubscribeRoom { room_id } => {
920 let topic = gossipsub::IdentTopic::new(room_topic(&room_id));
921 self.swarm.behaviour_mut().gossipsub.unsubscribe(&topic);
922 }
923 NetworkCommand::PublishRoomMessage { room_id, payload } => {
924 let topic = gossipsub::IdentTopic::new(room_topic(&room_id));
925 if let Err(e) = self.swarm.behaviour_mut().gossipsub.publish(topic, payload) {
926 // No subscribed peers is expected before the mesh
927 // forms; anything else (MessageTooLarge, full queues)
928 // is a real bug worth surfacing.
929 match e {
930 gossipsub::PublishError::NoPeersSubscribedToTopic => {
931 debug!(%room_id, "publish skipped: no peers subscribed to topic yet");
932 }
933 e => warn!(%e, %room_id, "publish room message failed"),
934 }
935 }
936 }
937 NetworkCommand::AnnounceRoom(ann) => {
938 let topic = gossipsub::IdentTopic::new(ROOMS_TOPIC);
939 match serde_json::to_vec(&ann) {
940 Ok(payload) => {
941 if let Err(e) =
942 self.swarm.behaviour_mut().gossipsub.publish(topic, payload)
943 {
944 debug!(%e, "publish room announcement failed");
945 }
946 }
947 Err(e) => warn!(%e, "encode room announcement"),
948 }
949 }
950 NetworkCommand::AcceptInbound { peer_id } => {
951 if self.pending_inbound.remove(&peer_id).is_some() {
952 info!(%peer_id, "inbound dial accepted — promoting to mesh");
953 self.swarm
954 .behaviour_mut()
955 .gossipsub
956 .add_explicit_peer(&peer_id);
957 self.discovered_peers.insert(peer_id);
958 } else {
959 debug!(%peer_id, "AcceptInbound for unknown peer (already promoted or disconnected)");
960 }
961 }
962 NetworkCommand::RejectInbound { peer_id } => {
963 self.pending_inbound.remove(&peer_id);
964 self.session_blocklist.insert(peer_id);
965 info!(%peer_id, "inbound dial rejected — disconnecting");
966 let _ = self.swarm.disconnect_peer_id(peer_id);
967 }
968 NetworkCommand::DisconnectPeer { peer_id } => {
969 info!(%peer_id, "app-level identity check failed — disconnecting");
970 let _ = self.swarm.disconnect_peer_id(peer_id);
971 }
972 NetworkCommand::Dial { address } => {
973 let opts: DialOpts = address.clone().into();
974 let conn_id = opts.connection_id();
975 match self.swarm.dial(opts) {
976 Ok(()) => {
977 self.dial_attempts.insert(conn_id, address);
978 }
979 Err(e) => {
980 // Synchronous dial error (bad multiaddr, transport refused).
981 let tx = self.event_tx.clone();
982 let err = e.to_string();
983 tokio::spawn(async move {
984 let _ = tx
985 .send(NetworkEvent::DialFailed {
986 address,
987 error: err,
988 })
989 .await;
990 });
991 }
992 }
993 }
994 NetworkCommand::DialAddresses { addresses } => {
995 use libp2p::multiaddr::Protocol;
996 if addresses.is_empty() {
997 return;
998 }
999 // Extract the shared peer-id from any address with a
1000 // `/p2p/<peer-id>` suffix. All host_addrs originating
1001 // from the same `RoomAnnouncement` share the
1002 // announcer's peer-id, so the first match is enough.
1003 let peer_id = addresses
1004 .iter()
1005 .flat_map(|a| a.iter())
1006 .find_map(|p| match p {
1007 Protocol::P2p(pid) => Some(pid),
1008 _ => None,
1009 });
1010 let opts = match peer_id {
1011 Some(pid) => DialOpts::peer_id(pid)
1012 .addresses(addresses.clone())
1013 .build(),
1014 // No /p2p/ segment anywhere — fall back to single-
1015 // address dial of the first candidate, matching the
1016 // legacy `Dial` semantics for unanchored multiaddrs.
1017 None => addresses[0].clone().into(),
1018 };
1019 let conn_id = opts.connection_id();
1020 // Use the first address as the representative for
1021 // dial_attempts. On synchronous error we report it;
1022 // on async success the post-identify handler upserts
1023 // with the actually-connected endpoint.
1024 let primary = addresses[0].clone();
1025 match self.swarm.dial(opts) {
1026 Ok(()) => {
1027 self.dial_attempts.insert(conn_id, primary);
1028 }
1029 Err(e) => {
1030 let tx = self.event_tx.clone();
1031 let err = e.to_string();
1032 tokio::spawn(async move {
1033 let _ = tx
1034 .send(NetworkEvent::DialFailed {
1035 address: primary,
1036 error: err,
1037 })
1038 .await;
1039 });
1040 }
1041 }
1042 }
1043 NetworkCommand::Shutdown => unreachable!(),
1044 }
1045 }
1046}