pipeworks_net/
discovery.rs1use bitcode::{Decode, Encode};
2use log::{error, info, warn};
3use pipeworks_core::{
4 bus::Bus,
5 event::BusEvent,
6 node_id::{NODE_ID, NodeId},
7};
8use std::{
9 net::{IpAddr, Ipv4Addr, SocketAddr},
10 sync::Arc,
11 time::{Duration, SystemTime},
12};
13use tokio::{net::UdpSocket, time::sleep};
14
15use crate::{PeerDiscoveryInfo, PeersDiscovered, TcpListenerBind};
16
17pub const MAGIC_BYTES: &[u8; 8] = b"wlabs-v1";
18pub const DISCOVERY_PORT: u16 = 41983;
19
20pub fn do_multicast_peer_discovery(bus: Bus) -> std::io::Result<()> {
21 let self_node_id = *NODE_ID;
22
23 let udp_bind_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), DISCOVERY_PORT);
24 let broadcast_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::BROADCAST), DISCOVERY_PORT);
25
26 let socket = {
28 let socket = socket2::Socket::new(
29 socket2::Domain::IPV4,
30 socket2::Type::DGRAM,
31 Some(socket2::Protocol::UDP),
32 )?;
33 socket.set_reuse_address(true)?;
34 socket.set_broadcast(true)?;
35 socket.set_nonblocking(true)?;
36 socket.bind(&udp_bind_addr.into())?;
37 Arc::new(UdpSocket::from_std(socket.into())?)
38 };
39
40 #[derive(Encode, Decode, Clone)]
41 struct PeerDiscoveryPacketV1 {
42 pub node_id: NodeId,
43 pub published_tcp_port: u16,
44 }
45
46 let tx = socket.clone();
48 let bus_clone = bus.clone();
49 tokio::spawn(async move {
50 loop {
51 let Some(BusEvent {
52 msg: TcpListenerBind { listen_addr },
53 ..
54 }) = bus_clone.get_latest()
55 else {
56 warn!("No TCP listener, skipping UDP discovery broadcast");
59 sleep(Duration::from_millis(250)).await;
60 continue;
61 };
62
63 let buf = {
65 let mut encoded = bitcode::encode(&PeerDiscoveryPacketV1 {
66 node_id: self_node_id,
67 published_tcp_port: listen_addr.port(),
68 });
69 let mut buf = MAGIC_BYTES.to_vec();
70 buf.append(&mut encoded);
71 buf
72 };
73
74 if let Err(e) = tx.send_to(&buf, broadcast_addr).await {
75 error!("TODO(send to CAS) Failed to send UDP broadcast: {}", e);
76 }
77
78 sleep(Duration::from_millis(250)).await;
79 }
80 });
81
82 let bus_clone = bus.clone();
84 tokio::spawn(async move {
85 let mut buf = vec![0u8; 1024];
86 let mut discovered = PeersDiscovered::default();
87
88 loop {
89 match socket.recv_from(&mut buf).await {
90 Ok((len, sender_addr)) => {
91 let now = SystemTime::now();
92
93 if len < MAGIC_BYTES.len() || buf[0..MAGIC_BYTES.len()] != MAGIC_BYTES[..] {
94 error!("Invalid magic bytes in UDP payload!");
95 continue;
96 }
97
98 match bitcode::decode::<PeerDiscoveryPacketV1>(&buf[MAGIC_BYTES.len()..len]) {
99 Ok(peer_discovery) => {
100 if peer_discovery.node_id == self_node_id {
102 continue;
103 }
104
105 let mut socket_addr = sender_addr.clone();
107 socket_addr.set_port(peer_discovery.published_tcp_port);
108
109 let entry = discovered
110 .peers
111 .entry(peer_discovery.node_id)
112 .or_insert_with(|| PeerDiscoveryInfo {
113 first_seen_at: now,
114 last_seen_at: now,
115 socket_addr,
116 });
117 entry.last_seen_at = now;
118 entry.socket_addr = socket_addr;
119
120 info!(
121 "Peer {} discovered on IP {}",
122 peer_discovery.node_id, socket_addr
123 );
124
125 bus_clone.send(discovered.clone());
126 }
127 Err(_) => {
128 error!("Invalid PeerDiscoveryPacket after correct magic bytes");
129 }
130 };
131 }
132 Err(e) => {
133 error!("Error receiving UDP packet: {}", e);
134 }
135 }
136 }
137 });
138
139 Ok(())
140}