Skip to main content

huddle_core/network/
mod.rs

1pub mod behavior;
2pub mod events;
3pub mod protocol;
4
5use std::collections::{HashMap, HashSet};
6use std::time::Duration;
7
8use futures::StreamExt;
9use libp2p::core::ConnectedPoint;
10use libp2p::swarm::dial_opts::DialOpts;
11use libp2p::swarm::ConnectionId;
12use libp2p::{
13    gossipsub, identify, mdns, noise, ping, tcp, yamux, Multiaddr, PeerId, Swarm, SwarmBuilder,
14};
15use tokio::sync::mpsc;
16use tracing::{debug, info, warn};
17
18/// How the network discovers peers.
19#[derive(Debug, Clone, Copy, PartialEq, Eq)]
20pub enum NetworkMode {
21    /// mDNS on: announce ourselves on the LAN and pick up announcements.
22    Mdns,
23    /// mDNS off: invisible to LAN discovery; the only way to connect is
24    /// for someone to dial our address (or for us to dial theirs).
25    Direct,
26}
27
28impl NetworkMode {
29    pub fn as_str(&self) -> &'static str {
30        match self {
31            NetworkMode::Mdns => "mdns",
32            NetworkMode::Direct => "direct",
33        }
34    }
35
36    pub fn from_str(s: &str) -> Option<Self> {
37        match s.trim().to_ascii_lowercase().as_str() {
38            "mdns" | "lan" | "open" => Some(NetworkMode::Mdns),
39            "direct" | "dial" | "private" => Some(NetworkMode::Direct),
40            _ => None,
41        }
42    }
43}
44
45use crate::identity::Identity;
46use crate::network::behavior::{HuddleBehavior, HuddleBehaviorEvent};
47use crate::network::events::NetworkEvent;
48use crate::network::protocol::{room_topic, RoomAnnouncement, ROOMS_TOPIC};
49
50#[derive(Debug)]
51pub enum NetworkCommand {
52    /// Subscribe to a room's per-room gossipsub topic.
53    SubscribeRoom { room_id: String },
54    /// Unsubscribe from a room's topic.
55    UnsubscribeRoom { room_id: String },
56    /// Publish a JSON-encoded `RoomMessage` to a room's topic.
57    PublishRoomMessage { room_id: String, payload: Vec<u8> },
58    /// Publish a room announcement on the global rooms topic.
59    AnnounceRoom(RoomAnnouncement),
60    /// User-initiated dial of an explicit address. Used for cross-network
61    /// reach when mDNS isn't enough.
62    Dial { address: Multiaddr },
63    Shutdown,
64}
65
66#[derive(Clone)]
67pub struct NetworkHandle {
68    cmd_tx: mpsc::Sender<NetworkCommand>,
69}
70
71impl NetworkHandle {
72    pub async fn subscribe_room(&self, room_id: String) {
73        let _ = self
74            .cmd_tx
75            .send(NetworkCommand::SubscribeRoom { room_id })
76            .await;
77    }
78
79    pub async fn unsubscribe_room(&self, room_id: String) {
80        let _ = self
81            .cmd_tx
82            .send(NetworkCommand::UnsubscribeRoom { room_id })
83            .await;
84    }
85
86    pub async fn publish_room_message(&self, room_id: String, payload: Vec<u8>) {
87        let _ = self
88            .cmd_tx
89            .send(NetworkCommand::PublishRoomMessage { room_id, payload })
90            .await;
91    }
92
93    pub async fn announce_room(&self, ann: RoomAnnouncement) {
94        let _ = self.cmd_tx.send(NetworkCommand::AnnounceRoom(ann)).await;
95    }
96
97    pub async fn dial(&self, address: Multiaddr) {
98        let _ = self.cmd_tx.send(NetworkCommand::Dial { address }).await;
99    }
100
101    pub async fn shutdown(&self) {
102        let _ = self.cmd_tx.send(NetworkCommand::Shutdown).await;
103    }
104}
105
106struct NetworkTask {
107    swarm: Swarm<HuddleBehavior>,
108    cmd_rx: mpsc::Receiver<NetworkCommand>,
109    event_tx: mpsc::Sender<NetworkEvent>,
110    discovered_peers: HashSet<PeerId>,
111    /// Tracks user-initiated dials so we can correlate the eventual
112    /// `ConnectionEstablished` / `OutgoingConnectionError` back to a
113    /// specific address the user asked us to dial.
114    dial_attempts: HashMap<ConnectionId, Multiaddr>,
115}
116
117pub fn start_network(
118    identity: &Identity,
119    event_tx: mpsc::Sender<NetworkEvent>,
120) -> crate::error::Result<NetworkHandle> {
121    start_network_with(identity, event_tx, NetworkMode::Mdns, 0)
122}
123
124/// Start the network task with explicit mode and TCP listen port.
125/// `listen_port = 0` requests a random port.
126pub fn start_network_with(
127    identity: &Identity,
128    event_tx: mpsc::Sender<NetworkEvent>,
129    mode: NetworkMode,
130    listen_port: u16,
131) -> crate::error::Result<NetworkHandle> {
132    let keypair = identity.keypair().clone();
133    let local_peer_id = identity.peer_id();
134
135    let mut swarm = SwarmBuilder::with_existing_identity(keypair)
136        .with_tokio()
137        .with_tcp(
138            tcp::Config::default(),
139            noise::Config::new,
140            yamux::Config::default,
141        )
142        .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?
143        .with_behaviour(|key| {
144            let mdns_opt = match mode {
145                NetworkMode::Mdns => Some(
146                    mdns::tokio::Behaviour::new(mdns::Config::default(), local_peer_id)
147                        .expect("mDNS init failed"),
148                ),
149                NetworkMode::Direct => None,
150            };
151            let mdns: libp2p::swarm::behaviour::toggle::Toggle<_> = mdns_opt.into();
152
153            let identify = identify::Behaviour::new(
154                identify::Config::new("/huddle/1.0.0".into(), key.public())
155                    .with_agent_version("huddle/0.2".into()),
156            );
157
158            let ping = ping::Behaviour::default();
159
160            let gossipsub_config = gossipsub::ConfigBuilder::default()
161                .heartbeat_interval(Duration::from_secs(1))
162                .validation_mode(gossipsub::ValidationMode::Strict)
163                .build()
164                .expect("valid gossipsub config");
165
166            let mut gossipsub = gossipsub::Behaviour::new(
167                gossipsub::MessageAuthenticity::Signed(key.clone()),
168                gossipsub_config,
169            )
170            .expect("valid gossipsub init");
171
172            // Every node subscribes to the global rooms topic so the lobby
173            // shows discovered rooms even before joining anything.
174            let rooms_topic = gossipsub::IdentTopic::new(ROOMS_TOPIC);
175            gossipsub
176                .subscribe(&rooms_topic)
177                .expect("subscribe rooms topic");
178
179            HuddleBehavior {
180                mdns,
181                identify,
182                ping,
183                gossipsub,
184            }
185        })
186        .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?
187        .with_swarm_config(|cfg| cfg.with_idle_connection_timeout(Duration::from_secs(120)))
188        .build();
189
190    let listen_addr: Multiaddr = format!("/ip4/0.0.0.0/tcp/{}", listen_port)
191        .parse()
192        .expect("valid listen addr");
193    swarm
194        .listen_on(listen_addr)
195        .map_err(|e| crate::error::HuddleError::Network(e.to_string()))?;
196    // Also bind IPv6 on all interfaces so users can dial via IPv6.
197    let listen_addr6: Multiaddr = format!("/ip6/::/tcp/{}", listen_port)
198        .parse()
199        .expect("valid ipv6 listen addr");
200    if let Err(e) = swarm.listen_on(listen_addr6) {
201        debug!(%e, "ipv6 listen skipped");
202    }
203
204    let (cmd_tx, cmd_rx) = mpsc::channel(256);
205    let task = NetworkTask {
206        swarm,
207        cmd_rx,
208        event_tx,
209        discovered_peers: HashSet::new(),
210        dial_attempts: HashMap::new(),
211    };
212    tokio::spawn(task.run());
213
214    Ok(NetworkHandle { cmd_tx })
215}
216
217impl NetworkTask {
218    async fn run(mut self) {
219        loop {
220            tokio::select! {
221                event = self.swarm.select_next_some() => {
222                    self.handle_swarm_event(event).await;
223                }
224                Some(cmd) = self.cmd_rx.recv() => {
225                    if matches!(cmd, NetworkCommand::Shutdown) {
226                        info!("network task shutting down");
227                        break;
228                    }
229                    self.handle_command(cmd);
230                }
231            }
232        }
233    }
234
235    async fn handle_swarm_event(
236        &mut self,
237        event: libp2p::swarm::SwarmEvent<HuddleBehaviorEvent>,
238    ) {
239        match event {
240            libp2p::swarm::SwarmEvent::NewListenAddr { address, .. } => {
241                info!(%address, "listening");
242                let _ = self
243                    .event_tx
244                    .send(NetworkEvent::ListeningOn { address })
245                    .await;
246            }
247            libp2p::swarm::SwarmEvent::ConnectionEstablished {
248                peer_id,
249                connection_id,
250                endpoint,
251                ..
252            } => {
253                if let Some(addr) = self.dial_attempts.remove(&connection_id) {
254                    info!(%peer_id, %addr, "user-dialed peer connected");
255                    // Treat dialed peers like mDNS-discovered: add to
256                    // gossipsub explicit peers so room announcements flow.
257                    self.swarm
258                        .behaviour_mut()
259                        .gossipsub
260                        .add_explicit_peer(&peer_id);
261                    self.discovered_peers.insert(peer_id);
262                    let _ = self
263                        .event_tx
264                        .send(NetworkEvent::DialSucceeded {
265                            peer_id,
266                            address: addr,
267                        })
268                        .await;
269                } else if let ConnectedPoint::Dialer { .. } = endpoint {
270                    // Outgoing connection we didn't track (e.g. mDNS auto-dial)
271                    // — still add to mesh; no user-visible event needed.
272                    self.swarm
273                        .behaviour_mut()
274                        .gossipsub
275                        .add_explicit_peer(&peer_id);
276                }
277            }
278            libp2p::swarm::SwarmEvent::OutgoingConnectionError {
279                connection_id,
280                error,
281                ..
282            } => {
283                if let Some(addr) = self.dial_attempts.remove(&connection_id) {
284                    warn!(%addr, %error, "user-dialed peer failed");
285                    let _ = self
286                        .event_tx
287                        .send(NetworkEvent::DialFailed {
288                            address: addr,
289                            error: error.to_string(),
290                        })
291                        .await;
292                }
293            }
294            libp2p::swarm::SwarmEvent::Behaviour(be) => self.handle_behavior_event(be).await,
295            _ => {}
296        }
297    }
298
299    async fn handle_behavior_event(&mut self, event: HuddleBehaviorEvent) {
300        match event {
301            HuddleBehaviorEvent::Mdns(mdns::Event::Discovered(peers)) => {
302                for (peer_id, addr) in peers {
303                    if self.discovered_peers.insert(peer_id) {
304                        info!(%peer_id, %addr, "mDNS discovered");
305                        self.swarm.add_peer_address(peer_id, addr);
306                        // Explicitly add to gossipsub mesh.
307                        self.swarm
308                            .behaviour_mut()
309                            .gossipsub
310                            .add_explicit_peer(&peer_id);
311                        let _ = self
312                            .event_tx
313                            .send(NetworkEvent::PeerDiscovered { peer_id })
314                            .await;
315                    }
316                }
317            }
318            HuddleBehaviorEvent::Mdns(mdns::Event::Expired(peers)) => {
319                for (peer_id, _) in peers {
320                    if self.discovered_peers.remove(&peer_id) {
321                        info!(%peer_id, "mDNS peer expired");
322                        self.swarm
323                            .behaviour_mut()
324                            .gossipsub
325                            .remove_explicit_peer(&peer_id);
326                        let _ = self.event_tx.send(NetworkEvent::PeerExpired { peer_id }).await;
327                    }
328                }
329            }
330            HuddleBehaviorEvent::Gossipsub(gossipsub::Event::Message {
331                propagation_source,
332                message,
333                ..
334            }) => {
335                self.handle_gossipsub_message(propagation_source, message).await;
336            }
337            HuddleBehaviorEvent::Identify(identify::Event::Received {
338                peer_id, info, ..
339            }) => {
340                debug!(%peer_id, agent = %info.agent_version, "identify received");
341            }
342            _ => {}
343        }
344    }
345
346    async fn handle_gossipsub_message(
347        &mut self,
348        from_peer: PeerId,
349        message: gossipsub::Message,
350    ) {
351        let topic = message.topic.to_string();
352        if topic == ROOMS_TOPIC {
353            match serde_json::from_slice::<RoomAnnouncement>(&message.data) {
354                Ok(ann) => {
355                    let _ = self
356                        .event_tx
357                        .send(NetworkEvent::RoomAnnouncementReceived(ann))
358                        .await;
359                }
360                Err(e) => {
361                    warn!(%e, "bad room announcement");
362                }
363            }
364        } else if let Some(room_id) = topic.strip_prefix(protocol::ROOM_TOPIC_PREFIX) {
365            let _ = self
366                .event_tx
367                .send(NetworkEvent::RoomMessageReceived {
368                    room_id: room_id.to_string(),
369                    payload: message.data,
370                    from_peer,
371                })
372                .await;
373        }
374    }
375
376    fn handle_command(&mut self, cmd: NetworkCommand) {
377        match cmd {
378            NetworkCommand::SubscribeRoom { room_id } => {
379                let topic = gossipsub::IdentTopic::new(room_topic(&room_id));
380                if let Err(e) = self.swarm.behaviour_mut().gossipsub.subscribe(&topic) {
381                    warn!(%e, %room_id, "subscribe room failed");
382                }
383            }
384            NetworkCommand::UnsubscribeRoom { room_id } => {
385                let topic = gossipsub::IdentTopic::new(room_topic(&room_id));
386                self.swarm.behaviour_mut().gossipsub.unsubscribe(&topic);
387            }
388            NetworkCommand::PublishRoomMessage { room_id, payload } => {
389                let topic = gossipsub::IdentTopic::new(room_topic(&room_id));
390                if let Err(e) = self.swarm.behaviour_mut().gossipsub.publish(topic, payload) {
391                    debug!(%e, %room_id, "publish room message failed (no peers yet?)");
392                }
393            }
394            NetworkCommand::AnnounceRoom(ann) => {
395                let topic = gossipsub::IdentTopic::new(ROOMS_TOPIC);
396                match serde_json::to_vec(&ann) {
397                    Ok(payload) => {
398                        if let Err(e) =
399                            self.swarm.behaviour_mut().gossipsub.publish(topic, payload)
400                        {
401                            debug!(%e, "publish room announcement failed");
402                        }
403                    }
404                    Err(e) => warn!(%e, "encode room announcement"),
405                }
406            }
407            NetworkCommand::Dial { address } => {
408                let opts: DialOpts = address.clone().into();
409                let conn_id = opts.connection_id();
410                match self.swarm.dial(opts) {
411                    Ok(()) => {
412                        self.dial_attempts.insert(conn_id, address);
413                    }
414                    Err(e) => {
415                        // Synchronous dial error (bad multiaddr, transport refused).
416                        let tx = self.event_tx.clone();
417                        let err = e.to_string();
418                        tokio::spawn(async move {
419                            let _ = tx
420                                .send(NetworkEvent::DialFailed {
421                                    address,
422                                    error: err,
423                                })
424                                .await;
425                        });
426                    }
427                }
428            }
429            NetworkCommand::Shutdown => unreachable!(),
430        }
431    }
432}