1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221
//! # mDNS Broadcasting
//!
//! This module provides a way to respond to mDNS queries on the network.
//!
//! In other words, this module provides an _mDNS server_.
//!
//! # Example
//!
//! ```rust, no_run
//! use searchlight::{
//! broadcast::{BroadcasterBuilder, ServiceBuilder},
//! discovery::{DiscoveryBuilder, DiscoveryEvent},
//! net::IpVersion,
//! };
//! use std::{
//! net::{IpAddr, Ipv4Addr},
//! str::FromStr,
//! };
//!
//! let (found_tx, found_rx) = std::sync::mpsc::sync_channel(0);
//!
//! let broadcaster = BroadcasterBuilder::new()
//! .loopback()
//! .add_service(
//! ServiceBuilder::new("_searchlight._udp.local.", "HELLO-WORLD", 1234)
//! .unwrap()
//! .add_ip_address(IpAddr::V4(Ipv4Addr::from_str("192.168.1.69").unwrap()))
//! .add_txt_truncated("key=value")
//! .add_txt_truncated("key2=value2")
//! .build()
//! .unwrap(),
//! )
//! .build(IpVersion::V4)
//! .unwrap()
//! .run_in_background();
//!
//! let discovery = DiscoveryBuilder::new()
//! .loopback()
//! .service("_searchlight._udp.local.")
//! .unwrap()
//! .build(IpVersion::V4)
//! .unwrap()
//! .run_in_background(move |event| {
//! if let DiscoveryEvent::ResponderFound(responder) = event {
//! found_tx.try_send(responder).ok();
//! }
//! });
//!
//! println!("Waiting for discovery to find responder...");
//!
//! println!("{:#?}", found_rx.recv().unwrap());
//!
//! println!("Shutting down...");
//!
//! broadcaster.shutdown().unwrap();
//! discovery.shutdown().unwrap();
//!
//! println!("Done!");
//! ```
use crate::{
errors::MultiIpIoError,
socket::{AsyncMdnsSocket, MdnsSocket, MdnsSocketRecv},
};
use std::{
collections::BTreeSet,
sync::{Arc, RwLock},
};
use trust_dns_client::{
op::Message as DnsMessage,
serialize::binary::{BinDecodable, BinEncodable, BinEncoder},
};
/// Errors that can occur while broadcasting or initializing a broadcaster.
pub mod errors;
mod builder;
pub use builder::BroadcasterBuilder;
mod service;
use service::ServiceDnsResponse;
pub use service::{IntoServiceTxt, Service, ServiceBuilder};
mod handle;
pub use handle::BroadcasterHandle;
use handle::*;
pub(crate) struct BroadcasterConfig {
services: BTreeSet<ServiceDnsResponse>,
}
/// A built mDNS broadcaster (server) instance, ready to be started.
///
/// You can choose to run broadcasting on the current thread, or in the background, using [`Broadcaster::run`] or [`Broadcaster::run_in_background`].
///
/// A `Broadcaster` can be built using [`BroadcasterBuilder`].
pub struct Broadcaster {
socket: MdnsSocket,
config: Arc<RwLock<BroadcasterConfig>>,
}
impl Broadcaster {
/// Run broadcasting on a new thread; in the background.
///
/// Returns a [`BroadcasterHandle`] that can be used to cleanly shut down the background thread.
pub fn run_in_background(self) -> BroadcasterHandle {
let Broadcaster { socket, config } = self;
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
let config_ref = config.clone();
let thread = std::thread::spawn(move || {
tokio::runtime::Builder::new_current_thread()
.thread_name("Searchlight mDNS Broadcaster (Tokio)")
.enable_all()
.build()
.unwrap()
.block_on(async move {
let socket = socket.into_async().await?;
Self::impl_run(&socket, socket.recv(vec![0; 4096]), config_ref, Some(shutdown_rx)).await;
Ok(())
})
});
BroadcasterHandle(BroadcasterHandleDrop(Some(BroadcasterHandleInner { config, thread, shutdown_tx })))
}
/// Run broadcasting on the current thread.
///
/// This will start a new Tokio runtime on the current thread and block until a fatal error occurs.
pub fn run(self) -> Result<(), MultiIpIoError> {
let Broadcaster { socket, config } = self;
tokio::runtime::Builder::new_current_thread()
.thread_name("Searchlight mDNS Broadcaster (Tokio)")
.enable_all()
.build()
.unwrap()
.block_on(async move {
let socket = socket.into_async().await?;
Self::impl_run(&socket, socket.recv(vec![0; 4096]), config, None).await;
Ok(())
})
}
}
impl Broadcaster {
async fn impl_run(
tx: &AsyncMdnsSocket,
mut rx: MdnsSocketRecv<'_>,
config: Arc<RwLock<BroadcasterConfig>>,
shutdown_rx: Option<tokio::sync::oneshot::Receiver<()>>,
) {
if let Some(shutdown_rx) = shutdown_rx {
tokio::select! {
biased;
_ = Self::recv_loop(tx, &mut rx, &config) => (),
_ = shutdown_rx => (),
}
} else {
Self::recv_loop(tx, &mut rx, &config).await
}
}
#[allow(clippy::await_holding_lock)]
// It's fine to hold the lock in this case because we're using the current-thread runtime.
// The future just won't be Send.
async fn recv_loop(tx: &AsyncMdnsSocket, rx: &mut MdnsSocketRecv<'_>, config: &RwLock<BroadcasterConfig>) {
let mut send_buf = vec![0u8; 4096];
loop {
let ((count, addr), packet) = match rx.recv_multicast().await {
Ok(recv) => recv,
Err(err) => {
log::warn!("Failed to receive on mDNS socket: {err}");
continue;
}
};
if count == 0 {
continue;
}
let message = match DnsMessage::from_bytes(packet) {
Ok(message) if !message.truncated() => message,
_ => continue,
};
let query = match message.query() {
Some(query) => query,
None => continue,
};
for service in config.read().unwrap().services.iter().filter(|service| {
if service.service_type() == query.name() {
return true;
}
if let Some(subtype_suffix) = &service.service_subtype_suffix {
if query.name().to_utf8().ends_with(subtype_suffix) {
return true;
}
}
false
}) {
send_buf.clear();
if service.dns_response.emit(&mut BinEncoder::new(&mut send_buf)).is_ok() {
if query.mdns_unicast_response() {
// Send unicast packet
if let Err(err) = tx.send_to(&send_buf, addr).await {
log::warn!("Failed to send unicast mDNS response to {addr}: {err}");
}
} else {
// Send multicast packet
if let Err(err) = tx.send_multicast(&send_buf).await {
log::warn!("Failed to send multicast mDNS response (requested by {addr}): {err}");
}
}
}
}
}
}
}