pipeworks_net/
discovery.rs

1use bitcode::{Decode, Encode};
2use log::{error, info, warn};
3use pipeworks_core::{
4    bus::Bus,
5    event::BusEvent,
6    node_id::{NODE_ID, NodeId},
7};
8use std::{
9    net::{IpAddr, Ipv4Addr, SocketAddr},
10    sync::Arc,
11    time::{Duration, SystemTime},
12};
13use tokio::{net::UdpSocket, time::sleep};
14
15use crate::{PeerDiscoveryInfo, PeersDiscovered, TcpListenerBind};
16
17pub const MAGIC_BYTES: &[u8; 8] = b"wlabs-v1";
18pub const DISCOVERY_PORT: u16 = 41983;
19
20pub fn do_multicast_peer_discovery(bus: Bus) -> std::io::Result<()> {
21    let self_node_id = *NODE_ID;
22
23    let udp_bind_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), DISCOVERY_PORT);
24    let broadcast_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::BROADCAST), DISCOVERY_PORT);
25
26    // Create a socket2 socket with `SO_REUSEADDR` set (Linux only).
27    let socket = {
28        let socket = socket2::Socket::new(
29            socket2::Domain::IPV4,
30            socket2::Type::DGRAM,
31            Some(socket2::Protocol::UDP),
32        )?;
33        socket.set_reuse_address(true)?;
34        socket.set_broadcast(true)?;
35        socket.set_nonblocking(true)?;
36        socket.bind(&udp_bind_addr.into())?;
37        Arc::new(UdpSocket::from_std(socket.into())?)
38    };
39
40    #[derive(Encode, Decode, Clone)]
41    struct PeerDiscoveryPacketV1 {
42        pub node_id: NodeId,
43        pub published_tcp_port: u16,
44    }
45
46    // Create a task to send out a ping every 250ms
47    let tx = socket.clone();
48    let bus_clone = bus.clone();
49    tokio::spawn(async move {
50        loop {
51            let Some(BusEvent {
52                msg: TcpListenerBind { listen_addr },
53                ..
54            }) = bus_clone.get_latest()
55            else {
56                // We need an active TCP listener for there to be any use in sending out discovery
57                // broadcasts.
58                warn!("No TCP listener, skipping UDP discovery broadcast");
59                sleep(Duration::from_millis(250)).await;
60                continue;
61            };
62
63            // Magic bytes (8)followed by the payload
64            let buf = {
65                let mut encoded = bitcode::encode(&PeerDiscoveryPacketV1 {
66                    node_id: self_node_id,
67                    published_tcp_port: listen_addr.port(),
68                });
69                let mut buf = MAGIC_BYTES.to_vec();
70                buf.append(&mut encoded);
71                buf
72            };
73
74            if let Err(e) = tx.send_to(&buf, broadcast_addr).await {
75                error!("TODO(send to CAS) Failed to send UDP broadcast: {}", e);
76            }
77
78            sleep(Duration::from_millis(250)).await;
79        }
80    });
81
82    // Handle incoming PeerDiscoverPacketV1 payloads on the UDP socket.
83    let bus_clone = bus.clone();
84    tokio::spawn(async move {
85        let mut buf = vec![0u8; 1024];
86        let mut discovered = PeersDiscovered::default();
87
88        loop {
89            match socket.recv_from(&mut buf).await {
90                Ok((len, sender_addr)) => {
91                    let now = SystemTime::now();
92
93                    if len < MAGIC_BYTES.len() || buf[0..MAGIC_BYTES.len()] != MAGIC_BYTES[..] {
94                        error!("Invalid magic bytes in UDP payload!");
95                        continue;
96                    }
97
98                    match bitcode::decode::<PeerDiscoveryPacketV1>(&buf[MAGIC_BYTES.len()..len]) {
99                        Ok(peer_discovery) => {
100                            // Ignore self
101                            if peer_discovery.node_id == self_node_id {
102                                continue;
103                            }
104
105                            // The TCP SocketAddr is the same as the sender, with the advertised port.
106                            let mut socket_addr = sender_addr.clone();
107                            socket_addr.set_port(peer_discovery.published_tcp_port);
108
109                            let entry = discovered
110                                .peers
111                                .entry(peer_discovery.node_id)
112                                .or_insert_with(|| PeerDiscoveryInfo {
113                                    first_seen_at: now,
114                                    last_seen_at: now,
115                                    socket_addr,
116                                });
117                            entry.last_seen_at = now;
118                            entry.socket_addr = socket_addr;
119
120                            info!(
121                                "Peer {} discovered on IP {}",
122                                peer_discovery.node_id, socket_addr
123                            );
124
125                            bus_clone.send(discovered.clone());
126                        }
127                        Err(_) => {
128                            error!("Invalid PeerDiscoveryPacket after correct magic bytes");
129                        }
130                    };
131                }
132                Err(e) => {
133                    error!("Error receiving UDP packet: {}", e);
134                }
135            }
136        }
137    });
138
139    Ok(())
140}