use std::{
collections::HashSet,
io::ErrorKind,
sync::mpsc::{channel, Receiver, RecvError, Sender, TryRecvError},
thread::JoinHandle,
time::Duration,
};
use log::{error, trace};
use crate::{error::Error, net::build_multicast_socket, Service, ServiceInfo, Udis};
#[derive(Debug)]
pub struct SyncUdis {
_udis: Udis,
bg_thread_jh: JoinHandle<Result<(), Error>>,
cmd_tx: Sender<Cmd>,
serv_info_rx: Receiver<ServiceInfo>,
}
enum Cmd {
Shutdown,
}
impl SyncUdis {
pub(crate) fn build(udis: Udis) -> Self {
let (cmd_tx, cmd_rx) = channel();
let (serv_info_tx, serv_info_rx) = channel();
let udis_bg = udis.clone();
let bg_thread_jh =
std::thread::spawn(move || sync_bg_thread(udis_bg, cmd_rx, serv_info_tx));
Self {
_udis: udis,
bg_thread_jh,
cmd_tx,
serv_info_rx,
}
}
pub fn find_service(&self) -> Result<ServiceInfo, Error> {
if self.bg_thread_jh.is_finished() {
return Err(Error::BackgroundThreadShutdown);
}
let serv_info = self.serv_info_rx.recv()?;
Ok(serv_info)
}
pub fn try_find_service(&self) -> Result<Option<ServiceInfo>, Error> {
if self.bg_thread_jh.is_finished() {
return Err(Error::BackgroundThreadShutdown);
}
match self.serv_info_rx.try_recv() {
Ok(serv_info) => Ok(Some(serv_info)),
Err(TryRecvError::Empty) => Ok(None),
Err(TryRecvError::Disconnected) => Err(Error::ServiceInfoRecvError(RecvError)),
}
}
pub fn shutdown(self) -> Result<(), Error> {
self.cmd_tx
.send(Cmd::Shutdown)
.map_err(|_| Error::FailedToShutdownUdisThread)?;
self.bg_thread_jh
.join()
.map_err(|_| Error::FailedToShutdownUdisThread)??;
Ok(())
}
}
fn sync_bg_thread(
udis: Udis,
cmd_rx: Receiver<Cmd>,
serv_info_tx: Sender<ServiceInfo>,
) -> Result<(), Error> {
let (disc_addr, socket) = build_multicast_socket()?;
trace!("joined udis notify network on {disc_addr}");
for service in &udis.services {
match service {
Service::Host { kind, port } => {
trace!("hosting service `{}` on port {}", kind, port);
}
Service::Search { kind } => {
trace!("searching for service `{}`", kind);
}
}
}
let mut registry = HashSet::<Udis>::new();
let notify_message = serde_json::to_vec(&udis).map_err(Error::FailedToSerialiseNotifyMsg)?;
socket.send_to(¬ify_message[..], &disc_addr.into())?;
let mut buf = Vec::with_capacity(1024);
loop {
match cmd_rx.try_recv() {
Ok(cmd) => match cmd {
Cmd::Shutdown => break,
},
Err(TryRecvError::Empty) => (),
Err(TryRecvError::Disconnected) => break,
}
std::thread::sleep(Duration::from_millis(100));
let received = match socket.recv(buf.spare_capacity_mut()) {
Ok(a) => a,
Err(e) => {
match e.kind() {
ErrorKind::TimedOut | ErrorKind::WouldBlock => (),
k => error!(
"Error while receiving udis notify messages (will continue): ({k:?}) {e}"
),
}
continue;
}
};
unsafe {
buf.set_len(received);
}
let peer: Udis =
serde_json::from_slice(&buf[..]).map_err(Error::FailedToDeserialiseNotifyMsg)?;
buf.clear();
if peer == udis {
continue;
}
if registry.contains(&peer) {
continue;
}
registry.insert(peer.clone());
if udis.get_wanted_services(&peer).count() > 0 {
trace!(
"notified of peer `{}` that wants one of our services",
peer.name
);
socket.send_to(¬ify_message[..], &disc_addr.into())?;
}
for service in peer.get_wanted_services(&udis) {
let Service::Host { kind, port } = service else {
trace!("Non-host service returned by get_wanted_services, skipping");
continue;
};
trace!(
"found peer `{}` that hosts a service we want `{}` at {}:{}",
peer.name,
kind,
peer.addr,
port
);
let serv_info = ServiceInfo {
name: peer.name.clone(),
kind: kind.clone(),
addr: peer.addr,
port: *port,
};
serv_info_tx.send(serv_info)?;
}
}
trace!("udis background task shutting down");
Ok(())
}