use core::pin::pin;
use std::collections::{HashMap, HashSet};
use std::io::Write as _;
use std::net::IpAddr;
use embassy_futures::select::{select, select3, Either};
use embassy_time::{Duration, Timer};
use futures_lite::StreamExt;
use zbus::zvariant::{ObjectPath, OwnedObjectPath};
use zbus::Connection;
use crate::error::Error;
use crate::transport::network::mdns::{DottedName, MdnsRemoteService};
use crate::transport::network::MatterLocalService;
use crate::utils::select::Coalesce;
use crate::utils::zbus_proxies::avahi::entry_group::EntryGroupProxy;
use crate::utils::zbus_proxies::avahi::server2::Server2Proxy;
use crate::utils::zbus_proxies::avahi::service_browser::ServiceBrowserProxy;
use crate::Matter;
const AVAHI_IF_UNSPEC: i32 = -1;
const AVAHI_PROTO_UNSPEC: i32 = -1;
const AVAHI_PROTO_INET: i32 = 0;
const AVAHI_PROTO_INET6: i32 = 1;
const AVAHI_RESOLVE_PROTOS: [i32; 2] = [AVAHI_PROTO_INET6, AVAHI_PROTO_INET];
const BROWSE_POLL_INTERVAL_MS: u64 = 250;
pub struct AvahiMdns {
services: HashMap<MatterLocalService, OwnedObjectPath>,
connection: Connection,
}
impl AvahiMdns {
pub fn new(connection: Connection) -> Self {
Self {
services: HashMap::new(),
connection,
}
}
pub async fn run(&mut self, matter: &Matter<'_>) -> Result<(), Error> {
let connection = self.connection.clone();
let mut respond = pin!(self.run_respond(matter));
let mut resolve = pin!(Self::run_resolve(matter, &connection));
let mut browse = pin!(Self::run_browse(matter, &connection));
select3(&mut respond, &mut resolve, &mut browse)
.coalesce()
.await
}
async fn run_respond(&mut self, matter: &Matter<'_>) -> Result<(), Error> {
{
let avahi = Server2Proxy::new(&self.connection).await?;
info!("Avahi API version: {}", avahi.get_apiversion().await?);
}
loop {
matter.transport().wait_mdns().await;
let mut services = HashSet::new();
matter.mdns_services(|service| {
services.insert(service);
Ok(())
})?;
info!("mDNS services changed, updating...");
self.update_services(matter, &services).await?;
info!("mDNS services updated");
}
}
async fn run_resolve(matter: &Matter<'_>, connection: &Connection) -> Result<(), Error> {
loop {
let service = matter.transport().wait_mdns_resolve_request().await;
let mut name_buf: heapless::String<128> = heapless::String::new();
service.instance_name(&mut name_buf);
let label = name_buf
.split('.')
.next()
.unwrap_or(name_buf.as_str())
.to_string();
let service_type = service.service_type();
let avahi = Server2Proxy::new(connection).await?;
while matter.transport().mdns_resolve_in_flight() {
let (ips, port, txt, scope_id) =
resolve_all_families(&avahi, AVAHI_IF_UNSPEC, &label, service_type).await;
if !ips.is_empty() {
let txt_pairs = txt_pairs(&txt);
matter
.transport()
.try_deposit_mdns_resolve(&MdnsRemoteService {
instance_name: DottedName(name_buf.as_str()),
port: Some(port),
addrs: ips.iter().copied(),
txt: txt_pairs.iter().copied(),
scope_id,
});
}
Timer::after(Duration::from_millis(BROWSE_POLL_INTERVAL_MS)).await;
}
}
}
async fn run_browse(matter: &Matter<'_>, connection: &Connection) -> Result<(), Error> {
loop {
let filter = matter.transport().wait_mdns_browse_request().await;
let mut service_type: heapless::String<64> = heapless::String::new();
filter.service_type(&mut service_type, false);
let avahi = Server2Proxy::new(connection).await?;
let browser_path = avahi
.service_browser_prepare(AVAHI_IF_UNSPEC, AVAHI_PROTO_UNSPEC, &service_type, "", 0)
.await?;
let browser = ServiceBrowserProxy::builder(connection)
.path(browser_path)?
.build()
.await?;
let mut item_new_stream = browser.receive_item_new().await?;
browser.start().await?;
while matter.transport().mdns_browse_in_flight() {
let item = pin!(item_new_stream.next());
let tick = pin!(Timer::after(Duration::from_millis(BROWSE_POLL_INTERVAL_MS)));
let signal = match select(item, tick).await {
Either::First(Some(signal)) => signal,
Either::First(None) => break, Either::Second(_) => continue, };
let Ok(args) = signal.args() else { continue };
let (name, ips, port, txt, scope_id) = resolve_browsed_all_families(
&avahi,
args.interface,
args.protocol,
args.name,
args.type_,
args.domain,
)
.await;
if !ips.is_empty() {
let txt_pairs = txt_pairs(&txt);
matter
.transport()
.try_deposit_mdns_browse(&MdnsRemoteService {
instance_name: DottedName(&name),
port: Some(port),
addrs: ips.iter().copied(),
txt: txt_pairs.iter().copied(),
scope_id,
});
}
}
if let Err(e) = browser.free().await {
warn!("Failed to free Avahi browser: {:?}", e);
}
}
}
async fn update_services(
&mut self,
matter: &Matter<'_>,
services: &HashSet<MatterLocalService>,
) -> Result<(), Error> {
for service in services {
if !self.services.contains_key(service) {
info!("Registering mDNS service: {:?}", service);
let path = self.register(matter, service).await?;
self.services.insert(service.clone(), path);
}
}
loop {
let removed = self
.services
.iter()
.find(|(service, _)| !services.contains(service));
if let Some((service, path)) = removed {
info!("Deregistering mDNS service: {:?}", service);
self.deregister(path.as_ref()).await?;
self.services.remove(&service.clone());
} else {
break;
}
}
Ok(())
}
async fn register(
&mut self,
matter: &Matter<'_>,
service: &MatterLocalService,
) -> Result<OwnedObjectPath, Error> {
let mut buf = [0u8; 512];
let (service, _) = service.service(matter.dev_det(), matter.port(), &mut buf)?;
let avahi = Server2Proxy::new(&self.connection).await?;
let path = avahi.entry_group_new().await?;
let group = EntryGroupProxy::builder(&self.connection)
.path(path.clone())?
.build()
.await?;
let mut txt_buf = Vec::new();
let offsets = service
.txt_kvs
.clone()
.map(|(k, v)| {
let start = txt_buf.len();
if v.is_empty() {
txt_buf.extend_from_slice(k.as_bytes());
} else {
write_unwrap!(&mut txt_buf, "{}={}", k, v);
}
txt_buf.len() - start
})
.collect::<Vec<_>>();
let mut txt_slice = txt_buf.as_slice();
let mut txt = Vec::new();
for offset in offsets {
let (entry, next_slice) = txt_slice.split_at(offset);
txt.push(entry);
txt_slice = next_slice;
}
group
.add_service(
AVAHI_IF_UNSPEC,
AVAHI_PROTO_UNSPEC,
0,
service.name,
service.service_protocol,
"",
"",
service.port,
&txt,
)
.await?;
for subtype in service.service_subtypes.clone() {
let avahi_subtype = format!("{}._sub.{}", subtype, service.service_protocol);
group
.add_service_subtype(
AVAHI_IF_UNSPEC,
AVAHI_PROTO_UNSPEC,
0,
service.name,
service.service_protocol,
"",
&avahi_subtype,
)
.await?;
}
group.commit().await?;
Ok(path)
}
async fn deregister(&self, path: ObjectPath<'_>) -> Result<(), Error> {
let group = EntryGroupProxy::builder(&self.connection)
.path(path)?
.build()
.await?;
group.free().await?;
Ok(())
}
}
fn txt_pairs(txt: &[Vec<u8>]) -> Vec<(&str, &str)> {
let mut pairs: Vec<(&str, &str)> = Vec::new();
for entry in txt {
if let Ok(s) = core::str::from_utf8(entry) {
match s.find('=') {
Some(eq) => pairs.push((&s[..eq], &s[eq + 1..])),
None => pairs.push((s, "")),
}
}
}
pairs
}
async fn resolve_all_families(
avahi: &Server2Proxy<'_>,
interface: i32,
label: &str,
service_type: &str,
) -> (Vec<IpAddr>, u16, Vec<Vec<u8>>, u32) {
let mut ips: Vec<IpAddr> = Vec::new();
let mut port = 0u16;
let mut txt: Vec<Vec<u8>> = Vec::new();
let mut scope_id = 0u32;
for aproto in AVAHI_RESOLVE_PROTOS {
match avahi
.resolve_service(
interface,
AVAHI_PROTO_UNSPEC,
label,
service_type,
"local",
aproto,
0,
)
.await
{
Ok((iface, _proto, _name, _type, _domain, _host, _ap, address, p, t, _fl)) => {
if let Ok(ip) = address.parse::<IpAddr>() {
if is_link_local_v6(&ip) && iface > 0 {
scope_id = iface as u32;
}
if !ips.contains(&ip) {
ips.push(ip);
}
port = p;
if txt.is_empty() {
txt = t;
}
}
}
Err(e) => debug!("Avahi resolve of {label} (aproto {aproto}) failed: {e:?}"),
}
}
(ips, port, txt, scope_id)
}
#[allow(clippy::type_complexity)]
async fn resolve_browsed_all_families(
avahi: &Server2Proxy<'_>,
interface: i32,
protocol: i32,
name: &str,
type_: &str,
domain: &str,
) -> (String, Vec<IpAddr>, u16, Vec<Vec<u8>>, u32) {
let mut instance_name = String::new();
let mut ips: Vec<IpAddr> = Vec::new();
let mut port = 0u16;
let mut txt: Vec<Vec<u8>> = Vec::new();
let mut scope_id = 0u32;
for aproto in AVAHI_RESOLVE_PROTOS {
match avahi
.resolve_service(interface, protocol, name, type_, domain, aproto, 0)
.await
{
Ok((iface, _proto, rname, _type, _domain, _host, _ap, address, p, t, _fl)) => {
if let Ok(ip) = address.parse::<IpAddr>() {
if is_link_local_v6(&ip) && iface > 0 {
scope_id = iface as u32;
}
if !ips.contains(&ip) {
ips.push(ip);
}
instance_name = rname;
port = p;
if txt.is_empty() {
txt = t;
}
} else {
warn!("Could not parse IP address: {address}");
}
}
Err(e) => debug!("Avahi resolve (browsed, aproto {aproto}) failed: {e:?}"),
}
}
(instance_name, ips, port, txt, scope_id)
}
fn is_link_local_v6(ip: &IpAddr) -> bool {
matches!(ip, IpAddr::V6(v6) if v6.is_unicast_link_local())
}