huddle_core/network/mod.rs
1pub mod behavior;
2pub mod events;
3pub mod protocol;
4pub mod server;
5pub mod transport;
6
7use std::collections::{HashMap, HashSet};
8use std::sync::Arc;
9use std::time::Duration;
10
11use futures::StreamExt;
12use sha2::{Digest, Sha256};
13use libp2p::core::ConnectedPoint;
14use libp2p::swarm::dial_opts::DialOpts;
15use libp2p::swarm::ConnectionId;
16use libp2p::{
17 autonat, dcutr, gossipsub, identify, mdns, noise, ping, tcp, yamux, Multiaddr, PeerId, Swarm,
18 SwarmBuilder,
19};
20use tokio::sync::mpsc;
21use tracing::{debug, info, warn};
22
23/// How the network discovers peers.
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub enum NetworkMode {
26 /// huddle 0.8 default: no libp2p at all. The only transport is the
27 /// centralized Tor-onion relay (see `network::server`). No LAN
28 /// discovery, no direct dial, no NAT machinery — every message rides
29 /// the onion. This is what the binary starts in unless the user
30 /// explicitly opts back into libp2p with `--mode mdns|direct`.
31 Server,
32 /// libp2p with mDNS on: announce ourselves on the LAN and pick up
33 /// announcements. Opt-in (`--mode mdns`); runs *alongside* the onion
34 /// relay so LAN peers connect directly while internet peers route
35 /// through the server.
36 Mdns,
37 /// libp2p with mDNS off: invisible to LAN discovery; the only libp2p
38 /// way to connect is an explicit dial. Opt-in (`--mode direct`); also
39 /// runs alongside the onion relay.
40 Direct,
41}
42
43impl NetworkMode {
44 pub fn as_str(&self) -> &'static str {
45 match self {
46 NetworkMode::Server => "server",
47 NetworkMode::Mdns => "mdns",
48 NetworkMode::Direct => "direct",
49 }
50 }
51
52 /// True when this mode starts a libp2p swarm (LAN / direct dial). The
53 /// `Server` default does not — it's onion-relay-only.
54 pub fn uses_libp2p(&self) -> bool {
55 !matches!(self, NetworkMode::Server)
56 }
57
58 pub fn from_str(s: &str) -> Option<Self> {
59 match s.trim().to_ascii_lowercase().as_str() {
60 "server" | "tor" | "onion" => Some(NetworkMode::Server),
61 "mdns" | "lan" | "open" => Some(NetworkMode::Mdns),
62 "direct" | "dial" | "private" => Some(NetworkMode::Direct),
63 _ => None,
64 }
65 }
66}
67
68use crate::identity::{compute_fingerprint, Identity};
69use crate::network::behavior::{HuddleBehavior, HuddleBehaviorEvent};
70use crate::network::events::NetworkEvent;
71use crate::network::protocol::{room_topic, RoomAnnouncement, ROOMS_TOPIC};
72
73#[derive(Debug)]
74pub enum NetworkCommand {
75 /// Subscribe to a room's per-room gossipsub topic.
76 SubscribeRoom { room_id: String },
77 /// Unsubscribe from a room's topic.
78 UnsubscribeRoom { room_id: String },
79 /// Publish a JSON-encoded `RoomMessage` to a room's topic.
80 PublishRoomMessage { room_id: String, payload: Vec<u8> },
81 /// Publish a room announcement on the global rooms topic.
82 AnnounceRoom(RoomAnnouncement),
83 /// User-initiated dial of an explicit address. Used for cross-network
84 /// reach when mDNS isn't enough.
85 Dial { address: Multiaddr },
86 /// huddle 0.5.2: dial a peer using multiple candidate addresses,
87 /// letting libp2p race them in parallel. Used by the "add by HD
88 /// ID / username" flow, which resolves a fingerprint to every
89 /// address we know for that peer (room announcement `host_addrs`
90 /// + persisted `known_peers`). libp2p's parallel dialer picks
91 /// the cheapest path that completes — LAN beats public IP beats
92 /// relay-hopped without us having to probe transports manually.
93 DialAddresses { addresses: Vec<Multiaddr> },
94 /// Phase A: user accepted an inbound dial — promote the peer to
95 /// explicit-peer status so room announcements flow.
96 AcceptInbound { peer_id: PeerId },
97 /// Phase A: user rejected an inbound dial — disconnect them and
98 /// add the peer_id to the in-memory blocklist for this session
99 /// (caller is responsible for the persistent blocked_peers row).
100 RejectInbound { peer_id: PeerId },
101 /// Phase C follow-up: drop a connection that failed an
102 /// application-level identity check (e.g. invite-fingerprint
103 /// mismatch). Differs from `RejectInbound` in that it doesn't
104 /// touch the inbound-pending map (the connection is already
105 /// past Identify when we discover the mismatch) and doesn't
106 /// persist a block — the caller may want to retry with a
107 /// corrected invite.
108 DisconnectPeer { peer_id: PeerId },
109 Shutdown,
110}
111
112#[derive(Clone)]
113pub struct NetworkHandle {
114 cmd_tx: mpsc::Sender<NetworkCommand>,
115 /// huddle 0.8: optional connection to the centralized server (a Tor
116 /// onion relay). When attached, every room subscribe/unsubscribe and
117 /// every published wire message is mirrored to it, so peers we can't
118 /// reach over libp2p still receive our traffic (and we theirs, via the
119 /// server's offline mailbox). `None` until `attach_server` runs.
120 server: Arc<std::sync::Mutex<Option<server::ServerClient>>>,
121 /// huddle 1.2: map of `room_id -> partner fingerprint` for 1:1 DM rooms.
122 /// When a room is registered here, relay traffic for it is delivered by
123 /// recipient fingerprint (`ServerClient::send_direct`) instead of by room
124 /// membership fan-out — so DMs reach the partner reliably (live or via
125 /// their offline mailbox) without both sides having subscribed the same
126 /// room on the relay. Group rooms are absent from this map and keep using
127 /// membership fan-out (`publish`). Populated by the app from
128 /// `start_direct` / `bootstrap_direct_room` / `add_contact`.
129 dm_partners: Arc<std::sync::Mutex<HashMap<String, String>>>,
130}
131
132/// Stable per-message id for the server mailbox/receipts: a short hash of
133/// the wire bytes. Opaque to the server; lets it dedup/queue by id.
134fn server_msg_id(payload: &[u8]) -> String {
135 let mut h = Sha256::new();
136 h.update(payload);
137 hex::encode(&h.finalize()[..8])
138}
139
140impl NetworkHandle {
141 /// Attach a live server connection so subsequent room traffic mirrors
142 /// to it. Replaces any previous one. Called by the app's server
143 /// connection task after each (re)connect.
144 pub fn attach_server(&self, client: server::ServerClient) {
145 *self.server.lock().unwrap() = Some(client);
146 }
147
148 /// Drop the server connection (e.g. after a disconnect) so we stop
149 /// mirroring until it reconnects.
150 pub fn detach_server(&self) {
151 *self.server.lock().unwrap() = None;
152 }
153
154 /// Snapshot the attached server client, if any. Cloned out so we never
155 /// hold the lock across the (sync, non-blocking) mirror call.
156 fn server_client(&self) -> Option<server::ServerClient> {
157 self.server.lock().unwrap().clone()
158 }
159
160 pub async fn subscribe_room(&self, room_id: String) {
161 if let Some(s) = self.server_client() {
162 let _ = s.subscribe(&room_id);
163 }
164 let _ = self
165 .cmd_tx
166 .send(NetworkCommand::SubscribeRoom { room_id })
167 .await;
168 }
169
170 pub async fn unsubscribe_room(&self, room_id: String) {
171 if let Some(s) = self.server_client() {
172 let _ = s.unsubscribe(&room_id);
173 }
174 let _ = self
175 .cmd_tx
176 .send(NetworkCommand::UnsubscribeRoom { room_id })
177 .await;
178 }
179
180 /// huddle 1.2: register a 1:1 DM room so its relay traffic is delivered by
181 /// the partner's fingerprint instead of by room-membership fan-out. Safe
182 /// to call repeatedly (idempotent). Never registers an empty partner.
183 pub fn register_dm(&self, room_id: String, partner_fingerprint: String) {
184 if partner_fingerprint.is_empty() {
185 return;
186 }
187 self.dm_partners
188 .lock()
189 .unwrap()
190 .insert(room_id, partner_fingerprint);
191 }
192
193 /// huddle 1.2: forget a DM room's direct-delivery mapping (e.g. on leave).
194 pub fn forget_dm(&self, room_id: &str) {
195 self.dm_partners.lock().unwrap().remove(room_id);
196 }
197
198 fn dm_partner(&self, room_id: &str) -> Option<String> {
199 self.dm_partners.lock().unwrap().get(room_id).cloned()
200 }
201
202 pub async fn publish_room_message(&self, room_id: String, payload: Vec<u8>) {
203 if let Some(s) = self.server_client() {
204 // huddle 1.2: a registered DM room delivers straight to the
205 // partner's fingerprint (reliable even if they never subscribed
206 // this room on the relay); a group room fans out by membership.
207 match self.dm_partner(&room_id) {
208 Some(to) => {
209 let _ = s.send_direct(&to, &room_id, &server_msg_id(&payload), &payload);
210 }
211 None => {
212 let _ = s.publish(&room_id, &server_msg_id(&payload), &payload);
213 }
214 }
215 }
216 let _ = self
217 .cmd_tx
218 .send(NetworkCommand::PublishRoomMessage { room_id, payload })
219 .await;
220 }
221
222 /// huddle 1.2: deliver `payload` straight to `to`'s fingerprint over the
223 /// relay (independent of room membership), and also ride libp2p gossipsub
224 /// on the `room_id` topic for LAN reachability. Used for friend requests
225 /// (delivered to the target's inbox tag) and any other point-to-point
226 /// control message where the recipient is known.
227 pub async fn publish_direct(&self, to: String, room_id: String, payload: Vec<u8>) {
228 if let Some(s) = self.server_client() {
229 let _ = s.send_direct(&to, &room_id, &server_msg_id(&payload), &payload);
230 }
231 let _ = self
232 .cmd_tx
233 .send(NetworkCommand::PublishRoomMessage { room_id, payload })
234 .await;
235 }
236
237 pub async fn announce_room(&self, ann: RoomAnnouncement) {
238 let _ = self.cmd_tx.send(NetworkCommand::AnnounceRoom(ann)).await;
239 }
240
241 /// huddle 1.2.1: ask the attached relay to mint a short-lived connect code.
242 /// Returns `false` (no request sent) when no relay is currently attached.
243 /// The reply surfaces as `ServerEvent::ConnectToken` → `AppEvent`.
244 pub fn create_connect_token(&self) -> bool {
245 match self.server_client() {
246 Some(s) => s.create_connect_token().is_ok(),
247 None => false,
248 }
249 }
250
251 /// huddle 1.2.1: ask the attached relay to resolve a connect code.
252 /// Returns `false` when no relay is attached.
253 pub fn redeem_connect_token(&self, token: &str) -> bool {
254 match self.server_client() {
255 Some(s) => s.redeem_connect_token(token).is_ok(),
256 None => false,
257 }
258 }
259
260 /// True when a server connection is currently attached.
261 pub fn has_server(&self) -> bool {
262 self.server.lock().unwrap().is_some()
263 }
264
265 pub async fn dial(&self, address: Multiaddr) {
266 let _ = self.cmd_tx.send(NetworkCommand::Dial { address }).await;
267 }
268
269 /// huddle 0.5.2: dial a peer with multiple candidate addresses
270 /// at once. libp2p's swarm races them; the first to complete a
271 /// handshake wins, the rest are dropped. Use when you've resolved
272 /// a peer fingerprint to several known transports — typically
273 /// LAN ip4 + public ip4 + relay circuit.
274 pub async fn dial_addresses(&self, addresses: Vec<Multiaddr>) {
275 let _ = self
276 .cmd_tx
277 .send(NetworkCommand::DialAddresses { addresses })
278 .await;
279 }
280
281 pub async fn accept_inbound(&self, peer_id: PeerId) {
282 let _ = self
283 .cmd_tx
284 .send(NetworkCommand::AcceptInbound { peer_id })
285 .await;
286 }
287
288 pub async fn reject_inbound(&self, peer_id: PeerId) {
289 let _ = self
290 .cmd_tx
291 .send(NetworkCommand::RejectInbound { peer_id })
292 .await;
293 }
294
295 pub async fn disconnect_peer(&self, peer_id: PeerId) {
296 let _ = self
297 .cmd_tx
298 .send(NetworkCommand::DisconnectPeer { peer_id })
299 .await;
300 }
301
302 pub async fn shutdown(&self) {
303 let _ = self.cmd_tx.send(NetworkCommand::Shutdown).await;
304 }
305}
306
307/// What kind of connection we're holding open until Identify gives us
308/// the remote's Ed25519 fingerprint. Quarantined inbound dials are NOT
309/// added to the gossipsub mesh until the user accepts; outbound user-
310/// dials add to the mesh on connection but still wait on Identify so
311/// the eventual `DialSucceeded` can carry the fingerprint.
312#[derive(Debug)]
313enum PendingPeer {
314 /// Inbound dial from an unknown peer — modal-pending in the app.
315 InboundUnknown { address: Multiaddr },
316}
317
318struct NetworkTask {
319 swarm: Swarm<HuddleBehavior>,
320 cmd_rx: mpsc::Receiver<NetworkCommand>,
321 event_tx: mpsc::Sender<NetworkEvent>,
322 discovered_peers: HashSet<PeerId>,
323 /// Tracks user-initiated dials so we can correlate the eventual
324 /// `ConnectionEstablished` / `OutgoingConnectionError` back to a
325 /// specific address the user asked us to dial.
326 dial_attempts: HashMap<ConnectionId, Multiaddr>,
327 /// Phase A: peers connected but not yet promoted to the gossipsub
328 /// mesh (inbound) — waiting either for `Identify` to land so we
329 /// know their fingerprint, or for the user's accept/reject decision.
330 /// On `ConnectionClosed` we drop entries here so a peer disconnecting
331 /// mid-prompt doesn't leak state.
332 pending_inbound: HashMap<PeerId, PendingPeer>,
333 /// Phase A: peers the user has explicitly rejected this session —
334 /// auto-disconnect every reconnect attempt without re-prompting.
335 /// Persistent across runs via `blocked_peers`; this in-memory copy
336 /// is loaded at startup (TODO: not yet wired; falls back to the DB
337 /// check in the app layer for now) and updated on `RejectInbound`.
338 session_blocklist: HashSet<PeerId>,
339 /// Phase D: configured relay multiaddrs. When Identify lands for a
340 /// peer whose multiaddr matches one of these, we call
341 /// `listen_on("<addr>/p2p-circuit")` to register a reservation.
342 /// Tracked as multiaddr strings (no PeerId yet — we don't know it
343 /// until Identify) plus a set of peer_ids of confirmed relays so
344 /// we only register once per relay.
345 configured_relays: Vec<Multiaddr>,
346 relay_peer_ids: HashSet<PeerId>,
347 /// huddle 0.7.11: per-peer DCUtR failure counter. Logged at warn
348 /// once the count crosses `DCUTR_FAIL_BUDGET` so symmetric-NAT
349 /// pairs don't generate runaway hole-punch attempts in the logs.
350 /// libp2p's dcutr behavior schedules its own retries internally;
351 /// this is purely an observability signal for the app.
352 dcutr_failures: HashMap<PeerId, u32>,
353}
354
355/// huddle 0.7.11: warn after this many failed DCUtR attempts to the
356/// same peer. dcutr keeps trying internally, but the audit flagged the
357/// spam as a real issue — capping the log noise is the realistic fix.
358const DCUTR_FAIL_BUDGET: u32 = 6;
359
360pub fn start_network(
361 identity: &Identity,
362 event_tx: mpsc::Sender<NetworkEvent>,
363) -> crate::error::Result<NetworkHandle> {
364 start_network_with(identity, event_tx, NetworkMode::Mdns, 0, Vec::new())
365}
366
367/// huddle 0.8: build a `NetworkHandle` with **no libp2p swarm** — the
368/// onion-relay-only (`NetworkMode::Server`) path. We still hand back a
369/// fully-functional handle so the rest of the app is oblivious: room
370/// subscribe/unsubscribe/publish mirror to the attached `ServerClient`
371/// exactly as before, while the libp2p-specific commands (dial, announce,
372/// accept/reject, …) are simply drained and dropped. A tiny task consumes
373/// `cmd_rx` so the bounded channel never fills and `send().await` never
374/// blocks; it exits on `Shutdown`.
375pub fn start_network_disabled() -> NetworkHandle {
376 let (cmd_tx, mut cmd_rx) = mpsc::channel::<NetworkCommand>(256);
377 tokio::spawn(async move {
378 while let Some(cmd) = cmd_rx.recv().await {
379 if matches!(cmd, NetworkCommand::Shutdown) {
380 info!("network task (libp2p-disabled) shutting down");
381 break;
382 }
383 // Everything else is a libp2p op with no swarm to run it; the
384 // server mirror (in NetworkHandle) already handled the room
385 // traffic before the command reached us. Drop it.
386 }
387 });
388 NetworkHandle {
389 cmd_tx,
390 server: Arc::new(std::sync::Mutex::new(None)),
391 dm_partners: Arc::new(std::sync::Mutex::new(HashMap::new())),
392 }
393}
394
395/// Start the network task with explicit mode, TCP listen port, and any
396/// pre-configured relay multiaddrs. `listen_port = 0` requests a
397/// random port. Relays are dialed on startup; once `Identify` lands
398/// from a relay peer, we call `listen_on("<relay>/p2p-circuit")` to
399/// register a reservation so peers behind NAT can dial us through it.
400pub fn start_network_with(
401 identity: &Identity,
402 event_tx: mpsc::Sender<NetworkEvent>,
403 mode: NetworkMode,
404 listen_port: u16,
405 relays: Vec<Multiaddr>,
406) -> crate::error::Result<NetworkHandle> {
407 let keypair = identity.keypair().clone();
408 let local_peer_id = identity.peer_id();
409
410 let mut swarm = SwarmBuilder::with_existing_identity(keypair)
411 .with_tokio()
412 .with_tcp(
413 tcp::Config::default(),
414 noise::Config::new,
415 yamux::Config::default,
416 )
417 .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?
418 // Phase D: wrap the transport with relay-client. This both
419 // composes the transport (so we can dial `/p2p-circuit/`
420 // addresses) and surfaces a `relay::client::Behaviour` into
421 // the `with_behaviour` closure as the second argument.
422 .with_relay_client(noise::Config::new, yamux::Config::default)
423 .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?
424 .with_behaviour(|key, relay_client| {
425 let mdns_opt = match mode {
426 NetworkMode::Mdns => Some(
427 mdns::tokio::Behaviour::new(mdns::Config::default(), local_peer_id)
428 .expect("mDNS init failed"),
429 ),
430 // `Direct` runs libp2p without mDNS. `Server` never reaches
431 // here (it takes the `start_network_disabled` path and runs
432 // no swarm), but the match must be exhaustive.
433 NetworkMode::Direct | NetworkMode::Server => None,
434 };
435 let mdns: libp2p::swarm::behaviour::toggle::Toggle<_> = mdns_opt.into();
436
437 let identify = identify::Behaviour::new(
438 identify::Config::new("/huddle/1.0.0".into(), key.public())
439 .with_agent_version("huddle/0.5".into()),
440 );
441
442 let ping = ping::Behaviour::default();
443
444 let gossipsub_config = gossipsub::ConfigBuilder::default()
445 .heartbeat_interval(Duration::from_secs(1))
446 .validation_mode(gossipsub::ValidationMode::Strict)
447 // Default is 64 KiB. Raise it so base64-encoded file
448 // chunks (see files::CHUNK_SIZE) keep ample headroom.
449 .max_transmit_size(256 * 1024)
450 .build()
451 .expect("valid gossipsub config");
452
453 let mut gossipsub = gossipsub::Behaviour::new(
454 gossipsub::MessageAuthenticity::Signed(key.clone()),
455 gossipsub_config,
456 )
457 .expect("valid gossipsub init");
458
459 // Every node subscribes to the global rooms topic so the lobby
460 // shows discovered rooms even before joining anything.
461 let rooms_topic = gossipsub::IdentTopic::new(ROOMS_TOPIC);
462 gossipsub
463 .subscribe(&rooms_topic)
464 .expect("subscribe rooms topic");
465
466 // AutoNAT v2: client probes external addresses by asking
467 // remote servers to dial us back; server answers other
468 // peers' probes. Both halves are needed for symmetric
469 // P2P reachability detection (v1 combined them; v2 split).
470 let autonat_client = autonat::v2::client::Behaviour::new(
471 rand::rngs::OsRng,
472 autonat::v2::client::Config::default(),
473 );
474 let autonat_server = autonat::v2::server::Behaviour::new(rand::rngs::OsRng);
475 let dcutr = dcutr::Behaviour::new(local_peer_id);
476
477 HuddleBehavior {
478 mdns,
479 identify,
480 ping,
481 gossipsub,
482 relay_client,
483 autonat_client,
484 autonat_server,
485 dcutr,
486 }
487 })
488 .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?
489 .with_swarm_config(|cfg| cfg.with_idle_connection_timeout(Duration::from_secs(120)))
490 .build();
491
492 let listen_addr: Multiaddr = format!("/ip4/0.0.0.0/tcp/{}", listen_port)
493 .parse()
494 .expect("valid listen addr");
495 swarm
496 .listen_on(listen_addr)
497 .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?;
498 // Also bind IPv6 on all interfaces so users can dial via IPv6.
499 let listen_addr6: Multiaddr = format!("/ip6/::/tcp/{}", listen_port)
500 .parse()
501 .expect("valid ipv6 listen addr");
502 if let Err(e) = swarm.listen_on(listen_addr6) {
503 debug!(%e, "ipv6 listen skipped");
504 }
505
506 let (cmd_tx, cmd_rx) = mpsc::channel(256);
507 let mut task = NetworkTask {
508 swarm,
509 cmd_rx,
510 event_tx,
511 discovered_peers: HashSet::new(),
512 dial_attempts: HashMap::new(),
513 pending_inbound: HashMap::new(),
514 session_blocklist: HashSet::new(),
515 configured_relays: relays.clone(),
516 relay_peer_ids: HashSet::new(),
517 dcutr_failures: HashMap::new(),
518 };
519 // Phase D: dial each configured relay so Identify can complete and
520 // we can register a `/p2p-circuit` reservation. Failures here are
521 // non-fatal — the user can still chat on LAN.
522 for relay_addr in relays {
523 info!(addr = %relay_addr, "dialing configured relay");
524 let opts: libp2p::swarm::dial_opts::DialOpts = relay_addr.clone().into();
525 let conn_id = opts.connection_id();
526 match task.swarm.dial(opts) {
527 Ok(()) => {
528 task.dial_attempts.insert(conn_id, relay_addr);
529 }
530 Err(e) => warn!(%e, "dial relay failed"),
531 }
532 }
533 tokio::spawn(task.run());
534
535 Ok(NetworkHandle {
536 cmd_tx,
537 server: Arc::new(std::sync::Mutex::new(None)),
538 dm_partners: Arc::new(std::sync::Mutex::new(HashMap::new())),
539 })
540}
541
542impl NetworkTask {
543 async fn run(mut self) {
544 loop {
545 tokio::select! {
546 event = self.swarm.select_next_some() => {
547 self.handle_swarm_event(event).await;
548 }
549 Some(cmd) = self.cmd_rx.recv() => {
550 if matches!(cmd, NetworkCommand::Shutdown) {
551 info!("network task shutting down");
552 break;
553 }
554 self.handle_command(cmd);
555 }
556 }
557 }
558 }
559
560 async fn handle_swarm_event(
561 &mut self,
562 event: libp2p::swarm::SwarmEvent<HuddleBehaviorEvent>,
563 ) {
564 match event {
565 libp2p::swarm::SwarmEvent::NewListenAddr { address, .. } => {
566 info!(%address, "listening");
567 // Phase D: a relay-circuit address is a reachability
568 // milestone — surface it as its own event so the lobby
569 // can show "reachable via N relays" status.
570 use libp2p::multiaddr::Protocol;
571 let is_circuit = address
572 .iter()
573 .any(|p| matches!(p, Protocol::P2pCircuit));
574 if is_circuit {
575 let _ = self
576 .event_tx
577 .send(NetworkEvent::RelayReservationEstablished {
578 address: address.clone(),
579 })
580 .await;
581 }
582 let _ = self
583 .event_tx
584 .send(NetworkEvent::ListeningOn { address })
585 .await;
586 }
587 libp2p::swarm::SwarmEvent::ConnectionEstablished {
588 peer_id,
589 connection_id,
590 endpoint,
591 ..
592 } => {
593 if let Some(addr) = self.dial_attempts.remove(&connection_id) {
594 // Phase D: a connection that was for a configured
595 // relay shouldn't pollute the lobby with a normal
596 // DialSucceeded — relays aren't chat peers. We just
597 // remember the peer_id and wait for Identify, then
598 // register a /p2p-circuit reservation.
599 let is_relay = self.configured_relays.iter().any(|r| r == &addr);
600 if is_relay {
601 info!(%peer_id, %addr, "connected to configured relay");
602 self.relay_peer_ids.insert(peer_id);
603 } else {
604 info!(%peer_id, %addr, "user-dialed peer connected");
605 // Treat dialed peers like mDNS-discovered: add
606 // to gossipsub explicit peers so room
607 // announcements flow.
608 self.swarm
609 .behaviour_mut()
610 .gossipsub
611 .add_explicit_peer(&peer_id);
612 self.discovered_peers.insert(peer_id);
613 let _ = self
614 .event_tx
615 .send(NetworkEvent::DialSucceeded {
616 peer_id,
617 address: addr,
618 })
619 .await;
620 }
621 } else if let ConnectedPoint::Dialer { .. } = endpoint {
622 // Outgoing connection we didn't track (e.g. mDNS auto-dial)
623 // — still add to mesh; no user-visible event needed.
624 self.swarm
625 .behaviour_mut()
626 .gossipsub
627 .add_explicit_peer(&peer_id);
628 } else {
629 // Inbound dial from an unknown peer (Phase A). We
630 // hold the connection but do NOT add them to the
631 // explicit-peer set yet — wait for Identify so we
632 // can show the user the peer's fingerprint, then
633 // either AcceptInbound (promote to mesh) or
634 // RejectInbound (disconnect + persist blocklist).
635 //
636 // Known limitation: gossipsub's score-based mesh
637 // formation may still forward topic messages to
638 // this peer via other peers we have in common.
639 // True hard-quarantine would need a custom
640 // ConnectionHandler — out of scope for v1.
641 if self.session_blocklist.contains(&peer_id) {
642 info!(%peer_id, "rejecting inbound from session-blocked peer");
643 let _ = self.swarm.disconnect_peer_id(peer_id);
644 } else {
645 let address = match &endpoint {
646 ConnectedPoint::Listener { send_back_addr, .. } => {
647 send_back_addr.clone()
648 }
649 _ => Multiaddr::empty(),
650 };
651 debug!(%peer_id, %address, "inbound peer pending decision");
652 self.pending_inbound
653 .insert(peer_id, PendingPeer::InboundUnknown { address });
654 }
655 }
656 }
657 libp2p::swarm::SwarmEvent::OutgoingConnectionError {
658 connection_id,
659 error,
660 ..
661 } => {
662 if let Some(addr) = self.dial_attempts.remove(&connection_id) {
663 warn!(%addr, %error, "user-dialed peer failed");
664 let _ = self
665 .event_tx
666 .send(NetworkEvent::DialFailed {
667 address: addr,
668 error: error.to_string(),
669 })
670 .await;
671 }
672 }
673 libp2p::swarm::SwarmEvent::ConnectionClosed {
674 peer_id,
675 num_established,
676 ..
677 } => {
678 // Drop any pending-inbound entry for this peer — they
679 // disconnected before we could prompt the user (or
680 // before the user accepted). Lets a re-connect start
681 // fresh rather than reusing stale state.
682 self.pending_inbound.remove(&peer_id);
683 // huddle 0.7.11: emit PeerDisconnected so the app can
684 // clean its `connected_dial_addrs` map. Only fire when
685 // the LAST connection to that peer closed — multiple
686 // simultaneous connections (e.g. direct + relay) would
687 // otherwise emit spurious disconnects when one of the
688 // two drops.
689 if num_established == 0 {
690 let _ = self
691 .event_tx
692 .send(NetworkEvent::PeerDisconnected { peer_id })
693 .await;
694 // Also clean up gossipsub's explicit-peers set so
695 // it doesn't accumulate dead PeerIds across the
696 // session.
697 self.swarm
698 .behaviour_mut()
699 .gossipsub
700 .remove_explicit_peer(&peer_id);
701 }
702 }
703 libp2p::swarm::SwarmEvent::Behaviour(be) => self.handle_behavior_event(be).await,
704 _ => {}
705 }
706 }
707
708 async fn handle_behavior_event(&mut self, event: HuddleBehaviorEvent) {
709 match event {
710 HuddleBehaviorEvent::Mdns(mdns::Event::Discovered(peers)) => {
711 for (peer_id, addr) in peers {
712 if self.discovered_peers.insert(peer_id) {
713 info!(%peer_id, %addr, "mDNS discovered");
714 self.swarm.add_peer_address(peer_id, addr);
715 // Explicitly add to gossipsub mesh.
716 self.swarm
717 .behaviour_mut()
718 .gossipsub
719 .add_explicit_peer(&peer_id);
720 let _ = self
721 .event_tx
722 .send(NetworkEvent::PeerDiscovered { peer_id })
723 .await;
724 }
725 }
726 }
727 HuddleBehaviorEvent::Mdns(mdns::Event::Expired(peers)) => {
728 for (peer_id, _) in peers {
729 if self.discovered_peers.remove(&peer_id) {
730 info!(%peer_id, "mDNS peer expired");
731 self.swarm
732 .behaviour_mut()
733 .gossipsub
734 .remove_explicit_peer(&peer_id);
735 let _ = self.event_tx.send(NetworkEvent::PeerExpired { peer_id }).await;
736 }
737 }
738 }
739 HuddleBehaviorEvent::Gossipsub(gossipsub::Event::Message {
740 propagation_source,
741 message,
742 ..
743 }) => {
744 self.handle_gossipsub_message(propagation_source, message).await;
745 }
746 HuddleBehaviorEvent::Identify(identify::Event::Received {
747 peer_id, info, ..
748 }) => {
749 debug!(%peer_id, agent = %info.agent_version, "identify received");
750 // Phase D: if this peer is a configured relay, register
751 // a `/p2p-circuit` reservation on first identify. Idem-
752 // potent — only fire if we haven't listened on this
753 // relay already (identify fires periodically).
754 if self.relay_peer_ids.contains(&peer_id) {
755 use libp2p::multiaddr::Protocol;
756 if let Some(relay_addr) = self
757 .configured_relays
758 .iter()
759 .find(|a| {
760 // Match by /p2p/<peer-id> suffix when
761 // present, else by the addr we dialed.
762 a.iter().any(|p| matches!(p, Protocol::P2p(pid) if pid == peer_id))
763 || self.dial_attempts.values().any(|d| d == *a)
764 })
765 .cloned()
766 {
767 let circuit = relay_addr.with(Protocol::P2pCircuit);
768 match self.swarm.listen_on(circuit.clone()) {
769 Ok(_) => info!(%circuit, "listening on relay circuit"),
770 Err(e) => warn!(%e, %circuit, "relay listen_on failed"),
771 }
772 }
773 }
774 // Decode the remote's Ed25519 pubkey and derive our
775 // 24-char fingerprint from it. Non-Ed25519 keys (Secp,
776 // Rsa, Ecdsa) shouldn't appear in practice — huddle
777 // only generates Ed25519 identities — so we just log
778 // and skip if the cast fails.
779 let fingerprint = match info.public_key.clone().try_into_ed25519() {
780 Ok(ed_pk) => {
781 let bytes = ed_pk.to_bytes();
782 compute_fingerprint(&bytes)
783 }
784 Err(_) => {
785 warn!(%peer_id, "identify pubkey isn't Ed25519; skipping fingerprint");
786 return;
787 }
788 };
789 // Always notify the app layer so it can populate
790 // `known_peers.fingerprint` and detect that an
791 // outbound peer we dialed has fully identified.
792 let _ = self
793 .event_tx
794 .send(NetworkEvent::PeerIdentified {
795 peer_id,
796 fingerprint: fingerprint.clone(),
797 })
798 .await;
799 // If the peer is in pending_inbound, Identify completing
800 // is the cue to surface the user prompt. Keep them in
801 // pending_inbound until Accept or Reject — we don't
802 // know yet which way the user will decide.
803 if let Some(PendingPeer::InboundUnknown { address }) =
804 self.pending_inbound.get(&peer_id)
805 {
806 let address = address.clone();
807 let _ = self
808 .event_tx
809 .send(NetworkEvent::InboundDial {
810 peer_id,
811 fingerprint,
812 address,
813 })
814 .await;
815 }
816 }
817 HuddleBehaviorEvent::AutonatClient(ev) => {
818 // One probe per address candidate. `result.is_ok()`
819 // means a remote AutoNAT server dialed us back
820 // successfully on `tested_addr` ⇒ this address is
821 // reachable from the outside. The app layer aggregates
822 // these into the lobby reachability badge.
823 let reachable = ev.result.is_ok();
824 if reachable {
825 info!(tested = %ev.tested_addr, server = %ev.server, "AutoNAT: reachable");
826 } else {
827 debug!(tested = %ev.tested_addr, server = %ev.server, "AutoNAT: probe failed");
828 }
829 let _ = self
830 .event_tx
831 .send(NetworkEvent::NatProbeResult {
832 tested_addr: ev.tested_addr,
833 reachable,
834 })
835 .await;
836 }
837 HuddleBehaviorEvent::AutonatServer(_) => {
838 // We answered another peer's reachability probe.
839 // No app-visible action.
840 }
841 HuddleBehaviorEvent::Dcutr(ev) => {
842 let success = ev.result.is_ok();
843 if success {
844 info!(remote = %ev.remote_peer_id, "DCUtR: direct connection established");
845 // huddle 0.7.11: a successful hole-punch resets the
846 // per-peer DCUtR retry counter so future failures
847 // don't immediately bail out.
848 self.dcutr_failures.remove(&ev.remote_peer_id);
849 } else {
850 debug!(remote = %ev.remote_peer_id, "DCUtR: hole-punch failed");
851 let count = self
852 .dcutr_failures
853 .entry(ev.remote_peer_id)
854 .and_modify(|n| *n += 1)
855 .or_insert(1);
856 if *count >= DCUTR_FAIL_BUDGET {
857 warn!(
858 remote = %ev.remote_peer_id,
859 attempts = *count,
860 "DCUtR: giving up after repeated failures (symmetric NAT likely)"
861 );
862 }
863 }
864 let _ = self
865 .event_tx
866 .send(NetworkEvent::DcutrUpgrade {
867 remote_peer: ev.remote_peer_id,
868 success,
869 })
870 .await;
871 }
872 HuddleBehaviorEvent::RelayClient(event) => {
873 // huddle 0.7.12: libp2p 0.56's `relay::client::Event`
874 // only exposes the success-path variants
875 // (`ReservationReqAccepted`, `OutboundCircuitEstablished`,
876 // `InboundCircuitEstablished`). There's no
877 // `ReservationReqFailed` arm we can match on, so we
878 // can't reliably surface reservation loss without a
879 // separate health-check timer (future work). For now
880 // we log every event so operators can see the
881 // lifecycle in `huddle.log` — pre-0.7.11 the whole
882 // arm was swallowed by `_ => {}` which hid even that.
883 use libp2p::relay::client::Event as Rc;
884 match event {
885 Rc::ReservationReqAccepted { relay_peer_id, .. } => {
886 info!(%relay_peer_id, "relay: reservation accepted");
887 }
888 Rc::OutboundCircuitEstablished { relay_peer_id, .. } => {
889 debug!(%relay_peer_id, "relay: outbound circuit established");
890 }
891 other => {
892 debug!(?other, "relay client event");
893 }
894 }
895 }
896 _ => {}
897 }
898 }
899
900 async fn handle_gossipsub_message(
901 &mut self,
902 from_peer: PeerId,
903 message: gossipsub::Message,
904 ) {
905 let topic = message.topic.to_string();
906 if topic == ROOMS_TOPIC {
907 match serde_json::from_slice::<RoomAnnouncement>(&message.data) {
908 Ok(ann) => {
909 let _ = self
910 .event_tx
911 .send(NetworkEvent::RoomAnnouncementReceived(ann))
912 .await;
913 }
914 Err(e) => {
915 warn!(%e, "bad room announcement");
916 }
917 }
918 } else if let Some(room_id) = topic.strip_prefix(protocol::ROOM_TOPIC_PREFIX) {
919 let _ = self
920 .event_tx
921 .send(NetworkEvent::RoomMessageReceived {
922 room_id: room_id.to_string(),
923 payload: message.data,
924 from_peer,
925 })
926 .await;
927 }
928 }
929
930 fn handle_command(&mut self, cmd: NetworkCommand) {
931 match cmd {
932 NetworkCommand::SubscribeRoom { room_id } => {
933 let topic = gossipsub::IdentTopic::new(room_topic(&room_id));
934 if let Err(e) = self.swarm.behaviour_mut().gossipsub.subscribe(&topic) {
935 warn!(%e, %room_id, "subscribe room failed");
936 }
937 }
938 NetworkCommand::UnsubscribeRoom { room_id } => {
939 let topic = gossipsub::IdentTopic::new(room_topic(&room_id));
940 self.swarm.behaviour_mut().gossipsub.unsubscribe(&topic);
941 }
942 NetworkCommand::PublishRoomMessage { room_id, payload } => {
943 let topic = gossipsub::IdentTopic::new(room_topic(&room_id));
944 if let Err(e) = self.swarm.behaviour_mut().gossipsub.publish(topic, payload) {
945 // No subscribed peers is expected before the mesh
946 // forms; anything else (MessageTooLarge, full queues)
947 // is a real bug worth surfacing.
948 match e {
949 gossipsub::PublishError::NoPeersSubscribedToTopic => {
950 debug!(%room_id, "publish skipped: no peers subscribed to topic yet");
951 }
952 e => warn!(%e, %room_id, "publish room message failed"),
953 }
954 }
955 }
956 NetworkCommand::AnnounceRoom(ann) => {
957 let topic = gossipsub::IdentTopic::new(ROOMS_TOPIC);
958 match serde_json::to_vec(&ann) {
959 Ok(payload) => {
960 if let Err(e) =
961 self.swarm.behaviour_mut().gossipsub.publish(topic, payload)
962 {
963 debug!(%e, "publish room announcement failed");
964 }
965 }
966 Err(e) => warn!(%e, "encode room announcement"),
967 }
968 }
969 NetworkCommand::AcceptInbound { peer_id } => {
970 if self.pending_inbound.remove(&peer_id).is_some() {
971 info!(%peer_id, "inbound dial accepted — promoting to mesh");
972 self.swarm
973 .behaviour_mut()
974 .gossipsub
975 .add_explicit_peer(&peer_id);
976 self.discovered_peers.insert(peer_id);
977 } else {
978 debug!(%peer_id, "AcceptInbound for unknown peer (already promoted or disconnected)");
979 }
980 }
981 NetworkCommand::RejectInbound { peer_id } => {
982 self.pending_inbound.remove(&peer_id);
983 self.session_blocklist.insert(peer_id);
984 info!(%peer_id, "inbound dial rejected — disconnecting");
985 let _ = self.swarm.disconnect_peer_id(peer_id);
986 }
987 NetworkCommand::DisconnectPeer { peer_id } => {
988 info!(%peer_id, "app-level identity check failed — disconnecting");
989 let _ = self.swarm.disconnect_peer_id(peer_id);
990 }
991 NetworkCommand::Dial { address } => {
992 let opts: DialOpts = address.clone().into();
993 let conn_id = opts.connection_id();
994 match self.swarm.dial(opts) {
995 Ok(()) => {
996 self.dial_attempts.insert(conn_id, address);
997 }
998 Err(e) => {
999 // Synchronous dial error (bad multiaddr, transport refused).
1000 let tx = self.event_tx.clone();
1001 let err = e.to_string();
1002 tokio::spawn(async move {
1003 let _ = tx
1004 .send(NetworkEvent::DialFailed {
1005 address,
1006 error: err,
1007 })
1008 .await;
1009 });
1010 }
1011 }
1012 }
1013 NetworkCommand::DialAddresses { addresses } => {
1014 use libp2p::multiaddr::Protocol;
1015 if addresses.is_empty() {
1016 return;
1017 }
1018 // Extract the shared peer-id from any address with a
1019 // `/p2p/<peer-id>` suffix. All host_addrs originating
1020 // from the same `RoomAnnouncement` share the
1021 // announcer's peer-id, so the first match is enough.
1022 let peer_id = addresses
1023 .iter()
1024 .flat_map(|a| a.iter())
1025 .find_map(|p| match p {
1026 Protocol::P2p(pid) => Some(pid),
1027 _ => None,
1028 });
1029 let opts = match peer_id {
1030 Some(pid) => DialOpts::peer_id(pid)
1031 .addresses(addresses.clone())
1032 .build(),
1033 // No /p2p/ segment anywhere — fall back to single-
1034 // address dial of the first candidate, matching the
1035 // legacy `Dial` semantics for unanchored multiaddrs.
1036 None => addresses[0].clone().into(),
1037 };
1038 let conn_id = opts.connection_id();
1039 // Use the first address as the representative for
1040 // dial_attempts. On synchronous error we report it;
1041 // on async success the post-identify handler upserts
1042 // with the actually-connected endpoint.
1043 let primary = addresses[0].clone();
1044 match self.swarm.dial(opts) {
1045 Ok(()) => {
1046 self.dial_attempts.insert(conn_id, primary);
1047 }
1048 Err(e) => {
1049 let tx = self.event_tx.clone();
1050 let err = e.to_string();
1051 tokio::spawn(async move {
1052 let _ = tx
1053 .send(NetworkEvent::DialFailed {
1054 address: primary,
1055 error: err,
1056 })
1057 .await;
1058 });
1059 }
1060 }
1061 }
1062 NetworkCommand::Shutdown => unreachable!(),
1063 }
1064 }
1065}