use std::{
io,
net::{IpAddr, Ipv4Addr, SocketAddr},
time::Duration,
};
use sha2::{Digest, Sha256};
pub struct BroadcasterGuard {
tx: Option<flume::Sender<()>>,
}
impl BroadcasterGuard {
#[must_use]
pub fn new(tx: flume::Sender<()>) -> Self {
Self { tx: Some(tx) }
}
}
impl Drop for BroadcasterGuard {
fn drop(&mut self) {
if let Some(tx) = self.tx.take() {
let _ = tx.send(());
}
}
}
#[derive(Debug, Clone)]
pub struct DiscoveredPeer {
pub name: String,
pub addr: SocketAddr,
pub os: String,
pub rtt_ms: Option<f64>,
}
#[must_use]
pub fn derive_channel_id(phrase: &str) -> String {
let mut hasher = Sha256::new();
hasher.update(phrase.as_bytes());
let result = hasher.finalize();
hex::encode(&result[..4])
}
pub async fn start_broadcaster(
channel_id: &str,
port: u16,
cancel_rx: flume::Receiver<()>,
) -> Result<(), io::Error> {
let socket = compio::net::UdpSocket::bind("0.0.0.0:0").await?;
socket.set_broadcast(true)?;
let msg = format!(
"HAYATE_PEER:{}:{}:{}",
channel_id,
std::env::consts::OS,
port
);
let msg_bytes = msg.into_bytes();
let target = SocketAddr::new(IpAddr::V4(Ipv4Addr::BROADCAST), 50002);
let loopback = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 50002);
loop {
let compio::BufResult(res, _) = socket.send_to(msg_bytes.clone(), target).await;
if let Err(_e) = res {
}
let compio::BufResult(res, _) = socket.send_to(msg_bytes.clone(), loopback).await;
if let Err(_e) = res {
}
let sleep_fut = compio::time::sleep(Duration::from_secs(1));
let cancel_fut = cancel_rx.recv_async();
let sleep_pinned = std::pin::pin!(sleep_fut);
let cancel_pinned = std::pin::pin!(cancel_fut);
if let futures_util::future::Either::Right(_) =
futures_util::future::select(sleep_pinned, cancel_pinned).await
{
break;
}
}
Ok(())
}
pub async fn listen_for_broadcast(
target_phrase: Option<&str>,
timeout: Duration,
) -> Result<Option<(String, SocketAddr, String)>, io::Error> {
let target_channel_id = target_phrase.map(derive_channel_id);
let std_socket = socket2::Socket::new(
socket2::Domain::IPV4,
socket2::Type::DGRAM,
Some(socket2::Protocol::UDP),
)?;
std_socket.set_reuse_address(true)?;
#[cfg(not(windows))]
std_socket.set_reuse_port(true)?;
let listen_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 50002);
std_socket.bind(&socket2::SockAddr::from(listen_addr))?;
let socket = compio::net::UdpSocket::from_std(std_socket.into())?;
let buf = vec![0u8; 1024];
let res = compio::time::timeout(timeout, async move {
let mut temp_buf = buf;
loop {
let compio::BufResult(recv_res, b) = socket.recv_from(temp_buf).await;
temp_buf = b;
match recv_res {
Ok((n, src_addr)) => {
let data = &temp_buf[..n];
if let Ok(text) = std::str::from_utf8(data) {
let mut parts = text.split(':');
let parsed = (|| {
if parts.next()? != "HAYATE_PEER" {
return None;
}
let channel_id = parts.next()?;
let os = parts.next()?;
let port_str = parts.next()?;
let matches = match &target_channel_id {
Some(expected_id) => channel_id == expected_id,
None => true,
};
if matches {
let port = port_str.parse::<u16>().ok()?;
let peer_addr = SocketAddr::new(src_addr.ip(), port);
Some(("Hayate Peer".to_owned(), peer_addr, os.to_owned()))
} else {
None
}
})();
if let Some(res) = parsed {
return Ok(Some(res));
}
}
}
Err(e) => return Err(e),
}
}
})
.await;
match res {
Ok(inner_res) => inner_res,
Err(_) => Ok(None), }
}