Skip to main content

po_node/
discovery.rs

1//! UDP broadcast-based peer discovery for LAN environments.
2//!
3//! Sends periodic beacons on the broadcast address and listens for
4//! beacons from other PO nodes on the same network.
5
6use dashmap::DashMap;
7use std::net::SocketAddr;
8use std::sync::Arc;
9use tokio::net::UdpSocket;
10use tokio::sync::mpsc;
11use tracing::{debug, warn};
12
13use po_crypto::identity::NodeId;
14
15/// Default discovery port.
16pub const DISCOVERY_PORT: u16 = 5433;
17
18/// A discovered peer on the local network.
19#[derive(Debug, Clone)]
20pub struct DiscoveredPeer {
21    pub node_id: String,
22    pub addr: SocketAddr,
23    pub quic_port: u16,
24    pub last_seen: std::time::Instant,
25}
26
27/// LAN discovery service using UDP broadcast.
28pub struct Discovery {
29    socket: Arc<UdpSocket>,
30    peers: Arc<DashMap<String, DiscoveredPeer>>,
31    our_node_id: String,
32    our_quic_port: u16,
33}
34
35impl Discovery {
36    /// Start the discovery service.
37    pub async fn start(
38        node_id: &NodeId,
39        quic_port: u16,
40    ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
41        let socket = UdpSocket::bind(format!("0.0.0.0:{DISCOVERY_PORT}")).await?;
42        socket.set_broadcast(true)?;
43
44        Ok(Self {
45            socket: Arc::new(socket),
46            peers: Arc::new(DashMap::new()),
47            our_node_id: node_id.to_hex(),
48            our_quic_port: quic_port,
49        })
50    }
51
52    /// Send a single discovery beacon.
53    pub async fn send_beacon(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
54        let msg = format!("PO|{}|{}", self.our_node_id, self.our_quic_port);
55        let broadcast_addr: SocketAddr = format!("255.255.255.255:{DISCOVERY_PORT}").parse()?;
56        self.socket.send_to(msg.as_bytes(), broadcast_addr).await?;
57        debug!("Sent discovery beacon");
58        Ok(())
59    }
60
61    /// Listen for a single beacon and register the peer.
62    pub async fn listen_once(
63        &self,
64    ) -> Result<Option<DiscoveredPeer>, Box<dyn std::error::Error + Send + Sync>> {
65        let mut buf = [0u8; 256];
66        let (n, addr) = self.socket.recv_from(&mut buf).await?;
67        let msg = std::str::from_utf8(&buf[..n])?;
68
69        if let Some(peer) = self.parse_beacon(msg, addr) {
70            // Don't register ourselves
71            if peer.node_id != self.our_node_id {
72                self.peers.insert(peer.node_id.clone(), peer.clone());
73                return Ok(Some(peer));
74            }
75        }
76
77        Ok(None)
78    }
79
80    /// Get all currently known peers.
81    pub fn known_peers(&self) -> Vec<DiscoveredPeer> {
82        self.peers.iter().map(|r| r.value().clone()).collect()
83    }
84
85    /// Parse a beacon message.
86    fn parse_beacon(&self, msg: &str, source: SocketAddr) -> Option<DiscoveredPeer> {
87        let parts: Vec<&str> = msg.split('|').collect();
88        if parts.len() != 3 || parts[0] != "PO" {
89            return None;
90        }
91
92        let node_id = parts[1].to_string();
93        let quic_port: u16 = parts[2].parse().ok()?;
94
95        Some(DiscoveredPeer {
96            node_id,
97            addr: source,
98            quic_port,
99            last_seen: std::time::Instant::now(),
100        })
101    }
102
103    /// Spawn the background beacon + listener tasks.
104    /// Returns a channel that emits newly discovered peers.
105    pub fn spawn_background(
106        self: Arc<Self>,
107        beacon_interval: std::time::Duration,
108    ) -> mpsc::Receiver<DiscoveredPeer> {
109        let (tx, rx) = mpsc::channel(32);
110
111        // Beacon sender task
112        let disc_clone = Arc::clone(&self);
113        tokio::spawn(async move {
114            loop {
115                if let Err(e) = disc_clone.send_beacon().await {
116                    warn!("Beacon send error: {e}");
117                }
118                tokio::time::sleep(beacon_interval).await;
119            }
120        });
121
122        // Beacon listener task
123        let disc_clone = Arc::clone(&self);
124        tokio::spawn(async move {
125            loop {
126                match disc_clone.listen_once().await {
127                    Ok(Some(peer)) => {
128                        let _ = tx.send(peer).await;
129                    }
130                    Ok(None) => {} // Our own beacon or invalid
131                    Err(e) => {
132                        warn!("Beacon listen error: {e}");
133                        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
134                    }
135                }
136            }
137        });
138
139        rx
140    }
141}