use crate::{
errors::MultiIpIoError,
socket::{AsyncMdnsSocket, MdnsSocket, MdnsSocketRecv},
};
use std::{
collections::BTreeSet,
sync::{Arc, RwLock},
};
use trust_dns_client::{
op::Message as DnsMessage,
serialize::binary::{BinDecodable, BinEncodable, BinEncoder},
};
pub mod errors;
mod builder;
pub use builder::BroadcasterBuilder;
mod service;
use service::ServiceDnsResponse;
pub use service::{IntoServiceTxt, Service, ServiceBuilder};
mod handle;
pub use handle::BroadcasterHandle;
use handle::*;
pub(crate) struct BroadcasterConfig {
services: BTreeSet<ServiceDnsResponse>,
}
pub struct Broadcaster {
socket: MdnsSocket,
config: Arc<RwLock<BroadcasterConfig>>,
}
impl Broadcaster {
pub fn run_in_background(self) -> BroadcasterHandle {
let Broadcaster { socket, config } = self;
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
let config_ref = config.clone();
let thread = std::thread::spawn(move || {
tokio::runtime::Builder::new_current_thread()
.thread_name("Searchlight mDNS Broadcaster (Tokio)")
.enable_all()
.build()
.unwrap()
.block_on(async move {
let socket = socket.into_async().await?;
Self::impl_run(&socket, socket.recv(vec![0; 4096]), config_ref, Some(shutdown_rx)).await;
Ok(())
})
});
BroadcasterHandle(BroadcasterHandleDrop(Some(BroadcasterHandleInner { config, thread, shutdown_tx })))
}
pub fn run(self) -> Result<(), MultiIpIoError> {
let Broadcaster { socket, config } = self;
tokio::runtime::Builder::new_current_thread()
.thread_name("Searchlight mDNS Broadcaster (Tokio)")
.enable_all()
.build()
.unwrap()
.block_on(async move {
let socket = socket.into_async().await?;
Self::impl_run(&socket, socket.recv(vec![0; 4096]), config, None).await;
Ok(())
})
}
}
impl Broadcaster {
async fn impl_run(
tx: &AsyncMdnsSocket,
mut rx: MdnsSocketRecv<'_>,
config: Arc<RwLock<BroadcasterConfig>>,
shutdown_rx: Option<tokio::sync::oneshot::Receiver<()>>,
) {
if let Some(shutdown_rx) = shutdown_rx {
tokio::select! {
biased;
_ = Self::recv_loop(tx, &mut rx, &config) => (),
_ = shutdown_rx => (),
}
} else {
Self::recv_loop(tx, &mut rx, &config).await
}
}
#[allow(clippy::await_holding_lock)]
async fn recv_loop(tx: &AsyncMdnsSocket, rx: &mut MdnsSocketRecv<'_>, config: &RwLock<BroadcasterConfig>) {
let mut send_buf = vec![0u8; 4096];
loop {
let ((count, addr), packet) = match rx.recv_multicast().await {
Ok(recv) => recv,
Err(err) => {
log::warn!("Failed to receive on mDNS socket: {err}");
continue;
}
};
if count == 0 {
continue;
}
let message = match DnsMessage::from_bytes(packet) {
Ok(message) if !message.truncated() => message,
_ => continue,
};
let query = match message.query() {
Some(query) => query,
None => continue,
};
for service in config.read().unwrap().services.iter().filter(|service| {
if service.service_type() == query.name() {
return true;
}
if let Some(subtype_suffix) = &service.service_subtype_suffix {
if query.name().to_utf8().ends_with(subtype_suffix) {
return true;
}
}
false
}) {
send_buf.clear();
if service.dns_response.emit(&mut BinEncoder::new(&mut send_buf)).is_ok() {
if query.mdns_unicast_response() {
if let Err(err) = tx.send_to(&send_buf, addr).await {
log::warn!("Failed to send unicast mDNS response to {addr}: {err}");
}
} else {
if let Err(err) = tx.send_multicast(&send_buf).await {
log::warn!("Failed to send multicast mDNS response (requested by {addr}): {err}");
}
}
}
}
}
}
}