mdns_sd/
service_daemon.rs

1//! Service daemon for mDNS Service Discovery.
2
3// How DNS-based Service Discovery works in a nutshell:
4//
5// (excerpt from RFC 6763)
6// .... that a particular service instance can be
7//    described using a DNS SRV [RFC2782] and DNS TXT [RFC1035] record.
8//    The SRV record has a name of the form "<Instance>.<Service>.<Domain>"
9//    and gives the target host and port where the service instance can be
10//    reached.  The DNS TXT record of the same name gives additional
11//    information about this instance, in a structured form using key/value
12//    pairs, described in Section 6.  A client discovers the list of
13//    available instances of a given service type using a query for a DNS
14//    PTR [RFC1035] record with a name of the form "<Service>.<Domain>",
15//    which returns a set of zero or more names, which are the names of the
16//    aforementioned DNS SRV/TXT record pairs.
17//
18// Some naming conventions in this source code:
19//
20// `ty_domain` refers to service type together with domain name, i.e. <service>.<domain>.
21// Every <service> consists of two labels: service itself and "_udp." or "_tcp".
22// See RFC 6763 section 7 Service Names.
23//     for example: `_my-service._udp.local.`
24//
25// `fullname` refers to a full Service Instance Name, i.e. <instance>.<service>.<domain>
26//     for example: `my_home._my-service._udp.local.`
27//
28// In mDNS and DNS, the basic data structure is "Resource Record" (RR), where
29// in Service Discovery, the basic data structure is "Service Info". One Service Info
30// corresponds to a set of DNS Resource Records.
31#[cfg(feature = "logging")]
32use crate::log::{debug, trace};
33use crate::{
34    dns_cache::{current_time_millis, DnsCache},
35    dns_parser::{
36        ip_address_rr_type, DnsAddress, DnsEntryExt, DnsIncoming, DnsOutgoing, DnsPointer,
37        DnsRecordBox, DnsRecordExt, DnsSrv, DnsTxt, InterfaceId, RRType, ScopedIp,
38        CLASS_CACHE_FLUSH, CLASS_IN, FLAGS_AA, FLAGS_QR_QUERY, FLAGS_QR_RESPONSE, MAX_MSG_ABSOLUTE,
39    },
40    error::{e_fmt, Error, Result},
41    service_info::{split_sub_domain, DnsRegistry, MyIntf, Probe, ServiceInfo, ServiceStatus},
42    Receiver, ResolvedService, TxtProperties,
43};
44use flume::{bounded, Sender, TrySendError};
45use if_addrs::{IfAddr, Interface};
46use mio::{event::Source, net::UdpSocket as MioUdpSocket, Interest, Poll, Registry, Token};
47use socket2::Domain;
48use socket_pktinfo::PktInfoUdpSocket;
49use std::{
50    cmp::{self, Reverse},
51    collections::{hash_map::Entry, BinaryHeap, HashMap, HashSet},
52    fmt, io,
53    net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, UdpSocket},
54    str, thread,
55    time::Duration,
56    vec,
57};
58
59/// The default max length of the service name without domain, not including the
60/// leading underscore (`_`). It is set to 15 per
61/// [RFC 6763 section 7.2](https://www.rfc-editor.org/rfc/rfc6763#section-7.2).
62pub const SERVICE_NAME_LEN_MAX_DEFAULT: u8 = 15;
63
64/// The default interval for checking IP changes automatically.
65pub const IP_CHECK_INTERVAL_IN_SECS_DEFAULT: u32 = 5;
66
67/// The default time out for [ServiceDaemon::verify] is 10 seconds, per
68/// [RFC 6762 section 10.4](https://datatracker.ietf.org/doc/html/rfc6762#section-10.4)
69pub const VERIFY_TIMEOUT_DEFAULT: Duration = Duration::from_secs(10);
70
71const MDNS_PORT: u16 = 5353;
72const GROUP_ADDR_V4: Ipv4Addr = Ipv4Addr::new(224, 0, 0, 251);
73const GROUP_ADDR_V6: Ipv6Addr = Ipv6Addr::new(0xff02, 0, 0, 0, 0, 0, 0, 0xfb);
74const LOOPBACK_V4: Ipv4Addr = Ipv4Addr::new(127, 0, 0, 1);
75
76const RESOLVE_WAIT_IN_MILLIS: u64 = 500;
77
78/// Response status code for the service `unregister` call.
79#[derive(Debug)]
80pub enum UnregisterStatus {
81    /// Unregister was successful.
82    OK,
83    /// The service was not found in the registration.
84    NotFound,
85}
86
87/// Status code for the service daemon.
88#[derive(Debug, PartialEq, Clone, Eq)]
89#[non_exhaustive]
90pub enum DaemonStatus {
91    /// The daemon is running as normal.
92    Running,
93
94    /// The daemon has been shutdown.
95    Shutdown,
96}
97
98/// Different counters included in the metrics.
99/// Currently all counters are for outgoing packets.
100#[derive(Hash, Eq, PartialEq)]
101enum Counter {
102    Register,
103    RegisterResend,
104    Unregister,
105    UnregisterResend,
106    Browse,
107    ResolveHostname,
108    Respond,
109    CacheRefreshPTR,
110    CacheRefreshSrvTxt,
111    CacheRefreshAddr,
112    KnownAnswerSuppression,
113    CachedPTR,
114    CachedSRV,
115    CachedAddr,
116    CachedTxt,
117    CachedNSec,
118    CachedSubtype,
119    DnsRegistryProbe,
120    DnsRegistryActive,
121    DnsRegistryTimer,
122    DnsRegistryNameChange,
123    Timer,
124}
125
126impl fmt::Display for Counter {
127    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
128        match self {
129            Self::Register => write!(f, "register"),
130            Self::RegisterResend => write!(f, "register-resend"),
131            Self::Unregister => write!(f, "unregister"),
132            Self::UnregisterResend => write!(f, "unregister-resend"),
133            Self::Browse => write!(f, "browse"),
134            Self::ResolveHostname => write!(f, "resolve-hostname"),
135            Self::Respond => write!(f, "respond"),
136            Self::CacheRefreshPTR => write!(f, "cache-refresh-ptr"),
137            Self::CacheRefreshSrvTxt => write!(f, "cache-refresh-srv-txt"),
138            Self::CacheRefreshAddr => write!(f, "cache-refresh-addr"),
139            Self::KnownAnswerSuppression => write!(f, "known-answer-suppression"),
140            Self::CachedPTR => write!(f, "cached-ptr"),
141            Self::CachedSRV => write!(f, "cached-srv"),
142            Self::CachedAddr => write!(f, "cached-addr"),
143            Self::CachedTxt => write!(f, "cached-txt"),
144            Self::CachedNSec => write!(f, "cached-nsec"),
145            Self::CachedSubtype => write!(f, "cached-subtype"),
146            Self::DnsRegistryProbe => write!(f, "dns-registry-probe"),
147            Self::DnsRegistryActive => write!(f, "dns-registry-active"),
148            Self::DnsRegistryTimer => write!(f, "dns-registry-timer"),
149            Self::DnsRegistryNameChange => write!(f, "dns-registry-name-change"),
150            Self::Timer => write!(f, "timer"),
151        }
152    }
153}
154
155/// A wrapper around UDP socket used by the mDNS daemon.
156///
157/// We do this because `mio` does not support PKTINFO and
158/// does not provide a way to implement `Source` trait directly and safely.
159struct MyUdpSocket {
160    /// The underlying socket that supports control messages like
161    /// `IP_PKTINFO` for IPv4 and `IPV6_PKTINFO` for IPv6.
162    pktinfo: PktInfoUdpSocket,
163
164    /// The mio UDP socket that is a clone of `pktinfo` and
165    /// is used for event polling.
166    mio: MioUdpSocket,
167}
168
169impl MyUdpSocket {
170    pub fn new(pktinfo: PktInfoUdpSocket) -> io::Result<Self> {
171        let std_sock = pktinfo.try_clone_std()?;
172        let mio = MioUdpSocket::from_std(std_sock);
173
174        Ok(Self { pktinfo, mio })
175    }
176}
177
178/// Implements the mio `Source` trait so that we can use `MyUdpSocket` with `Poll`.
179impl Source for MyUdpSocket {
180    fn register(
181        &mut self,
182        registry: &Registry,
183        token: Token,
184        interests: Interest,
185    ) -> io::Result<()> {
186        self.mio.register(registry, token, interests)
187    }
188
189    fn reregister(
190        &mut self,
191        registry: &Registry,
192        token: Token,
193        interests: Interest,
194    ) -> io::Result<()> {
195        self.mio.reregister(registry, token, interests)
196    }
197
198    fn deregister(&mut self, registry: &Registry) -> std::io::Result<()> {
199        self.mio.deregister(registry)
200    }
201}
202
203/// The metrics is a HashMap of (name_key, i64_value).
204/// The main purpose is to help monitoring the mDNS packet traffic.
205pub type Metrics = HashMap<String, i64>;
206
207const IPV4_SOCK_EVENT_KEY: usize = 4; // Pick a key just to indicate IPv4.
208const IPV6_SOCK_EVENT_KEY: usize = 6; // Pick a key just to indicate IPv6.
209const SIGNAL_SOCK_EVENT_KEY: usize = usize::MAX - 1; // avoid to overlap with zc.poll_ids
210
211/// A daemon thread for mDNS
212///
213/// This struct provides a handle and an API to the daemon. It is cloneable.
214#[derive(Clone)]
215pub struct ServiceDaemon {
216    /// Sender handle of the channel to the daemon.
217    sender: Sender<Command>,
218
219    /// Send to this addr to signal that a `Command` is coming.
220    ///
221    /// The daemon listens on this addr together with other mDNS sockets,
222    /// to avoid busy polling the flume channel. If there is a way to poll
223    /// the channel and mDNS sockets together, then this can be removed.
224    signal_addr: SocketAddr,
225}
226
227impl ServiceDaemon {
228    /// Creates a new daemon and spawns a thread to run the daemon.
229    ///
230    /// The daemon (re)uses the default mDNS port 5353. To keep it simple, we don't
231    /// ask callers to set the port.
232    pub fn new() -> Result<Self> {
233        // Use port 0 to allow the system assign a random available port,
234        // no need for a pre-defined port number.
235        let signal_addr = SocketAddrV4::new(LOOPBACK_V4, 0);
236
237        let signal_sock = UdpSocket::bind(signal_addr)
238            .map_err(|e| e_fmt!("failed to create signal_sock for daemon: {}", e))?;
239
240        // Get the socket with the OS chosen port
241        let signal_addr = signal_sock
242            .local_addr()
243            .map_err(|e| e_fmt!("failed to get signal sock addr: {}", e))?;
244
245        // Must be nonblocking so we can listen to it together with mDNS sockets.
246        signal_sock
247            .set_nonblocking(true)
248            .map_err(|e| e_fmt!("failed to set nonblocking for signal socket: {}", e))?;
249
250        let poller = Poll::new().map_err(|e| e_fmt!("failed to create mio Poll: {e}"))?;
251
252        let (sender, receiver) = bounded(100);
253
254        // Spawn the daemon thread
255        let mio_sock = MioUdpSocket::from_std(signal_sock);
256        thread::Builder::new()
257            .name("mDNS_daemon".to_string())
258            .spawn(move || Self::daemon_thread(mio_sock, poller, receiver))
259            .map_err(|e| e_fmt!("thread builder failed to spawn: {}", e))?;
260
261        Ok(Self {
262            sender,
263            signal_addr,
264        })
265    }
266
267    /// Sends `cmd` to the daemon via its channel, and sends a signal
268    /// to its sock addr to notify.
269    fn send_cmd(&self, cmd: Command) -> Result<()> {
270        let cmd_name = cmd.to_string();
271
272        // First, send to the flume channel.
273        self.sender.try_send(cmd).map_err(|e| match e {
274            TrySendError::Full(_) => Error::Again,
275            e => e_fmt!("flume::channel::send failed: {}", e),
276        })?;
277
278        // Second, send a signal to notify the daemon.
279        let addr = SocketAddrV4::new(LOOPBACK_V4, 0);
280        let socket = UdpSocket::bind(addr)
281            .map_err(|e| e_fmt!("Failed to create socket to send signal: {}", e))?;
282        socket
283            .send_to(cmd_name.as_bytes(), self.signal_addr)
284            .map_err(|e| {
285                e_fmt!(
286                    "signal socket send_to {} ({}) failed: {}",
287                    self.signal_addr,
288                    cmd_name,
289                    e
290                )
291            })?;
292
293        Ok(())
294    }
295
296    /// Starts browsing for a specific service type.
297    ///
298    /// `service_type` must end with a valid mDNS domain: '._tcp.local.' or '._udp.local.'
299    ///
300    /// Returns a channel `Receiver` to receive events about the service. The caller
301    /// can call `.recv_async().await` on this receiver to handle events in an
302    /// async environment or call `.recv()` in a sync environment.
303    ///
304    /// When a new instance is found, the daemon automatically tries to resolve, i.e.
305    /// finding more details, i.e. SRV records and TXT records.
306    pub fn browse(&self, service_type: &str) -> Result<Receiver<ServiceEvent>> {
307        check_domain_suffix(service_type)?;
308
309        let (resp_s, resp_r) = bounded(10);
310        self.send_cmd(Command::Browse(service_type.to_string(), 1, resp_s))?;
311        Ok(resp_r)
312    }
313
314    /// Stops searching for a specific service type.
315    ///
316    /// When an error is returned, the caller should retry only when
317    /// the error is `Error::Again`, otherwise should log and move on.
318    pub fn stop_browse(&self, ty_domain: &str) -> Result<()> {
319        self.send_cmd(Command::StopBrowse(ty_domain.to_string()))
320    }
321
322    /// Starts querying for the ip addresses of a hostname.
323    ///
324    /// Returns a channel `Receiver` to receive events about the hostname.
325    /// The caller can call `.recv_async().await` on this receiver to handle events in an
326    /// async environment or call `.recv()` in a sync environment.
327    ///
328    /// The `timeout` is specified in milliseconds.
329    pub fn resolve_hostname(
330        &self,
331        hostname: &str,
332        timeout: Option<u64>,
333    ) -> Result<Receiver<HostnameResolutionEvent>> {
334        check_hostname(hostname)?;
335        let (resp_s, resp_r) = bounded(10);
336        self.send_cmd(Command::ResolveHostname(
337            hostname.to_string(),
338            1,
339            resp_s,
340            timeout,
341        ))?;
342        Ok(resp_r)
343    }
344
345    /// Stops querying for the ip addresses of a hostname.
346    ///
347    /// When an error is returned, the caller should retry only when
348    /// the error is `Error::Again`, otherwise should log and move on.
349    pub fn stop_resolve_hostname(&self, hostname: &str) -> Result<()> {
350        self.send_cmd(Command::StopResolveHostname(hostname.to_string()))
351    }
352
353    /// Registers a service provided by this host.
354    ///
355    /// If `service_info` has no addresses yet and its `addr_auto` is enabled,
356    /// this method will automatically fill in addresses from the host.
357    ///
358    /// To re-announce a service with an updated `service_info`, just call
359    /// this `register` function again. No need to call `unregister` first.
360    pub fn register(&self, service_info: ServiceInfo) -> Result<()> {
361        check_service_name(service_info.get_fullname())?;
362        check_hostname(service_info.get_hostname())?;
363
364        self.send_cmd(Command::Register(service_info))
365    }
366
367    /// Unregisters a service. This is a graceful shutdown of a service.
368    ///
369    /// Returns a channel receiver that is used to receive the status code
370    /// of the unregister.
371    ///
372    /// When an error is returned, the caller should retry only when
373    /// the error is `Error::Again`, otherwise should log and move on.
374    pub fn unregister(&self, fullname: &str) -> Result<Receiver<UnregisterStatus>> {
375        let (resp_s, resp_r) = bounded(1);
376        self.send_cmd(Command::Unregister(fullname.to_lowercase(), resp_s))?;
377        Ok(resp_r)
378    }
379
380    /// Starts to monitor events from the daemon.
381    ///
382    /// Returns a channel [`Receiver`] of [`DaemonEvent`].
383    pub fn monitor(&self) -> Result<Receiver<DaemonEvent>> {
384        let (resp_s, resp_r) = bounded(100);
385        self.send_cmd(Command::Monitor(resp_s))?;
386        Ok(resp_r)
387    }
388
389    /// Shuts down the daemon thread and returns a channel to receive the status.
390    ///
391    /// When an error is returned, the caller should retry only when
392    /// the error is `Error::Again`, otherwise should log and move on.
393    pub fn shutdown(&self) -> Result<Receiver<DaemonStatus>> {
394        let (resp_s, resp_r) = bounded(1);
395        self.send_cmd(Command::Exit(resp_s))?;
396        Ok(resp_r)
397    }
398
399    /// Returns the status of the daemon.
400    ///
401    /// When an error is returned, the caller should retry only when
402    /// the error is `Error::Again`, otherwise should consider the daemon
403    /// stopped working and move on.
404    pub fn status(&self) -> Result<Receiver<DaemonStatus>> {
405        let (resp_s, resp_r) = bounded(1);
406
407        if self.sender.is_disconnected() {
408            resp_s
409                .send(DaemonStatus::Shutdown)
410                .map_err(|e| e_fmt!("failed to send daemon status to the client: {}", e))?;
411        } else {
412            self.send_cmd(Command::GetStatus(resp_s))?;
413        }
414
415        Ok(resp_r)
416    }
417
418    /// Returns a channel receiver for the metrics, e.g. input/output counters.
419    ///
420    /// The metrics returned is a snapshot. Hence the caller should call
421    /// this method repeatedly if they want to monitor the metrics continuously.
422    pub fn get_metrics(&self) -> Result<Receiver<Metrics>> {
423        let (resp_s, resp_r) = bounded(1);
424        self.send_cmd(Command::GetMetrics(resp_s))?;
425        Ok(resp_r)
426    }
427
428    /// Change the max length allowed for a service name.
429    ///
430    /// As RFC 6763 defines a length max for a service name, a user should not call
431    /// this method unless they have to. See [`SERVICE_NAME_LEN_MAX_DEFAULT`].
432    ///
433    /// `len_max` is capped at an internal limit, which is currently 30.
434    pub fn set_service_name_len_max(&self, len_max: u8) -> Result<()> {
435        const SERVICE_NAME_LEN_MAX_LIMIT: u8 = 30; // Double the default length max.
436
437        if len_max > SERVICE_NAME_LEN_MAX_LIMIT {
438            return Err(Error::Msg(format!(
439                "service name length max {len_max} is too large"
440            )));
441        }
442
443        self.send_cmd(Command::SetOption(DaemonOption::ServiceNameLenMax(len_max)))
444    }
445
446    /// Change the interval for checking IP changes automatically.
447    ///
448    /// Setting the interval to 0 disables the IP check.
449    ///
450    /// See [`IP_CHECK_INTERVAL_IN_SECS_DEFAULT`] for the default interval.
451    pub fn set_ip_check_interval(&self, interval_in_secs: u32) -> Result<()> {
452        let interval_in_millis = interval_in_secs as u64 * 1000;
453        self.send_cmd(Command::SetOption(DaemonOption::IpCheckInterval(
454            interval_in_millis,
455        )))
456    }
457
458    /// Get the current interval in seconds for checking IP changes automatically.
459    pub fn get_ip_check_interval(&self) -> Result<u32> {
460        let (resp_s, resp_r) = bounded(1);
461        self.send_cmd(Command::GetOption(resp_s))?;
462
463        let option = resp_r
464            .recv_timeout(Duration::from_secs(10))
465            .map_err(|e| e_fmt!("failed to receive ip check interval: {}", e))?;
466        let ip_check_interval_in_secs = option.ip_check_interval / 1000;
467        Ok(ip_check_interval_in_secs as u32)
468    }
469
470    /// Include interfaces that match `if_kind` for this service daemon.
471    ///
472    /// For example:
473    /// ```ignore
474    ///     daemon.enable_interface("en0")?;
475    /// ```
476    pub fn enable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
477        let if_kind_vec = if_kind.into_vec();
478        self.send_cmd(Command::SetOption(DaemonOption::EnableInterface(
479            if_kind_vec.kinds,
480        )))
481    }
482
483    /// Ignore/exclude interfaces that match `if_kind` for this daemon.
484    ///
485    /// For example:
486    /// ```ignore
487    ///     daemon.disable_interface(IfKind::IPv6)?;
488    /// ```
489    pub fn disable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
490        let if_kind_vec = if_kind.into_vec();
491        self.send_cmd(Command::SetOption(DaemonOption::DisableInterface(
492            if_kind_vec.kinds,
493        )))
494    }
495
496    #[cfg(test)]
497    pub fn test_down_interface(&self, ifname: &str) -> Result<()> {
498        self.send_cmd(Command::SetOption(DaemonOption::TestDownInterface(
499            ifname.to_string(),
500        )))
501    }
502
503    #[cfg(test)]
504    pub fn test_up_interface(&self, ifname: &str) -> Result<()> {
505        self.send_cmd(Command::SetOption(DaemonOption::TestUpInterface(
506            ifname.to_string(),
507        )))
508    }
509
510    /// Enable or disable the loopback for locally sent multicast packets in IPv4.
511    ///
512    /// By default, multicast loop is enabled for IPv4. When disabled, a querier will not
513    /// receive announcements from a responder on the same host.
514    ///
515    /// Reference: <https://learn.microsoft.com/en-us/windows/win32/winsock/ip-multicast-2>
516    ///
517    /// "The Winsock version of the IP_MULTICAST_LOOP option is semantically different than
518    /// the UNIX version of the IP_MULTICAST_LOOP option:
519    ///
520    /// In Winsock, the IP_MULTICAST_LOOP option applies only to the receive path.
521    /// In the UNIX version, the IP_MULTICAST_LOOP option applies to the send path."
522    ///
523    /// Which means, in order NOT to receive localhost announcements, you want to call
524    /// this API on the querier side on Windows, but on the responder side on Unix.
525    pub fn set_multicast_loop_v4(&self, on: bool) -> Result<()> {
526        self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV4(on)))
527    }
528
529    /// Enable or disable the loopback for locally sent multicast packets in IPv6.
530    ///
531    /// By default, multicast loop is enabled for IPv6. When disabled, a querier will not
532    /// receive announcements from a responder on the same host.
533    ///
534    /// Reference: <https://learn.microsoft.com/en-us/windows/win32/winsock/ip-multicast-2>
535    ///
536    /// "The Winsock version of the IP_MULTICAST_LOOP option is semantically different than
537    /// the UNIX version of the IP_MULTICAST_LOOP option:
538    ///
539    /// In Winsock, the IP_MULTICAST_LOOP option applies only to the receive path.
540    /// In the UNIX version, the IP_MULTICAST_LOOP option applies to the send path."
541    ///
542    /// Which means, in order NOT to receive localhost announcements, you want to call
543    /// this API on the querier side on Windows, but on the responder side on Unix.
544    pub fn set_multicast_loop_v6(&self, on: bool) -> Result<()> {
545        self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV6(on)))
546    }
547
548    /// Enable or disable the use of [ServiceEvent::ServiceData] instead of [ServiceEvent::ServiceResolved].
549    ///
550    /// This is recommended for clients using IPv6 as it will include scope_id in the address.
551    pub fn use_service_data(&self, on: bool) -> Result<()> {
552        self.send_cmd(Command::SetOption(DaemonOption::UseServiceData(on)))
553    }
554
555    /// Proactively confirms whether a service instance still valid.
556    ///
557    /// This call will issue queries for a service instance's SRV record and Address records.
558    ///
559    /// For `timeout`, most users should use [VERIFY_TIMEOUT_DEFAULT]
560    /// unless there is a reason not to follow RFC.
561    ///
562    /// If no response is received within `timeout`, the current resource
563    /// records will be flushed, and if needed, `ServiceRemoved` event will be
564    /// sent to active queriers.
565    ///
566    /// Reference: [RFC 6762](https://datatracker.ietf.org/doc/html/rfc6762#section-10.4)
567    pub fn verify(&self, instance_fullname: String, timeout: Duration) -> Result<()> {
568        self.send_cmd(Command::Verify(instance_fullname, timeout))
569    }
570
571    fn daemon_thread(signal_sock: MioUdpSocket, poller: Poll, receiver: Receiver<Command>) {
572        let mut zc = Zeroconf::new(signal_sock, poller);
573
574        if let Some(cmd) = zc.run(receiver) {
575            match cmd {
576                Command::Exit(resp_s) => {
577                    // It is guaranteed that the receiver already dropped,
578                    // i.e. the daemon command channel closed.
579                    if let Err(e) = resp_s.send(DaemonStatus::Shutdown) {
580                        debug!("exit: failed to send response of shutdown: {}", e);
581                    }
582                }
583                _ => {
584                    debug!("Unexpected command: {:?}", cmd);
585                }
586            }
587        }
588    }
589}
590
591/// Creates a new UDP socket that uses `intf` to send and recv multicast.
592fn _new_socket_bind(intf: &Interface, should_loop: bool) -> Result<MyUdpSocket> {
593    // Use the same socket for receiving and sending multicast packets.
594    // Such socket has to bind to INADDR_ANY or IN6ADDR_ANY.
595    let intf_ip = &intf.ip();
596    match intf_ip {
597        IpAddr::V4(ip) => {
598            let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), MDNS_PORT);
599            let sock = new_socket(addr.into(), true)?;
600
601            // Join mDNS group to receive packets.
602            sock.join_multicast_v4(&GROUP_ADDR_V4, ip)
603                .map_err(|e| e_fmt!("join multicast group on addr {}: {}", intf_ip, e))?;
604
605            // Set IP_MULTICAST_IF to send packets.
606            sock.set_multicast_if_v4(ip)
607                .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
608
609            // Per RFC 6762 section 11:
610            // "All Multicast DNS responses (including responses sent via unicast) SHOULD
611            // be sent with IP TTL set to 255."
612            // Here we set the TTL to 255 for multicast as we don't support unicast yet.
613            sock.set_multicast_ttl_v4(255)
614                .map_err(|e| e_fmt!("set set_multicast_ttl_v4 on addr {}: {}", ip, e))?;
615
616            if !should_loop {
617                sock.set_multicast_loop_v4(false)
618                    .map_err(|e| e_fmt!("failed to set multicast loop v4 for {ip}: {e}"))?;
619            }
620
621            // Test if we can send packets successfully.
622            let multicast_addr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
623            let test_packets = DnsOutgoing::new(0).to_data_on_wire();
624            for packet in test_packets {
625                sock.send_to(&packet, &multicast_addr)
626                    .map_err(|e| e_fmt!("send multicast packet on addr {}: {}", ip, e))?;
627            }
628            MyUdpSocket::new(sock)
629                .map_err(|e| e_fmt!("failed to create MySocket for interface {}: {e}", intf.name))
630        }
631        IpAddr::V6(ip) => {
632            let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), MDNS_PORT, 0, 0);
633            let sock = new_socket(addr.into(), true)?;
634
635            let if_index = intf.index.unwrap_or(0);
636
637            // Join mDNS group to receive packets.
638            sock.join_multicast_v6(&GROUP_ADDR_V6, if_index)
639                .map_err(|e| e_fmt!("join multicast group on addr {}: {}", ip, e))?;
640
641            // Set IPV6_MULTICAST_IF to send packets.
642            sock.set_multicast_if_v6(if_index)
643                .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
644
645            // We are not sending multicast packets to test this socket as there might
646            // be many IPv6 interfaces on a host and could cause such send error:
647            // "No buffer space available (os error 55)".
648
649            MyUdpSocket::new(sock)
650                .map_err(|e| e_fmt!("failed to create MySocket for interface {}: {e}", intf.name))
651        }
652    }
653}
654
655/// Creates a new UDP socket to bind to `port` with REUSEPORT option.
656/// `non_block` indicates whether to set O_NONBLOCK for the socket.
657fn new_socket(addr: SocketAddr, non_block: bool) -> Result<PktInfoUdpSocket> {
658    let domain = match addr {
659        SocketAddr::V4(_) => socket2::Domain::IPV4,
660        SocketAddr::V6(_) => socket2::Domain::IPV6,
661    };
662
663    let fd = PktInfoUdpSocket::new(domain).map_err(|e| e_fmt!("create socket failed: {}", e))?;
664
665    fd.set_reuse_address(true)
666        .map_err(|e| e_fmt!("set ReuseAddr failed: {}", e))?;
667    #[cfg(unix)] // this is currently restricted to Unix's in socket2
668    fd.set_reuse_port(true)
669        .map_err(|e| e_fmt!("set ReusePort failed: {}", e))?;
670
671    if non_block {
672        fd.set_nonblocking(true)
673            .map_err(|e| e_fmt!("set O_NONBLOCK: {}", e))?;
674    }
675
676    fd.bind(&addr.into())
677        .map_err(|e| e_fmt!("socket bind to {} failed: {}", &addr, e))?;
678
679    trace!("new socket bind to {}", &addr);
680    Ok(fd)
681}
682
683/// Specify a UNIX timestamp in millis to run `command` for the next time.
684struct ReRun {
685    /// UNIX timestamp in millis.
686    next_time: u64,
687    command: Command,
688}
689
690/// Specify kinds of interfaces. It is used to enable or to disable interfaces in the daemon.
691///
692/// Note that for ergonomic reasons, `From<&str>` and `From<IpAddr>` are implemented.
693#[derive(Debug, Clone)]
694#[non_exhaustive]
695pub enum IfKind {
696    /// All interfaces.
697    All,
698
699    /// All IPv4 interfaces.
700    IPv4,
701
702    /// All IPv6 interfaces.
703    IPv6,
704
705    /// By the interface name, for example "en0"
706    Name(String),
707
708    /// By an IPv4 or IPv6 address.
709    Addr(IpAddr),
710
711    /// 127.0.0.1 (or anything in 127.0.0.0/8), disabled by default.
712    ///
713    /// Use [ServiceDaemon::enable_interface] to support registering services on loopback interfaces,
714    /// which is required by some use cases (e.g., OSCQuery) that publish via mDNS.
715    LoopbackV4,
716
717    /// ::1/128, disabled by default.
718    LoopbackV6,
719}
720
721impl IfKind {
722    /// Checks if `intf` matches with this interface kind.
723    fn matches(&self, intf: &Interface) -> bool {
724        match self {
725            Self::All => true,
726            Self::IPv4 => intf.ip().is_ipv4(),
727            Self::IPv6 => intf.ip().is_ipv6(),
728            Self::Name(ifname) => ifname == &intf.name,
729            Self::Addr(addr) => addr == &intf.ip(),
730            Self::LoopbackV4 => intf.is_loopback() && intf.ip().is_ipv4(),
731            Self::LoopbackV6 => intf.is_loopback() && intf.ip().is_ipv6(),
732        }
733    }
734}
735
736/// The first use case of specifying an interface was to
737/// use an interface name. Hence adding this for ergonomic reasons.
738impl From<&str> for IfKind {
739    fn from(val: &str) -> Self {
740        Self::Name(val.to_string())
741    }
742}
743
744impl From<&String> for IfKind {
745    fn from(val: &String) -> Self {
746        Self::Name(val.to_string())
747    }
748}
749
750/// Still for ergonomic reasons.
751impl From<IpAddr> for IfKind {
752    fn from(val: IpAddr) -> Self {
753        Self::Addr(val)
754    }
755}
756
757/// A list of `IfKind` that can be used to match interfaces.
758pub struct IfKindVec {
759    kinds: Vec<IfKind>,
760}
761
762/// A trait that converts a type into a Vec of `IfKind`.
763pub trait IntoIfKindVec {
764    fn into_vec(self) -> IfKindVec;
765}
766
767impl<T: Into<IfKind>> IntoIfKindVec for T {
768    fn into_vec(self) -> IfKindVec {
769        let if_kind: IfKind = self.into();
770        IfKindVec {
771            kinds: vec![if_kind],
772        }
773    }
774}
775
776impl<T: Into<IfKind>> IntoIfKindVec for Vec<T> {
777    fn into_vec(self) -> IfKindVec {
778        let kinds: Vec<IfKind> = self.into_iter().map(|x| x.into()).collect();
779        IfKindVec { kinds }
780    }
781}
782
783/// Selection of interfaces.
784struct IfSelection {
785    /// The interfaces to be selected.
786    if_kind: IfKind,
787
788    /// Whether the `if_kind` should be enabled or not.
789    selected: bool,
790}
791
792/// A struct holding the state. It was inspired by `zeroconf` package in Python.
793struct Zeroconf {
794    /// Local interfaces keyed by interface index.
795    my_intfs: HashMap<u32, MyIntf>,
796
797    /// A common socket for IPv4 interfaces.
798    ipv4_sock: MyUdpSocket,
799
800    /// A common socket for IPv6 interfaces.
801    ipv6_sock: MyUdpSocket,
802
803    /// Local registered services, keyed by service full names.
804    my_services: HashMap<String, ServiceInfo>,
805
806    /// Received DNS records.
807    cache: DnsCache,
808
809    /// Registered service records, keyed by interface index.
810    dns_registry_map: HashMap<u32, DnsRegistry>,
811
812    /// Active "Browse" commands.
813    service_queriers: HashMap<String, Sender<ServiceEvent>>, // <ty_domain, channel::sender>
814
815    /// Active "ResolveHostname" commands.
816    ///
817    /// The timestamps are set at the future timestamp when the command should timeout.
818    /// `hostname` is case-insensitive and stored in lowercase.
819    hostname_resolvers: HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>, // <hostname, (channel::sender, UNIX timestamp in millis)>
820
821    /// All repeating transmissions.
822    retransmissions: Vec<ReRun>,
823
824    counters: Metrics,
825
826    /// Waits for incoming packets.
827    poller: Poll,
828
829    /// Channels to notify events.
830    monitors: Vec<Sender<DaemonEvent>>,
831
832    /// Options
833    service_name_len_max: u8,
834
835    /// Interval in millis to check IP address changes.
836    ip_check_interval: u64,
837
838    /// All interface selections called to the daemon.
839    if_selections: Vec<IfSelection>,
840
841    /// Socket for signaling.
842    signal_sock: MioUdpSocket,
843
844    /// Timestamps marking where we need another iteration of the run loop,
845    /// to react to events like retransmissions, cache refreshes, interface IP address changes, etc.
846    ///
847    /// When the run loop goes through a single iteration, it will
848    /// set its timeout to the earliest timer in this list.
849    timers: BinaryHeap<Reverse<u64>>,
850
851    status: DaemonStatus,
852
853    /// Service instances that are pending for resolving SRV and TXT.
854    pending_resolves: HashSet<String>,
855
856    /// Service instances that are already resolved.
857    resolved: HashSet<String>,
858
859    multicast_loop_v4: bool,
860
861    multicast_loop_v6: bool,
862
863    /// Whether to use [ServiceEvent::ServiceData] instead of [ServiceEvent::ServiceResolved].
864    use_service_data: bool,
865
866    #[cfg(test)]
867    test_down_interfaces: HashSet<String>,
868}
869
870/// Join the multicast group for the given interface.
871fn join_multicast_group(my_sock: &PktInfoUdpSocket, intf: &Interface) -> Result<()> {
872    let intf_ip = &intf.ip();
873    match intf_ip {
874        IpAddr::V4(ip) => {
875            // Join mDNS group to receive packets.
876            debug!("join multicast group V4 on addr {}", ip);
877            my_sock
878                .join_multicast_v4(&GROUP_ADDR_V4, ip)
879                .map_err(|e| e_fmt!("PKT join multicast group on addr {}: {}", intf_ip, e))?;
880        }
881        IpAddr::V6(ip) => {
882            let if_index = intf.index.unwrap_or(0);
883            // Join mDNS group to receive packets.
884            debug!(
885                "join multicast group V6 on addr {} with index {}",
886                ip, if_index
887            );
888            my_sock
889                .join_multicast_v6(&GROUP_ADDR_V6, if_index)
890                .map_err(|e| e_fmt!("PKT join multicast group on addr {}: {}", ip, e))?;
891        }
892    }
893    Ok(())
894}
895
896impl Zeroconf {
897    fn new(signal_sock: MioUdpSocket, poller: Poll) -> Self {
898        // Get interfaces.
899        let my_ifaddrs = my_ip_interfaces(false);
900
901        // Create a socket for every IP addr.
902        // Note: it is possible that `my_ifaddrs` contains the same IP addr with different interface names,
903        // or the same interface name with different IP addrs.
904        let mut my_intfs = HashMap::new();
905        let mut dns_registry_map = HashMap::new();
906
907        // Use the same socket for receiving and sending multicast packets.
908        // Such socket has to bind to INADDR_ANY or IN6ADDR_ANY.
909        let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), MDNS_PORT);
910        let sock = new_socket(addr.into(), true).unwrap();
911
912        // Per RFC 6762 section 11:
913        // "All Multicast DNS responses (including responses sent via unicast) SHOULD
914        // be sent with IP TTL set to 255."
915        // Here we set the TTL to 255 for multicast as we don't support unicast yet.
916        sock.set_multicast_ttl_v4(255)
917            .map_err(|e| e_fmt!("set set_multicast_ttl_v4 on addr: {}", e))
918            .unwrap();
919
920        // This clones a socket.
921        let ipv4_sock = MyUdpSocket::new(sock).unwrap();
922
923        let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), MDNS_PORT, 0, 0);
924        let sock = new_socket(addr.into(), true).unwrap();
925
926        // Per RFC 6762 section 11:
927        // "All Multicast DNS responses (including responses sent via unicast) SHOULD
928        // be sent with IP TTL set to 255."
929        sock.set_multicast_hops_v6(255)
930            .map_err(|e| e_fmt!("set set_multicast_hops_v6: {}", e))
931            .unwrap();
932
933        // This clones the ipv6 socket.
934        let ipv6_sock = MyUdpSocket::new(sock).unwrap();
935
936        // Configure sockets to join multicast groups.
937        for intf in my_ifaddrs {
938            let sock = if intf.ip().is_ipv4() {
939                &ipv4_sock
940            } else {
941                &ipv6_sock
942            };
943
944            if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
945                debug!(
946                    "config socket to join multicast: {}: {e}. Skipped.",
947                    &intf.ip()
948                );
949            }
950
951            let if_index = intf.index.unwrap_or(0);
952
953            // Add this interface address if not already present.
954            dns_registry_map
955                .entry(if_index)
956                .or_insert_with(DnsRegistry::new);
957
958            my_intfs
959                .entry(if_index)
960                .and_modify(|v: &mut MyIntf| {
961                    v.addrs.insert(intf.addr.clone());
962                })
963                .or_insert(MyIntf {
964                    name: intf.name.clone(),
965                    index: if_index,
966                    addrs: HashSet::from([intf.addr]),
967                });
968        }
969
970        let monitors = Vec::new();
971        let service_name_len_max = SERVICE_NAME_LEN_MAX_DEFAULT;
972        let ip_check_interval = IP_CHECK_INTERVAL_IN_SECS_DEFAULT as u64 * 1000;
973
974        let timers = BinaryHeap::new();
975
976        // Disable loopback by default.
977        let if_selections = vec![
978            IfSelection {
979                if_kind: IfKind::LoopbackV4,
980                selected: false,
981            },
982            IfSelection {
983                if_kind: IfKind::LoopbackV6,
984                selected: false,
985            },
986        ];
987
988        let status = DaemonStatus::Running;
989
990        Self {
991            my_intfs,
992            ipv4_sock,
993            ipv6_sock,
994            // poll_ids: HashMap::new(),
995            my_services: HashMap::new(),
996            cache: DnsCache::new(),
997            dns_registry_map,
998            hostname_resolvers: HashMap::new(),
999            service_queriers: HashMap::new(),
1000            retransmissions: Vec::new(),
1001            counters: HashMap::new(),
1002            poller,
1003            monitors,
1004            service_name_len_max,
1005            ip_check_interval,
1006            if_selections,
1007            signal_sock,
1008            timers,
1009            status,
1010            pending_resolves: HashSet::new(),
1011            resolved: HashSet::new(),
1012            multicast_loop_v4: true,
1013            multicast_loop_v6: true,
1014            use_service_data: false,
1015
1016            #[cfg(test)]
1017            test_down_interfaces: HashSet::new(),
1018        }
1019    }
1020
1021    /// The main event loop of the daemon thread
1022    ///
1023    /// In each round, it will:
1024    /// 1. select the listening sockets with a timeout.
1025    /// 2. process the incoming packets if any.
1026    /// 3. try_recv on its channel and execute commands.
1027    /// 4. announce its registered services.
1028    /// 5. process retransmissions if any.
1029    fn run(&mut self, receiver: Receiver<Command>) -> Option<Command> {
1030        // Add the daemon's signal socket to the poller.
1031        if let Err(e) = self.poller.registry().register(
1032            &mut self.signal_sock,
1033            mio::Token(SIGNAL_SOCK_EVENT_KEY),
1034            mio::Interest::READABLE,
1035        ) {
1036            debug!("failed to add signal socket to the poller: {}", e);
1037            return None;
1038        }
1039
1040        if let Err(e) = self.poller.registry().register(
1041            &mut self.ipv4_sock,
1042            mio::Token(IPV4_SOCK_EVENT_KEY),
1043            mio::Interest::READABLE,
1044        ) {
1045            debug!("failed to register ipv4 socket: {}", e);
1046            return None;
1047        }
1048
1049        if let Err(e) = self.poller.registry().register(
1050            &mut self.ipv6_sock,
1051            mio::Token(IPV6_SOCK_EVENT_KEY),
1052            mio::Interest::READABLE,
1053        ) {
1054            debug!("failed to register ipv6 socket: {}", e);
1055            return None;
1056        }
1057
1058        // Setup timer for IP checks.
1059        let mut next_ip_check = if self.ip_check_interval > 0 {
1060            current_time_millis() + self.ip_check_interval
1061        } else {
1062            0
1063        };
1064
1065        if next_ip_check > 0 {
1066            self.add_timer(next_ip_check);
1067        }
1068
1069        // Start the run loop.
1070
1071        let mut events = mio::Events::with_capacity(1024);
1072        loop {
1073            let now = current_time_millis();
1074
1075            let earliest_timer = self.peek_earliest_timer();
1076            let timeout = earliest_timer.map(|timer| {
1077                // If `timer` already passed, set `timeout` to be 1ms.
1078                let millis = if timer > now { timer - now } else { 1 };
1079                Duration::from_millis(millis)
1080            });
1081
1082            // Process incoming packets, command events and optional timeout.
1083            events.clear();
1084            match self.poller.poll(&mut events, timeout) {
1085                Ok(_) => self.handle_poller_events(&events),
1086                Err(e) => debug!("failed to select from sockets: {}", e),
1087            }
1088
1089            let now = current_time_millis();
1090
1091            // Remove the timers if already passed.
1092            self.pop_timers_till(now);
1093
1094            // Remove hostname resolvers with expired timeouts.
1095            for hostname in self
1096                .hostname_resolvers
1097                .clone()
1098                .into_iter()
1099                .filter(|(_, (_, timeout))| timeout.map(|t| now >= t).unwrap_or(false))
1100                .map(|(hostname, _)| hostname)
1101            {
1102                trace!("hostname resolver timeout for {}", &hostname);
1103                call_hostname_resolution_listener(
1104                    &self.hostname_resolvers,
1105                    &hostname,
1106                    HostnameResolutionEvent::SearchTimeout(hostname.to_owned()),
1107                );
1108                call_hostname_resolution_listener(
1109                    &self.hostname_resolvers,
1110                    &hostname,
1111                    HostnameResolutionEvent::SearchStopped(hostname.to_owned()),
1112                );
1113                self.hostname_resolvers.remove(&hostname);
1114            }
1115
1116            // process commands from the command channel
1117            while let Ok(command) = receiver.try_recv() {
1118                if matches!(command, Command::Exit(_)) {
1119                    self.status = DaemonStatus::Shutdown;
1120                    return Some(command);
1121                }
1122                self.exec_command(command, false);
1123            }
1124
1125            // check for repeated commands and run them if their time is up.
1126            let mut i = 0;
1127            while i < self.retransmissions.len() {
1128                if now >= self.retransmissions[i].next_time {
1129                    let rerun = self.retransmissions.remove(i);
1130                    self.exec_command(rerun.command, true);
1131                } else {
1132                    i += 1;
1133                }
1134            }
1135
1136            // Refresh cached service records with active queriers
1137            self.refresh_active_services();
1138
1139            // Refresh cached A/AAAA records with active queriers
1140            let mut query_count = 0;
1141            for (hostname, _sender) in self.hostname_resolvers.iter() {
1142                for (hostname, ip_addr) in
1143                    self.cache.refresh_due_hostname_resolutions(hostname).iter()
1144                {
1145                    self.send_query(hostname, ip_address_rr_type(&ip_addr.to_ip_addr()));
1146                    query_count += 1;
1147                }
1148            }
1149
1150            self.increase_counter(Counter::CacheRefreshAddr, query_count);
1151
1152            // check and evict expired records in our cache
1153            let now = current_time_millis();
1154
1155            // Notify service listeners about the expired records.
1156            let expired_services = self.cache.evict_expired_services(now);
1157            if !expired_services.is_empty() {
1158                debug!(
1159                    "run: send {} service removal to listeners",
1160                    expired_services.len()
1161                );
1162                self.notify_service_removal(expired_services);
1163            }
1164
1165            // Notify hostname listeners about the expired records.
1166            let expired_addrs = self.cache.evict_expired_addr(now);
1167            for (hostname, addrs) in expired_addrs {
1168                call_hostname_resolution_listener(
1169                    &self.hostname_resolvers,
1170                    &hostname,
1171                    HostnameResolutionEvent::AddressesRemoved(hostname.clone(), addrs),
1172                );
1173                let instances = self.cache.get_instances_on_host(&hostname);
1174                let instance_set: HashSet<String> = instances.into_iter().collect();
1175                self.resolve_updated_instances(&instance_set);
1176            }
1177
1178            // Send out probing queries.
1179            self.probing_handler();
1180
1181            // check IP changes if next_ip_check is reached.
1182            if now >= next_ip_check && next_ip_check > 0 {
1183                next_ip_check = now + self.ip_check_interval;
1184                self.add_timer(next_ip_check);
1185
1186                self.check_ip_changes();
1187            }
1188        }
1189    }
1190
1191    fn process_set_option(&mut self, daemon_opt: DaemonOption) {
1192        match daemon_opt {
1193            DaemonOption::ServiceNameLenMax(length) => self.service_name_len_max = length,
1194            DaemonOption::IpCheckInterval(interval) => self.ip_check_interval = interval,
1195            DaemonOption::EnableInterface(if_kind) => self.enable_interface(if_kind),
1196            DaemonOption::DisableInterface(if_kind) => self.disable_interface(if_kind),
1197            DaemonOption::MulticastLoopV4(on) => self.set_multicast_loop_v4(on),
1198            DaemonOption::MulticastLoopV6(on) => self.set_multicast_loop_v6(on),
1199            DaemonOption::UseServiceData(on) => self.use_service_data = on,
1200            #[cfg(test)]
1201            DaemonOption::TestDownInterface(ifname) => {
1202                self.test_down_interfaces.insert(ifname);
1203            }
1204            #[cfg(test)]
1205            DaemonOption::TestUpInterface(ifname) => {
1206                self.test_down_interfaces.remove(&ifname);
1207            }
1208        }
1209    }
1210
1211    fn enable_interface(&mut self, kinds: Vec<IfKind>) {
1212        debug!("enable_interface: {:?}", kinds);
1213        for if_kind in kinds {
1214            self.if_selections.push(IfSelection {
1215                if_kind,
1216                selected: true,
1217            });
1218        }
1219
1220        self.apply_intf_selections(my_ip_interfaces(true));
1221    }
1222
1223    fn disable_interface(&mut self, kinds: Vec<IfKind>) {
1224        debug!("disable_interface: {:?}", kinds);
1225        for if_kind in kinds {
1226            self.if_selections.push(IfSelection {
1227                if_kind,
1228                selected: false,
1229            });
1230        }
1231
1232        self.apply_intf_selections(my_ip_interfaces(true));
1233    }
1234
1235    fn set_multicast_loop_v4(&mut self, on: bool) {
1236        self.multicast_loop_v4 = on;
1237        self.ipv4_sock
1238            .pktinfo
1239            .set_multicast_loop_v4(on)
1240            .map_err(|e| e_fmt!("failed to set multicast loop v4: {}", e))
1241            .unwrap();
1242    }
1243
1244    fn set_multicast_loop_v6(&mut self, on: bool) {
1245        self.multicast_loop_v6 = on;
1246        self.ipv6_sock
1247            .pktinfo
1248            .set_multicast_loop_v6(on)
1249            .map_err(|e| e_fmt!("failed to set multicast loop v6: {}", e))
1250            .unwrap();
1251    }
1252
1253    fn notify_monitors(&mut self, event: DaemonEvent) {
1254        // Only retain the monitors that are still connected.
1255        self.monitors.retain(|sender| {
1256            if let Err(e) = sender.try_send(event.clone()) {
1257                debug!("notify_monitors: try_send: {}", &e);
1258                if matches!(e, TrySendError::Disconnected(_)) {
1259                    return false; // This monitor is dropped.
1260                }
1261            }
1262            true
1263        });
1264    }
1265
1266    /// Remove `addr` in my services that enabled `addr_auto`.
1267    fn del_addr_in_my_services(&mut self, addr: &IpAddr) {
1268        for (_, service_info) in self.my_services.iter_mut() {
1269            if service_info.is_addr_auto() {
1270                service_info.remove_ipaddr(addr);
1271            }
1272        }
1273    }
1274
1275    fn add_timer(&mut self, next_time: u64) {
1276        self.timers.push(Reverse(next_time));
1277    }
1278
1279    fn peek_earliest_timer(&self) -> Option<u64> {
1280        self.timers.peek().map(|Reverse(v)| *v)
1281    }
1282
1283    fn _pop_earliest_timer(&mut self) -> Option<u64> {
1284        self.timers.pop().map(|Reverse(v)| v)
1285    }
1286
1287    /// Pop all timers that are already passed till `now`.
1288    fn pop_timers_till(&mut self, now: u64) {
1289        while let Some(Reverse(v)) = self.timers.peek() {
1290            if *v > now {
1291                break;
1292            }
1293            self.timers.pop();
1294        }
1295    }
1296
1297    /// Apply all selections to `interfaces` and return the selected addresses.
1298    fn selected_addrs(&self, interfaces: Vec<Interface>) -> HashSet<IpAddr> {
1299        let intf_count = interfaces.len();
1300        let mut intf_selections = vec![true; intf_count];
1301
1302        // apply if_selections
1303        for selection in self.if_selections.iter() {
1304            // Mark the interfaces for this selection.
1305            for i in 0..intf_count {
1306                if selection.if_kind.matches(&interfaces[i]) {
1307                    intf_selections[i] = selection.selected;
1308                }
1309            }
1310        }
1311
1312        let mut selected_addrs = HashSet::new();
1313        for i in 0..intf_count {
1314            if intf_selections[i] {
1315                selected_addrs.insert(interfaces[i].addr.ip());
1316            }
1317        }
1318
1319        selected_addrs
1320    }
1321
1322    /// Apply all selections to `interfaces`.
1323    ///
1324    /// For any interface, add it if selected but not bound yet,
1325    /// delete it if not selected but still bound.
1326    fn apply_intf_selections(&mut self, interfaces: Vec<Interface>) {
1327        // By default, we enable all interfaces.
1328        let intf_count = interfaces.len();
1329        let mut intf_selections = vec![true; intf_count];
1330
1331        // apply if_selections
1332        for selection in self.if_selections.iter() {
1333            // Mark the interfaces for this selection.
1334            for i in 0..intf_count {
1335                if selection.if_kind.matches(&interfaces[i]) {
1336                    intf_selections[i] = selection.selected;
1337                }
1338            }
1339        }
1340
1341        // Update `my_intfs` based on the selections.
1342        for (idx, intf) in interfaces.into_iter().enumerate() {
1343            if intf_selections[idx] {
1344                // Add the interface
1345                self.add_interface(intf);
1346            } else {
1347                // Remove the interface
1348                self.del_interface(&intf);
1349            }
1350        }
1351    }
1352
1353    fn del_ip(&mut self, ip: IpAddr) {
1354        self.del_addr_in_my_services(&ip);
1355        self.notify_monitors(DaemonEvent::IpDel(ip));
1356    }
1357
1358    /// Check for IP changes and update [my_intfs] as needed.
1359    fn check_ip_changes(&mut self) {
1360        // Get the current interfaces.
1361        let my_ifaddrs = my_ip_interfaces(true);
1362
1363        #[cfg(test)]
1364        let my_ifaddrs: Vec<_> = my_ifaddrs
1365            .into_iter()
1366            .filter(|intf| !self.test_down_interfaces.contains(&intf.name))
1367            .collect();
1368
1369        let ifaddrs_map: HashMap<u32, Vec<&IfAddr>> =
1370            my_ifaddrs.iter().fold(HashMap::new(), |mut acc, intf| {
1371                let if_index = intf.index.unwrap_or(0);
1372                acc.entry(if_index).or_default().push(&intf.addr);
1373                acc
1374            });
1375
1376        let mut deleted_intfs = Vec::new();
1377        let mut deleted_ips = Vec::new();
1378
1379        for (if_index, my_intf) in self.my_intfs.iter_mut() {
1380            let mut last_ipv4 = None;
1381            let mut last_ipv6 = None;
1382
1383            if let Some(current_addrs) = ifaddrs_map.get(if_index) {
1384                my_intf.addrs.retain(|addr| {
1385                    if current_addrs.contains(&addr) {
1386                        true
1387                    } else {
1388                        match addr.ip() {
1389                            IpAddr::V4(ipv4) => last_ipv4 = Some(ipv4),
1390                            IpAddr::V6(ipv6) => last_ipv6 = Some(ipv6),
1391                        }
1392                        deleted_ips.push(addr.ip());
1393                        false
1394                    }
1395                });
1396                if my_intf.addrs.is_empty() {
1397                    deleted_intfs.push((*if_index, last_ipv4, last_ipv6))
1398                }
1399            } else {
1400                // If it does not exist, remove the interface.
1401                debug!(
1402                    "check_ip_changes: interface {} ({}) no longer exists, removing",
1403                    my_intf.name, if_index
1404                );
1405                for addr in my_intf.addrs.iter() {
1406                    match addr.ip() {
1407                        IpAddr::V4(ipv4) => last_ipv4 = Some(ipv4),
1408                        IpAddr::V6(ipv6) => last_ipv6 = Some(ipv6),
1409                    }
1410                    deleted_ips.push(addr.ip())
1411                }
1412                deleted_intfs.push((*if_index, last_ipv4, last_ipv6));
1413            }
1414        }
1415
1416        if !deleted_ips.is_empty() || !deleted_intfs.is_empty() {
1417            debug!(
1418                "check_ip_changes: {} deleted ips {} deleted intfs",
1419                deleted_ips.len(),
1420                deleted_intfs.len()
1421            );
1422        }
1423
1424        for ip in deleted_ips {
1425            self.del_ip(ip);
1426        }
1427
1428        for (if_index, last_ipv4, last_ipv6) in deleted_intfs {
1429            let Some(my_intf) = self.my_intfs.remove(&if_index) else {
1430                continue;
1431            };
1432
1433            if let Some(ipv4) = last_ipv4 {
1434                debug!("leave multicast for {ipv4}");
1435                if let Err(e) = self
1436                    .ipv4_sock
1437                    .pktinfo
1438                    .leave_multicast_v4(&GROUP_ADDR_V4, &ipv4)
1439                {
1440                    debug!("leave multicast group for addr {ipv4}: {e}");
1441                }
1442            }
1443
1444            if let Some(ipv6) = last_ipv6 {
1445                debug!("leave multicast for {ipv6}");
1446                if let Err(e) = self
1447                    .ipv6_sock
1448                    .pktinfo
1449                    .leave_multicast_v6(&GROUP_ADDR_V6, my_intf.index)
1450                {
1451                    debug!("leave multicast group for IPv6: {ipv6}: {e}");
1452                }
1453            }
1454
1455            // Remove cache records for this interface.
1456            let intf_id = InterfaceId {
1457                name: my_intf.name.to_string(),
1458                index: my_intf.index,
1459            };
1460            let removed_instances = self.cache.remove_records_on_intf(intf_id);
1461            self.notify_service_removal(removed_instances);
1462        }
1463
1464        // Add newly found interfaces only if in our selections.
1465        self.apply_intf_selections(my_ifaddrs);
1466    }
1467
1468    fn del_interface(&mut self, intf: &Interface) {
1469        let if_index = intf.index.unwrap_or(0);
1470        trace!(
1471            "del_interface: {} ({if_index}) addr {}",
1472            intf.name,
1473            intf.ip()
1474        );
1475
1476        let Some(my_intf) = self.my_intfs.get_mut(&if_index) else {
1477            debug!("del_interface: interface {} not found", intf.name);
1478            return;
1479        };
1480
1481        let mut ip_removed = false;
1482
1483        if my_intf.addrs.remove(&intf.addr) {
1484            ip_removed = true;
1485
1486            match intf.addr.ip() {
1487                IpAddr::V4(ipv4) => {
1488                    if my_intf.next_ifaddr_v4().is_none() {
1489                        if let Err(e) = self
1490                            .ipv4_sock
1491                            .pktinfo
1492                            .leave_multicast_v4(&GROUP_ADDR_V4, &ipv4)
1493                        {
1494                            debug!("leave multicast group for addr {ipv4}: {e}");
1495                        }
1496                    }
1497                }
1498
1499                IpAddr::V6(ipv6) => {
1500                    if my_intf.next_ifaddr_v6().is_none() {
1501                        if let Err(e) = self
1502                            .ipv6_sock
1503                            .pktinfo
1504                            .leave_multicast_v6(&GROUP_ADDR_V6, if_index)
1505                        {
1506                            debug!("leave multicast group for addr {ipv6}: {e}");
1507                        }
1508                    }
1509                }
1510            }
1511
1512            if my_intf.addrs.is_empty() {
1513                // If no more addresses, remove the interface.
1514                debug!("del_interface: removing interface {}", intf.name);
1515                self.my_intfs.remove(&if_index);
1516                self.dns_registry_map.remove(&if_index);
1517                self.cache.remove_addrs_on_disabled_intf(if_index);
1518            }
1519        }
1520
1521        if ip_removed {
1522            // Notify the monitors.
1523            self.notify_monitors(DaemonEvent::IpDel(intf.ip()));
1524            // Remove the interface from my services that enabled `addr_auto`.
1525            self.del_addr_in_my_services(&intf.ip());
1526        }
1527    }
1528
1529    fn add_interface(&mut self, intf: Interface) {
1530        let sock = if intf.ip().is_ipv4() {
1531            &self.ipv4_sock
1532        } else {
1533            &self.ipv6_sock
1534        };
1535
1536        let if_index = intf.index.unwrap_or(0);
1537        let mut new_addr = false;
1538
1539        match self.my_intfs.entry(if_index) {
1540            Entry::Occupied(mut entry) => {
1541                // If intf has a new address, add it to the existing interface.
1542                let my_intf = entry.get_mut();
1543                if !my_intf.addrs.contains(&intf.addr) {
1544                    if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1545                        debug!("add_interface: socket_config {}: {e}", &intf.name);
1546                    }
1547                    my_intf.addrs.insert(intf.addr.clone());
1548                    new_addr = true;
1549                }
1550            }
1551            Entry::Vacant(entry) => {
1552                if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1553                    debug!("add_interface: socket_config {}: {e}. Skipped.", &intf.name);
1554                    return;
1555                }
1556
1557                new_addr = true;
1558                let new_intf = MyIntf {
1559                    name: intf.name.clone(),
1560                    index: if_index,
1561                    addrs: HashSet::from([intf.addr.clone()]),
1562                };
1563                entry.insert(new_intf);
1564            }
1565        }
1566
1567        if !new_addr {
1568            trace!("add_interface: interface {} already exists", &intf.name);
1569            return;
1570        }
1571
1572        debug!("add new interface {}: {}", intf.name, intf.ip());
1573
1574        let Some(my_intf) = self.my_intfs.get(&if_index) else {
1575            debug!("add_interface: cannot find if_index {if_index}");
1576            return;
1577        };
1578
1579        let dns_registry = match self.dns_registry_map.get_mut(&if_index) {
1580            Some(registry) => registry,
1581            None => self
1582                .dns_registry_map
1583                .entry(if_index)
1584                .or_insert_with(DnsRegistry::new),
1585        };
1586
1587        for (_, service_info) in self.my_services.iter_mut() {
1588            if service_info.is_addr_auto() {
1589                let new_ip = intf.ip();
1590                service_info.insert_ipaddr(new_ip);
1591
1592                if announce_service_on_intf(dns_registry, service_info, my_intf, &sock.pktinfo) {
1593                    debug!(
1594                        "Announce service {} on {}",
1595                        service_info.get_fullname(),
1596                        intf.ip()
1597                    );
1598                    service_info.set_status(if_index, ServiceStatus::Announced);
1599                } else {
1600                    for timer in dns_registry.new_timers.drain(..) {
1601                        self.timers.push(Reverse(timer));
1602                    }
1603                    service_info.set_status(if_index, ServiceStatus::Probing);
1604                }
1605            }
1606        }
1607
1608        // As we added a new interface, we want to execute all active "Browse" reruns now.
1609        let mut browse_reruns = Vec::new();
1610        let mut i = 0;
1611        while i < self.retransmissions.len() {
1612            if matches!(self.retransmissions[i].command, Command::Browse(..)) {
1613                browse_reruns.push(self.retransmissions.remove(i));
1614            } else {
1615                i += 1;
1616            }
1617        }
1618
1619        for rerun in browse_reruns {
1620            self.exec_command(rerun.command, true);
1621        }
1622
1623        // Notify the monitors.
1624        self.notify_monitors(DaemonEvent::IpAdd(intf.ip()));
1625    }
1626
1627    /// Registers a service.
1628    ///
1629    /// RFC 6762 section 8.3.
1630    /// ...the Multicast DNS responder MUST send
1631    ///    an unsolicited Multicast DNS response containing, in the Answer
1632    ///    Section, all of its newly registered resource records
1633    ///
1634    /// Zeroconf will then respond to requests for information about this service.
1635    fn register_service(&mut self, mut info: ServiceInfo) {
1636        // Check the service name length.
1637        if let Err(e) = check_service_name_length(info.get_type(), self.service_name_len_max) {
1638            debug!("check_service_name_length: {}", &e);
1639            self.notify_monitors(DaemonEvent::Error(e));
1640            return;
1641        }
1642
1643        if info.is_addr_auto() {
1644            let selected_addrs = self.selected_addrs(my_ip_interfaces(true));
1645            for addr in selected_addrs {
1646                info.insert_ipaddr(addr);
1647            }
1648        }
1649
1650        debug!("register service {:?}", &info);
1651
1652        let outgoing_addrs = self.send_unsolicited_response(&mut info);
1653        if !outgoing_addrs.is_empty() {
1654            self.notify_monitors(DaemonEvent::Announce(
1655                info.get_fullname().to_string(),
1656                format!("{:?}", &outgoing_addrs),
1657            ));
1658        }
1659
1660        // The key has to be lower case letter as DNS record name is case insensitive.
1661        // The info will have the original name.
1662        let service_fullname = info.get_fullname().to_lowercase();
1663        self.my_services.insert(service_fullname, info);
1664    }
1665
1666    /// Sends out announcement of `info` on every valid interface.
1667    /// Returns the list of interface IPs that sent out the announcement.
1668    fn send_unsolicited_response(&mut self, info: &mut ServiceInfo) -> Vec<IpAddr> {
1669        let mut outgoing_addrs = Vec::new();
1670        let mut outgoing_intfs = HashSet::new();
1671
1672        for (if_index, intf) in self.my_intfs.iter() {
1673            let dns_registry = match self.dns_registry_map.get_mut(if_index) {
1674                Some(registry) => registry,
1675                None => self
1676                    .dns_registry_map
1677                    .entry(*if_index)
1678                    .or_insert_with(DnsRegistry::new),
1679            };
1680
1681            let mut announced = false;
1682
1683            // IPv4
1684            if announce_service_on_intf(dns_registry, info, intf, &self.ipv4_sock.pktinfo) {
1685                for addr in intf.addrs.iter().filter(|a| a.ip().is_ipv4()) {
1686                    outgoing_addrs.push(addr.ip());
1687                }
1688                outgoing_intfs.insert(intf.index);
1689
1690                debug!(
1691                    "Announce service IPv4 {} on {}",
1692                    info.get_fullname(),
1693                    intf.name
1694                );
1695                announced = true;
1696            }
1697
1698            if announce_service_on_intf(dns_registry, info, intf, &self.ipv6_sock.pktinfo) {
1699                for addr in intf.addrs.iter().filter(|a| a.ip().is_ipv6()) {
1700                    outgoing_addrs.push(addr.ip());
1701                }
1702                outgoing_intfs.insert(intf.index);
1703
1704                debug!(
1705                    "Announce service IPv6 {} on {}",
1706                    info.get_fullname(),
1707                    intf.name
1708                );
1709                announced = true;
1710            }
1711
1712            if announced {
1713                info.set_status(intf.index, ServiceStatus::Announced);
1714            } else {
1715                for timer in dns_registry.new_timers.drain(..) {
1716                    self.timers.push(Reverse(timer));
1717                }
1718                info.set_status(*if_index, ServiceStatus::Probing);
1719            }
1720        }
1721
1722        // RFC 6762 section 8.3.
1723        // ..The Multicast DNS responder MUST send at least two unsolicited
1724        //    responses, one second apart.
1725        let next_time = current_time_millis() + 1000;
1726        for if_index in outgoing_intfs {
1727            self.add_retransmission(
1728                next_time,
1729                Command::RegisterResend(info.get_fullname().to_string(), if_index),
1730            );
1731        }
1732
1733        outgoing_addrs
1734    }
1735
1736    /// Send probings or finish them if expired. Notify waiting services.
1737    fn probing_handler(&mut self) {
1738        let now = current_time_millis();
1739
1740        for (if_index, intf) in self.my_intfs.iter() {
1741            let Some(dns_registry) = self.dns_registry_map.get_mut(if_index) else {
1742                continue;
1743            };
1744
1745            let (out, expired_probes) = check_probing(dns_registry, &mut self.timers, now);
1746
1747            // send probing.
1748            if !out.questions().is_empty() {
1749                trace!("sending out probing of questions: {:?}", out.questions());
1750                send_dns_outgoing(&out, intf, &self.ipv4_sock.pktinfo);
1751                send_dns_outgoing(&out, intf, &self.ipv6_sock.pktinfo);
1752            }
1753
1754            // For finished probes, wake up services that are waiting for the probes.
1755            let waiting_services =
1756                handle_expired_probes(expired_probes, &intf.name, dns_registry, &mut self.monitors);
1757
1758            for service_name in waiting_services {
1759                // service names are lowercase
1760                if let Some(info) = self.my_services.get_mut(&service_name.to_lowercase()) {
1761                    if info.get_status(*if_index) == ServiceStatus::Announced {
1762                        debug!("service {} already announced", info.get_fullname());
1763                        continue;
1764                    }
1765
1766                    let announced_v4 =
1767                        announce_service_on_intf(dns_registry, info, intf, &self.ipv4_sock.pktinfo);
1768                    let announced_v6 =
1769                        announce_service_on_intf(dns_registry, info, intf, &self.ipv6_sock.pktinfo);
1770
1771                    if announced_v4 || announced_v6 {
1772                        let next_time = now + 1000;
1773                        let command =
1774                            Command::RegisterResend(info.get_fullname().to_string(), *if_index);
1775                        self.retransmissions.push(ReRun { next_time, command });
1776                        self.timers.push(Reverse(next_time));
1777
1778                        let fullname = match dns_registry.name_changes.get(&service_name) {
1779                            Some(new_name) => new_name.to_string(),
1780                            None => service_name.to_string(),
1781                        };
1782
1783                        let mut hostname = info.get_hostname();
1784                        if let Some(new_name) = dns_registry.name_changes.get(hostname) {
1785                            hostname = new_name;
1786                        }
1787
1788                        debug!("wake up: announce service {} on {}", fullname, intf.name);
1789                        notify_monitors(
1790                            &mut self.monitors,
1791                            DaemonEvent::Announce(fullname, format!("{}:{}", hostname, &intf.name)),
1792                        );
1793
1794                        info.set_status(*if_index, ServiceStatus::Announced);
1795                    }
1796                }
1797            }
1798        }
1799    }
1800
1801    fn unregister_service(
1802        &self,
1803        info: &ServiceInfo,
1804        intf: &MyIntf,
1805        sock: &PktInfoUdpSocket,
1806    ) -> Vec<u8> {
1807        let is_ipv4 = sock.domain() == Domain::IPV4;
1808
1809        let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
1810        out.add_answer_at_time(
1811            DnsPointer::new(
1812                info.get_type(),
1813                RRType::PTR,
1814                CLASS_IN,
1815                0,
1816                info.get_fullname().to_string(),
1817            ),
1818            0,
1819        );
1820
1821        if let Some(sub) = info.get_subtype() {
1822            trace!("Adding subdomain {}", sub);
1823            out.add_answer_at_time(
1824                DnsPointer::new(
1825                    sub,
1826                    RRType::PTR,
1827                    CLASS_IN,
1828                    0,
1829                    info.get_fullname().to_string(),
1830                ),
1831                0,
1832            );
1833        }
1834
1835        out.add_answer_at_time(
1836            DnsSrv::new(
1837                info.get_fullname(),
1838                CLASS_IN | CLASS_CACHE_FLUSH,
1839                0,
1840                info.get_priority(),
1841                info.get_weight(),
1842                info.get_port(),
1843                info.get_hostname().to_string(),
1844            ),
1845            0,
1846        );
1847        out.add_answer_at_time(
1848            DnsTxt::new(
1849                info.get_fullname(),
1850                CLASS_IN | CLASS_CACHE_FLUSH,
1851                0,
1852                info.generate_txt(),
1853            ),
1854            0,
1855        );
1856
1857        let if_addrs = if is_ipv4 {
1858            info.get_addrs_on_my_intf_v4(intf)
1859        } else {
1860            info.get_addrs_on_my_intf_v6(intf)
1861        };
1862
1863        if if_addrs.is_empty() {
1864            return vec![];
1865        }
1866
1867        for address in if_addrs {
1868            out.add_answer_at_time(
1869                DnsAddress::new(
1870                    info.get_hostname(),
1871                    ip_address_rr_type(&address),
1872                    CLASS_IN | CLASS_CACHE_FLUSH,
1873                    0,
1874                    address,
1875                    intf.into(),
1876                ),
1877                0,
1878            );
1879        }
1880
1881        // `out` data is non-empty, hence we can do this.
1882        send_dns_outgoing(&out, intf, sock).remove(0)
1883    }
1884
1885    /// Binds a channel `listener` to querying mDNS hostnames.
1886    ///
1887    /// If there is already a `listener`, it will be updated, i.e. overwritten.
1888    fn add_hostname_resolver(
1889        &mut self,
1890        hostname: String,
1891        listener: Sender<HostnameResolutionEvent>,
1892        timeout: Option<u64>,
1893    ) {
1894        let real_timeout = timeout.map(|t| current_time_millis() + t);
1895        self.hostname_resolvers
1896            .insert(hostname.to_lowercase(), (listener, real_timeout));
1897        if let Some(t) = real_timeout {
1898            self.add_timer(t);
1899        }
1900    }
1901
1902    /// Sends a multicast query for `name` with `qtype`.
1903    fn send_query(&self, name: &str, qtype: RRType) {
1904        self.send_query_vec(&[(name, qtype)]);
1905    }
1906
1907    /// Sends out a list of `questions` (i.e. DNS questions) via multicast.
1908    fn send_query_vec(&self, questions: &[(&str, RRType)]) {
1909        trace!("Sending query questions: {:?}", questions);
1910        let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
1911        let now = current_time_millis();
1912
1913        for (name, qtype) in questions {
1914            out.add_question(name, *qtype);
1915
1916            for record in self.cache.get_known_answers(name, *qtype, now) {
1917                /*
1918                RFC 6762 section 7.1: https://datatracker.ietf.org/doc/html/rfc6762#section-7.1
1919                ...
1920                    When a Multicast DNS querier sends a query to which it already knows
1921                    some answers, it populates the Answer Section of the DNS query
1922                    message with those answers.
1923                 */
1924                trace!("add known answer: {:?}", record.record);
1925                let mut new_record = record.record.clone();
1926                new_record.get_record_mut().update_ttl(now);
1927                out.add_answer_box(new_record);
1928            }
1929        }
1930
1931        for (_, intf) in self.my_intfs.iter() {
1932            send_dns_outgoing(&out, intf, &self.ipv4_sock.pktinfo);
1933            send_dns_outgoing(&out, intf, &self.ipv6_sock.pktinfo);
1934        }
1935    }
1936
1937    /// Reads one UDP datagram from the socket of `intf`.
1938    ///
1939    /// Returns false if failed to receive a packet,
1940    /// otherwise returns true.
1941    fn handle_read(&mut self, event_key: usize) -> bool {
1942        let sock = match event_key {
1943            IPV4_SOCK_EVENT_KEY => &mut self.ipv4_sock,
1944            IPV6_SOCK_EVENT_KEY => &mut self.ipv6_sock,
1945            _ => {
1946                debug!("handle_read: unknown token {}", event_key);
1947                return false;
1948            }
1949        };
1950        let mut buf = vec![0u8; MAX_MSG_ABSOLUTE];
1951
1952        // Read the next mDNS UDP datagram.
1953        //
1954        // If the datagram is larger than `buf`, excess bytes may or may not
1955        // be truncated by the socket layer depending on the platform's libc.
1956        // In any case, such large datagram will not be decoded properly and
1957        // this function should return false but should not crash.
1958        let (sz, pktinfo) = match sock.pktinfo.recv(&mut buf) {
1959            Ok(sz) => sz,
1960            Err(e) => {
1961                if e.kind() != std::io::ErrorKind::WouldBlock {
1962                    debug!("listening socket read failed: {}", e);
1963                }
1964                return false;
1965            }
1966        };
1967
1968        // Find the interface that received the packet.
1969        let pkt_if_index = pktinfo.if_index as u32;
1970        let Some(my_intf) = self.my_intfs.get(&pkt_if_index) else {
1971            debug!(
1972                "handle_read: no interface found for pktinfo if_index: {}",
1973                pktinfo.if_index
1974            );
1975            return true; // We still return true to indicate that we read something.
1976        };
1977
1978        buf.truncate(sz); // reduce potential processing errors
1979
1980        match DnsIncoming::new(buf, my_intf.into()) {
1981            Ok(msg) => {
1982                if msg.is_query() {
1983                    self.handle_query(msg, pkt_if_index, event_key == IPV4_SOCK_EVENT_KEY);
1984                } else if msg.is_response() {
1985                    self.handle_response(msg, pkt_if_index);
1986                } else {
1987                    debug!("Invalid message: not query and not response");
1988                }
1989            }
1990            Err(e) => debug!("Invalid incoming DNS message: {}", e),
1991        }
1992
1993        true
1994    }
1995
1996    /// Returns true, if sent query. Returns false if SRV already exists.
1997    fn query_unresolved(&mut self, instance: &str) -> bool {
1998        if !valid_instance_name(instance) {
1999            trace!("instance name {} not valid", instance);
2000            return false;
2001        }
2002
2003        if let Some(records) = self.cache.get_srv(instance) {
2004            for record in records {
2005                if let Some(srv) = record.record.any().downcast_ref::<DnsSrv>() {
2006                    if self.cache.get_addr(srv.host()).is_none() {
2007                        self.send_query_vec(&[(srv.host(), RRType::A), (srv.host(), RRType::AAAA)]);
2008                        return true;
2009                    }
2010                }
2011            }
2012        } else {
2013            self.send_query(instance, RRType::ANY);
2014            return true;
2015        }
2016
2017        false
2018    }
2019
2020    /// Checks if `ty_domain` has records in the cache. If yes, sends the
2021    /// cached records via `sender`.
2022    fn query_cache_for_service(
2023        &mut self,
2024        ty_domain: &str,
2025        sender: &Sender<ServiceEvent>,
2026        now: u64,
2027    ) {
2028        let mut resolved: HashSet<String> = HashSet::new();
2029        let mut unresolved: HashSet<String> = HashSet::new();
2030
2031        if let Some(records) = self.cache.get_ptr(ty_domain) {
2032            for record in records.iter().filter(|r| !r.record.expires_soon(now)) {
2033                if let Some(ptr) = record.record.any().downcast_ref::<DnsPointer>() {
2034                    let mut new_event = None;
2035                    if self.use_service_data {
2036                        match self.resolve_service_from_cache(ty_domain, ptr.alias()) {
2037                            Ok(resolved_service) => {
2038                                if resolved_service.is_valid() {
2039                                    debug!("Resolved service from cache: {}", ptr.alias());
2040                                    new_event =
2041                                        Some(ServiceEvent::ServiceData(Box::new(resolved_service)));
2042                                } else {
2043                                    debug!("Resolved service is not valid: {}", ptr.alias());
2044                                }
2045                            }
2046                            Err(err) => {
2047                                debug!("Error while resolving service from cache: {}", err);
2048                                continue;
2049                            }
2050                        }
2051                    } else {
2052                        // Support legacy behavior.
2053                        match self.create_service_info_from_cache(ty_domain, ptr.alias()) {
2054                            Ok(info) => {
2055                                if info.is_ready() {
2056                                    new_event = Some(ServiceEvent::ServiceResolved(info));
2057                                } else {
2058                                    debug!("Service info is not ready: {}", ptr.alias());
2059                                }
2060                            }
2061                            Err(err) => {
2062                                debug!("Error while creating service info from cache: {}", err);
2063                                continue;
2064                            }
2065                        }
2066                    }
2067
2068                    match sender.send(ServiceEvent::ServiceFound(
2069                        ty_domain.to_string(),
2070                        ptr.alias().to_string(),
2071                    )) {
2072                        Ok(()) => debug!("send service found {}", ptr.alias()),
2073                        Err(e) => {
2074                            debug!("failed to send service found: {}", e);
2075                            continue;
2076                        }
2077                    }
2078
2079                    if let Some(event) = new_event {
2080                        resolved.insert(ptr.alias().to_string());
2081                        match sender.send(event) {
2082                            Ok(()) => debug!("sent service resolved: {}", ptr.alias()),
2083                            Err(e) => debug!("failed to send service resolved: {}", e),
2084                        }
2085                    } else {
2086                        unresolved.insert(ptr.alias().to_string());
2087                    }
2088                }
2089            }
2090        }
2091
2092        for instance in resolved.drain() {
2093            self.pending_resolves.remove(&instance);
2094            self.resolved.insert(instance);
2095        }
2096
2097        for instance in unresolved.drain() {
2098            self.add_pending_resolve(instance);
2099        }
2100    }
2101
2102    /// Checks if `hostname` has records in the cache. If yes, sends the
2103    /// cached records via `sender`.
2104    fn query_cache_for_hostname(
2105        &mut self,
2106        hostname: &str,
2107        sender: Sender<HostnameResolutionEvent>,
2108    ) {
2109        let addresses_map = self.cache.get_addresses_for_host(hostname);
2110        for (name, addresses) in addresses_map {
2111            match sender.send(HostnameResolutionEvent::AddressesFound(name, addresses)) {
2112                Ok(()) => trace!("sent hostname addresses found"),
2113                Err(e) => debug!("failed to send hostname addresses found: {}", e),
2114            }
2115        }
2116    }
2117
2118    fn add_pending_resolve(&mut self, instance: String) {
2119        if !self.pending_resolves.contains(&instance) {
2120            let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
2121            self.add_retransmission(next_time, Command::Resolve(instance.clone(), 1));
2122            self.pending_resolves.insert(instance);
2123        }
2124    }
2125
2126    fn create_service_info_from_cache(
2127        &self,
2128        ty_domain: &str,
2129        fullname: &str,
2130    ) -> Result<ServiceInfo> {
2131        let my_name = {
2132            let name = fullname.trim_end_matches(split_sub_domain(ty_domain).0);
2133            name.strip_suffix('.').unwrap_or(name).to_string()
2134        };
2135
2136        let now = current_time_millis();
2137        let mut info = ServiceInfo::new(ty_domain, &my_name, "", (), 0, None)?;
2138
2139        // Be sure setting `subtype` if available even when querying for the parent domain.
2140        if let Some(subtype) = self.cache.get_subtype(fullname) {
2141            trace!(
2142                "ty_domain: {} found subtype {} for instance: {}",
2143                ty_domain,
2144                subtype,
2145                fullname
2146            );
2147            if info.get_subtype().is_none() {
2148                info.set_subtype(subtype.clone());
2149            }
2150        }
2151
2152        // resolve SRV record
2153        if let Some(records) = self.cache.get_srv(fullname) {
2154            if let Some(answer) = records.iter().find(|r| !r.record.expires_soon(now)) {
2155                if let Some(dns_srv) = answer.record.any().downcast_ref::<DnsSrv>() {
2156                    info.set_hostname(dns_srv.host().to_string());
2157                    info.set_port(dns_srv.port());
2158                }
2159            }
2160        }
2161
2162        // resolve TXT record
2163        if let Some(records) = self.cache.get_txt(fullname) {
2164            if let Some(record) = records.iter().find(|r| !r.record.expires_soon(now)) {
2165                if let Some(dns_txt) = record.record.any().downcast_ref::<DnsTxt>() {
2166                    info.set_properties_from_txt(dns_txt.text());
2167                }
2168            }
2169        }
2170
2171        // resolve A and AAAA records
2172        if let Some(records) = self.cache.get_addr(info.get_hostname()) {
2173            for answer in records.iter() {
2174                if let Some(dns_a) = answer.record.any().downcast_ref::<DnsAddress>() {
2175                    if dns_a.expires_soon(now) {
2176                        trace!(
2177                            "Addr expired or expires soon: {}",
2178                            dns_a.address().to_ip_addr()
2179                        );
2180                    } else {
2181                        info.insert_ipaddr(dns_a.address().to_ip_addr());
2182                    }
2183                }
2184            }
2185        }
2186
2187        Ok(info)
2188    }
2189
2190    /// Creates a `ResolvedService` from the cache.
2191    fn resolve_service_from_cache(
2192        &self,
2193        ty_domain: &str,
2194        fullname: &str,
2195    ) -> Result<ResolvedService> {
2196        let now = current_time_millis();
2197        let mut resolved_service = ResolvedService {
2198            ty_domain: ty_domain.to_string(),
2199            sub_ty_domain: None,
2200            fullname: fullname.to_string(),
2201            host: String::new(),
2202            port: 0,
2203            addresses: HashSet::new(),
2204            txt_properties: TxtProperties::new(),
2205        };
2206
2207        // Be sure setting `subtype` if available even when querying for the parent domain.
2208        if let Some(subtype) = self.cache.get_subtype(fullname) {
2209            trace!(
2210                "ty_domain: {} found subtype {} for instance: {}",
2211                ty_domain,
2212                subtype,
2213                fullname
2214            );
2215            if resolved_service.sub_ty_domain.is_none() {
2216                resolved_service.sub_ty_domain = Some(subtype.to_string());
2217            }
2218        }
2219
2220        // resolve SRV record
2221        if let Some(records) = self.cache.get_srv(fullname) {
2222            if let Some(answer) = records.iter().find(|r| !r.record.expires_soon(now)) {
2223                if let Some(dns_srv) = answer.record.any().downcast_ref::<DnsSrv>() {
2224                    resolved_service.host = dns_srv.host().to_string();
2225                    resolved_service.port = dns_srv.port();
2226                }
2227            }
2228        }
2229
2230        // resolve TXT record
2231        if let Some(records) = self.cache.get_txt(fullname) {
2232            if let Some(record) = records.iter().find(|r| !r.record.expires_soon(now)) {
2233                if let Some(dns_txt) = record.record.any().downcast_ref::<DnsTxt>() {
2234                    resolved_service.txt_properties = dns_txt.text().into();
2235                }
2236            }
2237        }
2238
2239        // resolve A and AAAA records
2240        if let Some(records) = self.cache.get_addr(&resolved_service.host) {
2241            for answer in records.iter() {
2242                if let Some(dns_a) = answer.record.any().downcast_ref::<DnsAddress>() {
2243                    if dns_a.expires_soon(now) {
2244                        trace!(
2245                            "Addr expired or expires soon: {}",
2246                            dns_a.address().to_ip_addr()
2247                        );
2248                    } else {
2249                        resolved_service.addresses.insert(dns_a.address());
2250                    }
2251                }
2252            }
2253        }
2254
2255        Ok(resolved_service)
2256    }
2257
2258    fn handle_poller_events(&mut self, events: &mio::Events) {
2259        for ev in events.iter() {
2260            trace!("event received with key {:?}", ev.token());
2261            if ev.token().0 == SIGNAL_SOCK_EVENT_KEY {
2262                // Drain signals as we will drain commands as well.
2263                self.signal_sock_drain();
2264
2265                if let Err(e) = self.poller.registry().reregister(
2266                    &mut self.signal_sock,
2267                    ev.token(),
2268                    mio::Interest::READABLE,
2269                ) {
2270                    debug!("failed to modify poller for signal socket: {}", e);
2271                }
2272                continue; // Next event.
2273            }
2274
2275            // Read until no more packets available.
2276            while self.handle_read(ev.token().0) {}
2277
2278            // we continue to monitor this socket.
2279            if ev.token().0 == IPV4_SOCK_EVENT_KEY {
2280                // Re-register the IPv4 socket for reading.
2281                if let Err(e) = self.poller.registry().reregister(
2282                    &mut self.ipv4_sock,
2283                    ev.token(),
2284                    mio::Interest::READABLE,
2285                ) {
2286                    debug!("modify poller for IPv4 socket: {}", e);
2287                }
2288            } else if ev.token().0 == IPV6_SOCK_EVENT_KEY {
2289                // Re-register the IPv6 socket for reading.
2290                if let Err(e) = self.poller.registry().reregister(
2291                    &mut self.ipv6_sock,
2292                    ev.token(),
2293                    mio::Interest::READABLE,
2294                ) {
2295                    debug!("modify poller for IPv6 socket: {}", e);
2296                }
2297            }
2298        }
2299    }
2300
2301    /// Deal with incoming response packets.  All answers
2302    /// are held in the cache, and listeners are notified.
2303    fn handle_response(&mut self, mut msg: DnsIncoming, if_index: u32) {
2304        let now = current_time_millis();
2305
2306        // remove records that are expired.
2307        let mut record_predicate = |record: &DnsRecordBox| {
2308            if !record.get_record().is_expired(now) {
2309                return true;
2310            }
2311
2312            debug!("record is expired, removing it from cache.");
2313            if self.cache.remove(record) {
2314                // for PTR records, send event to listeners
2315                if let Some(dns_ptr) = record.any().downcast_ref::<DnsPointer>() {
2316                    call_service_listener(
2317                        &self.service_queriers,
2318                        dns_ptr.get_name(),
2319                        ServiceEvent::ServiceRemoved(
2320                            dns_ptr.get_name().to_string(),
2321                            dns_ptr.alias().to_string(),
2322                        ),
2323                    );
2324                }
2325            }
2326            false
2327        };
2328        msg.answers_mut().retain(&mut record_predicate);
2329        msg.authorities_mut().retain(&mut record_predicate);
2330        msg.additionals_mut().retain(&mut record_predicate);
2331
2332        // check possible conflicts and handle them.
2333        self.conflict_handler(&msg, if_index);
2334
2335        // check if the message is for us.
2336        let mut is_for_us = true; // assume it is for us.
2337
2338        // If there are any PTR records in the answers, there should be
2339        // at least one PTR for us. Otherwise, the message is not for us.
2340        // If there are no PTR records at all, assume this message is for us.
2341        for answer in msg.answers() {
2342            if answer.get_type() == RRType::PTR {
2343                if self.service_queriers.contains_key(answer.get_name()) {
2344                    is_for_us = true;
2345                    break; // OK to break: at least one PTR for us.
2346                } else {
2347                    is_for_us = false;
2348                }
2349            } else if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2350                // If there is a hostname querier for this address, then it is for us.
2351                let answer_lowercase = answer.get_name().to_lowercase();
2352                if self.hostname_resolvers.contains_key(&answer_lowercase) {
2353                    is_for_us = true;
2354                    break; // OK to break: at least one hostname for us.
2355                }
2356            }
2357        }
2358
2359        /// Represents a DNS record change that involves one service instance.
2360        struct InstanceChange {
2361            ty: RRType,   // The type of DNS record for the instance.
2362            name: String, // The name of the record.
2363        }
2364
2365        // Go through all answers to get the new and updated records.
2366        // For new PTR records, send out ServiceFound immediately. For others,
2367        // collect them into `changes`.
2368        //
2369        // Note: we don't try to identify the update instances based on
2370        // each record immediately as the answers are likely related to each
2371        // other.
2372        let mut changes = Vec::new();
2373        let mut timers = Vec::new();
2374        let Some(my_intf) = self.my_intfs.get(&if_index) else {
2375            return;
2376        };
2377        for record in msg.all_records() {
2378            match self
2379                .cache
2380                .add_or_update(my_intf, record, &mut timers, is_for_us)
2381            {
2382                Some((dns_record, true)) => {
2383                    timers.push(dns_record.record.get_record().get_expire_time());
2384                    timers.push(dns_record.record.get_record().get_refresh_time());
2385
2386                    let ty = dns_record.record.get_type();
2387                    let name = dns_record.record.get_name();
2388
2389                    // Only process PTR that does not expire soon (i.e. TTL > 1).
2390                    if ty == RRType::PTR && dns_record.record.get_record().get_ttl() > 1 {
2391                        if self.service_queriers.contains_key(name) {
2392                            timers.push(dns_record.record.get_record().get_refresh_time());
2393                        }
2394
2395                        // send ServiceFound
2396                        if let Some(dns_ptr) = dns_record.record.any().downcast_ref::<DnsPointer>()
2397                        {
2398                            call_service_listener(
2399                                &self.service_queriers,
2400                                name,
2401                                ServiceEvent::ServiceFound(
2402                                    name.to_string(),
2403                                    dns_ptr.alias().to_string(),
2404                                ),
2405                            );
2406                            changes.push(InstanceChange {
2407                                ty,
2408                                name: dns_ptr.alias().to_string(),
2409                            });
2410                        }
2411                    } else {
2412                        changes.push(InstanceChange {
2413                            ty,
2414                            name: name.to_string(),
2415                        });
2416                    }
2417                }
2418                Some((dns_record, false)) => {
2419                    timers.push(dns_record.record.get_record().get_expire_time());
2420                    timers.push(dns_record.record.get_record().get_refresh_time());
2421                }
2422                _ => {}
2423            }
2424        }
2425
2426        // Add timers for the new records.
2427        for t in timers {
2428            self.add_timer(t);
2429        }
2430
2431        // Go through remaining changes to see if any hostname resolutions were found or updated.
2432        for change in changes
2433            .iter()
2434            .filter(|change| change.ty == RRType::A || change.ty == RRType::AAAA)
2435        {
2436            let addr_map = self.cache.get_addresses_for_host(&change.name);
2437            for (name, addresses) in addr_map {
2438                call_hostname_resolution_listener(
2439                    &self.hostname_resolvers,
2440                    &change.name,
2441                    HostnameResolutionEvent::AddressesFound(name, addresses),
2442                )
2443            }
2444        }
2445
2446        // Identify the instances that need to be "resolved".
2447        let mut updated_instances = HashSet::new();
2448        for update in changes {
2449            match update.ty {
2450                RRType::PTR | RRType::SRV | RRType::TXT => {
2451                    updated_instances.insert(update.name);
2452                }
2453                RRType::A | RRType::AAAA => {
2454                    let instances = self.cache.get_instances_on_host(&update.name);
2455                    updated_instances.extend(instances);
2456                }
2457                _ => {}
2458            }
2459        }
2460
2461        self.resolve_updated_instances(&updated_instances);
2462    }
2463
2464    fn conflict_handler(&mut self, msg: &DnsIncoming, if_index: u32) {
2465        let Some(my_intf) = self.my_intfs.get(&if_index) else {
2466            debug!("handle_response: no intf found for index {if_index}");
2467            return;
2468        };
2469
2470        let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
2471            return;
2472        };
2473
2474        for answer in msg.answers().iter() {
2475            let mut new_records = Vec::new();
2476
2477            let name = answer.get_name();
2478            let Some(probe) = dns_registry.probing.get_mut(name) else {
2479                continue;
2480            };
2481
2482            // check against possible multicast forwarding
2483            if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2484                if let Some(answer_addr) = answer.any().downcast_ref::<DnsAddress>() {
2485                    if answer_addr.interface_id.index != if_index {
2486                        debug!(
2487                            "conflict handler: answer addr {:?} not in the subnet of intf {}",
2488                            answer_addr, my_intf.name
2489                        );
2490                        continue;
2491                    }
2492                }
2493
2494                // double check if any other address record matches rrdata,
2495                // as there could be multiple addresses for the same name.
2496                let any_match = probe.records.iter().any(|r| {
2497                    r.get_type() == answer.get_type()
2498                        && r.get_class() == answer.get_class()
2499                        && r.rrdata_match(answer.as_ref())
2500                });
2501                if any_match {
2502                    continue; // no conflict for this answer.
2503                }
2504            }
2505
2506            probe.records.retain(|record| {
2507                if record.get_type() == answer.get_type()
2508                    && record.get_class() == answer.get_class()
2509                    && !record.rrdata_match(answer.as_ref())
2510                {
2511                    debug!(
2512                        "found conflict name: '{name}' record: {}: {} PEER: {}",
2513                        record.get_type(),
2514                        record.rdata_print(),
2515                        answer.rdata_print()
2516                    );
2517
2518                    // create a new name for this record
2519                    // then remove the old record in probing.
2520                    let mut new_record = record.clone();
2521                    let new_name = match record.get_type() {
2522                        RRType::A => hostname_change(name),
2523                        RRType::AAAA => hostname_change(name),
2524                        _ => name_change(name),
2525                    };
2526                    new_record.get_record_mut().set_new_name(new_name);
2527                    new_records.push(new_record);
2528                    return false; // old record is dropped from the probe.
2529                }
2530
2531                true
2532            });
2533
2534            // ?????
2535            // if probe.records.is_empty() {
2536            //     dns_registry.probing.remove(name);
2537            // }
2538
2539            // Probing again with the new names.
2540            let create_time = current_time_millis() + fastrand::u64(0..250);
2541
2542            let waiting_services = probe.waiting_services.clone();
2543
2544            for record in new_records {
2545                if dns_registry.update_hostname(name, record.get_name(), create_time) {
2546                    self.timers.push(Reverse(create_time));
2547                }
2548
2549                // remember the name changes (note: `name` might not be the original, it could be already changed once.)
2550                dns_registry.name_changes.insert(
2551                    record.get_record().get_original_name().to_string(),
2552                    record.get_name().to_string(),
2553                );
2554
2555                let new_probe = match dns_registry.probing.get_mut(record.get_name()) {
2556                    Some(p) => p,
2557                    None => {
2558                        let new_probe = dns_registry
2559                            .probing
2560                            .entry(record.get_name().to_string())
2561                            .or_insert_with(|| {
2562                                debug!("conflict handler: new probe of {}", record.get_name());
2563                                Probe::new(create_time)
2564                            });
2565                        self.timers.push(Reverse(new_probe.next_send));
2566                        new_probe
2567                    }
2568                };
2569
2570                debug!(
2571                    "insert record with new name '{}' {} into probe",
2572                    record.get_name(),
2573                    record.get_type()
2574                );
2575                new_probe.insert_record(record);
2576
2577                new_probe.waiting_services.extend(waiting_services.clone());
2578            }
2579        }
2580    }
2581
2582    /// Resolve the updated (including new) instances.
2583    ///
2584    /// Note: it is possible that more than 1 PTR pointing to the same
2585    /// instance. For example, a regular service type PTR and a sub-type
2586    /// service type PTR can both point to the same service instance.
2587    /// This loop automatically handles the sub-type PTRs.
2588    fn resolve_updated_instances(&mut self, updated_instances: &HashSet<String>) {
2589        let mut resolved: HashSet<String> = HashSet::new();
2590        let mut unresolved: HashSet<String> = HashSet::new();
2591        let mut removed_instances = HashMap::new();
2592
2593        let now = current_time_millis();
2594
2595        for (ty_domain, records) in self.cache.all_ptr().iter() {
2596            if !self.service_queriers.contains_key(ty_domain) {
2597                // No need to resolve if not in our queries.
2598                continue;
2599            }
2600
2601            for record in records.iter().filter(|r| !r.record.expires_soon(now)) {
2602                if let Some(dns_ptr) = record.record.any().downcast_ref::<DnsPointer>() {
2603                    if updated_instances.contains(dns_ptr.alias()) {
2604                        let mut instance_found = false;
2605                        let mut new_event = None;
2606
2607                        if self.use_service_data {
2608                            if let Ok(resolved) =
2609                                self.resolve_service_from_cache(ty_domain, dns_ptr.alias())
2610                            {
2611                                debug!(
2612                                    "resolve_updated_instances: from cache: {}",
2613                                    dns_ptr.alias()
2614                                );
2615                                instance_found = true;
2616                                if resolved.is_valid() {
2617                                    new_event = Some(ServiceEvent::ServiceData(Box::new(resolved)));
2618                                } else {
2619                                    debug!("Resolved service is not valid: {}", dns_ptr.alias());
2620                                }
2621                            }
2622                        } else if let Ok(info) =
2623                            self.create_service_info_from_cache(ty_domain, dns_ptr.alias())
2624                        {
2625                            instance_found = true;
2626                            if info.is_ready() {
2627                                new_event = Some(ServiceEvent::ServiceResolved(info));
2628                            } else {
2629                                debug!("Service info is not ready: {}", dns_ptr.alias());
2630                            }
2631                        }
2632
2633                        if instance_found {
2634                            if let Some(event) = new_event {
2635                                debug!("call queriers to resolve {}", dns_ptr.alias());
2636                                resolved.insert(dns_ptr.alias().to_string());
2637                                call_service_listener(&self.service_queriers, ty_domain, event);
2638                            } else {
2639                                if self.resolved.remove(dns_ptr.alias()) {
2640                                    removed_instances
2641                                        .entry(ty_domain.to_string())
2642                                        .or_insert_with(HashSet::new)
2643                                        .insert(dns_ptr.alias().to_string());
2644                                }
2645                                unresolved.insert(dns_ptr.alias().to_string());
2646                            }
2647                        }
2648                    }
2649                }
2650            }
2651        }
2652
2653        for instance in resolved.drain() {
2654            self.pending_resolves.remove(&instance);
2655            self.resolved.insert(instance);
2656        }
2657
2658        for instance in unresolved.drain() {
2659            self.add_pending_resolve(instance);
2660        }
2661
2662        if !removed_instances.is_empty() {
2663            debug!(
2664                "resolve_updated_instances: removed {}",
2665                &removed_instances.len()
2666            );
2667            self.notify_service_removal(removed_instances);
2668        }
2669    }
2670
2671    /// Handle incoming query packets, figure out whether and what to respond.
2672    fn handle_query(&mut self, msg: DnsIncoming, if_index: u32, is_ipv4: bool) {
2673        let sock = if is_ipv4 {
2674            &self.ipv4_sock
2675        } else {
2676            &self.ipv6_sock
2677        };
2678        let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
2679
2680        // Special meta-query "_services._dns-sd._udp.<Domain>".
2681        // See https://datatracker.ietf.org/doc/html/rfc6763#section-9
2682        const META_QUERY: &str = "_services._dns-sd._udp.local.";
2683
2684        let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
2685            debug!("missing dns registry for intf {}", if_index);
2686            return;
2687        };
2688
2689        let Some(intf) = self.my_intfs.get(&if_index) else {
2690            return;
2691        };
2692
2693        for question in msg.questions().iter() {
2694            let qtype = question.entry_type();
2695
2696            if qtype == RRType::PTR {
2697                for service in self.my_services.values() {
2698                    if service.get_status(if_index) != ServiceStatus::Announced {
2699                        continue;
2700                    }
2701
2702                    if question.entry_name() == service.get_type()
2703                        || service
2704                            .get_subtype()
2705                            .as_ref()
2706                            .is_some_and(|v| v == question.entry_name())
2707                    {
2708                        add_answer_with_additionals(
2709                            &mut out,
2710                            &msg,
2711                            service,
2712                            intf,
2713                            dns_registry,
2714                            is_ipv4,
2715                        );
2716                    } else if question.entry_name() == META_QUERY {
2717                        let ptr_added = out.add_answer(
2718                            &msg,
2719                            DnsPointer::new(
2720                                question.entry_name(),
2721                                RRType::PTR,
2722                                CLASS_IN,
2723                                service.get_other_ttl(),
2724                                service.get_type().to_string(),
2725                            ),
2726                        );
2727                        if !ptr_added {
2728                            trace!("answer was not added for meta-query {:?}", &question);
2729                        }
2730                    }
2731                }
2732            } else {
2733                // Simultaneous Probe Tiebreaking (RFC 6762 section 8.2)
2734                if qtype == RRType::ANY && msg.num_authorities() > 0 {
2735                    let probe_name = question.entry_name();
2736
2737                    if let Some(probe) = dns_registry.probing.get_mut(probe_name) {
2738                        let now = current_time_millis();
2739
2740                        // Only do tiebreaking if probe already started.
2741                        // This check also helps avoid redo tiebreaking if start time
2742                        // was postponed.
2743                        if probe.start_time < now {
2744                            let incoming_records: Vec<_> = msg
2745                                .authorities()
2746                                .iter()
2747                                .filter(|r| r.get_name() == probe_name)
2748                                .collect();
2749
2750                            probe.tiebreaking(&incoming_records, now, probe_name);
2751                        }
2752                    }
2753                }
2754
2755                if qtype == RRType::A || qtype == RRType::AAAA || qtype == RRType::ANY {
2756                    for service in self.my_services.values() {
2757                        if service.get_status(if_index) != ServiceStatus::Announced {
2758                            continue;
2759                        }
2760
2761                        let service_hostname =
2762                            match dns_registry.name_changes.get(service.get_hostname()) {
2763                                Some(new_name) => new_name,
2764                                None => service.get_hostname(),
2765                            };
2766
2767                        if service_hostname.to_lowercase() == question.entry_name().to_lowercase() {
2768                            let intf_addrs = if is_ipv4 {
2769                                service.get_addrs_on_my_intf_v4(intf)
2770                            } else {
2771                                service.get_addrs_on_my_intf_v6(intf)
2772                            };
2773                            if intf_addrs.is_empty()
2774                                && (qtype == RRType::A || qtype == RRType::AAAA)
2775                            {
2776                                let t = match qtype {
2777                                    RRType::A => "TYPE_A",
2778                                    RRType::AAAA => "TYPE_AAAA",
2779                                    _ => "invalid_type",
2780                                };
2781                                trace!(
2782                                    "Cannot find valid addrs for {} response on intf {:?}",
2783                                    t,
2784                                    &intf
2785                                );
2786                                return;
2787                            }
2788                            for address in intf_addrs {
2789                                out.add_answer(
2790                                    &msg,
2791                                    DnsAddress::new(
2792                                        service_hostname,
2793                                        ip_address_rr_type(&address),
2794                                        CLASS_IN | CLASS_CACHE_FLUSH,
2795                                        service.get_host_ttl(),
2796                                        address,
2797                                        intf.into(),
2798                                    ),
2799                                );
2800                            }
2801                        }
2802                    }
2803                }
2804
2805                let query_name = question.entry_name().to_lowercase();
2806                let service_opt = self
2807                    .my_services
2808                    .iter()
2809                    .find(|(k, _v)| {
2810                        let service_name = match dns_registry.name_changes.get(k.as_str()) {
2811                            Some(new_name) => new_name,
2812                            None => k,
2813                        };
2814                        service_name == &query_name
2815                    })
2816                    .map(|(_, v)| v);
2817
2818                let Some(service) = service_opt else {
2819                    continue;
2820                };
2821
2822                if service.get_status(if_index) != ServiceStatus::Announced {
2823                    continue;
2824                }
2825
2826                let intf_addrs = if is_ipv4 {
2827                    service.get_addrs_on_my_intf_v4(intf)
2828                } else {
2829                    service.get_addrs_on_my_intf_v6(intf)
2830                };
2831                if intf_addrs.is_empty() {
2832                    debug!(
2833                        "Cannot find valid addrs for TYPE_SRV response on intf {:?}",
2834                        &intf
2835                    );
2836                    continue;
2837                }
2838
2839                add_answer_of_service(
2840                    &mut out,
2841                    &msg,
2842                    question.entry_name(),
2843                    service,
2844                    qtype,
2845                    intf_addrs,
2846                );
2847            }
2848        }
2849
2850        if !out.answers_count() > 0 {
2851            out.set_id(msg.id());
2852            send_dns_outgoing(&out, intf, &sock.pktinfo);
2853
2854            let if_name = intf.name.clone();
2855
2856            self.increase_counter(Counter::Respond, 1);
2857            self.notify_monitors(DaemonEvent::Respond(if_name));
2858        }
2859
2860        self.increase_counter(Counter::KnownAnswerSuppression, out.known_answer_count());
2861    }
2862
2863    /// Increases the value of `counter` by `count`.
2864    fn increase_counter(&mut self, counter: Counter, count: i64) {
2865        let key = counter.to_string();
2866        match self.counters.get_mut(&key) {
2867            Some(v) => *v += count,
2868            None => {
2869                self.counters.insert(key, count);
2870            }
2871        }
2872    }
2873
2874    /// Sets the value of `counter` to `count`.
2875    fn set_counter(&mut self, counter: Counter, count: i64) {
2876        let key = counter.to_string();
2877        self.counters.insert(key, count);
2878    }
2879
2880    fn signal_sock_drain(&self) {
2881        let mut signal_buf = [0; 1024];
2882
2883        // This recv is non-blocking as the socket is non-blocking.
2884        while let Ok(sz) = self.signal_sock.recv(&mut signal_buf) {
2885            trace!(
2886                "signal socket recvd: {}",
2887                String::from_utf8_lossy(&signal_buf[0..sz])
2888            );
2889        }
2890    }
2891
2892    fn add_retransmission(&mut self, next_time: u64, command: Command) {
2893        self.retransmissions.push(ReRun { next_time, command });
2894        self.add_timer(next_time);
2895    }
2896
2897    /// Sends service removal event to listeners for expired service records.
2898    /// `expired`: map of service type domain to set of instance names.
2899    fn notify_service_removal(&self, expired: HashMap<String, HashSet<String>>) {
2900        for (ty_domain, sender) in self.service_queriers.iter() {
2901            if let Some(instances) = expired.get(ty_domain) {
2902                for instance_name in instances {
2903                    let event = ServiceEvent::ServiceRemoved(
2904                        ty_domain.to_string(),
2905                        instance_name.to_string(),
2906                    );
2907                    match sender.send(event) {
2908                        Ok(()) => debug!("notify_service_removal: sent ServiceRemoved to listener of {ty_domain}: {instance_name}"),
2909                        Err(e) => debug!("Failed to send event: {}", e),
2910                    }
2911                }
2912            }
2913        }
2914    }
2915
2916    /// The entry point that executes all commands received by the daemon.
2917    ///
2918    /// `repeating`: whether this is a retransmission.
2919    fn exec_command(&mut self, command: Command, repeating: bool) {
2920        trace!("exec_command: {:?} repeating: {}", &command, repeating);
2921        match command {
2922            Command::Browse(ty, next_delay, listener) => {
2923                self.exec_command_browse(repeating, ty, next_delay, listener);
2924            }
2925
2926            Command::ResolveHostname(hostname, next_delay, listener, timeout) => {
2927                self.exec_command_resolve_hostname(
2928                    repeating, hostname, next_delay, listener, timeout,
2929                );
2930            }
2931
2932            Command::Register(service_info) => {
2933                self.register_service(service_info);
2934                self.increase_counter(Counter::Register, 1);
2935            }
2936
2937            Command::RegisterResend(fullname, intf) => {
2938                trace!("register-resend service: {fullname} on {}", &intf);
2939                self.exec_command_register_resend(fullname, intf);
2940            }
2941
2942            Command::Unregister(fullname, resp_s) => {
2943                trace!("unregister service {} repeat {}", &fullname, &repeating);
2944                self.exec_command_unregister(repeating, fullname, resp_s);
2945            }
2946
2947            Command::UnregisterResend(packet, if_index, is_ipv4) => {
2948                self.exec_command_unregister_resend(packet, if_index, is_ipv4);
2949            }
2950
2951            Command::StopBrowse(ty_domain) => self.exec_command_stop_browse(ty_domain),
2952
2953            Command::StopResolveHostname(hostname) => {
2954                self.exec_command_stop_resolve_hostname(hostname.to_lowercase())
2955            }
2956
2957            Command::Resolve(instance, try_count) => self.exec_command_resolve(instance, try_count),
2958
2959            Command::GetMetrics(resp_s) => self.exec_command_get_metrics(resp_s),
2960
2961            Command::GetStatus(resp_s) => match resp_s.send(self.status.clone()) {
2962                Ok(()) => trace!("Sent status to the client"),
2963                Err(e) => debug!("Failed to send status: {}", e),
2964            },
2965
2966            Command::Monitor(resp_s) => {
2967                self.monitors.push(resp_s);
2968            }
2969
2970            Command::SetOption(daemon_opt) => {
2971                self.process_set_option(daemon_opt);
2972            }
2973
2974            Command::GetOption(resp_s) => {
2975                let val = DaemonOptionVal {
2976                    _service_name_len_max: self.service_name_len_max,
2977                    ip_check_interval: self.ip_check_interval,
2978                };
2979                if let Err(e) = resp_s.send(val) {
2980                    debug!("Failed to send options: {}", e);
2981                }
2982            }
2983
2984            Command::Verify(instance_fullname, timeout) => {
2985                self.exec_command_verify(instance_fullname, timeout, repeating);
2986            }
2987
2988            _ => {
2989                debug!("unexpected command: {:?}", &command);
2990            }
2991        }
2992    }
2993
2994    fn exec_command_get_metrics(&mut self, resp_s: Sender<HashMap<String, i64>>) {
2995        self.set_counter(Counter::CachedPTR, self.cache.ptr_count() as i64);
2996        self.set_counter(Counter::CachedSRV, self.cache.srv_count() as i64);
2997        self.set_counter(Counter::CachedAddr, self.cache.addr_count() as i64);
2998        self.set_counter(Counter::CachedTxt, self.cache.txt_count() as i64);
2999        self.set_counter(Counter::CachedNSec, self.cache.nsec_count() as i64);
3000        self.set_counter(Counter::CachedSubtype, self.cache.subtype_count() as i64);
3001        self.set_counter(Counter::Timer, self.timers.len() as i64);
3002
3003        let dns_registry_probe_count: usize = self
3004            .dns_registry_map
3005            .values()
3006            .map(|r| r.probing.len())
3007            .sum();
3008        self.set_counter(Counter::DnsRegistryProbe, dns_registry_probe_count as i64);
3009
3010        let dns_registry_active_count: usize = self
3011            .dns_registry_map
3012            .values()
3013            .map(|r| r.active.values().map(|a| a.len()).sum::<usize>())
3014            .sum();
3015        self.set_counter(Counter::DnsRegistryActive, dns_registry_active_count as i64);
3016
3017        let dns_registry_timer_count: usize = self
3018            .dns_registry_map
3019            .values()
3020            .map(|r| r.new_timers.len())
3021            .sum();
3022        self.set_counter(Counter::DnsRegistryTimer, dns_registry_timer_count as i64);
3023
3024        let dns_registry_name_change_count: usize = self
3025            .dns_registry_map
3026            .values()
3027            .map(|r| r.name_changes.len())
3028            .sum();
3029        self.set_counter(
3030            Counter::DnsRegistryNameChange,
3031            dns_registry_name_change_count as i64,
3032        );
3033
3034        // Send the metrics to the client.
3035        if let Err(e) = resp_s.send(self.counters.clone()) {
3036            debug!("Failed to send metrics: {}", e);
3037        }
3038    }
3039
3040    fn exec_command_browse(
3041        &mut self,
3042        repeating: bool,
3043        ty: String,
3044        next_delay: u32,
3045        listener: Sender<ServiceEvent>,
3046    ) {
3047        let pretty_addrs: Vec<String> = self
3048            .my_intfs
3049            .iter()
3050            .map(|(if_index, itf)| format!("{} ({if_index})", itf.name))
3051            .collect();
3052
3053        if let Err(e) = listener.send(ServiceEvent::SearchStarted(format!(
3054            "{ty} on {} interfaces [{}]",
3055            pretty_addrs.len(),
3056            pretty_addrs.join(", ")
3057        ))) {
3058            debug!(
3059                "Failed to send SearchStarted({})(repeating:{}): {}",
3060                &ty, repeating, e
3061            );
3062            return;
3063        }
3064
3065        let now = current_time_millis();
3066        if !repeating {
3067            // Binds a `listener` to querying mDNS domain type `ty`.
3068            //
3069            // If there is already a `listener`, it will be updated, i.e. overwritten.
3070            self.service_queriers.insert(ty.clone(), listener.clone());
3071
3072            // if we already have the records in our cache, just send them
3073            self.query_cache_for_service(&ty, &listener, now);
3074        }
3075
3076        self.send_query(&ty, RRType::PTR);
3077        self.increase_counter(Counter::Browse, 1);
3078
3079        let next_time = now + (next_delay * 1000) as u64;
3080        let max_delay = 60 * 60;
3081        let delay = cmp::min(next_delay * 2, max_delay);
3082        self.add_retransmission(next_time, Command::Browse(ty, delay, listener));
3083    }
3084
3085    fn exec_command_resolve_hostname(
3086        &mut self,
3087        repeating: bool,
3088        hostname: String,
3089        next_delay: u32,
3090        listener: Sender<HostnameResolutionEvent>,
3091        timeout: Option<u64>,
3092    ) {
3093        let addr_list: Vec<_> = self.my_intfs.iter().collect();
3094        if let Err(e) = listener.send(HostnameResolutionEvent::SearchStarted(format!(
3095            "{} on addrs {:?}",
3096            &hostname, &addr_list
3097        ))) {
3098            debug!(
3099                "Failed to send ResolveStarted({})(repeating:{}): {}",
3100                &hostname, repeating, e
3101            );
3102            return;
3103        }
3104        if !repeating {
3105            self.add_hostname_resolver(hostname.to_owned(), listener.clone(), timeout);
3106            // if we already have the records in our cache, just send them
3107            self.query_cache_for_hostname(&hostname, listener.clone());
3108        }
3109
3110        self.send_query_vec(&[(&hostname, RRType::A), (&hostname, RRType::AAAA)]);
3111        self.increase_counter(Counter::ResolveHostname, 1);
3112
3113        let now = current_time_millis();
3114        let next_time = now + u64::from(next_delay) * 1000;
3115        let max_delay = 60 * 60;
3116        let delay = cmp::min(next_delay * 2, max_delay);
3117
3118        // Only add retransmission if it does not exceed the hostname resolver timeout, if any.
3119        if self
3120            .hostname_resolvers
3121            .get(&hostname)
3122            .and_then(|(_sender, timeout)| *timeout)
3123            .map(|timeout| next_time < timeout)
3124            .unwrap_or(true)
3125        {
3126            self.add_retransmission(
3127                next_time,
3128                Command::ResolveHostname(hostname, delay, listener, None),
3129            );
3130        }
3131    }
3132
3133    fn exec_command_resolve(&mut self, instance: String, try_count: u16) {
3134        let pending_query = self.query_unresolved(&instance);
3135        let max_try = 3;
3136        if pending_query && try_count < max_try {
3137            // Note that if the current try already succeeds, the next retransmission
3138            // will be no-op as the cache has been updated.
3139            let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
3140            self.add_retransmission(next_time, Command::Resolve(instance, try_count + 1));
3141        }
3142    }
3143
3144    fn exec_command_unregister(
3145        &mut self,
3146        repeating: bool,
3147        fullname: String,
3148        resp_s: Sender<UnregisterStatus>,
3149    ) {
3150        let response = match self.my_services.remove_entry(&fullname) {
3151            None => {
3152                debug!("unregister: cannot find such service {}", &fullname);
3153                UnregisterStatus::NotFound
3154            }
3155            Some((_k, info)) => {
3156                let mut timers = Vec::new();
3157
3158                for (if_index, intf) in self.my_intfs.iter() {
3159                    let packet = self.unregister_service(&info, intf, &self.ipv4_sock.pktinfo);
3160                    // repeat for one time just in case some peers miss the message
3161                    if !repeating && !packet.is_empty() {
3162                        let next_time = current_time_millis() + 120;
3163                        self.retransmissions.push(ReRun {
3164                            next_time,
3165                            command: Command::UnregisterResend(packet, *if_index, true),
3166                        });
3167                        timers.push(next_time);
3168                    }
3169
3170                    // ipv6
3171
3172                    let packet = self.unregister_service(&info, intf, &self.ipv6_sock.pktinfo);
3173                    if !repeating && !packet.is_empty() {
3174                        let next_time = current_time_millis() + 120;
3175                        self.retransmissions.push(ReRun {
3176                            next_time,
3177                            command: Command::UnregisterResend(packet, *if_index, false),
3178                        });
3179                        timers.push(next_time);
3180                    }
3181                }
3182
3183                for t in timers {
3184                    self.add_timer(t);
3185                }
3186
3187                self.increase_counter(Counter::Unregister, 1);
3188                UnregisterStatus::OK
3189            }
3190        };
3191        if let Err(e) = resp_s.send(response) {
3192            debug!("unregister: failed to send response: {}", e);
3193        }
3194    }
3195
3196    fn exec_command_unregister_resend(&mut self, packet: Vec<u8>, if_index: u32, is_ipv4: bool) {
3197        let Some(intf) = self.my_intfs.get(&if_index) else {
3198            return;
3199        };
3200        let sock = if is_ipv4 {
3201            &self.ipv4_sock.pktinfo
3202        } else {
3203            &self.ipv6_sock.pktinfo
3204        };
3205
3206        let if_addr = if is_ipv4 {
3207            match intf.next_ifaddr_v4() {
3208                Some(addr) => addr,
3209                None => return,
3210            }
3211        } else {
3212            match intf.next_ifaddr_v6() {
3213                Some(addr) => addr,
3214                None => return,
3215            }
3216        };
3217
3218        debug!("UnregisterResend from {:?}", if_addr);
3219        multicast_on_intf(&packet[..], &intf.name, intf.index, if_addr, sock);
3220
3221        self.increase_counter(Counter::UnregisterResend, 1);
3222    }
3223
3224    fn exec_command_stop_browse(&mut self, ty_domain: String) {
3225        match self.service_queriers.remove_entry(&ty_domain) {
3226            None => debug!("StopBrowse: cannot find querier for {}", &ty_domain),
3227            Some((ty, sender)) => {
3228                // Remove pending browse commands in the reruns.
3229                trace!("StopBrowse: removed queryer for {}", &ty);
3230                let mut i = 0;
3231                while i < self.retransmissions.len() {
3232                    if let Command::Browse(t, _, _) = &self.retransmissions[i].command {
3233                        if t == &ty {
3234                            self.retransmissions.remove(i);
3235                            trace!("StopBrowse: removed retransmission for {}", &ty);
3236                            continue;
3237                        }
3238                    }
3239                    i += 1;
3240                }
3241
3242                // Remove cache entries.
3243                self.cache.remove_service_type(&ty_domain);
3244
3245                // Notify the client.
3246                match sender.send(ServiceEvent::SearchStopped(ty_domain)) {
3247                    Ok(()) => trace!("Sent SearchStopped to the listener"),
3248                    Err(e) => debug!("Failed to send SearchStopped: {}", e),
3249                }
3250            }
3251        }
3252    }
3253
3254    fn exec_command_stop_resolve_hostname(&mut self, hostname: String) {
3255        if let Some((host, (sender, _timeout))) = self.hostname_resolvers.remove_entry(&hostname) {
3256            // Remove pending resolve commands in the reruns.
3257            trace!("StopResolve: removed queryer for {}", &host);
3258            let mut i = 0;
3259            while i < self.retransmissions.len() {
3260                if let Command::Resolve(t, _) = &self.retransmissions[i].command {
3261                    if t == &host {
3262                        self.retransmissions.remove(i);
3263                        trace!("StopResolve: removed retransmission for {}", &host);
3264                        continue;
3265                    }
3266                }
3267                i += 1;
3268            }
3269
3270            // Notify the client.
3271            match sender.send(HostnameResolutionEvent::SearchStopped(hostname)) {
3272                Ok(()) => trace!("Sent SearchStopped to the listener"),
3273                Err(e) => debug!("Failed to send SearchStopped: {}", e),
3274            }
3275        }
3276    }
3277
3278    fn exec_command_register_resend(&mut self, fullname: String, if_index: u32) {
3279        let Some(info) = self.my_services.get_mut(&fullname) else {
3280            trace!("announce: cannot find such service {}", &fullname);
3281            return;
3282        };
3283
3284        let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
3285            return;
3286        };
3287
3288        let Some(intf) = self.my_intfs.get(&if_index) else {
3289            return;
3290        };
3291
3292        let announced_v4 =
3293            announce_service_on_intf(dns_registry, info, intf, &self.ipv4_sock.pktinfo);
3294        let announced_v6 =
3295            announce_service_on_intf(dns_registry, info, intf, &self.ipv6_sock.pktinfo);
3296
3297        if announced_v4 || announced_v6 {
3298            let mut hostname = info.get_hostname();
3299            if let Some(new_name) = dns_registry.name_changes.get(hostname) {
3300                hostname = new_name;
3301            }
3302            let service_name = match dns_registry.name_changes.get(&fullname) {
3303                Some(new_name) => new_name.to_string(),
3304                None => fullname,
3305            };
3306
3307            debug!("resend: announce service {service_name} on {}", intf.name);
3308
3309            notify_monitors(
3310                &mut self.monitors,
3311                DaemonEvent::Announce(service_name, format!("{}:{}", hostname, &intf.name)),
3312            );
3313            info.set_status(if_index, ServiceStatus::Announced);
3314        } else {
3315            debug!("register-resend should not fail");
3316        }
3317
3318        self.increase_counter(Counter::RegisterResend, 1);
3319    }
3320
3321    fn exec_command_verify(&mut self, instance: String, timeout: Duration, repeating: bool) {
3322        /*
3323        RFC 6762 section 10.4:
3324        ...
3325        When the cache receives this hint that it should reconfirm some
3326        record, it MUST issue two or more queries for the resource record in
3327        dispute.  If no response is received within ten seconds, then, even
3328        though its TTL may indicate that it is not yet due to expire, that
3329        record SHOULD be promptly flushed from the cache.
3330        */
3331        let now = current_time_millis();
3332        let expire_at = if repeating {
3333            None
3334        } else {
3335            Some(now + timeout.as_millis() as u64)
3336        };
3337
3338        // send query for the resource records.
3339        let record_vec = self.cache.service_verify_queries(&instance, expire_at);
3340
3341        if !record_vec.is_empty() {
3342            let query_vec: Vec<(&str, RRType)> = record_vec
3343                .iter()
3344                .map(|(record, rr_type)| (record.as_str(), *rr_type))
3345                .collect();
3346            self.send_query_vec(&query_vec);
3347
3348            if let Some(new_expire) = expire_at {
3349                self.add_timer(new_expire); // ensure a check for the new expire time.
3350
3351                // schedule a resend 1 second later
3352                self.add_retransmission(now + 1000, Command::Verify(instance, timeout));
3353            }
3354        }
3355    }
3356
3357    /// Refresh cached service records with active queriers
3358    fn refresh_active_services(&mut self) {
3359        let mut query_ptr_count = 0;
3360        let mut query_srv_count = 0;
3361        let mut new_timers = HashSet::new();
3362        let mut query_addr_count = 0;
3363
3364        for (ty_domain, _sender) in self.service_queriers.iter() {
3365            let refreshed_timers = self.cache.refresh_due_ptr(ty_domain);
3366            if !refreshed_timers.is_empty() {
3367                trace!("sending refresh query for PTR: {}", ty_domain);
3368                self.send_query(ty_domain, RRType::PTR);
3369                query_ptr_count += 1;
3370                new_timers.extend(refreshed_timers);
3371            }
3372
3373            let (instances, timers) = self.cache.refresh_due_srv_txt(ty_domain);
3374            for (instance, types) in instances {
3375                trace!("sending refresh query for: {}", &instance);
3376                let query_vec = types
3377                    .into_iter()
3378                    .map(|ty| (instance.as_str(), ty))
3379                    .collect::<Vec<_>>();
3380                self.send_query_vec(&query_vec);
3381                query_srv_count += 1;
3382            }
3383            new_timers.extend(timers);
3384            let (hostnames, timers) = self.cache.refresh_due_hosts(ty_domain);
3385            for hostname in hostnames.iter() {
3386                trace!("sending refresh queries for A and AAAA:  {}", hostname);
3387                self.send_query_vec(&[(hostname, RRType::A), (hostname, RRType::AAAA)]);
3388                query_addr_count += 2;
3389            }
3390            new_timers.extend(timers);
3391        }
3392
3393        for timer in new_timers {
3394            self.add_timer(timer);
3395        }
3396
3397        self.increase_counter(Counter::CacheRefreshPTR, query_ptr_count);
3398        self.increase_counter(Counter::CacheRefreshSrvTxt, query_srv_count);
3399        self.increase_counter(Counter::CacheRefreshAddr, query_addr_count);
3400    }
3401}
3402
3403/// Adds one or more answers of a service for incoming msg and RR entry name.
3404fn add_answer_of_service(
3405    out: &mut DnsOutgoing,
3406    msg: &DnsIncoming,
3407    entry_name: &str,
3408    service: &ServiceInfo,
3409    qtype: RRType,
3410    intf_addrs: Vec<IpAddr>,
3411) {
3412    if qtype == RRType::SRV || qtype == RRType::ANY {
3413        out.add_answer(
3414            msg,
3415            DnsSrv::new(
3416                entry_name,
3417                CLASS_IN | CLASS_CACHE_FLUSH,
3418                service.get_host_ttl(),
3419                service.get_priority(),
3420                service.get_weight(),
3421                service.get_port(),
3422                service.get_hostname().to_string(),
3423            ),
3424        );
3425    }
3426
3427    if qtype == RRType::TXT || qtype == RRType::ANY {
3428        out.add_answer(
3429            msg,
3430            DnsTxt::new(
3431                entry_name,
3432                CLASS_IN | CLASS_CACHE_FLUSH,
3433                service.get_other_ttl(),
3434                service.generate_txt(),
3435            ),
3436        );
3437    }
3438
3439    if qtype == RRType::SRV {
3440        for address in intf_addrs {
3441            out.add_additional_answer(DnsAddress::new(
3442                service.get_hostname(),
3443                ip_address_rr_type(&address),
3444                CLASS_IN | CLASS_CACHE_FLUSH,
3445                service.get_host_ttl(),
3446                address,
3447                InterfaceId::default(),
3448            ));
3449        }
3450    }
3451}
3452
3453/// All possible events sent to the client from the daemon
3454/// regarding service discovery.
3455#[derive(Clone, Debug)]
3456#[non_exhaustive]
3457pub enum ServiceEvent {
3458    /// Started searching for a service type.
3459    SearchStarted(String),
3460
3461    /// Found a specific (service_type, fullname).
3462    ServiceFound(String, String),
3463
3464    /// Resolved a service instance with detailed info.
3465    ///
3466    /// Will be deprecated in the future by [ServiceEvent::ServiceData].
3467    ServiceResolved(ServiceInfo),
3468
3469    /// Resolved a service instance in a ResolvedService struct.
3470    /// Must call [ServiceDaemon::use_service_data] to receive this event.
3471    /// Since v0.14.0, this is preferred over [ServiceEvent::ServiceResolved].
3472    ServiceData(Box<ResolvedService>),
3473
3474    /// A service instance (service_type, fullname) was removed.
3475    ServiceRemoved(String, String),
3476
3477    /// Stopped searching for a service type.
3478    SearchStopped(String),
3479}
3480
3481/// All possible events sent to the client from the daemon
3482/// regarding host resolution.
3483#[derive(Clone, Debug)]
3484#[non_exhaustive]
3485pub enum HostnameResolutionEvent {
3486    /// Started searching for the ip address of a hostname.
3487    SearchStarted(String),
3488    /// One or more addresses for a hostname has been found.
3489    AddressesFound(String, HashSet<ScopedIp>),
3490    /// One or more addresses for a hostname has been removed.
3491    AddressesRemoved(String, HashSet<ScopedIp>),
3492    /// The search for the ip address of a hostname has timed out.
3493    SearchTimeout(String),
3494    /// Stopped searching for the ip address of a hostname.
3495    SearchStopped(String),
3496}
3497
3498/// Some notable events from the daemon besides [`ServiceEvent`].
3499/// These events are expected to happen infrequently.
3500#[derive(Clone, Debug)]
3501#[non_exhaustive]
3502pub enum DaemonEvent {
3503    /// Daemon unsolicitly announced a service from an interface.
3504    Announce(String, String),
3505
3506    /// Daemon encountered an error.
3507    Error(Error),
3508
3509    /// Daemon detected a new IP address from the host.
3510    IpAdd(IpAddr),
3511
3512    /// Daemon detected a IP address removed from the host.
3513    IpDel(IpAddr),
3514
3515    /// Daemon resolved a name conflict by changing one of its names.
3516    /// see [DnsNameChange] for more details.
3517    NameChange(DnsNameChange),
3518
3519    /// Send out a multicast response via an interface.
3520    Respond(String),
3521}
3522
3523/// Represents a name change due to a name conflict resolution.
3524/// See [RFC 6762 section 9](https://datatracker.ietf.org/doc/html/rfc6762#section-9)
3525#[derive(Clone, Debug)]
3526pub struct DnsNameChange {
3527    /// The original name set in `ServiceInfo` by the user.
3528    pub original: String,
3529
3530    /// A new name is created by appending a suffix after the original name.
3531    ///
3532    /// - for a service instance name, the suffix is `(N)`, where N starts at 2.
3533    /// - for a host name, the suffix is `-N`, where N starts at 2.
3534    ///
3535    /// For example:
3536    ///
3537    /// - Service name `foo._service-type._udp` becomes `foo (2)._service-type._udp`
3538    /// - Host name `foo.local.` becomes `foo-2.local.`
3539    pub new_name: String,
3540
3541    /// The resource record type
3542    pub rr_type: RRType,
3543
3544    /// The interface where the name conflict and its change happened.
3545    pub intf_name: String,
3546}
3547
3548/// Commands supported by the daemon
3549#[derive(Debug)]
3550enum Command {
3551    /// Browsing for a service type (ty_domain, next_time_delay_in_seconds, channel::sender)
3552    Browse(String, u32, Sender<ServiceEvent>),
3553
3554    /// Resolve a hostname to IP addresses.
3555    ResolveHostname(String, u32, Sender<HostnameResolutionEvent>, Option<u64>), // (hostname, next_time_delay_in_seconds, sender, timeout_in_milliseconds)
3556
3557    /// Register a service
3558    Register(ServiceInfo),
3559
3560    /// Unregister a service
3561    Unregister(String, Sender<UnregisterStatus>), // (fullname)
3562
3563    /// Announce again a service to local network
3564    RegisterResend(String, u32), // (fullname)
3565
3566    /// Resend unregister packet.
3567    UnregisterResend(Vec<u8>, u32, bool), // (packet content, if_index, is_ipv4)
3568
3569    /// Stop browsing a service type
3570    StopBrowse(String), // (ty_domain)
3571
3572    /// Stop resolving a hostname
3573    StopResolveHostname(String), // (hostname)
3574
3575    /// Send query to resolve a service instance.
3576    /// This is used when a PTR record exists but SRV & TXT records are missing.
3577    Resolve(String, u16), // (service_instance_fullname, try_count)
3578
3579    /// Read the current values of the counters
3580    GetMetrics(Sender<Metrics>),
3581
3582    /// Get the current status of the daemon.
3583    GetStatus(Sender<DaemonStatus>),
3584
3585    /// Monitor noticeable events in the daemon.
3586    Monitor(Sender<DaemonEvent>),
3587
3588    SetOption(DaemonOption),
3589
3590    GetOption(Sender<DaemonOptionVal>),
3591
3592    /// Proactively confirm a DNS resource record.
3593    ///
3594    /// The intention is to check if a service name or IP address still valid
3595    /// before its TTL expires.
3596    Verify(String, Duration),
3597
3598    Exit(Sender<DaemonStatus>),
3599}
3600
3601impl fmt::Display for Command {
3602    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3603        match self {
3604            Self::Browse(_, _, _) => write!(f, "Command Browse"),
3605            Self::ResolveHostname(_, _, _, _) => write!(f, "Command ResolveHostname"),
3606            Self::Exit(_) => write!(f, "Command Exit"),
3607            Self::GetStatus(_) => write!(f, "Command GetStatus"),
3608            Self::GetMetrics(_) => write!(f, "Command GetMetrics"),
3609            Self::Monitor(_) => write!(f, "Command Monitor"),
3610            Self::Register(_) => write!(f, "Command Register"),
3611            Self::RegisterResend(_, _) => write!(f, "Command RegisterResend"),
3612            Self::SetOption(_) => write!(f, "Command SetOption"),
3613            Self::GetOption(_) => write!(f, "Command GetOption"),
3614            Self::StopBrowse(_) => write!(f, "Command StopBrowse"),
3615            Self::StopResolveHostname(_) => write!(f, "Command StopResolveHostname"),
3616            Self::Unregister(_, _) => write!(f, "Command Unregister"),
3617            Self::UnregisterResend(_, _, _) => write!(f, "Command UnregisterResend"),
3618            Self::Resolve(_, _) => write!(f, "Command Resolve"),
3619            Self::Verify(_, _) => write!(f, "Command VerifyResource"),
3620        }
3621    }
3622}
3623
3624struct DaemonOptionVal {
3625    _service_name_len_max: u8,
3626    ip_check_interval: u64,
3627}
3628
3629#[derive(Debug)]
3630enum DaemonOption {
3631    ServiceNameLenMax(u8),
3632    IpCheckInterval(u64),
3633    EnableInterface(Vec<IfKind>),
3634    DisableInterface(Vec<IfKind>),
3635    MulticastLoopV4(bool),
3636    MulticastLoopV6(bool),
3637    UseServiceData(bool),
3638    #[cfg(test)]
3639    TestDownInterface(String),
3640    #[cfg(test)]
3641    TestUpInterface(String),
3642}
3643
3644/// The length of Service Domain name supported in this lib.
3645const DOMAIN_LEN: usize = "._tcp.local.".len();
3646
3647/// Validate the length of "service_name" in a "_<service_name>.<domain_name>." string.
3648fn check_service_name_length(ty_domain: &str, limit: u8) -> Result<()> {
3649    if ty_domain.len() <= DOMAIN_LEN + 1 {
3650        // service name cannot be empty or only '_'.
3651        return Err(e_fmt!("Service type name cannot be empty: {}", ty_domain));
3652    }
3653
3654    let service_name_len = ty_domain.len() - DOMAIN_LEN - 1; // exclude the leading `_`
3655    if service_name_len > limit as usize {
3656        return Err(e_fmt!("Service name length must be <= {} bytes", limit));
3657    }
3658    Ok(())
3659}
3660
3661/// Checks if `name` ends with a valid domain: '._tcp.local.' or '._udp.local.'
3662fn check_domain_suffix(name: &str) -> Result<()> {
3663    if !(name.ends_with("._tcp.local.") || name.ends_with("._udp.local.")) {
3664        return Err(e_fmt!(
3665            "mDNS service {} must end with '._tcp.local.' or '._udp.local.'",
3666            name
3667        ));
3668    }
3669
3670    Ok(())
3671}
3672
3673/// Validate the service name in a fully qualified name.
3674///
3675/// A Full Name = <Instance>.<Service>.<Domain>
3676/// The only `<Domain>` supported are "._tcp.local." and "._udp.local.".
3677///
3678/// Note: this function does not check for the length of the service name.
3679/// Instead, `register_service` method will check the length.
3680fn check_service_name(fullname: &str) -> Result<()> {
3681    check_domain_suffix(fullname)?;
3682
3683    let remaining: Vec<&str> = fullname[..fullname.len() - DOMAIN_LEN].split('.').collect();
3684    let name = remaining.last().ok_or_else(|| e_fmt!("No service name"))?;
3685
3686    if &name[0..1] != "_" {
3687        return Err(e_fmt!("Service name must start with '_'"));
3688    }
3689
3690    let name = &name[1..];
3691
3692    if name.contains("--") {
3693        return Err(e_fmt!("Service name must not contain '--'"));
3694    }
3695
3696    if name.starts_with('-') || name.ends_with('-') {
3697        return Err(e_fmt!("Service name (%s) may not start or end with '-'"));
3698    }
3699
3700    let ascii_count = name.chars().filter(|c| c.is_ascii_alphabetic()).count();
3701    if ascii_count < 1 {
3702        return Err(e_fmt!(
3703            "Service name must contain at least one letter (eg: 'A-Za-z')"
3704        ));
3705    }
3706
3707    Ok(())
3708}
3709
3710/// Validate a hostname.
3711fn check_hostname(hostname: &str) -> Result<()> {
3712    if !hostname.ends_with(".local.") {
3713        return Err(e_fmt!("Hostname must end with '.local.': {hostname}"));
3714    }
3715
3716    if hostname == ".local." {
3717        return Err(e_fmt!(
3718            "The part of the hostname before '.local.' cannot be empty"
3719        ));
3720    }
3721
3722    if hostname.len() > 255 {
3723        return Err(e_fmt!("Hostname length must be <= 255 bytes"));
3724    }
3725
3726    Ok(())
3727}
3728
3729fn call_service_listener(
3730    listeners_map: &HashMap<String, Sender<ServiceEvent>>,
3731    ty_domain: &str,
3732    event: ServiceEvent,
3733) {
3734    if let Some(listener) = listeners_map.get(ty_domain) {
3735        match listener.send(event) {
3736            Ok(()) => trace!("Sent event to listener successfully"),
3737            Err(e) => debug!("Failed to send event: {}", e),
3738        }
3739    }
3740}
3741
3742fn call_hostname_resolution_listener(
3743    listeners_map: &HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>,
3744    hostname: &str,
3745    event: HostnameResolutionEvent,
3746) {
3747    let hostname_lower = hostname.to_lowercase();
3748    if let Some(listener) = listeners_map.get(&hostname_lower).map(|(l, _)| l) {
3749        match listener.send(event) {
3750            Ok(()) => trace!("Sent event to listener successfully"),
3751            Err(e) => debug!("Failed to send event: {}", e),
3752        }
3753    }
3754}
3755
3756/// Returns valid network interfaces in the host system.
3757/// Loopback interfaces are excluded. Operational down interfaces are excluded as well.
3758fn my_ip_interfaces(with_loopback: bool) -> Vec<Interface> {
3759    if_addrs::get_if_addrs()
3760        .unwrap_or_default()
3761        .into_iter()
3762        .filter(|i| i.is_oper_up() && (!i.is_loopback() || with_loopback))
3763        .collect()
3764}
3765
3766fn send_dns_outgoing(out: &DnsOutgoing, my_intf: &MyIntf, sock: &PktInfoUdpSocket) -> Vec<Vec<u8>> {
3767    let if_name = &my_intf.name;
3768
3769    let if_addr = if sock.domain() == Domain::IPV4 {
3770        match my_intf.next_ifaddr_v4() {
3771            Some(addr) => addr,
3772            None => return vec![],
3773        }
3774    } else {
3775        match my_intf.next_ifaddr_v6() {
3776            Some(addr) => addr,
3777            None => return vec![],
3778        }
3779    };
3780
3781    send_dns_outgoing_impl(out, if_name, my_intf.index, if_addr, sock)
3782}
3783
3784/// Send an outgoing mDNS query or response, and returns the packet bytes.
3785fn send_dns_outgoing_impl(
3786    out: &DnsOutgoing,
3787    if_name: &str,
3788    if_index: u32,
3789    if_addr: &IfAddr,
3790    sock: &PktInfoUdpSocket,
3791) -> Vec<Vec<u8>> {
3792    let qtype = if out.is_query() {
3793        "query"
3794    } else {
3795        if out.answers_count() == 0 && out.additionals().is_empty() {
3796            return vec![]; // no need to send empty response
3797        }
3798        "response"
3799    };
3800    trace!(
3801        "send {}: {} questions {} answers {} authorities {} additional",
3802        qtype,
3803        out.questions().len(),
3804        out.answers_count(),
3805        out.authorities().len(),
3806        out.additionals().len()
3807    );
3808
3809    match if_addr.ip() {
3810        IpAddr::V4(ipv4) => {
3811            if let Err(e) = sock.set_multicast_if_v4(&ipv4) {
3812                debug!(
3813                    "send_dns_outgoing: failed to set multicast interface for IPv4 {}: {}",
3814                    ipv4, e
3815                );
3816                return vec![]; // cannot send without a valid interface
3817            }
3818        }
3819        IpAddr::V6(ipv6) => {
3820            if let Err(e) = sock.set_multicast_if_v6(if_index) {
3821                debug!(
3822                    "send_dns_outgoing: failed to set multicast interface for IPv6 {}: {}",
3823                    ipv6, e
3824                );
3825                return vec![]; // cannot send without a valid interface
3826            }
3827        }
3828    }
3829
3830    let packet_list = out.to_data_on_wire();
3831    for packet in packet_list.iter() {
3832        multicast_on_intf(packet, if_name, if_index, if_addr, sock);
3833    }
3834    packet_list
3835}
3836
3837/// Sends a multicast packet, and returns the packet bytes.
3838fn multicast_on_intf(
3839    packet: &[u8],
3840    if_name: &str,
3841    if_index: u32,
3842    if_addr: &IfAddr,
3843    socket: &PktInfoUdpSocket,
3844) {
3845    if packet.len() > MAX_MSG_ABSOLUTE {
3846        debug!("Drop over-sized packet ({})", packet.len());
3847        return;
3848    }
3849
3850    let addr: SocketAddr = match if_addr {
3851        if_addrs::IfAddr::V4(_) => SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into(),
3852        if_addrs::IfAddr::V6(_) => {
3853            let mut sock = SocketAddrV6::new(GROUP_ADDR_V6, MDNS_PORT, 0, 0);
3854            sock.set_scope_id(if_index); // Choose iface for multicast
3855            sock.into()
3856        }
3857    };
3858
3859    // Sends out `packet` to `addr` on the socket.
3860    let sock_addr = addr.into();
3861    match socket.send_to(packet, &sock_addr) {
3862        Ok(sz) => trace!(
3863            "sent out {} bytes on interface {} (idx {}) addr {}",
3864            sz,
3865            if_name,
3866            if_index,
3867            if_addr.ip()
3868        ),
3869        Err(e) => trace!("Failed to send to {} via {:?}: {}", addr, &if_name, e),
3870    }
3871}
3872
3873/// Returns true if `name` is a valid instance name of format:
3874/// <instance>.<service_type>.<_udp|_tcp>.local.
3875/// Note: <instance> could contain '.' as well.
3876fn valid_instance_name(name: &str) -> bool {
3877    name.split('.').count() >= 5
3878}
3879
3880fn notify_monitors(monitors: &mut Vec<Sender<DaemonEvent>>, event: DaemonEvent) {
3881    monitors.retain(|sender| {
3882        if let Err(e) = sender.try_send(event.clone()) {
3883            debug!("notify_monitors: try_send: {}", &e);
3884            if matches!(e, TrySendError::Disconnected(_)) {
3885                return false; // This monitor is dropped.
3886            }
3887        }
3888        true
3889    });
3890}
3891
3892/// Check if all unique records passed "probing", and if yes, create a packet
3893/// to announce the service.
3894fn prepare_announce(
3895    info: &ServiceInfo,
3896    intf: &MyIntf,
3897    dns_registry: &mut DnsRegistry,
3898    is_ipv4: bool,
3899) -> Option<DnsOutgoing> {
3900    let intf_addrs = if is_ipv4 {
3901        info.get_addrs_on_my_intf_v4(intf)
3902    } else {
3903        info.get_addrs_on_my_intf_v6(intf)
3904    };
3905
3906    if intf_addrs.is_empty() {
3907        debug!(
3908            "prepare_announce: no valid addrs on interface {}",
3909            &intf.name
3910        );
3911        return None;
3912    }
3913
3914    // check if we changed our name due to conflicts.
3915    let service_fullname = match dns_registry.name_changes.get(info.get_fullname()) {
3916        Some(new_name) => new_name,
3917        None => info.get_fullname(),
3918    };
3919
3920    debug!(
3921        "prepare to announce service {service_fullname} on {:?}",
3922        &intf_addrs
3923    );
3924
3925    let mut probing_count = 0;
3926    let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
3927    let create_time = current_time_millis() + fastrand::u64(0..250);
3928
3929    out.add_answer_at_time(
3930        DnsPointer::new(
3931            info.get_type(),
3932            RRType::PTR,
3933            CLASS_IN,
3934            info.get_other_ttl(),
3935            service_fullname.to_string(),
3936        ),
3937        0,
3938    );
3939
3940    if let Some(sub) = info.get_subtype() {
3941        trace!("Adding subdomain {}", sub);
3942        out.add_answer_at_time(
3943            DnsPointer::new(
3944                sub,
3945                RRType::PTR,
3946                CLASS_IN,
3947                info.get_other_ttl(),
3948                service_fullname.to_string(),
3949            ),
3950            0,
3951        );
3952    }
3953
3954    // SRV records.
3955    let hostname = match dns_registry.name_changes.get(info.get_hostname()) {
3956        Some(new_name) => new_name.to_string(),
3957        None => info.get_hostname().to_string(),
3958    };
3959
3960    let mut srv = DnsSrv::new(
3961        info.get_fullname(),
3962        CLASS_IN | CLASS_CACHE_FLUSH,
3963        info.get_host_ttl(),
3964        info.get_priority(),
3965        info.get_weight(),
3966        info.get_port(),
3967        hostname,
3968    );
3969
3970    if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
3971        srv.get_record_mut().set_new_name(new_name.to_string());
3972    }
3973
3974    if !info.requires_probe()
3975        || dns_registry.is_probing_done(&srv, info.get_fullname(), create_time)
3976    {
3977        out.add_answer_at_time(srv, 0);
3978    } else {
3979        probing_count += 1;
3980    }
3981
3982    // TXT records.
3983
3984    let mut txt = DnsTxt::new(
3985        info.get_fullname(),
3986        CLASS_IN | CLASS_CACHE_FLUSH,
3987        info.get_other_ttl(),
3988        info.generate_txt(),
3989    );
3990
3991    if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
3992        txt.get_record_mut().set_new_name(new_name.to_string());
3993    }
3994
3995    if !info.requires_probe()
3996        || dns_registry.is_probing_done(&txt, info.get_fullname(), create_time)
3997    {
3998        out.add_answer_at_time(txt, 0);
3999    } else {
4000        probing_count += 1;
4001    }
4002
4003    // Address records. (A and AAAA)
4004
4005    let hostname = info.get_hostname();
4006    for address in intf_addrs {
4007        let mut dns_addr = DnsAddress::new(
4008            hostname,
4009            ip_address_rr_type(&address),
4010            CLASS_IN | CLASS_CACHE_FLUSH,
4011            info.get_host_ttl(),
4012            address,
4013            intf.into(),
4014        );
4015
4016        if let Some(new_name) = dns_registry.name_changes.get(hostname) {
4017            dns_addr.get_record_mut().set_new_name(new_name.to_string());
4018        }
4019
4020        if !info.requires_probe()
4021            || dns_registry.is_probing_done(&dns_addr, info.get_fullname(), create_time)
4022        {
4023            out.add_answer_at_time(dns_addr, 0);
4024        } else {
4025            probing_count += 1;
4026        }
4027    }
4028
4029    if probing_count > 0 {
4030        return None;
4031    }
4032
4033    Some(out)
4034}
4035
4036/// Send an unsolicited response for owned service via `intf` and `sock`.
4037/// Returns true if sent out successfully for IPv4 or IPv6.
4038fn announce_service_on_intf(
4039    dns_registry: &mut DnsRegistry,
4040    info: &ServiceInfo,
4041    intf: &MyIntf,
4042    sock: &PktInfoUdpSocket,
4043) -> bool {
4044    let is_ipv4 = sock.domain() == Domain::IPV4;
4045    if let Some(out) = prepare_announce(info, intf, dns_registry, is_ipv4) {
4046        send_dns_outgoing(&out, intf, sock);
4047        return true;
4048    }
4049
4050    false
4051}
4052
4053/// Returns a new name based on the `original` to avoid conflicts.
4054/// If the name already contains a number in parentheses, increments that number.
4055///
4056/// Examples:
4057/// - `foo.local.` becomes `foo (2).local.`
4058/// - `foo (2).local.` becomes `foo (3).local.`
4059/// - `foo (9)` becomes `foo (10)`
4060fn name_change(original: &str) -> String {
4061    let mut parts: Vec<_> = original.split('.').collect();
4062    let Some(first_part) = parts.get_mut(0) else {
4063        return format!("{original} (2)");
4064    };
4065
4066    let mut new_name = format!("{first_part} (2)");
4067
4068    // check if there is already has `(<num>)` suffix.
4069    if let Some(paren_pos) = first_part.rfind(" (") {
4070        // Check if there's a closing parenthesis
4071        if let Some(end_paren) = first_part[paren_pos..].find(')') {
4072            let absolute_end_pos = paren_pos + end_paren;
4073            // Only process if the closing parenthesis is the last character
4074            if absolute_end_pos == first_part.len() - 1 {
4075                let num_start = paren_pos + 2; // Skip " ("
4076                                               // Try to parse the number between parentheses
4077                if let Ok(number) = first_part[num_start..absolute_end_pos].parse::<u32>() {
4078                    let base_name = &first_part[..paren_pos];
4079                    new_name = format!("{} ({})", base_name, number + 1)
4080                }
4081            }
4082        }
4083    }
4084
4085    *first_part = &new_name;
4086    parts.join(".")
4087}
4088
4089/// Returns a new name based on the `original` to avoid conflicts.
4090/// If the name already contains a hyphenated number, increments that number.
4091///
4092/// Examples:
4093/// - `foo.local.` becomes `foo-2.local.`
4094/// - `foo-2.local.` becomes `foo-3.local.`
4095/// - `foo` becomes `foo-2`
4096fn hostname_change(original: &str) -> String {
4097    let mut parts: Vec<_> = original.split('.').collect();
4098    let Some(first_part) = parts.get_mut(0) else {
4099        return format!("{original}-2");
4100    };
4101
4102    let mut new_name = format!("{first_part}-2");
4103
4104    // check if there is already a `-<num>` suffix
4105    if let Some(hyphen_pos) = first_part.rfind('-') {
4106        // Try to parse everything after the hyphen as a number
4107        if let Ok(number) = first_part[hyphen_pos + 1..].parse::<u32>() {
4108            let base_name = &first_part[..hyphen_pos];
4109            new_name = format!("{}-{}", base_name, number + 1);
4110        }
4111    }
4112
4113    *first_part = &new_name;
4114    parts.join(".")
4115}
4116
4117fn add_answer_with_additionals(
4118    out: &mut DnsOutgoing,
4119    msg: &DnsIncoming,
4120    service: &ServiceInfo,
4121    intf: &MyIntf,
4122    dns_registry: &DnsRegistry,
4123    is_ipv4: bool,
4124) {
4125    let intf_addrs = if is_ipv4 {
4126        service.get_addrs_on_my_intf_v4(intf)
4127    } else {
4128        service.get_addrs_on_my_intf_v6(intf)
4129    };
4130    if intf_addrs.is_empty() {
4131        trace!("No addrs on LAN of intf {:?}", intf);
4132        return;
4133    }
4134
4135    // check if we changed our name due to conflicts.
4136    let service_fullname = match dns_registry.name_changes.get(service.get_fullname()) {
4137        Some(new_name) => new_name,
4138        None => service.get_fullname(),
4139    };
4140
4141    let hostname = match dns_registry.name_changes.get(service.get_hostname()) {
4142        Some(new_name) => new_name,
4143        None => service.get_hostname(),
4144    };
4145
4146    let ptr_added = out.add_answer(
4147        msg,
4148        DnsPointer::new(
4149            service.get_type(),
4150            RRType::PTR,
4151            CLASS_IN,
4152            service.get_other_ttl(),
4153            service_fullname.to_string(),
4154        ),
4155    );
4156
4157    if !ptr_added {
4158        trace!("answer was not added for msg {:?}", msg);
4159        return;
4160    }
4161
4162    if let Some(sub) = service.get_subtype() {
4163        trace!("Adding subdomain {}", sub);
4164        out.add_additional_answer(DnsPointer::new(
4165            sub,
4166            RRType::PTR,
4167            CLASS_IN,
4168            service.get_other_ttl(),
4169            service_fullname.to_string(),
4170        ));
4171    }
4172
4173    // Add recommended additional answers according to
4174    // https://tools.ietf.org/html/rfc6763#section-12.1.
4175    out.add_additional_answer(DnsSrv::new(
4176        service_fullname,
4177        CLASS_IN | CLASS_CACHE_FLUSH,
4178        service.get_host_ttl(),
4179        service.get_priority(),
4180        service.get_weight(),
4181        service.get_port(),
4182        hostname.to_string(),
4183    ));
4184
4185    out.add_additional_answer(DnsTxt::new(
4186        service_fullname,
4187        CLASS_IN | CLASS_CACHE_FLUSH,
4188        service.get_other_ttl(),
4189        service.generate_txt(),
4190    ));
4191
4192    for address in intf_addrs {
4193        out.add_additional_answer(DnsAddress::new(
4194            hostname,
4195            ip_address_rr_type(&address),
4196            CLASS_IN | CLASS_CACHE_FLUSH,
4197            service.get_host_ttl(),
4198            address,
4199            intf.into(),
4200        ));
4201    }
4202}
4203
4204/// Check probes in a registry and returns: a probing packet to send out, and a list of probe names
4205/// that are finished.
4206fn check_probing(
4207    dns_registry: &mut DnsRegistry,
4208    timers: &mut BinaryHeap<Reverse<u64>>,
4209    now: u64,
4210) -> (DnsOutgoing, Vec<String>) {
4211    let mut expired_probes = Vec::new();
4212    let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
4213
4214    for (name, probe) in dns_registry.probing.iter_mut() {
4215        if now >= probe.next_send {
4216            if probe.expired(now) {
4217                // move the record to active
4218                expired_probes.push(name.clone());
4219            } else {
4220                out.add_question(name, RRType::ANY);
4221
4222                /*
4223                RFC 6762 section 8.2: https://datatracker.ietf.org/doc/html/rfc6762#section-8.2
4224                ...
4225                for tiebreaking to work correctly in all
4226                cases, the Authority Section must contain *all* the records and
4227                proposed rdata being probed for uniqueness.
4228                    */
4229                for record in probe.records.iter() {
4230                    out.add_authority(record.clone());
4231                }
4232
4233                probe.update_next_send(now);
4234
4235                // add timer
4236                timers.push(Reverse(probe.next_send));
4237            }
4238        }
4239    }
4240
4241    (out, expired_probes)
4242}
4243
4244/// Process expired probes on an interface and return a list of services
4245/// that are waiting for the probe to finish.
4246///
4247/// `DnsNameChange` events are sent to the monitors.
4248fn handle_expired_probes(
4249    expired_probes: Vec<String>,
4250    intf_name: &str,
4251    dns_registry: &mut DnsRegistry,
4252    monitors: &mut Vec<Sender<DaemonEvent>>,
4253) -> HashSet<String> {
4254    let mut waiting_services = HashSet::new();
4255
4256    for name in expired_probes {
4257        let Some(probe) = dns_registry.probing.remove(&name) else {
4258            continue;
4259        };
4260
4261        // send notifications about name changes
4262        for record in probe.records.iter() {
4263            if let Some(new_name) = record.get_record().get_new_name() {
4264                dns_registry
4265                    .name_changes
4266                    .insert(name.clone(), new_name.to_string());
4267
4268                let event = DnsNameChange {
4269                    original: record.get_record().get_original_name().to_string(),
4270                    new_name: new_name.to_string(),
4271                    rr_type: record.get_type(),
4272                    intf_name: intf_name.to_string(),
4273                };
4274                debug!("Name change event: {:?}", &event);
4275                notify_monitors(monitors, DaemonEvent::NameChange(event));
4276            }
4277        }
4278
4279        // move RR from probe to active.
4280        debug!(
4281            "probe of '{name}' finished: move {} records to active. ({} waiting services)",
4282            probe.records.len(),
4283            probe.waiting_services.len(),
4284        );
4285
4286        // Move records to active and plan to wake up services if records are not empty.
4287        if !probe.records.is_empty() {
4288            match dns_registry.active.get_mut(&name) {
4289                Some(records) => {
4290                    records.extend(probe.records);
4291                }
4292                None => {
4293                    dns_registry.active.insert(name, probe.records);
4294                }
4295            }
4296
4297            waiting_services.extend(probe.waiting_services);
4298        }
4299    }
4300
4301    waiting_services
4302}
4303
4304#[cfg(test)]
4305mod tests {
4306    use super::{
4307        _new_socket_bind, check_domain_suffix, check_service_name_length, hostname_change,
4308        my_ip_interfaces, name_change, send_dns_outgoing_impl, valid_instance_name,
4309        HostnameResolutionEvent, ServiceDaemon, ServiceEvent, ServiceInfo, GROUP_ADDR_V4,
4310        MDNS_PORT,
4311    };
4312    use crate::{
4313        dns_parser::{
4314            DnsIncoming, DnsOutgoing, DnsPointer, InterfaceId, RRType, ScopedIp, CLASS_IN,
4315            FLAGS_AA, FLAGS_QR_RESPONSE,
4316        },
4317        service_daemon::{add_answer_of_service, check_hostname},
4318    };
4319    use std::{
4320        net::{SocketAddr, SocketAddrV4},
4321        time::{Duration, SystemTime},
4322    };
4323    use test_log::test;
4324
4325    #[test]
4326    fn test_socketaddr_print() {
4327        let addr: SocketAddr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
4328        let print = format!("{}", addr);
4329        assert_eq!(print, "224.0.0.251:5353");
4330    }
4331
4332    #[test]
4333    fn test_instance_name() {
4334        assert!(valid_instance_name("my-laser._printer._tcp.local."));
4335        assert!(valid_instance_name("my-laser.._printer._tcp.local."));
4336        assert!(!valid_instance_name("_printer._tcp.local."));
4337    }
4338
4339    #[test]
4340    fn test_check_service_name_length() {
4341        let result = check_service_name_length("_tcp", 100);
4342        assert!(result.is_err());
4343        if let Err(e) = result {
4344            println!("{}", e);
4345        }
4346    }
4347
4348    #[test]
4349    fn test_check_hostname() {
4350        // valid hostnames
4351        for hostname in &[
4352            "my_host.local.",
4353            &("A".repeat(255 - ".local.".len()) + ".local."),
4354        ] {
4355            let result = check_hostname(hostname);
4356            assert!(result.is_ok());
4357        }
4358
4359        // erroneous hostnames
4360        for hostname in &[
4361            "my_host.local",
4362            ".local.",
4363            &("A".repeat(256 - ".local.".len()) + ".local."),
4364        ] {
4365            let result = check_hostname(hostname);
4366            assert!(result.is_err());
4367            if let Err(e) = result {
4368                println!("{}", e);
4369            }
4370        }
4371    }
4372
4373    #[test]
4374    fn test_check_domain_suffix() {
4375        assert!(check_domain_suffix("_missing_dot._tcp.local").is_err());
4376        assert!(check_domain_suffix("_missing_bar.tcp.local.").is_err());
4377        assert!(check_domain_suffix("_mis_spell._tpp.local.").is_err());
4378        assert!(check_domain_suffix("_mis_spell._upp.local.").is_err());
4379        assert!(check_domain_suffix("_has_dot._tcp.local.").is_ok());
4380        assert!(check_domain_suffix("_goodname._udp.local.").is_ok());
4381    }
4382
4383    #[test]
4384    fn test_service_with_temporarily_invalidated_ptr() {
4385        // Create a daemon
4386        let d = ServiceDaemon::new().expect("Failed to create daemon");
4387
4388        let service = "_test_inval_ptr._udp.local.";
4389        let host_name = "my_host_tmp_invalidated_ptr.local.";
4390        let intfs: Vec<_> = my_ip_interfaces(false);
4391        let intf_ips: Vec<_> = intfs.iter().map(|intf| intf.ip()).collect();
4392        let port = 5201;
4393        let my_service =
4394            ServiceInfo::new(service, "my_instance", host_name, &intf_ips[..], port, None)
4395                .expect("invalid service info")
4396                .enable_addr_auto();
4397        let result = d.register(my_service.clone());
4398        assert!(result.is_ok());
4399
4400        // Browse for a service
4401        let browse_chan = d.browse(service).unwrap();
4402        let timeout = Duration::from_secs(2);
4403        let mut resolved = false;
4404
4405        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4406            match event {
4407                ServiceEvent::ServiceResolved(info) => {
4408                    resolved = true;
4409                    println!("Resolved a service of {}", &info.get_fullname());
4410                    break;
4411                }
4412                e => {
4413                    println!("Received event {:?}", e);
4414                }
4415            }
4416        }
4417
4418        assert!(resolved);
4419
4420        println!("Stopping browse of {}", service);
4421        // Pause browsing so restarting will cause a new immediate query.
4422        // Unregistering will not work here, it will invalidate all the records.
4423        d.stop_browse(service).unwrap();
4424
4425        // Ensure the search is stopped.
4426        // Reduces the chance of receiving an answer adding the ptr back to the
4427        // cache causing the later browse to return directly from the cache.
4428        // (which invalidates what this test is trying to test for.)
4429        let mut stopped = false;
4430        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4431            match event {
4432                ServiceEvent::SearchStopped(_) => {
4433                    stopped = true;
4434                    println!("Stopped browsing service");
4435                    break;
4436                }
4437                // Other `ServiceResolved` messages may be received
4438                // here as they come from different interfaces.
4439                // That's fine for this test.
4440                e => {
4441                    println!("Received event {:?}", e);
4442                }
4443            }
4444        }
4445
4446        assert!(stopped);
4447
4448        // Invalidate the ptr from the service to the host.
4449        let invalidate_ptr_packet = DnsPointer::new(
4450            my_service.get_type(),
4451            RRType::PTR,
4452            CLASS_IN,
4453            0,
4454            my_service.get_fullname().to_string(),
4455        );
4456
4457        let mut packet_buffer = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
4458        packet_buffer.add_additional_answer(invalidate_ptr_packet);
4459
4460        for intf in intfs {
4461            let sock = _new_socket_bind(&intf, true).unwrap();
4462            send_dns_outgoing_impl(
4463                &packet_buffer,
4464                &intf.name,
4465                intf.index.unwrap_or(0),
4466                &intf.addr,
4467                &sock.pktinfo,
4468            );
4469        }
4470
4471        println!(
4472            "Sent PTR record invalidation. Starting second browse for {}",
4473            service
4474        );
4475
4476        // Restart the browse to force the sender to re-send the announcements.
4477        let browse_chan = d.browse(service).unwrap();
4478
4479        resolved = false;
4480        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4481            match event {
4482                ServiceEvent::ServiceResolved(info) => {
4483                    resolved = true;
4484                    println!("Resolved a service of {}", &info.get_fullname());
4485                    break;
4486                }
4487                e => {
4488                    println!("Received event {:?}", e);
4489                }
4490            }
4491        }
4492
4493        assert!(resolved);
4494        d.shutdown().unwrap();
4495    }
4496
4497    #[test]
4498    fn test_expired_srv() {
4499        // construct service info
4500        let service_type = "_expired-srv._udp.local.";
4501        let instance = "test_instance";
4502        let host_name = "expired_srv_host.local.";
4503        let mut my_service = ServiceInfo::new(service_type, instance, host_name, "", 5023, None)
4504            .unwrap()
4505            .enable_addr_auto();
4506        // let fullname = my_service.get_fullname().to_string();
4507
4508        // set SRV to expire soon.
4509        let new_ttl = 3; // for testing only.
4510        my_service._set_host_ttl(new_ttl);
4511
4512        // register my service
4513        let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
4514        let result = mdns_server.register(my_service);
4515        assert!(result.is_ok());
4516
4517        let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
4518        let browse_chan = mdns_client.browse(service_type).unwrap();
4519        let timeout = Duration::from_secs(2);
4520        let mut resolved = false;
4521
4522        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4523            match event {
4524                ServiceEvent::ServiceResolved(info) => {
4525                    resolved = true;
4526                    println!("Resolved a service of {}", &info.get_fullname());
4527                    break;
4528                }
4529                _ => {}
4530            }
4531        }
4532
4533        assert!(resolved);
4534
4535        // Exit the server so that no more responses.
4536        mdns_server.shutdown().unwrap();
4537
4538        // SRV record in the client cache will expire.
4539        let expire_timeout = Duration::from_secs(new_ttl as u64);
4540        while let Ok(event) = browse_chan.recv_timeout(expire_timeout) {
4541            match event {
4542                ServiceEvent::ServiceRemoved(service_type, full_name) => {
4543                    println!("Service removed: {}: {}", &service_type, &full_name);
4544                    break;
4545                }
4546                _ => {}
4547            }
4548        }
4549    }
4550
4551    #[test]
4552    fn test_hostname_resolution_address_removed() {
4553        // Create a mDNS server
4554        let server = ServiceDaemon::new().expect("Failed to create server");
4555        let hostname = "addr_remove_host._tcp.local.";
4556        let service_ip_addr: ScopedIp = my_ip_interfaces(false)
4557            .iter()
4558            .find(|iface| iface.ip().is_ipv4())
4559            .map(|iface| iface.ip().into())
4560            .unwrap();
4561
4562        let mut my_service = ServiceInfo::new(
4563            "_host_res_test._tcp.local.",
4564            "my_instance",
4565            hostname,
4566            &service_ip_addr.to_ip_addr(),
4567            1234,
4568            None,
4569        )
4570        .expect("invalid service info");
4571
4572        // Set a short TTL for addresses for testing.
4573        let addr_ttl = 2;
4574        my_service._set_host_ttl(addr_ttl); // Expire soon
4575
4576        server.register(my_service).unwrap();
4577
4578        // Create a mDNS client for resolving the hostname.
4579        let client = ServiceDaemon::new().expect("Failed to create client");
4580        let event_receiver = client.resolve_hostname(hostname, None).unwrap();
4581        let resolved = loop {
4582            match event_receiver.recv() {
4583                Ok(HostnameResolutionEvent::AddressesFound(found_hostname, addresses)) => {
4584                    assert!(found_hostname == hostname);
4585                    assert!(addresses.contains(&service_ip_addr));
4586                    println!("address found: {:?}", &addresses);
4587                    break true;
4588                }
4589                Ok(HostnameResolutionEvent::SearchStopped(_)) => break false,
4590                Ok(_event) => {}
4591                Err(_) => break false,
4592            }
4593        };
4594
4595        assert!(resolved);
4596
4597        // Shutdown the server so no more responses / refreshes for addresses.
4598        server.shutdown().unwrap();
4599
4600        // Wait till hostname address record expires, with 1 second grace period.
4601        let timeout = Duration::from_secs(addr_ttl as u64 + 1);
4602        let removed = loop {
4603            match event_receiver.recv_timeout(timeout) {
4604                Ok(HostnameResolutionEvent::AddressesRemoved(removed_host, addresses)) => {
4605                    assert!(removed_host == hostname);
4606                    assert!(addresses.contains(&service_ip_addr));
4607
4608                    println!(
4609                        "address removed: hostname: {} addresses: {:?}",
4610                        &hostname, &addresses
4611                    );
4612                    break true;
4613                }
4614                Ok(_event) => {}
4615                Err(_) => {
4616                    break false;
4617                }
4618            }
4619        };
4620
4621        assert!(removed);
4622
4623        client.shutdown().unwrap();
4624    }
4625
4626    #[test]
4627    fn test_refresh_ptr() {
4628        // construct service info
4629        let service_type = "_refresh-ptr._udp.local.";
4630        let instance = "test_instance";
4631        let host_name = "refresh_ptr_host.local.";
4632        let service_ip_addr = my_ip_interfaces(false)
4633            .iter()
4634            .find(|iface| iface.ip().is_ipv4())
4635            .map(|iface| iface.ip())
4636            .unwrap();
4637
4638        let mut my_service = ServiceInfo::new(
4639            service_type,
4640            instance,
4641            host_name,
4642            &service_ip_addr,
4643            5023,
4644            None,
4645        )
4646        .unwrap();
4647
4648        let new_ttl = 3; // for testing only.
4649        my_service._set_other_ttl(new_ttl);
4650
4651        // register my service
4652        let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
4653        let result = mdns_server.register(my_service);
4654        assert!(result.is_ok());
4655
4656        let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
4657        let browse_chan = mdns_client.browse(service_type).unwrap();
4658        let timeout = Duration::from_millis(1500); // Give at least 1 second for the service probing.
4659        let mut resolved = false;
4660
4661        // resolve the service first.
4662        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4663            match event {
4664                ServiceEvent::ServiceResolved(info) => {
4665                    resolved = true;
4666                    println!("Resolved a service of {}", &info.get_fullname());
4667                    break;
4668                }
4669                _ => {}
4670            }
4671        }
4672
4673        assert!(resolved);
4674
4675        // wait over 80% of TTL, and refresh PTR should be sent out.
4676        let timeout = Duration::from_millis(new_ttl as u64 * 1000 * 90 / 100);
4677        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4678            println!("event: {:?}", &event);
4679        }
4680
4681        // verify refresh counter.
4682        let metrics_chan = mdns_client.get_metrics().unwrap();
4683        let metrics = metrics_chan.recv_timeout(timeout).unwrap();
4684        let ptr_refresh_counter = metrics["cache-refresh-ptr"];
4685        assert_eq!(ptr_refresh_counter, 1);
4686        let srvtxt_refresh_counter = metrics["cache-refresh-srv-txt"];
4687        assert_eq!(srvtxt_refresh_counter, 1);
4688
4689        // Exit the server so that no more responses.
4690        mdns_server.shutdown().unwrap();
4691        mdns_client.shutdown().unwrap();
4692    }
4693
4694    #[test]
4695    fn test_name_change() {
4696        assert_eq!(name_change("foo.local."), "foo (2).local.");
4697        assert_eq!(name_change("foo (2).local."), "foo (3).local.");
4698        assert_eq!(name_change("foo (9).local."), "foo (10).local.");
4699        assert_eq!(name_change("foo"), "foo (2)");
4700        assert_eq!(name_change("foo (2)"), "foo (3)");
4701        assert_eq!(name_change(""), " (2)");
4702
4703        // Additional edge cases
4704        assert_eq!(name_change("foo (abc)"), "foo (abc) (2)"); // Invalid number
4705        assert_eq!(name_change("foo (2"), "foo (2 (2)"); // Missing closing parenthesis
4706        assert_eq!(name_change("foo (2) extra"), "foo (2) extra (2)"); // Extra text after number
4707    }
4708
4709    #[test]
4710    fn test_hostname_change() {
4711        assert_eq!(hostname_change("foo.local."), "foo-2.local.");
4712        assert_eq!(hostname_change("foo"), "foo-2");
4713        assert_eq!(hostname_change("foo-2.local."), "foo-3.local.");
4714        assert_eq!(hostname_change("foo-9"), "foo-10");
4715        assert_eq!(hostname_change("test-42.domain."), "test-43.domain.");
4716    }
4717
4718    #[test]
4719    fn test_add_answer_txt_ttl() {
4720        // construct a simple service info
4721        let service_type = "_test_add_answer._udp.local.";
4722        let instance = "test_instance";
4723        let host_name = "add_answer_host.local.";
4724        let service_intf = my_ip_interfaces(false)
4725            .into_iter()
4726            .find(|iface| iface.ip().is_ipv4())
4727            .unwrap();
4728        let service_ip_addr = service_intf.ip();
4729        let my_service = ServiceInfo::new(
4730            service_type,
4731            instance,
4732            host_name,
4733            &service_ip_addr,
4734            5023,
4735            None,
4736        )
4737        .unwrap();
4738
4739        // construct a DnsOutgoing message
4740        let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
4741
4742        // Construct a dummy DnsIncoming message
4743        let mut dummy_data = out.to_data_on_wire();
4744        let interface_id = InterfaceId::from(&service_intf);
4745        let incoming = DnsIncoming::new(dummy_data.pop().unwrap(), interface_id).unwrap();
4746
4747        // Add an answer of TXT type for the service.
4748        let if_addrs = vec![service_intf.ip()];
4749        add_answer_of_service(
4750            &mut out,
4751            &incoming,
4752            instance,
4753            &my_service,
4754            RRType::TXT,
4755            if_addrs,
4756        );
4757
4758        // Check if the answer was added correctly
4759        assert!(
4760            out.answers_count() > 0,
4761            "No answers added to the outgoing message"
4762        );
4763
4764        // Check if the first answer is of type TXT
4765        let answer = out._answers().first().unwrap();
4766        assert_eq!(answer.0.get_type(), RRType::TXT);
4767
4768        // Check TTL is set properly for the TXT record
4769        assert_eq!(answer.0.get_record().get_ttl(), my_service.get_other_ttl());
4770    }
4771
4772    #[test]
4773    fn test_interface_flip() {
4774        // start a server
4775        let ty_domain = "_intf-flip._udp.local.";
4776        let host_name = "intf_flip.local.";
4777        let now = SystemTime::now()
4778            .duration_since(SystemTime::UNIX_EPOCH)
4779            .unwrap();
4780        let instance_name = now.as_micros().to_string(); // Create a unique name.
4781        let port = 5200;
4782
4783        // Get a single IPv4 address
4784        let (ip_addr1, intf_name) = my_ip_interfaces(false)
4785            .iter()
4786            .find(|iface| iface.ip().is_ipv4())
4787            .map(|iface| (iface.ip(), iface.name.clone()))
4788            .unwrap();
4789
4790        println!("Using interface {} with IP {}", intf_name, ip_addr1);
4791
4792        // Register the service.
4793        let service1 =
4794            ServiceInfo::new(ty_domain, &instance_name, host_name, &ip_addr1, port, None)
4795                .expect("valid service info");
4796        let server1 = ServiceDaemon::new().expect("failed to start server");
4797        server1
4798            .register(service1)
4799            .expect("Failed to register service1");
4800
4801        // wait for the service announced.
4802        std::thread::sleep(Duration::from_secs(2));
4803
4804        // start a client
4805        let client = ServiceDaemon::new().expect("failed to start client");
4806        client.use_service_data(true).unwrap();
4807
4808        let receiver = client.browse(ty_domain).unwrap();
4809
4810        let timeout = Duration::from_secs(3);
4811        let mut got_data = false;
4812
4813        while let Ok(event) = receiver.recv_timeout(timeout) {
4814            match event {
4815                ServiceEvent::ServiceData(_) => {
4816                    println!("Received ServiceData event");
4817                    got_data = true;
4818                    break;
4819                }
4820                _ => {}
4821            }
4822        }
4823
4824        assert!(got_data, "Should receive ServiceData event");
4825
4826        // Set a short IP check interval to detect interface changes quickly.
4827        client.set_ip_check_interval(1).unwrap();
4828
4829        // Now shutdown the interface and expect the client to lose the service.
4830        println!("Shutting down interface {}", &intf_name);
4831        client.test_down_interface(&intf_name).unwrap();
4832
4833        let mut got_removed = false;
4834
4835        while let Ok(event) = receiver.recv_timeout(timeout) {
4836            match event {
4837                ServiceEvent::ServiceRemoved(ty_domain, instance) => {
4838                    got_removed = true;
4839                    println!("removed: {ty_domain} : {instance}");
4840                    break;
4841                }
4842                _ => {}
4843            }
4844        }
4845        assert!(got_removed, "Should receive ServiceRemoved event");
4846
4847        println!("Bringing up interface {}", &intf_name);
4848        client.test_up_interface(&intf_name).unwrap();
4849        let mut got_data = false;
4850        while let Ok(event) = receiver.recv_timeout(timeout) {
4851            match event {
4852                ServiceEvent::ServiceData(resolved) => {
4853                    got_data = true;
4854                    println!("Received ServiceData: {:?}", resolved);
4855                    break;
4856                }
4857                _ => {}
4858            }
4859        }
4860        assert!(
4861            got_data,
4862            "Should receive ServiceData event after interface is back up"
4863        );
4864
4865        server1.shutdown().unwrap();
4866        client.shutdown().unwrap();
4867    }
4868}