use core::pin::pin;
use std::collections::{HashMap, HashSet};
use std::net::IpAddr;
use std::sync::mpsc::{sync_channel, Receiver, SyncSender, TryRecvError};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use embassy_futures::select::select3;
use embassy_time::Timer;
use zeroconf::browser::TMdnsBrowser;
use zeroconf::prelude::TEventLoop;
use zeroconf::service::TMdnsService;
use zeroconf::txt_record::TTxtRecord;
use zeroconf::{MdnsBrowser, ServiceDiscovery, ServiceType};
use crate::error::{Error, ErrorCode};
use crate::transport::network::mdns::{DottedName, MdnsRemoteService};
use crate::transport::network::MatterLocalService;
use crate::utils::select::Coalesce;
use crate::Matter;
const QUERY_POLL_INTERVAL_MS: u64 = 100;
fn txt_pairs(svc: &ServiceDiscovery) -> Vec<(String, String)> {
let mut pairs = Vec::new();
if let Some(txt) = svc.txt() {
for (key, value) in txt.iter() {
pairs.push((key, value));
}
}
pairs
}
pub struct ZeroconfMdns {
services: HashMap<MatterLocalService, MdnsEntry>,
}
impl Default for ZeroconfMdns {
fn default() -> Self {
Self::new()
}
}
impl ZeroconfMdns {
pub fn new() -> Self {
Self {
services: HashMap::new(),
}
}
pub async fn run(&mut self, matter: &Matter<'_>) -> Result<(), Error> {
let mut respond = pin!(self.run_respond(matter));
let mut resolve = pin!(Self::run_resolve(matter));
let mut browse = pin!(Self::run_browse(matter));
select3(&mut respond, &mut resolve, &mut browse)
.coalesce()
.await
}
async fn run_respond(&mut self, matter: &Matter<'_>) -> Result<(), Error> {
loop {
matter.transport().wait_mdns().await;
let mut services = HashSet::new();
matter.mdns_services(|service| {
services.insert(service);
Ok(())
})?;
info!("mDNS services changed, updating...");
self.update_services(matter, &services)?;
info!("mDNS services updated");
}
}
async fn run_resolve(matter: &Matter<'_>) -> Result<(), Error> {
loop {
let service = matter.transport().wait_mdns_resolve_request().await;
let mut name_buf: heapless::String<128> = heapless::String::new();
service.instance_name(&mut name_buf);
let label = name_buf.split('.').next().unwrap_or("").to_string();
let mut parts = service.service_type().trim_start_matches('_').split("._");
let svc_name = parts.next().unwrap_or("matter");
let proto = parts.next().unwrap_or("tcp");
let service_type = ServiceType::new(svc_name, proto)?;
let (discovered, _stop) = Self::spawn_browser(service_type);
while matter.transport().mdns_resolve_in_flight() {
Self::drain(&discovered, |svc| {
if svc.name() != &label {
return;
}
if let Ok(ip) = svc.address().parse::<IpAddr>() {
let pairs = txt_pairs(svc);
matter
.transport()
.try_deposit_mdns_resolve(&MdnsRemoteService {
instance_name: DottedName(name_buf.as_str()),
port: Some(*svc.port()),
addrs: core::iter::once(ip),
txt: pairs.iter().map(|(k, v)| (k.as_str(), v.as_str())),
scope_id: 0,
});
}
});
Timer::after(embassy_time::Duration::from_millis(QUERY_POLL_INTERVAL_MS)).await;
}
}
}
async fn run_browse(matter: &Matter<'_>) -> Result<(), Error> {
loop {
let _filter = matter.transport().wait_mdns_browse_request().await;
let service_type = ServiceType::new("matterc", "udp")?;
let (discovered, _stop) = Self::spawn_browser(service_type);
while matter.transport().mdns_browse_in_flight() {
Self::drain(&discovered, |svc| {
if let Ok(ip) = svc.address().parse::<IpAddr>() {
let pairs = txt_pairs(svc);
matter
.transport()
.try_deposit_mdns_browse(&MdnsRemoteService {
instance_name: DottedName(svc.name()),
port: Some(*svc.port()),
addrs: core::iter::once(ip),
txt: pairs.iter().map(|(k, v)| (k.as_str(), v.as_str())),
scope_id: 0,
});
}
});
Timer::after(embassy_time::Duration::from_millis(QUERY_POLL_INTERVAL_MS)).await;
}
}
}
fn spawn_browser(service_type: ServiceType) -> (Arc<Mutex<Vec<ServiceDiscovery>>>, MdnsEntry) {
let discovered: Arc<Mutex<Vec<ServiceDiscovery>>> = Arc::new(Mutex::new(Vec::new()));
let discovered_thread = discovered.clone();
let (stop_tx, stop_rx) = sync_channel::<()>(1);
let _ = std::thread::spawn(move || {
let mut browser = MdnsBrowser::new(service_type);
browser.set_service_discovered_callback(Box::new(
move |result: zeroconf::Result<ServiceDiscovery>, _context| {
if let Ok(service) = result {
if let Ok(mut guard) = discovered_thread.lock() {
guard.push(service);
}
}
},
));
match browser.browse_services() {
Ok(event_loop) => {
while matches!(stop_rx.try_recv(), Err(TryRecvError::Empty)) {
if let Err(e) = event_loop.poll(Duration::from_millis(100)) {
warn!("Browser poll error: {:?}", e);
break;
}
}
}
Err(e) => error!("Failed to start zeroconf browser: {:?}", e),
}
});
(discovered, MdnsEntry(stop_tx))
}
fn drain(discovered: &Arc<Mutex<Vec<ServiceDiscovery>>>, mut f: impl FnMut(&ServiceDiscovery)) {
if let Ok(mut guard) = discovered.lock() {
for svc in guard.drain(..) {
f(&svc);
}
}
}
fn update_services(
&mut self,
matter: &Matter<'_>,
services: &HashSet<MatterLocalService>,
) -> Result<(), Error> {
for service in services {
if !self.services.contains_key(service) {
info!("Registering mDNS service: {:?}", service);
let zeroconf_service = SendableZeroconfMdnsService::new(matter, service)?;
let (sender, receiver) = sync_channel(1);
let _ = std::thread::spawn(move || zeroconf_service.run(receiver));
self.services.insert(service.clone(), MdnsEntry(sender));
}
}
loop {
let removed = self
.services
.iter()
.find(|(service, _)| !services.contains(service));
if let Some((service, _)) = removed {
info!("Deregistering mDNS service: {:?}", service);
self.services.remove(&service.clone());
} else {
break;
}
}
Ok(())
}
}
struct SendableZeroconfMdnsService {
name: String,
service_type: ServiceType,
port: u16,
txt_kvs: Vec<(String, String)>,
}
impl SendableZeroconfMdnsService {
fn new(matter: &Matter<'_>, mdns_service: &MatterLocalService) -> Result<Self, Error> {
let mut buf = [0u8; 512];
let (service, _) = mdns_service.service(matter.dev_det(), matter.port(), &mut buf)?;
let service_name = service.service.strip_prefix('_').unwrap_or(service.service);
let protocol = service
.protocol
.strip_prefix('_')
.unwrap_or(service.protocol);
let subtypes: Vec<&str> = service
.service_subtypes
.clone()
.map(|subtype| subtype.strip_prefix('_').unwrap_or(subtype))
.collect();
let service_type = if !subtypes.is_empty() {
ServiceType::with_sub_types(service_name, protocol, subtypes)?
} else {
ServiceType::new(service_name, protocol)?
};
let txt_kvs = service
.txt_kvs
.clone()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect::<Vec<_>>();
Ok(Self {
name: service.name.to_string(),
service_type,
port: service.port,
txt_kvs,
})
}
fn run(self, receiver: Receiver<()>) -> Result<(), Error> {
let mut mdns_service = zeroconf::MdnsService::new(self.service_type, self.port);
let mut txt_record = zeroconf::TxtRecord::new();
for (k, v) in &self.txt_kvs {
trace!("mDNS TXT key {} val {}", k, v);
txt_record.insert(k, v)?;
}
mdns_service.set_name(&self.name);
mdns_service.set_txt_record(txt_record);
mdns_service.set_registered_callback(Box::new(|_, _| {}));
let event_loop = mdns_service.register()?;
while matches!(receiver.try_recv(), Err(TryRecvError::Empty)) {
event_loop.poll(std::time::Duration::from_secs(1))?;
}
Ok(())
}
}
struct MdnsEntry(SyncSender<()>);
impl Drop for MdnsEntry {
fn drop(&mut self) {
if let Err(e) = self.0.send(()) {
error!("Deregistering mDNS entry failed: {}", debug2format!(e));
}
}
}
impl From<zeroconf::error::Error> for Error {
fn from(e: zeroconf::error::Error) -> Self {
Self::new_with_details(ErrorCode::MdnsError, Box::new(e))
}
}