1use dashmap::DashMap;
7use std::net::SocketAddr;
8use std::sync::Arc;
9use tokio::net::UdpSocket;
10use tokio::sync::mpsc;
11use tracing::{debug, warn};
12
13use po_crypto::identity::NodeId;
14
15pub const DISCOVERY_PORT: u16 = 5433;
17
18#[derive(Debug, Clone)]
20pub struct DiscoveredPeer {
21 pub node_id: String,
22 pub addr: SocketAddr,
23 pub quic_port: u16,
24 pub last_seen: std::time::Instant,
25}
26
27pub struct Discovery {
29 socket: Arc<UdpSocket>,
30 peers: Arc<DashMap<String, DiscoveredPeer>>,
31 our_node_id: String,
32 our_quic_port: u16,
33}
34
35impl Discovery {
36 pub async fn start(
38 node_id: &NodeId,
39 quic_port: u16,
40 ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
41 let socket = UdpSocket::bind(format!("0.0.0.0:{DISCOVERY_PORT}")).await?;
42 socket.set_broadcast(true)?;
43
44 Ok(Self {
45 socket: Arc::new(socket),
46 peers: Arc::new(DashMap::new()),
47 our_node_id: node_id.to_hex(),
48 our_quic_port: quic_port,
49 })
50 }
51
52 pub async fn send_beacon(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
54 let msg = format!("PO|{}|{}", self.our_node_id, self.our_quic_port);
55 let broadcast_addr: SocketAddr = format!("255.255.255.255:{DISCOVERY_PORT}").parse()?;
56 self.socket.send_to(msg.as_bytes(), broadcast_addr).await?;
57 debug!("Sent discovery beacon");
58 Ok(())
59 }
60
61 pub async fn listen_once(
63 &self,
64 ) -> Result<Option<DiscoveredPeer>, Box<dyn std::error::Error + Send + Sync>> {
65 let mut buf = [0u8; 256];
66 let (n, addr) = self.socket.recv_from(&mut buf).await?;
67 let msg = std::str::from_utf8(&buf[..n])?;
68
69 if let Some(peer) = self.parse_beacon(msg, addr) {
70 if peer.node_id != self.our_node_id {
72 self.peers.insert(peer.node_id.clone(), peer.clone());
73 return Ok(Some(peer));
74 }
75 }
76
77 Ok(None)
78 }
79
80 pub fn known_peers(&self) -> Vec<DiscoveredPeer> {
82 self.peers.iter().map(|r| r.value().clone()).collect()
83 }
84
85 fn parse_beacon(&self, msg: &str, source: SocketAddr) -> Option<DiscoveredPeer> {
87 let parts: Vec<&str> = msg.split('|').collect();
88 if parts.len() != 3 || parts[0] != "PO" {
89 return None;
90 }
91
92 let node_id = parts[1].to_string();
93 let quic_port: u16 = parts[2].parse().ok()?;
94
95 Some(DiscoveredPeer {
96 node_id,
97 addr: source,
98 quic_port,
99 last_seen: std::time::Instant::now(),
100 })
101 }
102
103 pub fn spawn_background(
106 self: Arc<Self>,
107 beacon_interval: std::time::Duration,
108 ) -> mpsc::Receiver<DiscoveredPeer> {
109 let (tx, rx) = mpsc::channel(32);
110
111 let disc_clone = Arc::clone(&self);
113 tokio::spawn(async move {
114 loop {
115 if let Err(e) = disc_clone.send_beacon().await {
116 warn!("Beacon send error: {e}");
117 }
118 tokio::time::sleep(beacon_interval).await;
119 }
120 });
121
122 let disc_clone = Arc::clone(&self);
124 tokio::spawn(async move {
125 loop {
126 match disc_clone.listen_once().await {
127 Ok(Some(peer)) => {
128 let _ = tx.send(peer).await;
129 }
130 Ok(None) => {} Err(e) => {
132 warn!("Beacon listen error: {e}");
133 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
134 }
135 }
136 }
137 });
138
139 rx
140 }
141}