Skip to main content

mdns_sd/
service_daemon.rs

1//! Service daemon for mDNS Service Discovery.
2
3// How DNS-based Service Discovery works in a nutshell:
4//
5// (excerpt from RFC 6763)
6// .... that a particular service instance can be
7//    described using a DNS SRV [RFC2782] and DNS TXT [RFC1035] record.
8//    The SRV record has a name of the form "<Instance>.<Service>.<Domain>"
9//    and gives the target host and port where the service instance can be
10//    reached.  The DNS TXT record of the same name gives additional
11//    information about this instance, in a structured form using key/value
12//    pairs, described in Section 6.  A client discovers the list of
13//    available instances of a given service type using a query for a DNS
14//    PTR [RFC1035] record with a name of the form "<Service>.<Domain>",
15//    which returns a set of zero or more names, which are the names of the
16//    aforementioned DNS SRV/TXT record pairs.
17//
18// Some naming conventions in this source code:
19//
20// `ty_domain` refers to service type together with domain name, i.e. <service>.<domain>.
21// Every <service> consists of two labels: service itself and "_udp." or "_tcp".
22// See RFC 6763 section 7 Service Names.
23//     for example: `_my-service._udp.local.`
24//
25// `fullname` refers to a full Service Instance Name, i.e. <instance>.<service>.<domain>
26//     for example: `my_home._my-service._udp.local.`
27//
28// In mDNS and DNS, the basic data structure is "Resource Record" (RR), where
29// in Service Discovery, the basic data structure is "Service Info". One Service Info
30// corresponds to a set of DNS Resource Records.
31#[cfg(feature = "logging")]
32use crate::log::{debug, error, trace};
33use crate::{
34    current_time_millis,
35    dns_cache::{DnsCache, IpType},
36    dns_parser::{
37        ip_address_rr_type, DnsAddress, DnsEntryExt, DnsIncoming, DnsOutgoing, DnsPointer,
38        DnsRecordBox, DnsRecordExt, DnsSrv, DnsTxt, InterfaceId, RRType, ScopedIp,
39        CLASS_CACHE_FLUSH, CLASS_IN, FLAGS_AA, FLAGS_QR_QUERY, FLAGS_QR_RESPONSE, MAX_MSG_ABSOLUTE,
40    },
41    error::{e_fmt, Error, Result},
42    service_info::{valid_ip_on_intf, DnsRegistry, MyIntf, Probe, ServiceInfo, ServiceStatus},
43    Receiver, ResolvedService, TxtProperties,
44};
45use flume::{bounded, Sender, TrySendError};
46use if_addrs::{IfAddr, Interface};
47use mio::{event::Source, net::UdpSocket as MioUdpSocket, Interest, Poll, Registry, Token};
48use socket2::Domain;
49use socket_pktinfo::PktInfoUdpSocket;
50use std::{
51    cmp::{self, Reverse},
52    collections::{hash_map::Entry, BinaryHeap, HashMap, HashSet},
53    fmt, io,
54    net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, UdpSocket},
55    str, thread,
56    time::Duration,
57    vec,
58};
59
60/// The default max length of the service name without domain, not including the
61/// leading underscore (`_`). It is set to 15 per
62/// [RFC 6763 section 7.2](https://www.rfc-editor.org/rfc/rfc6763#section-7.2).
63pub const SERVICE_NAME_LEN_MAX_DEFAULT: u8 = 15;
64
65/// The default interval for checking IP changes automatically.
66pub const IP_CHECK_INTERVAL_IN_SECS_DEFAULT: u32 = 5;
67
68/// The default time out for [ServiceDaemon::verify] is 10 seconds, per
69/// [RFC 6762 section 10.4](https://datatracker.ietf.org/doc/html/rfc6762#section-10.4)
70pub const VERIFY_TIMEOUT_DEFAULT: Duration = Duration::from_secs(10);
71
72/// The mDNS port number per RFC 6762.
73pub const MDNS_PORT: u16 = 5353;
74
75const GROUP_ADDR_V4: Ipv4Addr = Ipv4Addr::new(224, 0, 0, 251);
76const GROUP_ADDR_V6: Ipv6Addr = Ipv6Addr::new(0xff02, 0, 0, 0, 0, 0, 0, 0xfb);
77const LOOPBACK_V4: Ipv4Addr = Ipv4Addr::new(127, 0, 0, 1);
78
79const RESOLVE_WAIT_IN_MILLIS: u64 = 500;
80
81/// Response status code for the service `unregister` call.
82#[derive(Debug)]
83pub enum UnregisterStatus {
84    /// Unregister was successful.
85    OK,
86    /// The service was not found in the registration.
87    NotFound,
88}
89
90/// Status code for the service daemon.
91#[derive(Debug, PartialEq, Clone, Eq)]
92#[non_exhaustive]
93pub enum DaemonStatus {
94    /// The daemon is running as normal.
95    Running,
96
97    /// The daemon has been shutdown.
98    Shutdown,
99}
100
101/// Different counters included in the metrics.
102/// Currently all counters are for outgoing packets.
103#[derive(Hash, Eq, PartialEq)]
104enum Counter {
105    Register,
106    RegisterResend,
107    Unregister,
108    UnregisterResend,
109    Browse,
110    ResolveHostname,
111    Respond,
112    CacheRefreshPTR,
113    CacheRefreshSrvTxt,
114    CacheRefreshAddr,
115    KnownAnswerSuppression,
116    CachedPTR,
117    CachedSRV,
118    CachedAddr,
119    CachedTxt,
120    CachedNSec,
121    CachedSubtype,
122    DnsRegistryProbe,
123    DnsRegistryActive,
124    DnsRegistryTimer,
125    DnsRegistryNameChange,
126    Timer,
127}
128
129impl fmt::Display for Counter {
130    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
131        match self {
132            Self::Register => write!(f, "register"),
133            Self::RegisterResend => write!(f, "register-resend"),
134            Self::Unregister => write!(f, "unregister"),
135            Self::UnregisterResend => write!(f, "unregister-resend"),
136            Self::Browse => write!(f, "browse"),
137            Self::ResolveHostname => write!(f, "resolve-hostname"),
138            Self::Respond => write!(f, "respond"),
139            Self::CacheRefreshPTR => write!(f, "cache-refresh-ptr"),
140            Self::CacheRefreshSrvTxt => write!(f, "cache-refresh-srv-txt"),
141            Self::CacheRefreshAddr => write!(f, "cache-refresh-addr"),
142            Self::KnownAnswerSuppression => write!(f, "known-answer-suppression"),
143            Self::CachedPTR => write!(f, "cached-ptr"),
144            Self::CachedSRV => write!(f, "cached-srv"),
145            Self::CachedAddr => write!(f, "cached-addr"),
146            Self::CachedTxt => write!(f, "cached-txt"),
147            Self::CachedNSec => write!(f, "cached-nsec"),
148            Self::CachedSubtype => write!(f, "cached-subtype"),
149            Self::DnsRegistryProbe => write!(f, "dns-registry-probe"),
150            Self::DnsRegistryActive => write!(f, "dns-registry-active"),
151            Self::DnsRegistryTimer => write!(f, "dns-registry-timer"),
152            Self::DnsRegistryNameChange => write!(f, "dns-registry-name-change"),
153            Self::Timer => write!(f, "timer"),
154        }
155    }
156}
157
158#[derive(Debug)]
159enum InternalError {
160    IntfAddrInvalid(Interface),
161}
162
163impl fmt::Display for InternalError {
164    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
165        match self {
166            InternalError::IntfAddrInvalid(iface) => write!(f, "interface addr invalid: {iface:?}"),
167        }
168    }
169}
170
171type MyResult<T> = core::result::Result<T, InternalError>;
172
173/// A wrapper around UDP socket used by the mDNS daemon.
174///
175/// We do this because `mio` does not support PKTINFO and
176/// does not provide a way to implement `Source` trait directly and safely.
177struct MyUdpSocket {
178    /// The underlying socket that supports control messages like
179    /// `IP_PKTINFO` for IPv4 and `IPV6_PKTINFO` for IPv6.
180    pktinfo: PktInfoUdpSocket,
181
182    /// The mio UDP socket that is a clone of `pktinfo` and
183    /// is used for event polling.
184    mio: MioUdpSocket,
185}
186
187impl MyUdpSocket {
188    pub fn new(pktinfo: PktInfoUdpSocket) -> io::Result<Self> {
189        let std_sock = pktinfo.try_clone_std()?;
190        let mio = MioUdpSocket::from_std(std_sock);
191
192        Ok(Self { pktinfo, mio })
193    }
194}
195
196/// Implements the mio `Source` trait so that we can use `MyUdpSocket` with `Poll`.
197impl Source for MyUdpSocket {
198    fn register(
199        &mut self,
200        registry: &Registry,
201        token: Token,
202        interests: Interest,
203    ) -> io::Result<()> {
204        self.mio.register(registry, token, interests)
205    }
206
207    fn reregister(
208        &mut self,
209        registry: &Registry,
210        token: Token,
211        interests: Interest,
212    ) -> io::Result<()> {
213        self.mio.reregister(registry, token, interests)
214    }
215
216    fn deregister(&mut self, registry: &Registry) -> std::io::Result<()> {
217        self.mio.deregister(registry)
218    }
219}
220
221/// The metrics is a HashMap of (name_key, i64_value).
222/// The main purpose is to help monitoring the mDNS packet traffic.
223pub type Metrics = HashMap<String, i64>;
224
225const IPV4_SOCK_EVENT_KEY: usize = 4; // Pick a key just to indicate IPv4.
226const IPV6_SOCK_EVENT_KEY: usize = 6; // Pick a key just to indicate IPv6.
227const SIGNAL_SOCK_EVENT_KEY: usize = usize::MAX - 1; // avoid to overlap with zc.poll_ids
228
229/// A daemon thread for mDNS
230///
231/// This struct provides a handle and an API to the daemon. It is cloneable.
232#[derive(Clone)]
233pub struct ServiceDaemon {
234    /// Sender handle of the channel to the daemon.
235    sender: Sender<Command>,
236
237    /// Send to this addr to signal that a `Command` is coming.
238    ///
239    /// The daemon listens on this addr together with other mDNS sockets,
240    /// to avoid busy polling the flume channel. If there is a way to poll
241    /// the channel and mDNS sockets together, then this can be removed.
242    signal_addr: SocketAddr,
243}
244
245impl ServiceDaemon {
246    /// Creates a new daemon and spawns a thread to run the daemon.
247    ///
248    /// Creates a new mDNS service daemon using the default port (5353).
249    ///
250    /// For development/testing with custom ports, use [`ServiceDaemon::new_with_port`].
251    ///
252    /// # Errors
253    ///
254    /// Returns [`Error::Msg`] if the daemon cannot be initialized. This wraps an
255    /// underlying OS-level failure.
256    ///
257    /// Note that this constructor does not open the mDNS multicast sockets — those
258    /// are opened lazily by the daemon thread once it starts, so platform issues
259    /// such as "multicast not permitted" are surfaced later via [`DaemonEvent`] from
260    /// [`monitor`](Self::monitor) rather than here.
261    pub fn new() -> Result<Self> {
262        Self::new_with_port(MDNS_PORT)
263    }
264
265    /// Creates a new mDNS service daemon using a custom port.
266    ///
267    /// # Arguments
268    ///
269    /// * `port` - The UDP port to bind for mDNS communication.
270    ///   - In production, this should be `MDNS_PORT` (5353) per RFC 6762.
271    ///   - For development/testing, you can use a non-standard port (e.g., 5454)
272    ///     to avoid conflicts with system mDNS services.
273    ///   - Both publisher and browser must use the same port to communicate.
274    ///
275    /// # Example
276    ///
277    /// ```no_run
278    /// use mdns_sd::ServiceDaemon;
279    ///
280    /// // Use standard mDNS port (production)
281    /// let daemon = ServiceDaemon::new_with_port(5353)?;
282    ///
283    /// // Use custom port for development (avoids macOS Bonjour conflict)
284    /// let daemon_dev = ServiceDaemon::new_with_port(5454)?;
285    /// # Ok::<(), mdns_sd::Error>(())
286    /// ```
287    ///
288    /// # Errors
289    ///
290    /// See [`new`](Self::new) for the set of OS-level failures that may surface
291    /// here. Note that `port` is *not* validated against the kernel until the
292    /// daemon thread tries to bind the mDNS sockets, so an unusable `port`
293    /// (e.g., already in use, requires elevated privileges) will not be
294    /// reported by this constructor — listen for such failures via
295    /// [`monitor`](Self::monitor).
296    pub fn new_with_port(port: u16) -> Result<Self> {
297        // Use port 0 to allow the system assign a random available port,
298        // no need for a pre-defined port number.
299        let signal_addr = SocketAddrV4::new(LOOPBACK_V4, 0);
300
301        let signal_sock = UdpSocket::bind(signal_addr)
302            .map_err(|e| e_fmt!("failed to create signal_sock for daemon: {}", e))?;
303
304        // Get the socket with the OS chosen port
305        let signal_addr = signal_sock
306            .local_addr()
307            .map_err(|e| e_fmt!("failed to get signal sock addr: {}", e))?;
308
309        // Must be nonblocking so we can listen to it together with mDNS sockets.
310        signal_sock
311            .set_nonblocking(true)
312            .map_err(|e| e_fmt!("failed to set nonblocking for signal socket: {}", e))?;
313
314        let poller = Poll::new().map_err(|e| e_fmt!("failed to create mio Poll: {e}"))?;
315
316        let (sender, receiver) = bounded(100);
317
318        // Spawn the daemon thread
319        let mio_sock = MioUdpSocket::from_std(signal_sock);
320        let cmd_sender = sender.clone();
321        thread::Builder::new()
322            .name("mDNS_daemon".to_string())
323            .spawn(move || {
324                Self::daemon_thread(mio_sock, poller, receiver, port, cmd_sender, signal_addr)
325            })
326            .map_err(|e| e_fmt!("thread builder failed to spawn: {}", e))?;
327
328        Ok(Self {
329            sender,
330            signal_addr,
331        })
332    }
333
334    /// Sends `cmd` to the daemon via its channel, and sends a signal
335    /// to its sock addr to notify.
336    fn send_cmd(&self, cmd: Command) -> Result<()> {
337        let cmd_name = cmd.to_string();
338
339        // First, send to the flume channel.
340        self.sender.try_send(cmd).map_err(|e| match e {
341            TrySendError::Full(_) => Error::Again,
342            TrySendError::Disconnected(_) => Error::DaemonShutdown,
343        })?;
344
345        // Second, send a signal to notify the daemon.
346        let addr = SocketAddrV4::new(LOOPBACK_V4, 0);
347        let socket = UdpSocket::bind(addr)
348            .map_err(|e| e_fmt!("Failed to create socket to send signal: {}", e))?;
349        socket
350            .send_to(cmd_name.as_bytes(), self.signal_addr)
351            .map_err(|e| {
352                e_fmt!(
353                    "signal socket send_to {} ({}) failed: {}",
354                    self.signal_addr,
355                    cmd_name,
356                    e
357                )
358            })?;
359
360        Ok(())
361    }
362
363    /// Starts browsing for a specific service type.
364    ///
365    /// `service_type` must end with a valid mDNS domain: '._tcp.local.' or '._udp.local.'
366    ///
367    /// Returns a channel `Receiver` to receive events about the service. The caller
368    /// can call `.recv_async().await` on this receiver to handle events in an
369    /// async environment or call `.recv()` in a sync environment.
370    ///
371    /// When a new instance is found, the daemon automatically tries to resolve, i.e.
372    /// finding more details, i.e. SRV records and TXT records.
373    ///
374    /// # Errors
375    ///
376    /// Returns [`Error::Msg`] if `service_type` does not end with
377    /// `._tcp.local.` or `._udp.local.`.
378    ///
379    /// Returns [`Error::Again`] if the daemon's command queue is full.
380    ///
381    /// Returns [`Error::DaemonShutdown`] if the daemon thread has already exited.
382    pub fn browse(&self, service_type: &str) -> Result<Receiver<ServiceEvent>> {
383        check_domain_suffix(service_type)?;
384
385        let (resp_s, resp_r) = bounded(10);
386        self.send_cmd(Command::Browse(service_type.to_string(), 1, false, resp_s))?;
387        Ok(resp_r)
388    }
389
390    /// Preforms a "cache-only" browse.
391    ///
392    /// `service_type` must end with a valid mDNS domain: '._tcp.local.' or '._udp.local.'
393    ///
394    /// The functionality is identical to 'browse', but the service events are based solely on the contents
395    /// of the daemon's cache. No actual mDNS query is sent to the network.
396    ///
397    /// See [accept_unsolicited](Self::accept_unsolicited) if you want to do cache-only browsing.
398    ///
399    /// # Errors
400    ///
401    /// Same error conditions as [`browse`](Self::browse).
402    pub fn browse_cache(&self, service_type: &str) -> Result<Receiver<ServiceEvent>> {
403        check_domain_suffix(service_type)?;
404
405        let (resp_s, resp_r) = bounded(10);
406        self.send_cmd(Command::Browse(service_type.to_string(), 1, true, resp_s))?;
407        Ok(resp_r)
408    }
409
410    /// Stops searching for a specific service type.
411    ///
412    /// # Errors
413    ///
414    /// Returns [`Error::Again`] if the daemon's command queue is full.
415    ///
416    /// Returns [`Error::DaemonShutdown`] if the daemon thread has already exited.
417    pub fn stop_browse(&self, ty_domain: &str) -> Result<()> {
418        self.send_cmd(Command::StopBrowse(ty_domain.to_string()))
419    }
420
421    /// Starts querying for the ip addresses of a hostname.
422    ///
423    /// Returns a channel `Receiver` to receive events about the hostname.
424    /// The caller can call `.recv_async().await` on this receiver to handle events in an
425    /// async environment or call `.recv()` in a sync environment.
426    ///
427    /// The `timeout` is specified in milliseconds.
428    ///
429    /// # Errors
430    ///
431    /// Returns [`Error::Msg`] if:
432    ///
433    /// - `hostname` does not end with `.local.`;
434    /// - `hostname` is exactly `.local.` (the label before `.local.` is empty);
435    /// - `hostname` is longer than 255 bytes.
436    ///
437    /// Returns [`Error::Again`] if the daemon's command queue is full.
438    ///
439    /// Returns [`Error::DaemonShutdown`] if the daemon thread has already exited.
440    pub fn resolve_hostname(
441        &self,
442        hostname: &str,
443        timeout: Option<u64>,
444    ) -> Result<Receiver<HostnameResolutionEvent>> {
445        check_hostname(hostname)?;
446        let (resp_s, resp_r) = bounded(10);
447        self.send_cmd(Command::ResolveHostname(
448            hostname.to_string(),
449            1,
450            resp_s,
451            timeout,
452        ))?;
453        Ok(resp_r)
454    }
455
456    /// Stops querying for the ip addresses of a hostname.
457    ///
458    /// # Errors
459    ///
460    /// Same error conditions as [`stop_browse`](Self::stop_browse).
461    pub fn stop_resolve_hostname(&self, hostname: &str) -> Result<()> {
462        self.send_cmd(Command::StopResolveHostname(hostname.to_string()))
463    }
464
465    /// Registers a service provided by this host.
466    ///
467    /// If `service_info` has no addresses yet and its `addr_auto` is enabled,
468    /// this method will automatically fill in addresses from the host.
469    ///
470    /// To re-announce a service with an updated `service_info`, just call
471    /// this `register` function again. No need to call `unregister` first.
472    ///
473    /// # Errors
474    ///
475    /// Returns [`Error::Msg`] if the [`ServiceInfo`] is malformed, for example:
476    ///
477    /// - the fullname does not end with `._tcp.local.` or `._udp.local.`;
478    /// - the hostname does not end with `.local.`, is exactly `.local.`, or
479    ///   is longer than 255 bytes.
480    ///
481    /// Returns [`Error::Again`] if the daemon's command queue is full.
482    ///
483    /// Returns [`Error::DaemonShutdown`] if the daemon thread has already exited.
484    pub fn register(&self, service_info: ServiceInfo) -> Result<()> {
485        check_service_name(service_info.get_fullname())?;
486        check_hostname(service_info.get_hostname())?;
487
488        self.send_cmd(Command::Register(service_info.into()))
489    }
490
491    /// Unregisters a service. This is a graceful shutdown of a service.
492    ///
493    /// Returns a channel receiver that is used to receive the status code
494    /// of the unregister.
495    ///
496    /// # Errors
497    ///
498    /// Returns [`Error::Again`] if the daemon's command queue is full.
499    ///
500    /// Returns [`Error::DaemonShutdown`] if the daemon thread has already exited.
501    pub fn unregister(&self, fullname: &str) -> Result<Receiver<UnregisterStatus>> {
502        let (resp_s, resp_r) = bounded(1);
503        self.send_cmd(Command::Unregister(fullname.to_lowercase(), resp_s))?;
504        Ok(resp_r)
505    }
506
507    /// Starts to monitor events from the daemon.
508    ///
509    /// Returns a channel [`Receiver`] of [`DaemonEvent`].
510    ///
511    /// # Errors
512    ///
513    /// Returns [`Error::Again`] if the daemon's command queue is full.
514    ///
515    /// Returns [`Error::DaemonShutdown`] if the daemon thread has already exited.
516    pub fn monitor(&self) -> Result<Receiver<DaemonEvent>> {
517        let (resp_s, resp_r) = bounded(100);
518        self.send_cmd(Command::Monitor(resp_s))?;
519        Ok(resp_r)
520    }
521
522    /// Shuts down the daemon thread and returns a channel to receive the status.
523    ///
524    /// # Errors
525    ///
526    /// Returns [`Error::Again`] if the daemon's command queue is full.
527    ///
528    /// Returns [`Error::DaemonShutdown`] if the daemon thread has already exited.
529    pub fn shutdown(&self) -> Result<Receiver<DaemonStatus>> {
530        let (resp_s, resp_r) = bounded(1);
531        self.send_cmd(Command::Exit(resp_s))?;
532        Ok(resp_r)
533    }
534
535    /// Returns the status of the daemon.
536    ///
537    /// # Errors
538    ///
539    /// Returns [`Error::Again`] if the daemon's command queue is full.
540    ///
541    /// Returns [`Error::DaemonShutdown`] if the daemon thread has already exited.
542    pub fn status(&self) -> Result<Receiver<DaemonStatus>> {
543        let (resp_s, resp_r) = bounded(1);
544
545        if self.sender.is_disconnected() {
546            resp_s
547                .send(DaemonStatus::Shutdown)
548                .map_err(|e| e_fmt!("failed to send daemon status to the client: {}", e))?;
549        } else {
550            self.send_cmd(Command::GetStatus(resp_s))?;
551        }
552
553        Ok(resp_r)
554    }
555
556    /// Returns a channel receiver for the metrics, e.g. input/output counters.
557    ///
558    /// The metrics returned is a snapshot. Hence the caller should call
559    /// this method repeatedly if they want to monitor the metrics continuously.
560    ///
561    /// # Errors
562    ///
563    /// Returns [`Error::Again`] if the daemon's command queue is full.
564    ///
565    /// Returns [`Error::DaemonShutdown`] if the daemon thread has already exited.
566    pub fn get_metrics(&self) -> Result<Receiver<Metrics>> {
567        let (resp_s, resp_r) = bounded(1);
568        self.send_cmd(Command::GetMetrics(resp_s))?;
569        Ok(resp_r)
570    }
571
572    /// Change the max length allowed for a service name.
573    ///
574    /// As RFC 6763 defines a length max for a service name, a user should not call
575    /// this method unless they have to. See [`SERVICE_NAME_LEN_MAX_DEFAULT`].
576    ///
577    /// `len_max` is capped at an internal limit, which is currently 30.
578    ///
579    /// # Errors
580    ///
581    /// Returns [`Error::Msg`] if `len_max` exceeds the internal cap (30).
582    ///
583    /// Returns [`Error::Again`] if the daemon's command queue is full.
584    ///
585    /// Returns [`Error::DaemonShutdown`] if the daemon thread has already exited.
586    pub fn set_service_name_len_max(&self, len_max: u8) -> Result<()> {
587        const SERVICE_NAME_LEN_MAX_LIMIT: u8 = 30; // Double the default length max.
588
589        if len_max > SERVICE_NAME_LEN_MAX_LIMIT {
590            return Err(Error::Msg(format!(
591                "service name length max {len_max} is too large"
592            )));
593        }
594
595        self.send_cmd(Command::SetOption(DaemonOption::ServiceNameLenMax(len_max)))
596    }
597
598    /// Change the interval for checking IP changes automatically.
599    ///
600    /// Setting the interval to 0 disables the IP check.
601    ///
602    /// See [`IP_CHECK_INTERVAL_IN_SECS_DEFAULT`] for the default interval.
603    pub fn set_ip_check_interval(&self, interval_in_secs: u32) -> Result<()> {
604        let interval_in_millis = interval_in_secs as u64 * 1000;
605        self.send_cmd(Command::SetOption(DaemonOption::IpCheckInterval(
606            interval_in_millis,
607        )))
608    }
609
610    /// Get the current interval in seconds for checking IP changes automatically.
611    pub fn get_ip_check_interval(&self) -> Result<u32> {
612        let (resp_s, resp_r) = bounded(1);
613        self.send_cmd(Command::GetOption(resp_s))?;
614
615        let option = resp_r
616            .recv_timeout(Duration::from_secs(10))
617            .map_err(|e| e_fmt!("failed to receive ip check interval: {}", e))?;
618        let ip_check_interval_in_secs = option.ip_check_interval / 1000;
619        Ok(ip_check_interval_in_secs as u32)
620    }
621
622    /// Include interfaces that match `if_kind` for this service daemon.
623    ///
624    /// For example:
625    /// ```ignore
626    ///     daemon.enable_interface("en0")?;
627    /// ```
628    pub fn enable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
629        let if_kind_vec = if_kind.into_vec();
630        self.send_cmd(Command::SetOption(DaemonOption::EnableInterface(
631            if_kind_vec.kinds,
632        )))
633    }
634
635    /// Ignore/exclude interfaces that match `if_kind` for this daemon.
636    ///
637    /// For example:
638    /// ```ignore
639    ///     daemon.disable_interface(IfKind::IPv6)?;
640    /// ```
641    pub fn disable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
642        let if_kind_vec = if_kind.into_vec();
643        self.send_cmd(Command::SetOption(DaemonOption::DisableInterface(
644            if_kind_vec.kinds,
645        )))
646    }
647
648    /// If `accept` is true, accept and cache all responses, even if there is no active querier
649    /// for a given service type. This is useful / necessary when doing cache-only browsing. See
650    /// [browse_cache](Self::browse_cache).
651    ///
652    /// If `accept` is false (default), accept only responses matching queries that we have initiated.
653    ///
654    /// For example:
655    /// ```ignore
656    ///     daemon.accept_unsolicited(true)?;
657    /// ```
658    pub fn accept_unsolicited(&self, accept: bool) -> Result<()> {
659        self.send_cmd(Command::SetOption(DaemonOption::AcceptUnsolicited(accept)))
660    }
661
662    /// Include or exclude Apple P2P interfaces, e.g. "awdl0", "llw0".
663    /// By default, they are excluded.
664    pub fn include_apple_p2p(&self, include: bool) -> Result<()> {
665        self.send_cmd(Command::SetOption(DaemonOption::IncludeAppleP2P(include)))
666    }
667
668    #[cfg(test)]
669    pub fn test_down_interface(&self, ifname: &str) -> Result<()> {
670        self.send_cmd(Command::SetOption(DaemonOption::TestDownInterface(
671            ifname.to_string(),
672        )))
673    }
674
675    #[cfg(test)]
676    pub fn test_up_interface(&self, ifname: &str) -> Result<()> {
677        self.send_cmd(Command::SetOption(DaemonOption::TestUpInterface(
678            ifname.to_string(),
679        )))
680    }
681
682    /// Enable or disable the loopback for locally sent multicast packets in IPv4.
683    ///
684    /// By default, multicast loop is enabled for IPv4. When disabled, a querier will not
685    /// receive announcements from a responder on the same host.
686    ///
687    /// Reference: <https://learn.microsoft.com/en-us/windows/win32/winsock/ip-multicast-2>
688    ///
689    /// "The Winsock version of the IP_MULTICAST_LOOP option is semantically different than
690    /// the UNIX version of the IP_MULTICAST_LOOP option:
691    ///
692    /// In Winsock, the IP_MULTICAST_LOOP option applies only to the receive path.
693    /// In the UNIX version, the IP_MULTICAST_LOOP option applies to the send path."
694    ///
695    /// Which means, in order NOT to receive localhost announcements, you want to call
696    /// this API on the querier side on Windows, but on the responder side on Unix.
697    pub fn set_multicast_loop_v4(&self, on: bool) -> Result<()> {
698        self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV4(on)))
699    }
700
701    /// Enable or disable the loopback for locally sent multicast packets in IPv6.
702    ///
703    /// By default, multicast loop is enabled for IPv6. When disabled, a querier will not
704    /// receive announcements from a responder on the same host.
705    ///
706    /// Reference: <https://learn.microsoft.com/en-us/windows/win32/winsock/ip-multicast-2>
707    ///
708    /// "The Winsock version of the IP_MULTICAST_LOOP option is semantically different than
709    /// the UNIX version of the IP_MULTICAST_LOOP option:
710    ///
711    /// In Winsock, the IP_MULTICAST_LOOP option applies only to the receive path.
712    /// In the UNIX version, the IP_MULTICAST_LOOP option applies to the send path."
713    ///
714    /// Which means, in order NOT to receive localhost announcements, you want to call
715    /// this API on the querier side on Windows, but on the responder side on Unix.
716    pub fn set_multicast_loop_v6(&self, on: bool) -> Result<()> {
717        self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV6(on)))
718    }
719
720    /// Proactively confirms whether a service instance still valid.
721    ///
722    /// This call will issue queries for a service instance's SRV record and Address records.
723    ///
724    /// For `timeout`, most users should use [VERIFY_TIMEOUT_DEFAULT]
725    /// unless there is a reason not to follow RFC.
726    ///
727    /// If no response is received within `timeout`, the current resource
728    /// records will be flushed, and if needed, `ServiceRemoved` event will be
729    /// sent to active queriers.
730    ///
731    /// Reference: [RFC 6762](https://datatracker.ietf.org/doc/html/rfc6762#section-10.4)
732    ///
733    /// # Errors
734    ///
735    /// Returns [`Error::Again`] if the daemon's command queue is full.
736    ///
737    /// Returns [`Error::DaemonShutdown`] if the daemon thread has already exited.
738    pub fn verify(&self, instance_fullname: String, timeout: Duration) -> Result<()> {
739        self.send_cmd(Command::Verify(instance_fullname, timeout))
740    }
741
742    fn daemon_thread(
743        signal_sock: MioUdpSocket,
744        poller: Poll,
745        receiver: Receiver<Command>,
746        port: u16,
747        cmd_sender: Sender<Command>,
748        signal_addr: SocketAddr,
749    ) {
750        let mut zc = Zeroconf::new(signal_sock, poller, port, cmd_sender, signal_addr);
751
752        if let Some(cmd) = zc.run(receiver) {
753            match cmd {
754                Command::Exit(resp_s) => {
755                    // It is guaranteed that the receiver already dropped,
756                    // i.e. the daemon command channel closed.
757                    if let Err(e) = resp_s.send(DaemonStatus::Shutdown) {
758                        debug!("exit: failed to send response of shutdown: {}", e);
759                    }
760                }
761                _ => {
762                    debug!("Unexpected command: {:?}", cmd);
763                }
764            }
765        }
766    }
767}
768
769/// Creates a new UDP socket that uses `intf` to send and recv multicast.
770fn _new_socket_bind(intf: &Interface, should_loop: bool) -> Result<MyUdpSocket> {
771    // Use the same socket for receiving and sending multicast packets.
772    // Such socket has to bind to INADDR_ANY or IN6ADDR_ANY.
773    let intf_ip = &intf.ip();
774    match intf_ip {
775        IpAddr::V4(ip) => {
776            let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), MDNS_PORT);
777            let sock = new_socket(addr.into(), true)?;
778
779            // Join mDNS group to receive packets.
780            sock.join_multicast_v4(&GROUP_ADDR_V4, ip)
781                .map_err(|e| e_fmt!("join multicast group on addr {}: {}", intf_ip, e))?;
782
783            // Set IP_MULTICAST_IF to send packets.
784            sock.set_multicast_if_v4(ip)
785                .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
786
787            // Per RFC 6762 section 11:
788            // "All Multicast DNS responses (including responses sent via unicast) SHOULD
789            // be sent with IP TTL set to 255."
790            // Here we set the TTL to 255 for multicast as we don't support unicast yet.
791            sock.set_multicast_ttl_v4(255)
792                .map_err(|e| e_fmt!("set set_multicast_ttl_v4 on addr {}: {}", ip, e))?;
793
794            if !should_loop {
795                sock.set_multicast_loop_v4(false)
796                    .map_err(|e| e_fmt!("failed to set multicast loop v4 for {ip}: {e}"))?;
797            }
798
799            // Test if we can send packets successfully.
800            let multicast_addr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
801            let test_packets = DnsOutgoing::new(0).to_data_on_wire();
802            for packet in test_packets {
803                sock.send_to(&packet, &multicast_addr)
804                    .map_err(|e| e_fmt!("send multicast packet on addr {}: {}", ip, e))?;
805            }
806            MyUdpSocket::new(sock)
807                .map_err(|e| e_fmt!("failed to create MySocket for interface {}: {e}", intf.name))
808        }
809        IpAddr::V6(ip) => {
810            let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), MDNS_PORT, 0, 0);
811            let sock = new_socket(addr.into(), true)?;
812
813            let if_index = intf.index.unwrap_or(0);
814
815            // Join mDNS group to receive packets.
816            sock.join_multicast_v6(&GROUP_ADDR_V6, if_index)
817                .map_err(|e| e_fmt!("join multicast group on addr {}: {}", ip, e))?;
818
819            // Set IPV6_MULTICAST_IF to send packets.
820            sock.set_multicast_if_v6(if_index)
821                .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
822
823            // We are not sending multicast packets to test this socket as there might
824            // be many IPv6 interfaces on a host and could cause such send error:
825            // "No buffer space available (os error 55)".
826
827            MyUdpSocket::new(sock)
828                .map_err(|e| e_fmt!("failed to create MySocket for interface {}: {e}", intf.name))
829        }
830    }
831}
832
833/// Creates a new UDP socket to bind to `port` with REUSEPORT option.
834/// `non_block` indicates whether to set O_NONBLOCK for the socket.
835fn new_socket(addr: SocketAddr, non_block: bool) -> Result<PktInfoUdpSocket> {
836    let domain = match addr {
837        SocketAddr::V4(_) => socket2::Domain::IPV4,
838        SocketAddr::V6(_) => socket2::Domain::IPV6,
839    };
840
841    let fd = PktInfoUdpSocket::new(domain).map_err(|e| e_fmt!("create socket failed: {}", e))?;
842
843    fd.set_reuse_address(true)
844        .map_err(|e| e_fmt!("set ReuseAddr failed: {}", e))?;
845    #[cfg(unix)]
846    if let Err(e) = fd.set_reuse_port(true) {
847        debug!(
848            "SO_REUSEPORT is not supported, continuing without it: {}",
849            e
850        );
851    }
852
853    if non_block {
854        fd.set_nonblocking(true)
855            .map_err(|e| e_fmt!("set O_NONBLOCK: {}", e))?;
856    }
857
858    fd.bind(&addr.into())
859        .map_err(|e| e_fmt!("socket bind to {} failed: {}", &addr, e))?;
860
861    trace!("new socket bind to {}", &addr);
862    Ok(fd)
863}
864
865/// Specify a UNIX timestamp in millis to run `command` for the next time.
866struct ReRun {
867    /// UNIX timestamp in millis.
868    next_time: u64,
869    command: Command,
870}
871
872/// Specify kinds of interfaces. It is used to enable or to disable interfaces in the daemon.
873///
874/// Note that for ergonomic reasons, `From<&str>` and `From<IpAddr>` are implemented.
875#[derive(Debug, Clone)]
876#[non_exhaustive]
877pub enum IfKind {
878    /// All interfaces.
879    All,
880
881    /// All IPv4 interfaces.
882    IPv4,
883
884    /// All IPv6 interfaces.
885    IPv6,
886
887    /// By the interface name, for example "en0"
888    Name(String),
889
890    /// By an IPv4 or IPv6 address.
891    /// This is used to look up the interface. The semantics is to identify an interface of
892    /// IPv4 or IPv6, not a specific address on the interface.
893    Addr(IpAddr),
894
895    /// 127.0.0.1 (or anything in 127.0.0.0/8), enabled by default.
896    ///
897    /// Loopback interfaces are required by some use cases (e.g., OSCQuery) for publishing.
898    LoopbackV4,
899
900    /// ::1/128, enabled by default.
901    LoopbackV6,
902
903    /// By interface index, IPv4 only.
904    IndexV4(u32),
905
906    /// By interface index, IPv6 only.
907    IndexV6(u32),
908}
909
910impl IfKind {
911    /// Checks if `intf` matches with this interface kind.
912    fn matches(&self, intf: &Interface) -> bool {
913        match self {
914            Self::All => true,
915            Self::IPv4 => intf.ip().is_ipv4(),
916            Self::IPv6 => intf.ip().is_ipv6(),
917            Self::Name(ifname) => ifname == &intf.name,
918            Self::Addr(addr) => addr == &intf.ip(),
919            Self::LoopbackV4 => intf.is_loopback() && intf.ip().is_ipv4(),
920            Self::LoopbackV6 => intf.is_loopback() && intf.ip().is_ipv6(),
921            Self::IndexV4(idx) => intf.index == Some(*idx) && intf.ip().is_ipv4(),
922            Self::IndexV6(idx) => intf.index == Some(*idx) && intf.ip().is_ipv6(),
923        }
924    }
925}
926
927/// The first use case of specifying an interface was to
928/// use an interface name. Hence adding this for ergonomic reasons.
929impl From<&str> for IfKind {
930    fn from(val: &str) -> Self {
931        Self::Name(val.to_string())
932    }
933}
934
935impl From<&String> for IfKind {
936    fn from(val: &String) -> Self {
937        Self::Name(val.to_string())
938    }
939}
940
941/// Still for ergonomic reasons.
942impl From<IpAddr> for IfKind {
943    fn from(val: IpAddr) -> Self {
944        Self::Addr(val)
945    }
946}
947
948/// A list of `IfKind` that can be used to match interfaces.
949pub struct IfKindVec {
950    kinds: Vec<IfKind>,
951}
952
953/// A trait that converts a type into a Vec of `IfKind`.
954pub trait IntoIfKindVec {
955    fn into_vec(self) -> IfKindVec;
956}
957
958impl<T: Into<IfKind>> IntoIfKindVec for T {
959    fn into_vec(self) -> IfKindVec {
960        let if_kind: IfKind = self.into();
961        IfKindVec {
962            kinds: vec![if_kind],
963        }
964    }
965}
966
967impl<T: Into<IfKind>> IntoIfKindVec for Vec<T> {
968    fn into_vec(self) -> IfKindVec {
969        let kinds: Vec<IfKind> = self.into_iter().map(|x| x.into()).collect();
970        IfKindVec { kinds }
971    }
972}
973
974/// Selection of interfaces.
975struct IfSelection {
976    /// The interfaces to be selected.
977    if_kind: IfKind,
978
979    /// Whether the `if_kind` should be enabled or not.
980    selected: bool,
981}
982
983/// A struct holding the state. It was inspired by `zeroconf` package in Python.
984struct Zeroconf {
985    /// The mDNS port number to use for socket binding.
986    /// Typically MDNS_PORT (5353), but can be customized for development/testing.
987    port: u16,
988
989    /// Local interfaces keyed by interface index.
990    my_intfs: HashMap<u32, MyIntf>,
991
992    /// A common socket for IPv4 interfaces. It's None if IPv4 is disabled in OS kernel.
993    ipv4_sock: Option<MyUdpSocket>,
994
995    /// A common socket for IPv6 interfaces. It's None if IPv6 is disabled in OS kernel.
996    ipv6_sock: Option<MyUdpSocket>,
997
998    /// Local registered services, keyed by service full names.
999    my_services: HashMap<String, ServiceInfo>,
1000
1001    /// Received DNS records.
1002    cache: DnsCache,
1003
1004    /// Registered service records, keyed by interface index.
1005    dns_registry_map: HashMap<u32, DnsRegistry>,
1006
1007    /// Active "Browse" commands.
1008    service_queriers: HashMap<String, Sender<ServiceEvent>>, // <ty_domain, channel::sender>
1009
1010    /// Active "ResolveHostname" commands.
1011    ///
1012    /// The timestamps are set at the future timestamp when the command should timeout.
1013    /// `hostname` is case-insensitive and stored in lowercase.
1014    hostname_resolvers: HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>, // <hostname, (channel::sender, UNIX timestamp in millis)>
1015
1016    /// All repeating transmissions.
1017    retransmissions: Vec<ReRun>,
1018
1019    counters: Metrics,
1020
1021    /// Waits for incoming packets.
1022    poller: Poll,
1023
1024    /// Channels to notify events.
1025    monitors: Vec<Sender<DaemonEvent>>,
1026
1027    /// Options
1028    service_name_len_max: u8,
1029
1030    /// Interval in millis to check IP address changes.
1031    ip_check_interval: u64,
1032
1033    /// All interface selections called to the daemon.
1034    if_selections: Vec<IfSelection>,
1035
1036    /// Socket for signaling.
1037    signal_sock: MioUdpSocket,
1038
1039    /// Timestamps marking where we need another iteration of the run loop,
1040    /// to react to events like retransmissions, cache refreshes, interface IP address changes, etc.
1041    ///
1042    /// When the run loop goes through a single iteration, it will
1043    /// set its timeout to the earliest timer in this list.
1044    timers: BinaryHeap<Reverse<u64>>,
1045
1046    status: DaemonStatus,
1047
1048    /// Service instances that are pending for resolving SRV and TXT.
1049    pending_resolves: HashSet<String>,
1050
1051    /// Service instances that are already resolved.
1052    resolved: HashSet<String>,
1053
1054    multicast_loop_v4: bool,
1055
1056    multicast_loop_v6: bool,
1057
1058    accept_unsolicited: bool,
1059
1060    include_apple_p2p: bool,
1061
1062    cmd_sender: Sender<Command>,
1063
1064    signal_addr: SocketAddr,
1065
1066    #[cfg(test)]
1067    test_down_interfaces: HashSet<String>,
1068}
1069
1070/// Join the multicast group for the given interface.
1071fn join_multicast_group(my_sock: &PktInfoUdpSocket, intf: &Interface) -> Result<()> {
1072    let intf_ip = &intf.ip();
1073    match intf_ip {
1074        IpAddr::V4(ip) => {
1075            // Join mDNS group to receive packets.
1076            debug!("join multicast group V4 on {} addr {ip}", intf.name);
1077            my_sock
1078                .join_multicast_v4(&GROUP_ADDR_V4, ip)
1079                .map_err(|e| e_fmt!("PKT join multicast group on addr {}: {}", intf_ip, e))?;
1080        }
1081        IpAddr::V6(ip) => {
1082            let if_index = intf.index.unwrap_or(0);
1083            // Join mDNS group to receive packets.
1084            debug!(
1085                "join multicast group V6 on {} addr {ip} with index {if_index}",
1086                intf.name
1087            );
1088            my_sock
1089                .join_multicast_v6(&GROUP_ADDR_V6, if_index)
1090                .map_err(|e| e_fmt!("PKT join multicast group on addr {}: {}", ip, e))?;
1091        }
1092    }
1093    Ok(())
1094}
1095
1096impl Zeroconf {
1097    fn new(
1098        signal_sock: MioUdpSocket,
1099        poller: Poll,
1100        port: u16,
1101        cmd_sender: Sender<Command>,
1102        signal_addr: SocketAddr,
1103    ) -> Self {
1104        // Get interfaces.
1105        let my_ifaddrs = my_ip_interfaces(true);
1106
1107        // Create a socket for every IP addr.
1108        // Note: it is possible that `my_ifaddrs` contains the same IP addr with different interface names,
1109        // or the same interface name with different IP addrs.
1110        let mut my_intfs = HashMap::new();
1111        let mut dns_registry_map = HashMap::new();
1112
1113        // Use the same socket for receiving and sending multicast packets.
1114        // Such socket has to bind to INADDR_ANY or IN6ADDR_ANY.
1115        let mut ipv4_sock = None;
1116        let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), port);
1117        match new_socket(addr.into(), true) {
1118            Ok(sock) => {
1119                // Per RFC 6762 section 11:
1120                // "All Multicast DNS responses (including responses sent via unicast) SHOULD
1121                // be sent with IP TTL set to 255."
1122                // Here we set the TTL to 255 for multicast as we don't support unicast yet.
1123                sock.set_multicast_ttl_v4(255)
1124                    .map_err(|e| e_fmt!("set set_multicast_ttl_v4 on addr: {}", e))
1125                    .ok();
1126
1127                // This clones a socket.
1128                ipv4_sock = match MyUdpSocket::new(sock) {
1129                    Ok(s) => Some(s),
1130                    Err(e) => {
1131                        debug!("failed to create IPv4 MyUdpSocket: {e}");
1132                        None
1133                    }
1134                };
1135            }
1136            // Per RFC 6762 section 11:}
1137            Err(e) => debug!("failed to create IPv4 socket: {e}"),
1138        }
1139
1140        let mut ipv6_sock = None;
1141        let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), port, 0, 0);
1142        match new_socket(addr.into(), true) {
1143            Ok(sock) => {
1144                // Per RFC 6762 section 11:
1145                // "All Multicast DNS responses (including responses sent via unicast) SHOULD
1146                // be sent with IP TTL set to 255."
1147                sock.set_multicast_hops_v6(255)
1148                    .map_err(|e| e_fmt!("set set_multicast_hops_v6: {}", e))
1149                    .ok();
1150
1151                // This clones the ipv6 socket.
1152                ipv6_sock = match MyUdpSocket::new(sock) {
1153                    Ok(s) => Some(s),
1154                    Err(e) => {
1155                        debug!("failed to create IPv6 MyUdpSocket: {e}");
1156                        None
1157                    }
1158                };
1159            }
1160            Err(e) => debug!("failed to create IPv6 socket: {e}"),
1161        }
1162
1163        // Configure sockets to join multicast groups.
1164        for intf in my_ifaddrs {
1165            let sock_opt = if intf.ip().is_ipv4() {
1166                &ipv4_sock
1167            } else {
1168                &ipv6_sock
1169            };
1170            let Some(sock) = sock_opt else {
1171                debug!(
1172                    "no socket available for interface {} with addr {}. Skipped.",
1173                    intf.name,
1174                    intf.ip()
1175                );
1176                continue;
1177            };
1178
1179            if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1180                debug!("failed to join multicast: {}: {e}. Skipped.", &intf.ip());
1181            }
1182
1183            let if_index = intf.index.unwrap_or(0);
1184
1185            // Add this interface address if not already present.
1186            dns_registry_map
1187                .entry(if_index)
1188                .or_insert_with(DnsRegistry::new);
1189
1190            my_intfs
1191                .entry(if_index)
1192                .and_modify(|v: &mut MyIntf| {
1193                    v.addrs.insert(intf.addr.clone());
1194                })
1195                .or_insert(MyIntf {
1196                    name: intf.name.clone(),
1197                    index: if_index,
1198                    addrs: HashSet::from([intf.addr]),
1199                });
1200        }
1201
1202        let monitors = Vec::new();
1203        let service_name_len_max = SERVICE_NAME_LEN_MAX_DEFAULT;
1204        let ip_check_interval = IP_CHECK_INTERVAL_IN_SECS_DEFAULT as u64 * 1000;
1205
1206        let timers = BinaryHeap::new();
1207
1208        // Enable everything, including loopback interfaces.
1209        let if_selections = vec![];
1210
1211        let status = DaemonStatus::Running;
1212
1213        Self {
1214            port,
1215            my_intfs,
1216            ipv4_sock,
1217            ipv6_sock,
1218            my_services: HashMap::new(),
1219            cache: DnsCache::new(),
1220            dns_registry_map,
1221            hostname_resolvers: HashMap::new(),
1222            service_queriers: HashMap::new(),
1223            retransmissions: Vec::new(),
1224            counters: HashMap::new(),
1225            poller,
1226            monitors,
1227            service_name_len_max,
1228            ip_check_interval,
1229            if_selections,
1230            signal_sock,
1231            timers,
1232            status,
1233            pending_resolves: HashSet::new(),
1234            resolved: HashSet::new(),
1235            multicast_loop_v4: true,
1236            multicast_loop_v6: true,
1237            accept_unsolicited: false,
1238            include_apple_p2p: false,
1239            cmd_sender,
1240            signal_addr,
1241
1242            #[cfg(test)]
1243            test_down_interfaces: HashSet::new(),
1244        }
1245    }
1246
1247    /// Send a Command into the daemon channel and poke the signal socket to wake the poll loop.
1248    fn send_cmd_to_self(&self, cmd: Command) -> Result<()> {
1249        let cmd_name = cmd.to_string();
1250
1251        self.cmd_sender.try_send(cmd).map_err(|e| match e {
1252            TrySendError::Full(_) => Error::Again,
1253            TrySendError::Disconnected(_) => Error::DaemonShutdown,
1254        })?;
1255
1256        let addr = SocketAddrV4::new(LOOPBACK_V4, 0);
1257        let socket = UdpSocket::bind(addr)
1258            .map_err(|e| e_fmt!("Failed to create socket to send signal: {}", e))?;
1259        socket
1260            .send_to(cmd_name.as_bytes(), self.signal_addr)
1261            .map_err(|e| {
1262                e_fmt!(
1263                    "signal socket send_to {} ({}) failed: {}",
1264                    self.signal_addr,
1265                    cmd_name,
1266                    e
1267                )
1268            })?;
1269
1270        Ok(())
1271    }
1272
1273    /// Clean up all resources before shutdown.
1274    ///
1275    /// This method:
1276    /// 1. Unregisters all registered services (sends goodbye packets)
1277    /// 2. Stops all active browse operations
1278    /// 3. Stops all active hostname resolution operations
1279    /// 4. Clears all retransmissions
1280    fn cleanup(&mut self) {
1281        debug!("Starting cleanup for shutdown");
1282
1283        // 1. Unregister all services - send goodbye packets
1284        let service_names: Vec<String> = self.my_services.keys().cloned().collect();
1285        for fullname in service_names {
1286            if let Some(info) = self.my_services.get(&fullname) {
1287                debug!("Unregistering service during shutdown: {}", &fullname);
1288
1289                for intf in self.my_intfs.values() {
1290                    if let Some(sock) = self.ipv4_sock.as_ref() {
1291                        self.unregister_service(info, intf, &sock.pktinfo);
1292                    }
1293
1294                    if let Some(sock) = self.ipv6_sock.as_ref() {
1295                        self.unregister_service(info, intf, &sock.pktinfo);
1296                    }
1297                }
1298            }
1299        }
1300        self.my_services.clear();
1301
1302        // 2. Stop all browse operations
1303        let browse_types: Vec<String> = self.service_queriers.keys().cloned().collect();
1304        for ty_domain in browse_types {
1305            debug!("Stopping browse during shutdown: {}", &ty_domain);
1306            if let Some(sender) = self.service_queriers.remove(&ty_domain) {
1307                // Notify the client
1308                if let Err(e) = sender.send(ServiceEvent::SearchStopped(ty_domain.clone())) {
1309                    debug!("Failed to send SearchStopped during shutdown: {}", e);
1310                }
1311            }
1312        }
1313
1314        // 3. Stop all hostname resolution operations
1315        let hostnames: Vec<String> = self.hostname_resolvers.keys().cloned().collect();
1316        for hostname in hostnames {
1317            debug!(
1318                "Stopping hostname resolution during shutdown: {}",
1319                &hostname
1320            );
1321            if let Some((sender, _timeout)) = self.hostname_resolvers.remove(&hostname) {
1322                // Notify the client
1323                if let Err(e) =
1324                    sender.send(HostnameResolutionEvent::SearchStopped(hostname.clone()))
1325                {
1326                    debug!(
1327                        "Failed to send HostnameResolutionEvent::SearchStopped during shutdown: {}",
1328                        e
1329                    );
1330                }
1331            }
1332        }
1333
1334        // 4. Clear all retransmissions
1335        self.retransmissions.clear();
1336
1337        debug!("Cleanup completed");
1338    }
1339
1340    /// The main event loop of the daemon thread
1341    ///
1342    /// In each round, it will:
1343    /// 1. select the listening sockets with a timeout.
1344    /// 2. process the incoming packets if any.
1345    /// 3. try_recv on its channel and execute commands.
1346    /// 4. announce its registered services.
1347    /// 5. process retransmissions if any.
1348    fn run(&mut self, receiver: Receiver<Command>) -> Option<Command> {
1349        // Add the daemon's signal socket to the poller.
1350        if let Err(e) = self.poller.registry().register(
1351            &mut self.signal_sock,
1352            mio::Token(SIGNAL_SOCK_EVENT_KEY),
1353            mio::Interest::READABLE,
1354        ) {
1355            debug!("failed to add signal socket to the poller: {}", e);
1356            return None;
1357        }
1358
1359        if let Some(sock) = self.ipv4_sock.as_mut() {
1360            if let Err(e) = self.poller.registry().register(
1361                sock,
1362                mio::Token(IPV4_SOCK_EVENT_KEY),
1363                mio::Interest::READABLE,
1364            ) {
1365                debug!("failed to register ipv4 socket: {}", e);
1366                return None;
1367            }
1368        }
1369
1370        if let Some(sock) = self.ipv6_sock.as_mut() {
1371            if let Err(e) = self.poller.registry().register(
1372                sock,
1373                mio::Token(IPV6_SOCK_EVENT_KEY),
1374                mio::Interest::READABLE,
1375            ) {
1376                debug!("failed to register ipv6 socket: {}", e);
1377                return None;
1378            }
1379        }
1380
1381        // Setup timer for IP checks.
1382        let mut next_ip_check = if self.ip_check_interval > 0 {
1383            current_time_millis() + self.ip_check_interval
1384        } else {
1385            0
1386        };
1387
1388        if next_ip_check > 0 {
1389            self.add_timer(next_ip_check);
1390        }
1391
1392        // Start the run loop.
1393
1394        let mut events = mio::Events::with_capacity(1024);
1395        loop {
1396            let now = current_time_millis();
1397
1398            let earliest_timer = self.peek_earliest_timer();
1399            let timeout = earliest_timer.map(|timer| {
1400                // If `timer` already passed, set `timeout` to be 1ms.
1401                let millis = if timer > now { timer - now } else { 1 };
1402                Duration::from_millis(millis)
1403            });
1404
1405            // Process incoming packets, command events and optional timeout.
1406            events.clear();
1407            match self.poller.poll(&mut events, timeout) {
1408                Ok(_) => self.handle_poller_events(&events),
1409                Err(e) => debug!("failed to select from sockets: {}", e),
1410            }
1411
1412            let now = current_time_millis();
1413
1414            // Remove the timers if already passed.
1415            self.pop_timers_till(now);
1416
1417            // Remove hostname resolvers with expired timeouts.
1418            for hostname in self
1419                .hostname_resolvers
1420                .clone()
1421                .into_iter()
1422                .filter(|(_, (_, timeout))| timeout.map(|t| now >= t).unwrap_or(false))
1423                .map(|(hostname, _)| hostname)
1424            {
1425                trace!("hostname resolver timeout for {}", &hostname);
1426                call_hostname_resolution_listener(
1427                    &self.hostname_resolvers,
1428                    &hostname,
1429                    HostnameResolutionEvent::SearchTimeout(hostname.to_owned()),
1430                );
1431                call_hostname_resolution_listener(
1432                    &self.hostname_resolvers,
1433                    &hostname,
1434                    HostnameResolutionEvent::SearchStopped(hostname.to_owned()),
1435                );
1436                self.hostname_resolvers.remove(&hostname);
1437            }
1438
1439            // process commands from the command channel
1440            while let Ok(command) = receiver.try_recv() {
1441                if matches!(command, Command::Exit(_)) {
1442                    debug!("Exit command received, performing cleanup");
1443                    self.cleanup();
1444                    self.status = DaemonStatus::Shutdown;
1445                    return Some(command);
1446                }
1447                self.exec_command(command, false);
1448            }
1449
1450            // check for repeated commands and run them if their time is up.
1451            let mut i = 0;
1452            while i < self.retransmissions.len() {
1453                if now >= self.retransmissions[i].next_time {
1454                    let rerun = self.retransmissions.remove(i);
1455                    self.exec_command(rerun.command, true);
1456                } else {
1457                    i += 1;
1458                }
1459            }
1460
1461            // Refresh cached service records with active queriers
1462            self.refresh_active_services();
1463
1464            // Refresh cached A/AAAA records with active queriers
1465            let mut query_count = 0;
1466            for (hostname, _sender) in self.hostname_resolvers.iter() {
1467                for (hostname, ip_addr) in
1468                    self.cache.refresh_due_hostname_resolutions(hostname).iter()
1469                {
1470                    self.send_query(hostname, ip_address_rr_type(&ip_addr.to_ip_addr()));
1471                    query_count += 1;
1472                }
1473            }
1474
1475            self.increase_counter(Counter::CacheRefreshAddr, query_count);
1476
1477            // check and evict expired records in our cache
1478            let now = current_time_millis();
1479
1480            // Notify service listeners about the expired records.
1481            let expired_services = self.cache.evict_expired_services(now);
1482            if !expired_services.is_empty() {
1483                debug!(
1484                    "run: send {} service removal to listeners",
1485                    expired_services.len()
1486                );
1487                self.notify_service_removal(expired_services);
1488            }
1489
1490            // Notify hostname listeners about the expired records.
1491            let expired_addrs = self.cache.evict_expired_addr(now);
1492            for (hostname, addrs) in expired_addrs {
1493                call_hostname_resolution_listener(
1494                    &self.hostname_resolvers,
1495                    &hostname,
1496                    HostnameResolutionEvent::AddressesRemoved(hostname.clone(), addrs),
1497                );
1498                let instances = self.cache.get_instances_on_host(&hostname);
1499                let instance_set: HashSet<String> = instances.into_iter().collect();
1500                self.resolve_updated_instances(&instance_set);
1501            }
1502
1503            // Send out probing queries.
1504            self.probing_handler();
1505
1506            // check IP changes if next_ip_check is reached.
1507            if now >= next_ip_check && next_ip_check > 0 {
1508                next_ip_check = now + self.ip_check_interval;
1509                self.add_timer(next_ip_check);
1510
1511                self.check_ip_changes();
1512            }
1513        }
1514    }
1515
1516    fn process_set_option(&mut self, daemon_opt: DaemonOption) {
1517        match daemon_opt {
1518            DaemonOption::ServiceNameLenMax(length) => self.service_name_len_max = length,
1519            DaemonOption::IpCheckInterval(interval) => self.ip_check_interval = interval,
1520            DaemonOption::EnableInterface(if_kind) => self.enable_interface(if_kind),
1521            DaemonOption::DisableInterface(if_kind) => self.disable_interface(if_kind),
1522            DaemonOption::MulticastLoopV4(on) => self.set_multicast_loop_v4(on),
1523            DaemonOption::MulticastLoopV6(on) => self.set_multicast_loop_v6(on),
1524            DaemonOption::AcceptUnsolicited(accept) => self.set_accept_unsolicited(accept),
1525            DaemonOption::IncludeAppleP2P(enable) => self.set_apple_p2p(enable),
1526            #[cfg(test)]
1527            DaemonOption::TestDownInterface(ifname) => {
1528                self.test_down_interfaces.insert(ifname);
1529            }
1530            #[cfg(test)]
1531            DaemonOption::TestUpInterface(ifname) => {
1532                self.test_down_interfaces.remove(&ifname);
1533            }
1534        }
1535    }
1536
1537    fn enable_interface(&mut self, kinds: Vec<IfKind>) {
1538        debug!("enable_interface: {:?}", kinds);
1539        let interfaces = my_ip_interfaces_inner(true, self.include_apple_p2p);
1540
1541        for if_kind in kinds {
1542            self.if_selections.push(IfSelection {
1543                if_kind: resolve_addr_to_index(if_kind, &interfaces),
1544                selected: true,
1545            });
1546        }
1547
1548        self.apply_intf_selections(interfaces);
1549    }
1550
1551    fn disable_interface(&mut self, kinds: Vec<IfKind>) {
1552        debug!("disable_interface: {:?}", kinds);
1553        let interfaces = my_ip_interfaces_inner(true, self.include_apple_p2p);
1554
1555        for if_kind in kinds {
1556            self.if_selections.push(IfSelection {
1557                if_kind: resolve_addr_to_index(if_kind, &interfaces),
1558                selected: false,
1559            });
1560        }
1561
1562        self.apply_intf_selections(interfaces);
1563    }
1564
1565    fn set_multicast_loop_v4(&mut self, on: bool) {
1566        let Some(sock) = self.ipv4_sock.as_mut() else {
1567            return;
1568        };
1569        self.multicast_loop_v4 = on;
1570        sock.pktinfo
1571            .set_multicast_loop_v4(on)
1572            .map_err(|e| e_fmt!("failed to set multicast loop v4: {}", e))
1573            .unwrap();
1574    }
1575
1576    fn set_multicast_loop_v6(&mut self, on: bool) {
1577        let Some(sock) = self.ipv6_sock.as_mut() else {
1578            return;
1579        };
1580        self.multicast_loop_v6 = on;
1581        sock.pktinfo
1582            .set_multicast_loop_v6(on)
1583            .map_err(|e| e_fmt!("failed to set multicast loop v6: {}", e))
1584            .unwrap();
1585    }
1586
1587    fn set_accept_unsolicited(&mut self, accept: bool) {
1588        self.accept_unsolicited = accept;
1589    }
1590
1591    fn set_apple_p2p(&mut self, include: bool) {
1592        if self.include_apple_p2p != include {
1593            self.include_apple_p2p = include;
1594            self.apply_intf_selections(my_ip_interfaces_inner(true, self.include_apple_p2p));
1595        }
1596    }
1597
1598    fn notify_monitors(&mut self, event: DaemonEvent) {
1599        // Only retain the monitors that are still connected.
1600        self.monitors.retain(|sender| {
1601            if let Err(e) = sender.try_send(event.clone()) {
1602                debug!("notify_monitors: try_send: {}", &e);
1603                if matches!(e, TrySendError::Disconnected(_)) {
1604                    return false; // This monitor is dropped.
1605                }
1606            }
1607            true
1608        });
1609    }
1610
1611    /// Remove `addr` in my services that enabled `addr_auto`.
1612    fn del_addr_in_my_services(&mut self, addr: &IpAddr) {
1613        for (_, service_info) in self.my_services.iter_mut() {
1614            if service_info.is_addr_auto() {
1615                service_info.remove_ipaddr(addr);
1616            }
1617        }
1618    }
1619
1620    fn add_timer(&mut self, next_time: u64) {
1621        self.timers.push(Reverse(next_time));
1622    }
1623
1624    fn peek_earliest_timer(&self) -> Option<u64> {
1625        self.timers.peek().map(|Reverse(v)| *v)
1626    }
1627
1628    fn _pop_earliest_timer(&mut self) -> Option<u64> {
1629        self.timers.pop().map(|Reverse(v)| v)
1630    }
1631
1632    /// Pop all timers that are already passed till `now`.
1633    fn pop_timers_till(&mut self, now: u64) {
1634        while let Some(Reverse(v)) = self.timers.peek() {
1635            if *v > now {
1636                break;
1637            }
1638            self.timers.pop();
1639        }
1640    }
1641
1642    /// Apply all selections to `interfaces` and return the selected addresses.
1643    fn selected_intfs(&self, interfaces: Vec<Interface>) -> HashSet<Interface> {
1644        let intf_count = interfaces.len();
1645        let mut intf_selections = vec![true; intf_count];
1646
1647        // apply if_selections
1648        for selection in self.if_selections.iter() {
1649            // Mark the interfaces for this selection.
1650            for i in 0..intf_count {
1651                if selection.if_kind.matches(&interfaces[i]) {
1652                    intf_selections[i] = selection.selected;
1653                }
1654            }
1655        }
1656
1657        let mut selected_addrs = HashSet::new();
1658        for i in 0..intf_count {
1659            if intf_selections[i] {
1660                selected_addrs.insert(interfaces[i].clone());
1661            }
1662        }
1663
1664        selected_addrs
1665    }
1666
1667    /// Apply all selections to `interfaces`.
1668    ///
1669    /// For any interface, add it if selected but not bound yet,
1670    /// delete it if not selected but still bound.
1671    fn apply_intf_selections(&mut self, interfaces: Vec<Interface>) {
1672        // By default, we enable all interfaces.
1673        let intf_count = interfaces.len();
1674        let mut intf_selections = vec![true; intf_count];
1675
1676        // apply if_selections
1677        for selection in self.if_selections.iter() {
1678            // Mark the interfaces for this selection.
1679            for i in 0..intf_count {
1680                if selection.if_kind.matches(&interfaces[i]) {
1681                    intf_selections[i] = selection.selected;
1682                }
1683            }
1684        }
1685
1686        // Update `my_intfs` based on the selections.
1687        for (idx, intf) in interfaces.into_iter().enumerate() {
1688            if intf_selections[idx] {
1689                // Add the interface
1690                self.add_interface(intf);
1691            } else {
1692                // Remove the interface
1693                self.del_interface_addr(&intf);
1694            }
1695        }
1696    }
1697
1698    fn del_ip(&mut self, ip: IpAddr) {
1699        self.del_addr_in_my_services(&ip);
1700        self.notify_monitors(DaemonEvent::IpDel(ip));
1701    }
1702
1703    /// Check for IP changes and update [my_intfs] as needed.
1704    fn check_ip_changes(&mut self) {
1705        // Get the current interfaces.
1706        let my_ifaddrs = my_ip_interfaces_inner(true, self.include_apple_p2p);
1707
1708        #[cfg(test)]
1709        let my_ifaddrs: Vec<_> = my_ifaddrs
1710            .into_iter()
1711            .filter(|intf| !self.test_down_interfaces.contains(&intf.name))
1712            .collect();
1713
1714        let ifaddrs_map: HashMap<u32, Vec<&IfAddr>> =
1715            my_ifaddrs.iter().fold(HashMap::new(), |mut acc, intf| {
1716                let if_index = intf.index.unwrap_or(0);
1717                acc.entry(if_index).or_default().push(&intf.addr);
1718                acc
1719            });
1720
1721        let mut deleted_intfs = Vec::new();
1722        let mut deleted_ips = Vec::new();
1723
1724        for (if_index, my_intf) in self.my_intfs.iter_mut() {
1725            let mut last_ipv4 = None;
1726            let mut last_ipv6 = None;
1727
1728            if let Some(current_addrs) = ifaddrs_map.get(if_index) {
1729                my_intf.addrs.retain(|addr| {
1730                    if current_addrs.contains(&addr) {
1731                        true
1732                    } else {
1733                        match addr.ip() {
1734                            IpAddr::V4(ipv4) => last_ipv4 = Some(ipv4),
1735                            IpAddr::V6(ipv6) => last_ipv6 = Some(ipv6),
1736                        }
1737                        deleted_ips.push(addr.ip());
1738                        false
1739                    }
1740                });
1741                if my_intf.addrs.is_empty() {
1742                    deleted_intfs.push((*if_index, last_ipv4, last_ipv6))
1743                }
1744            } else {
1745                // If it does not exist, remove the interface.
1746                debug!(
1747                    "check_ip_changes: interface {} ({}) no longer exists, removing",
1748                    my_intf.name, if_index
1749                );
1750                for addr in my_intf.addrs.iter() {
1751                    match addr.ip() {
1752                        IpAddr::V4(ipv4) => last_ipv4 = Some(ipv4),
1753                        IpAddr::V6(ipv6) => last_ipv6 = Some(ipv6),
1754                    }
1755                    deleted_ips.push(addr.ip())
1756                }
1757                deleted_intfs.push((*if_index, last_ipv4, last_ipv6));
1758            }
1759        }
1760
1761        if !deleted_ips.is_empty() || !deleted_intfs.is_empty() {
1762            debug!(
1763                "check_ip_changes: {} deleted ips {} deleted intfs",
1764                deleted_ips.len(),
1765                deleted_intfs.len()
1766            );
1767        }
1768
1769        for ip in deleted_ips {
1770            self.del_ip(ip);
1771        }
1772
1773        for (if_index, last_ipv4, last_ipv6) in deleted_intfs {
1774            let Some(my_intf) = self.my_intfs.remove(&if_index) else {
1775                continue;
1776            };
1777
1778            if let Some(ipv4) = last_ipv4 {
1779                debug!("leave multicast for {ipv4}");
1780                if let Some(sock) = self.ipv4_sock.as_mut() {
1781                    if let Err(e) = sock.pktinfo.leave_multicast_v4(&GROUP_ADDR_V4, &ipv4) {
1782                        debug!("leave multicast group for addr {ipv4}: {e}");
1783                    }
1784                }
1785            }
1786
1787            if let Some(ipv6) = last_ipv6 {
1788                debug!("leave multicast for {ipv6}");
1789                if let Some(sock) = self.ipv6_sock.as_mut() {
1790                    if let Err(e) = sock
1791                        .pktinfo
1792                        .leave_multicast_v6(&GROUP_ADDR_V6, my_intf.index)
1793                    {
1794                        debug!("leave multicast group for IPv6: {ipv6}: {e}");
1795                    }
1796                }
1797            }
1798
1799            // Remove cache records for this interface.
1800            let intf_id = InterfaceId {
1801                name: my_intf.name.to_string(),
1802                index: my_intf.index,
1803            };
1804            let result = self.cache.remove_records_on_intf(intf_id);
1805            self.notify_service_removal(result.removed_instances);
1806            self.resolve_updated_instances(&result.modified_instances);
1807        }
1808
1809        // Add newly found interfaces only if in our selections.
1810        self.apply_intf_selections(my_ifaddrs);
1811    }
1812
1813    /// Remove an interface address when it was down, disabled or removed from the system.
1814    /// If no more addresses on the interface, remove the interface as well.
1815    fn del_interface_addr(&mut self, intf: &Interface) {
1816        let if_index = intf.index.unwrap_or(0);
1817        debug!(
1818            "del_interface_addr: {} ({if_index}) addr {}",
1819            intf.name,
1820            intf.ip()
1821        );
1822
1823        let Some(my_intf) = self.my_intfs.get_mut(&if_index) else {
1824            debug!("del_interface_addr: interface {} not found", intf.name);
1825            return;
1826        };
1827
1828        let mut ip_removed = false;
1829
1830        if my_intf.addrs.remove(&intf.addr) {
1831            ip_removed = true;
1832
1833            match intf.addr.ip() {
1834                IpAddr::V4(ipv4) => {
1835                    if my_intf.next_ifaddr_v4().is_none() {
1836                        if let Some(sock) = self.ipv4_sock.as_mut() {
1837                            if let Err(e) = sock.pktinfo.leave_multicast_v4(&GROUP_ADDR_V4, &ipv4) {
1838                                debug!("leave multicast group for addr {ipv4}: {e}");
1839                            } else {
1840                                debug!("leave multicast for {ipv4}");
1841                            }
1842                        }
1843                    }
1844                }
1845
1846                IpAddr::V6(ipv6) => {
1847                    if my_intf.next_ifaddr_v6().is_none() {
1848                        if let Some(sock) = self.ipv6_sock.as_mut() {
1849                            if let Err(e) =
1850                                sock.pktinfo.leave_multicast_v6(&GROUP_ADDR_V6, if_index)
1851                            {
1852                                debug!("leave multicast group for addr {ipv6}: {e}");
1853                            }
1854                        }
1855                    }
1856                }
1857            }
1858
1859            if my_intf.addrs.is_empty() {
1860                // If no more addresses, remove the interface.
1861                debug!("del_interface_addr: removing interface {}", intf.name);
1862                self.my_intfs.remove(&if_index);
1863                self.dns_registry_map.remove(&if_index);
1864                self.cache
1865                    .remove_addrs_on_disabled_intf(if_index, IpType::BOTH);
1866            } else {
1867                // Interface still has addresses of the other IP version.
1868                // Remove cached address records for the disabled IP version
1869                // only if no more addresses of that version remain.
1870                let is_v4 = intf.addr.ip().is_ipv4();
1871                let version_gone = if is_v4 {
1872                    my_intf.next_ifaddr_v4().is_none()
1873                } else {
1874                    my_intf.next_ifaddr_v6().is_none()
1875                };
1876                if version_gone {
1877                    let ip_type = if is_v4 { IpType::V4 } else { IpType::V6 };
1878                    self.cache.remove_addrs_on_disabled_intf(if_index, ip_type);
1879                }
1880            }
1881        }
1882
1883        if ip_removed {
1884            // Notify the monitors.
1885            self.notify_monitors(DaemonEvent::IpDel(intf.ip()));
1886            // Remove the interface from my services that enabled `addr_auto`.
1887            self.del_addr_in_my_services(&intf.ip());
1888        }
1889    }
1890
1891    fn add_interface(&mut self, intf: Interface) {
1892        let sock_opt = if intf.ip().is_ipv4() {
1893            &self.ipv4_sock
1894        } else {
1895            &self.ipv6_sock
1896        };
1897
1898        let Some(sock) = sock_opt else {
1899            debug!(
1900                "add_interface: no socket available for interface {} with addr {}. Skipped.",
1901                intf.name,
1902                intf.ip()
1903            );
1904            return;
1905        };
1906
1907        let if_index = intf.index.unwrap_or(0);
1908        let mut new_addr = false;
1909
1910        match self.my_intfs.entry(if_index) {
1911            Entry::Occupied(mut entry) => {
1912                // If intf has a new address, add it to the existing interface.
1913                let my_intf = entry.get_mut();
1914                if !my_intf.addrs.contains(&intf.addr) {
1915                    if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1916                        debug!("add_interface: socket_config {}: {e}", &intf.name);
1917                    }
1918                    my_intf.addrs.insert(intf.addr.clone());
1919                    new_addr = true;
1920                }
1921            }
1922            Entry::Vacant(entry) => {
1923                if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1924                    debug!("add_interface: socket_config {}: {e}. Skipped.", &intf.name);
1925                    return;
1926                }
1927
1928                new_addr = true;
1929                let new_intf = MyIntf {
1930                    name: intf.name.clone(),
1931                    index: if_index,
1932                    addrs: HashSet::from([intf.addr.clone()]),
1933                };
1934                entry.insert(new_intf);
1935            }
1936        }
1937
1938        if !new_addr {
1939            trace!("add_interface: interface {} already exists", &intf.name);
1940            return;
1941        }
1942
1943        debug!("add new interface {}: {}", intf.name, intf.ip());
1944
1945        let Some(my_intf) = self.my_intfs.get(&if_index) else {
1946            debug!("add_interface: cannot find if_index {if_index}");
1947            return;
1948        };
1949
1950        let dns_registry = match self.dns_registry_map.get_mut(&if_index) {
1951            Some(registry) => registry,
1952            None => self
1953                .dns_registry_map
1954                .entry(if_index)
1955                .or_insert_with(DnsRegistry::new),
1956        };
1957
1958        for (_, service_info) in self.my_services.iter_mut() {
1959            if service_info.is_addr_auto() {
1960                service_info.insert_ipaddr(&intf);
1961
1962                if let Ok(true) = announce_service_on_intf(
1963                    dns_registry,
1964                    service_info,
1965                    my_intf,
1966                    &sock.pktinfo,
1967                    self.port,
1968                ) {
1969                    debug!(
1970                        "Announce service {} on {}",
1971                        service_info.get_fullname(),
1972                        intf.ip()
1973                    );
1974                    service_info.set_status(if_index, ServiceStatus::Announced);
1975                } else {
1976                    for timer in dns_registry.new_timers.drain(..) {
1977                        self.timers.push(Reverse(timer));
1978                    }
1979                    service_info.set_status(if_index, ServiceStatus::Probing);
1980                }
1981            }
1982        }
1983
1984        // Send browse queries on the new interface without known answers.
1985        // This avoids known-answer suppression (RFC 6762 Section 7.1) that
1986        // would cause the responder to suppress its response, preventing
1987        // address records from being attributed to the new interface.
1988        if let Some(my_intf) = self.my_intfs.get(&if_index) {
1989            for ty in self.service_queriers.keys() {
1990                self.send_query_on_intf(ty, RRType::PTR, my_intf);
1991            }
1992        }
1993
1994        // Notify the monitors.
1995        self.notify_monitors(DaemonEvent::IpAdd(intf.ip()));
1996    }
1997
1998    /// Registers a service.
1999    ///
2000    /// RFC 6762 section 8.3.
2001    /// ...the Multicast DNS responder MUST send
2002    ///    an unsolicited Multicast DNS response containing, in the Answer
2003    ///    Section, all of its newly registered resource records
2004    ///
2005    /// Zeroconf will then respond to requests for information about this service.
2006    fn register_service(&mut self, mut info: ServiceInfo) {
2007        // Check the service name length.
2008        if let Err(e) = check_service_name_length(info.get_type(), self.service_name_len_max) {
2009            error!("check_service_name_length: {}", &e);
2010            self.notify_monitors(DaemonEvent::Error(e));
2011            return;
2012        }
2013
2014        if info.is_addr_auto() {
2015            let selected_intfs =
2016                self.selected_intfs(my_ip_interfaces_inner(true, self.include_apple_p2p));
2017            for intf in selected_intfs {
2018                info.insert_ipaddr(&intf);
2019            }
2020        }
2021
2022        debug!("register service {:?}", &info);
2023
2024        let outgoing_addrs = self.send_unsolicited_response(&mut info);
2025        if !outgoing_addrs.is_empty() {
2026            self.notify_monitors(DaemonEvent::Announce(
2027                info.get_fullname().to_string(),
2028                format!("{:?}", &outgoing_addrs),
2029            ));
2030        }
2031
2032        // The key has to be lower case letter as DNS record name is case insensitive.
2033        // The info will have the original name.
2034        let service_fullname = info.get_fullname().to_lowercase();
2035        self.my_services.insert(service_fullname, info);
2036    }
2037
2038    /// Sends out announcement of `info` on every valid interface.
2039    /// Returns the list of interface IPs that sent out the announcement.
2040    fn send_unsolicited_response(&mut self, info: &mut ServiceInfo) -> Vec<IpAddr> {
2041        let mut outgoing_addrs = Vec::new();
2042        let mut outgoing_intfs = HashSet::new();
2043
2044        let mut invalid_intf_addrs = HashSet::new();
2045
2046        for (if_index, intf) in self.my_intfs.iter() {
2047            let dns_registry = match self.dns_registry_map.get_mut(if_index) {
2048                Some(registry) => registry,
2049                None => self
2050                    .dns_registry_map
2051                    .entry(*if_index)
2052                    .or_insert_with(DnsRegistry::new),
2053            };
2054
2055            let mut announced = false;
2056
2057            // IPv4
2058            if let Some(sock) = self.ipv4_sock.as_mut() {
2059                match announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo, self.port) {
2060                    Ok(true) => {
2061                        for addr in intf.addrs.iter().filter(|a| a.ip().is_ipv4()) {
2062                            outgoing_addrs.push(addr.ip());
2063                        }
2064                        outgoing_intfs.insert(intf.index);
2065
2066                        debug!(
2067                            "Announce service IPv4 {} on {}",
2068                            info.get_fullname(),
2069                            intf.name
2070                        );
2071                        announced = true;
2072                    }
2073                    Ok(false) => {}
2074                    Err(InternalError::IntfAddrInvalid(intf_addr)) => {
2075                        invalid_intf_addrs.insert(intf_addr);
2076                    }
2077                }
2078            }
2079
2080            if let Some(sock) = self.ipv6_sock.as_mut() {
2081                match announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo, self.port) {
2082                    Ok(true) => {
2083                        for addr in intf.addrs.iter().filter(|a| a.ip().is_ipv6()) {
2084                            outgoing_addrs.push(addr.ip());
2085                        }
2086                        outgoing_intfs.insert(intf.index);
2087
2088                        debug!(
2089                            "Announce service IPv6 {} on {}",
2090                            info.get_fullname(),
2091                            intf.name
2092                        );
2093                        announced = true;
2094                    }
2095                    Ok(false) => {}
2096                    Err(InternalError::IntfAddrInvalid(intf_addr)) => {
2097                        invalid_intf_addrs.insert(intf_addr);
2098                    }
2099                }
2100            }
2101
2102            if announced {
2103                info.set_status(intf.index, ServiceStatus::Announced);
2104            } else {
2105                for timer in dns_registry.new_timers.drain(..) {
2106                    self.timers.push(Reverse(timer));
2107                }
2108                info.set_status(*if_index, ServiceStatus::Probing);
2109            }
2110        }
2111
2112        if !invalid_intf_addrs.is_empty() {
2113            let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addrs));
2114        }
2115
2116        // RFC 6762 section 8.3.
2117        // ..The Multicast DNS responder MUST send at least two unsolicited
2118        //    responses, one second apart.
2119        let next_time = current_time_millis() + 1000;
2120        for if_index in outgoing_intfs {
2121            self.add_retransmission(
2122                next_time,
2123                Command::RegisterResend(info.get_fullname().to_string(), if_index),
2124            );
2125        }
2126
2127        outgoing_addrs
2128    }
2129
2130    /// Send probings or finish them if expired. Notify waiting services.
2131    fn probing_handler(&mut self) {
2132        let now = current_time_millis();
2133        let mut invalid_intf_addrs = HashSet::new();
2134
2135        for (if_index, intf) in self.my_intfs.iter() {
2136            let Some(dns_registry) = self.dns_registry_map.get_mut(if_index) else {
2137                continue;
2138            };
2139
2140            let (out, expired_probes) = check_probing(dns_registry, &mut self.timers, now);
2141
2142            // send probing.
2143            if !out.questions().is_empty() {
2144                trace!("sending out probing of questions: {:?}", out.questions());
2145                if let Some(sock) = self.ipv4_sock.as_mut() {
2146                    if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
2147                        send_dns_outgoing(&out, intf, &sock.pktinfo, self.port, None, None)
2148                    {
2149                        invalid_intf_addrs.insert(intf_addr);
2150                    }
2151                }
2152                if let Some(sock) = self.ipv6_sock.as_mut() {
2153                    if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
2154                        send_dns_outgoing(&out, intf, &sock.pktinfo, self.port, None, None)
2155                    {
2156                        invalid_intf_addrs.insert(intf_addr);
2157                    }
2158                }
2159            }
2160
2161            // For finished probes, wake up services that are waiting for the probes.
2162            let waiting_services =
2163                handle_expired_probes(expired_probes, &intf.name, dns_registry, &mut self.monitors);
2164
2165            for service_name in waiting_services {
2166                // service names are lowercase
2167                if let Some(info) = self.my_services.get_mut(&service_name.to_lowercase()) {
2168                    if info.get_status(*if_index) == ServiceStatus::Announced {
2169                        debug!("service {} already announced", info.get_fullname());
2170                        continue;
2171                    }
2172
2173                    let announced_v4 = if let Some(sock) = self.ipv4_sock.as_mut() {
2174                        match announce_service_on_intf(
2175                            dns_registry,
2176                            info,
2177                            intf,
2178                            &sock.pktinfo,
2179                            self.port,
2180                        ) {
2181                            Ok(announced) => announced,
2182                            Err(InternalError::IntfAddrInvalid(intf_addr)) => {
2183                                invalid_intf_addrs.insert(intf_addr);
2184                                false
2185                            }
2186                        }
2187                    } else {
2188                        false
2189                    };
2190                    let announced_v6 = if let Some(sock) = self.ipv6_sock.as_mut() {
2191                        match announce_service_on_intf(
2192                            dns_registry,
2193                            info,
2194                            intf,
2195                            &sock.pktinfo,
2196                            self.port,
2197                        ) {
2198                            Ok(announced) => announced,
2199                            Err(InternalError::IntfAddrInvalid(intf_addr)) => {
2200                                invalid_intf_addrs.insert(intf_addr);
2201                                false
2202                            }
2203                        }
2204                    } else {
2205                        false
2206                    };
2207
2208                    if announced_v4 || announced_v6 {
2209                        let next_time = now + 1000;
2210                        let command =
2211                            Command::RegisterResend(info.get_fullname().to_string(), *if_index);
2212                        self.retransmissions.push(ReRun { next_time, command });
2213                        self.timers.push(Reverse(next_time));
2214
2215                        let fullname = dns_registry.resolve_name(&service_name).to_string();
2216
2217                        let hostname = dns_registry.resolve_name(info.get_hostname());
2218
2219                        debug!("wake up: announce service {} on {}", fullname, intf.name);
2220                        notify_monitors(
2221                            &mut self.monitors,
2222                            DaemonEvent::Announce(fullname, format!("{}:{}", hostname, &intf.name)),
2223                        );
2224
2225                        info.set_status(*if_index, ServiceStatus::Announced);
2226                    }
2227                }
2228            }
2229        }
2230
2231        if !invalid_intf_addrs.is_empty() {
2232            let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addrs));
2233        }
2234    }
2235
2236    fn unregister_service(
2237        &self,
2238        info: &ServiceInfo,
2239        intf: &MyIntf,
2240        sock: &PktInfoUdpSocket,
2241    ) -> Vec<u8> {
2242        let is_ipv4 = sock.domain() == Domain::IPV4;
2243
2244        let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
2245        out.add_answer_at_time(
2246            DnsPointer::new(
2247                info.get_type(),
2248                RRType::PTR,
2249                CLASS_IN,
2250                0,
2251                info.get_fullname().to_string(),
2252            ),
2253            0,
2254        );
2255
2256        if let Some(sub) = info.get_subtype() {
2257            trace!("Adding subdomain {}", sub);
2258            out.add_answer_at_time(
2259                DnsPointer::new(
2260                    sub,
2261                    RRType::PTR,
2262                    CLASS_IN,
2263                    0,
2264                    info.get_fullname().to_string(),
2265                ),
2266                0,
2267            );
2268        }
2269
2270        out.add_answer_at_time(
2271            DnsSrv::new(
2272                info.get_fullname(),
2273                CLASS_IN | CLASS_CACHE_FLUSH,
2274                0,
2275                info.get_priority(),
2276                info.get_weight(),
2277                info.get_port(),
2278                info.get_hostname().to_string(),
2279            ),
2280            0,
2281        );
2282        out.add_answer_at_time(
2283            DnsTxt::new(
2284                info.get_fullname(),
2285                CLASS_IN | CLASS_CACHE_FLUSH,
2286                0,
2287                info.generate_txt(),
2288            ),
2289            0,
2290        );
2291
2292        let if_addrs = if is_ipv4 {
2293            info.get_addrs_on_my_intf_v4(intf)
2294        } else {
2295            info.get_addrs_on_my_intf_v6(intf)
2296        };
2297
2298        if if_addrs.is_empty() {
2299            return vec![];
2300        }
2301
2302        for address in if_addrs {
2303            out.add_answer_at_time(
2304                DnsAddress::new(
2305                    info.get_hostname(),
2306                    ip_address_rr_type(&address),
2307                    CLASS_IN | CLASS_CACHE_FLUSH,
2308                    0,
2309                    address,
2310                    intf.into(),
2311                ),
2312                0,
2313            );
2314        }
2315
2316        // Only (at most) one packet is expected to be sent out.
2317        let sent_vec = match send_dns_outgoing(&out, intf, sock, self.port, None, None) {
2318            Ok(sent_vec) => sent_vec,
2319            Err(InternalError::IntfAddrInvalid(intf_addr)) => {
2320                let invalid_intf_addrs = HashSet::from([intf_addr]);
2321                let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addrs));
2322                vec![]
2323            }
2324        };
2325        sent_vec.into_iter().next().unwrap_or_default()
2326    }
2327
2328    /// Binds a channel `listener` to querying mDNS hostnames.
2329    ///
2330    /// If there is already a `listener`, it will be updated, i.e. overwritten.
2331    fn add_hostname_resolver(
2332        &mut self,
2333        hostname: String,
2334        listener: Sender<HostnameResolutionEvent>,
2335        timeout: Option<u64>,
2336    ) {
2337        let real_timeout = timeout.map(|t| current_time_millis() + t);
2338        self.hostname_resolvers
2339            .insert(hostname.to_lowercase(), (listener, real_timeout));
2340        if let Some(t) = real_timeout {
2341            self.add_timer(t);
2342        }
2343    }
2344
2345    /// Sends a multicast query for `name` with `qtype`.
2346    fn send_query(&self, name: &str, qtype: RRType) {
2347        self.send_query_vec(&[(name, qtype)]);
2348    }
2349
2350    /// Sends a query on a specific interface without known answers.
2351    ///
2352    /// Used when a new interface is added so the responder won't suppress
2353    /// its response due to known-answer suppression (RFC 6762 Section 7.1).
2354    fn send_query_on_intf(&self, name: &str, qtype: RRType, intf: &MyIntf) {
2355        let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
2356        out.add_question(name, qtype);
2357
2358        let mut invalid_intf_addrs = HashSet::new();
2359        if let Some(sock) = self.ipv4_sock.as_ref() {
2360            if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
2361                send_dns_outgoing(&out, intf, &sock.pktinfo, self.port, None, None)
2362            {
2363                invalid_intf_addrs.insert(intf_addr);
2364            }
2365        }
2366        if let Some(sock) = self.ipv6_sock.as_ref() {
2367            if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
2368                send_dns_outgoing(&out, intf, &sock.pktinfo, self.port, None, None)
2369            {
2370                invalid_intf_addrs.insert(intf_addr);
2371            }
2372        }
2373        if !invalid_intf_addrs.is_empty() {
2374            let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addrs));
2375        }
2376    }
2377
2378    /// Sends out a list of `questions` (i.e. DNS questions) via multicast.
2379    fn send_query_vec(&self, questions: &[(&str, RRType)]) {
2380        let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
2381        let now = current_time_millis();
2382
2383        for (name, qtype) in questions {
2384            out.add_question(name, *qtype);
2385
2386            for record in self.cache.get_known_answers(name, *qtype, now) {
2387                /*
2388                RFC 6762 section 7.1: https://datatracker.ietf.org/doc/html/rfc6762#section-7.1
2389                ...
2390                    When a Multicast DNS querier sends a query to which it already knows
2391                    some answers, it populates the Answer Section of the DNS query
2392                    message with those answers.
2393                 */
2394                trace!("add known answer: {:?}", record.record);
2395                let mut new_record = record.record.clone();
2396                new_record.get_record_mut().update_ttl(now);
2397                out.add_answer_box(new_record);
2398            }
2399        }
2400
2401        let mut invalid_intf_addrs = HashSet::new();
2402        for (_, intf) in self.my_intfs.iter() {
2403            if let Some(sock) = self.ipv4_sock.as_ref() {
2404                if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
2405                    send_dns_outgoing(&out, intf, &sock.pktinfo, self.port, None, None)
2406                {
2407                    invalid_intf_addrs.insert(intf_addr);
2408                }
2409            }
2410            if let Some(sock) = self.ipv6_sock.as_ref() {
2411                if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
2412                    send_dns_outgoing(&out, intf, &sock.pktinfo, self.port, None, None)
2413                {
2414                    invalid_intf_addrs.insert(intf_addr);
2415                }
2416            }
2417        }
2418
2419        if !invalid_intf_addrs.is_empty() {
2420            let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addrs));
2421        }
2422    }
2423
2424    /// Reads one UDP datagram from the socket of `intf`.
2425    ///
2426    /// Returns false if failed to receive a packet,
2427    /// otherwise returns true.
2428    fn handle_read(&mut self, event_key: usize) -> bool {
2429        let sock_opt = match event_key {
2430            IPV4_SOCK_EVENT_KEY => &mut self.ipv4_sock,
2431            IPV6_SOCK_EVENT_KEY => &mut self.ipv6_sock,
2432            _ => {
2433                debug!("handle_read: unknown token {}", event_key);
2434                return false;
2435            }
2436        };
2437        let Some(sock) = sock_opt.as_mut() else {
2438            debug!("handle_read: socket not available for token {}", event_key);
2439            return false;
2440        };
2441        let mut buf = vec![0u8; MAX_MSG_ABSOLUTE];
2442
2443        // Read the next mDNS UDP datagram.
2444        //
2445        // If the datagram is larger than `buf`, excess bytes may or may not
2446        // be truncated by the socket layer depending on the platform's libc.
2447        // In any case, such large datagram will not be decoded properly and
2448        // this function should return false but should not crash.
2449        let (sz, pktinfo) = match sock.pktinfo.recv(&mut buf) {
2450            Ok(sz) => sz,
2451            Err(e) => {
2452                if e.kind() != std::io::ErrorKind::WouldBlock {
2453                    debug!("listening socket read failed: {}", e);
2454                }
2455                return false;
2456            }
2457        };
2458
2459        // Find the interface that received the packet.
2460        let pkt_if_index = pktinfo.if_index as u32;
2461        let Some(my_intf) = self.my_intfs.get(&pkt_if_index) else {
2462            debug!(
2463                "handle_read: no interface found for pktinfo if_index: {}",
2464                pktinfo.if_index
2465            );
2466            return true; // We still return true to indicate that we read something.
2467        };
2468
2469        // Drop packets for an IP version that has been disabled on this interface.
2470        // This is needed because some times the socket layer may still receive packets
2471        // for an IP version even after we left the multicast group for that IP version.
2472        // We want to drop such packets to avoid unnecessary processing.
2473        let is_ipv4 = event_key == IPV4_SOCK_EVENT_KEY;
2474        if (is_ipv4 && my_intf.next_ifaddr_v4().is_none())
2475            || (!is_ipv4 && my_intf.next_ifaddr_v6().is_none())
2476        {
2477            debug!(
2478                "handle_read: dropping {} packet on intf {} (disabled)",
2479                if is_ipv4 { "IPv4" } else { "IPv6" },
2480                my_intf.name
2481            );
2482            return true;
2483        }
2484
2485        buf.truncate(sz); // reduce potential processing errors
2486
2487        match DnsIncoming::new(buf, my_intf.into()) {
2488            Ok(msg) => {
2489                if msg.is_query() {
2490                    let querier_addr = pktinfo.addr_src;
2491                    self.handle_query(msg, pkt_if_index, querier_addr);
2492                } else if msg.is_response() {
2493                    self.handle_response(msg, pkt_if_index);
2494                } else {
2495                    debug!("Invalid message: not query and not response");
2496                }
2497            }
2498            Err(e) => debug!("Invalid incoming DNS message: {}", e),
2499        }
2500
2501        true
2502    }
2503
2504    /// Returns true, if sent query. Returns false if SRV already exists.
2505    fn query_unresolved(&mut self, instance: &str) -> bool {
2506        if !valid_instance_name(instance) {
2507            trace!("instance name {} not valid", instance);
2508            return false;
2509        }
2510
2511        if let Some(records) = self.cache.get_srv(instance) {
2512            for record in records {
2513                if let Some(srv) = record.record.any().downcast_ref::<DnsSrv>() {
2514                    if self.cache.get_addr(srv.host()).is_none() {
2515                        self.send_query_vec(&[(srv.host(), RRType::A), (srv.host(), RRType::AAAA)]);
2516                        return true;
2517                    }
2518                }
2519            }
2520        } else {
2521            self.send_query(instance, RRType::ANY);
2522            return true;
2523        }
2524
2525        false
2526    }
2527
2528    /// Checks if `ty_domain` has records in the cache. If yes, sends the
2529    /// cached records via `sender`.
2530    fn query_cache_for_service(
2531        &mut self,
2532        ty_domain: &str,
2533        sender: &Sender<ServiceEvent>,
2534        now: u64,
2535    ) {
2536        let mut resolved: HashSet<String> = HashSet::new();
2537        let mut unresolved: HashSet<String> = HashSet::new();
2538
2539        if let Some(records) = self.cache.get_ptr(ty_domain) {
2540            for record in records.iter().filter(|r| !r.record.expires_soon(now)) {
2541                if let Some(ptr) = record.record.any().downcast_ref::<DnsPointer>() {
2542                    let mut new_event = None;
2543                    match self.resolve_service_from_cache(ty_domain, ptr.alias()) {
2544                        Ok(resolved_service) => {
2545                            if resolved_service.is_valid() {
2546                                debug!("Resolved service from cache: {}", ptr.alias());
2547                                new_event =
2548                                    Some(ServiceEvent::ServiceResolved(Box::new(resolved_service)));
2549                            } else {
2550                                debug!("Resolved service is not valid: {}", ptr.alias());
2551                            }
2552                        }
2553                        Err(err) => {
2554                            debug!("Error while resolving service from cache: {}", err);
2555                            continue;
2556                        }
2557                    }
2558
2559                    match sender.send(ServiceEvent::ServiceFound(
2560                        ty_domain.to_string(),
2561                        ptr.alias().to_string(),
2562                    )) {
2563                        Ok(()) => debug!("sent service found {}", ptr.alias()),
2564                        Err(e) => {
2565                            debug!("failed to send service found: {}", e);
2566                            continue;
2567                        }
2568                    }
2569
2570                    if let Some(event) = new_event {
2571                        resolved.insert(ptr.alias().to_string());
2572                        match sender.send(event) {
2573                            Ok(()) => debug!("sent service resolved: {}", ptr.alias()),
2574                            Err(e) => debug!("failed to send service resolved: {}", e),
2575                        }
2576                    } else {
2577                        unresolved.insert(ptr.alias().to_string());
2578                    }
2579                }
2580            }
2581        }
2582
2583        for instance in resolved.drain() {
2584            self.pending_resolves.remove(&instance);
2585            self.resolved.insert(instance);
2586        }
2587
2588        for instance in unresolved.drain() {
2589            self.add_pending_resolve(instance);
2590        }
2591    }
2592
2593    /// Checks if `hostname` has records in the cache. If yes, sends the
2594    /// cached records via `sender`.
2595    fn query_cache_for_hostname(
2596        &mut self,
2597        hostname: &str,
2598        sender: Sender<HostnameResolutionEvent>,
2599    ) {
2600        let addresses_map = self.cache.get_addresses_for_host(hostname);
2601        for (name, addresses) in addresses_map {
2602            match sender.send(HostnameResolutionEvent::AddressesFound(name, addresses)) {
2603                Ok(()) => trace!("sent hostname addresses found"),
2604                Err(e) => debug!("failed to send hostname addresses found: {}", e),
2605            }
2606        }
2607    }
2608
2609    fn add_pending_resolve(&mut self, instance: String) {
2610        if !self.pending_resolves.contains(&instance) {
2611            let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
2612            self.add_retransmission(next_time, Command::Resolve(instance.clone(), 1));
2613            self.pending_resolves.insert(instance);
2614        }
2615    }
2616
2617    /// Creates a `ResolvedService` from the cache.
2618    fn resolve_service_from_cache(
2619        &self,
2620        ty_domain: &str,
2621        fullname: &str,
2622    ) -> Result<ResolvedService> {
2623        let now = current_time_millis();
2624        let mut resolved_service = ResolvedService {
2625            ty_domain: ty_domain.to_string(),
2626            sub_ty_domain: None,
2627            fullname: fullname.to_string(),
2628            host: String::new(),
2629            port: 0,
2630            addresses: HashSet::new(),
2631            txt_properties: TxtProperties::new(),
2632        };
2633
2634        // Be sure setting `subtype` if available even when querying for the parent domain.
2635        if let Some(subtype) = self.cache.get_subtype(fullname) {
2636            trace!(
2637                "ty_domain: {} found subtype {} for instance: {}",
2638                ty_domain,
2639                subtype,
2640                fullname
2641            );
2642            if resolved_service.sub_ty_domain.is_none() {
2643                resolved_service.sub_ty_domain = Some(subtype.to_string());
2644            }
2645        }
2646
2647        // resolve SRV record
2648        if let Some(records) = self.cache.get_srv(fullname) {
2649            if let Some(answer) = records.iter().find(|r| !r.record.expires_soon(now)) {
2650                if let Some(dns_srv) = answer.record.any().downcast_ref::<DnsSrv>() {
2651                    resolved_service.host = dns_srv.host().to_string();
2652                    resolved_service.port = dns_srv.port();
2653                }
2654            }
2655        }
2656
2657        // resolve TXT record
2658        if let Some(records) = self.cache.get_txt(fullname) {
2659            if let Some(record) = records.iter().find(|r| !r.record.expires_soon(now)) {
2660                if let Some(dns_txt) = record.record.any().downcast_ref::<DnsTxt>() {
2661                    resolved_service.txt_properties = dns_txt.text().into();
2662                }
2663            }
2664        }
2665
2666        // resolve A and AAAA records
2667        if let Some(records) = self.cache.get_addr(&resolved_service.host) {
2668            for answer in records.iter() {
2669                if let Some(dns_a) = answer.record.any().downcast_ref::<DnsAddress>() {
2670                    if dns_a.expires_soon(now) {
2671                        trace!(
2672                            "Addr expired or expires soon: {}",
2673                            dns_a.address().to_ip_addr()
2674                        );
2675                    } else {
2676                        let scoped = dns_a.address();
2677                        if let ScopedIp::V4(v4) = &scoped {
2678                            // Merge interface_ids if this V4 addr already exists.
2679                            // Linear scan by IP since Eq/Hash include interface_ids.
2680                            let existing = resolved_service
2681                                .addresses
2682                                .iter()
2683                                .find(|a| a.to_ip_addr() == IpAddr::V4(*v4.addr()))
2684                                .cloned();
2685                            if let Some(mut existing) = existing {
2686                                resolved_service.addresses.remove(&existing);
2687                                if let ScopedIp::V4(existing_v4) = &mut existing {
2688                                    for id in v4.interface_ids() {
2689                                        existing_v4.add_interface_id(id.clone());
2690                                    }
2691                                }
2692                                resolved_service.addresses.insert(existing);
2693                            } else {
2694                                resolved_service.addresses.insert(scoped);
2695                            }
2696                        } else {
2697                            resolved_service.addresses.insert(scoped);
2698                        }
2699                    }
2700                }
2701            }
2702        }
2703
2704        Ok(resolved_service)
2705    }
2706
2707    fn handle_poller_events(&mut self, events: &mio::Events) {
2708        for ev in events.iter() {
2709            trace!("event received with key {:?}", ev.token());
2710            if ev.token().0 == SIGNAL_SOCK_EVENT_KEY {
2711                // Drain signals as we will drain commands as well.
2712                self.signal_sock_drain();
2713
2714                if let Err(e) = self.poller.registry().reregister(
2715                    &mut self.signal_sock,
2716                    ev.token(),
2717                    mio::Interest::READABLE,
2718                ) {
2719                    debug!("failed to modify poller for signal socket: {}", e);
2720                }
2721                continue; // Next event.
2722            }
2723
2724            // Read until no more packets available.
2725            while self.handle_read(ev.token().0) {}
2726
2727            // we continue to monitor this socket.
2728            if ev.token().0 == IPV4_SOCK_EVENT_KEY {
2729                // Re-register the IPv4 socket for reading.
2730                if let Some(sock) = self.ipv4_sock.as_mut() {
2731                    if let Err(e) =
2732                        self.poller
2733                            .registry()
2734                            .reregister(sock, ev.token(), mio::Interest::READABLE)
2735                    {
2736                        debug!("modify poller for IPv4 socket: {}", e);
2737                    }
2738                }
2739            } else if ev.token().0 == IPV6_SOCK_EVENT_KEY {
2740                // Re-register the IPv6 socket for reading.
2741                if let Some(sock) = self.ipv6_sock.as_mut() {
2742                    if let Err(e) =
2743                        self.poller
2744                            .registry()
2745                            .reregister(sock, ev.token(), mio::Interest::READABLE)
2746                    {
2747                        debug!("modify poller for IPv6 socket: {}", e);
2748                    }
2749                }
2750            }
2751        }
2752    }
2753
2754    /// Deal with incoming response packets.  All answers
2755    /// are held in the cache, and listeners are notified.
2756    fn handle_response(&mut self, mut msg: DnsIncoming, if_index: u32) {
2757        let now = current_time_millis();
2758
2759        // remove records that are expired.
2760        let mut record_predicate = |record: &DnsRecordBox| {
2761            if !record.get_record().is_expired(now) {
2762                return true;
2763            }
2764
2765            debug!("record is expired, removing it from cache.");
2766            if self.cache.remove(record) {
2767                // for PTR records, send event to listeners
2768                if let Some(dns_ptr) = record.any().downcast_ref::<DnsPointer>() {
2769                    call_service_listener(
2770                        &self.service_queriers,
2771                        dns_ptr.get_name(),
2772                        ServiceEvent::ServiceRemoved(
2773                            dns_ptr.get_name().to_string(),
2774                            dns_ptr.alias().to_string(),
2775                        ),
2776                    );
2777                }
2778            }
2779            false
2780        };
2781        msg.answers_mut().retain(&mut record_predicate);
2782        msg.authorities_mut().retain(&mut record_predicate);
2783        msg.additionals_mut().retain(&mut record_predicate);
2784
2785        // check possible conflicts and handle them.
2786        self.conflict_handler(&msg, if_index);
2787
2788        // check if the message is for us.
2789        let mut is_for_us = true; // assume it is for us.
2790
2791        // If there are any PTR records in the answers, there should be
2792        // at least one PTR for us. Otherwise, the message is not for us.
2793        // If there are no PTR records at all, assume this message is for us.
2794        for answer in msg.answers() {
2795            if answer.get_type() == RRType::PTR {
2796                if self.service_queriers.contains_key(answer.get_name()) {
2797                    is_for_us = true;
2798                    break; // OK to break: at least one PTR for us.
2799                } else {
2800                    is_for_us = false;
2801                }
2802            } else if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2803                // If there is a hostname querier for this address, then it is for us.
2804                let answer_lowercase = answer.get_name().to_lowercase();
2805                if self.hostname_resolvers.contains_key(&answer_lowercase) {
2806                    is_for_us = true;
2807                    break; // OK to break: at least one hostname for us.
2808                }
2809            }
2810        }
2811
2812        // if we explicitily want to accept unsolicited responses, we should consider all messages as for us.
2813        if self.accept_unsolicited {
2814            is_for_us = true;
2815        }
2816
2817        /// Represents a DNS record change that involves one service instance.
2818        struct InstanceChange {
2819            ty: RRType,   // The type of DNS record for the instance.
2820            name: String, // The name of the record.
2821        }
2822
2823        // Go through all answers to get the new and updated records.
2824        // For new PTR records, send out ServiceFound immediately. For others,
2825        // collect them into `changes`.
2826        //
2827        // Note: we don't try to identify the update instances based on
2828        // each record immediately as the answers are likely related to each
2829        // other.
2830        let mut changes = Vec::new();
2831        let mut timers = Vec::new();
2832        let Some(my_intf) = self.my_intfs.get(&if_index) else {
2833            return;
2834        };
2835        for record in msg.all_records() {
2836            match self
2837                .cache
2838                .add_or_update(my_intf, record, &mut timers, is_for_us)
2839            {
2840                Some((dns_record, true)) => {
2841                    timers.push(dns_record.record.get_record().get_expire_time());
2842                    timers.push(dns_record.record.get_record().get_refresh_time());
2843
2844                    let ty = dns_record.record.get_type();
2845                    let name = dns_record.record.get_name();
2846
2847                    // Only process PTR that does not expire soon (i.e. TTL > 1).
2848                    if ty == RRType::PTR && dns_record.record.get_record().get_ttl() > 1 {
2849                        if self.service_queriers.contains_key(name) {
2850                            timers.push(dns_record.record.get_record().get_refresh_time());
2851                        }
2852
2853                        // send ServiceFound
2854                        if let Some(dns_ptr) = dns_record.record.any().downcast_ref::<DnsPointer>()
2855                        {
2856                            debug!("calling listener with service found: {name}");
2857                            call_service_listener(
2858                                &self.service_queriers,
2859                                name,
2860                                ServiceEvent::ServiceFound(
2861                                    name.to_string(),
2862                                    dns_ptr.alias().to_string(),
2863                                ),
2864                            );
2865                            changes.push(InstanceChange {
2866                                ty,
2867                                name: dns_ptr.alias().to_string(),
2868                            });
2869                        }
2870                    } else {
2871                        changes.push(InstanceChange {
2872                            ty,
2873                            name: name.to_string(),
2874                        });
2875                    }
2876                }
2877                Some((dns_record, false)) => {
2878                    timers.push(dns_record.record.get_record().get_expire_time());
2879                    timers.push(dns_record.record.get_record().get_refresh_time());
2880                }
2881                _ => {}
2882            }
2883        }
2884
2885        // Add timers for the new records.
2886        for t in timers {
2887            self.add_timer(t);
2888        }
2889
2890        // Go through remaining changes to see if any hostname resolutions were found or updated.
2891        for change in changes
2892            .iter()
2893            .filter(|change| change.ty == RRType::A || change.ty == RRType::AAAA)
2894        {
2895            let addr_map = self.cache.get_addresses_for_host(&change.name);
2896            for (name, addresses) in addr_map {
2897                call_hostname_resolution_listener(
2898                    &self.hostname_resolvers,
2899                    &change.name,
2900                    HostnameResolutionEvent::AddressesFound(name, addresses),
2901                )
2902            }
2903        }
2904
2905        // Identify the instances that need to be "resolved".
2906        let mut updated_instances = HashSet::new();
2907        for update in changes {
2908            match update.ty {
2909                RRType::PTR | RRType::SRV | RRType::TXT => {
2910                    updated_instances.insert(update.name);
2911                }
2912                RRType::A | RRType::AAAA => {
2913                    let instances = self.cache.get_instances_on_host(&update.name);
2914                    updated_instances.extend(instances);
2915                }
2916                _ => {}
2917            }
2918        }
2919
2920        self.resolve_updated_instances(&updated_instances);
2921    }
2922
2923    fn conflict_handler(&mut self, msg: &DnsIncoming, if_index: u32) {
2924        let Some(my_intf) = self.my_intfs.get(&if_index) else {
2925            debug!("handle_response: no intf found for index {if_index}");
2926            return;
2927        };
2928
2929        let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
2930            return;
2931        };
2932
2933        for answer in msg.answers().iter() {
2934            let mut new_records = Vec::new();
2935
2936            let name = answer.get_name();
2937            let Some(probe) = dns_registry.probing.get_mut(name) else {
2938                continue;
2939            };
2940
2941            // check against possible multicast forwarding
2942            if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2943                if let Some(answer_addr) = answer.any().downcast_ref::<DnsAddress>() {
2944                    if answer_addr.interface_id.index != if_index {
2945                        debug!(
2946                            "conflict handler: answer addr {:?} not in the subnet of intf {}",
2947                            answer_addr, my_intf.name
2948                        );
2949                        continue;
2950                    }
2951                }
2952
2953                // double check if any other address record matches rrdata,
2954                // as there could be multiple addresses for the same name.
2955                let any_match = probe.records.iter().any(|r| {
2956                    r.get_type() == answer.get_type()
2957                        && r.get_class() == answer.get_class()
2958                        && r.rrdata_match(answer.as_ref())
2959                });
2960                if any_match {
2961                    continue; // no conflict for this answer.
2962                }
2963            }
2964
2965            probe.records.retain(|record| {
2966                if record.get_type() == answer.get_type()
2967                    && record.get_class() == answer.get_class()
2968                    && !record.rrdata_match(answer.as_ref())
2969                {
2970                    debug!(
2971                        "found conflict name: '{name}' record: {}: {} PEER: {}",
2972                        record.get_type(),
2973                        record.rdata_print(),
2974                        answer.rdata_print()
2975                    );
2976
2977                    // create a new name for this record
2978                    // then remove the old record in probing.
2979                    let mut new_record = record.clone();
2980                    let new_name = match record.get_type() {
2981                        RRType::A => hostname_change(name),
2982                        RRType::AAAA => hostname_change(name),
2983                        _ => name_change(name),
2984                    };
2985                    new_record.get_record_mut().set_new_name(new_name);
2986                    new_records.push(new_record);
2987                    return false; // old record is dropped from the probe.
2988                }
2989
2990                true
2991            });
2992
2993            // ?????
2994            // if probe.records.is_empty() {
2995            //     dns_registry.probing.remove(name);
2996            // }
2997
2998            // Probing again with the new names.
2999            let create_time = current_time_millis() + fastrand::u64(0..250);
3000
3001            let waiting_services = probe.waiting_services.clone();
3002
3003            for record in new_records {
3004                if dns_registry.update_hostname(name, record.get_name(), create_time) {
3005                    self.timers.push(Reverse(create_time));
3006                }
3007
3008                // remember the name changes (note: `name` might not be the original, it could be already changed once.)
3009                dns_registry.name_changes.insert(
3010                    record.get_record().get_original_name().to_string(),
3011                    record.get_name().to_string(),
3012                );
3013
3014                let new_probe = match dns_registry.probing.get_mut(record.get_name()) {
3015                    Some(p) => p,
3016                    None => {
3017                        let new_probe = dns_registry
3018                            .probing
3019                            .entry(record.get_name().to_string())
3020                            .or_insert_with(|| {
3021                                debug!("conflict handler: new probe of {}", record.get_name());
3022                                Probe::new(create_time)
3023                            });
3024                        self.timers.push(Reverse(new_probe.next_send));
3025                        new_probe
3026                    }
3027                };
3028
3029                debug!(
3030                    "insert record with new name '{}' {} into probe",
3031                    record.get_name(),
3032                    record.get_type()
3033                );
3034                new_probe.insert_record(record);
3035
3036                new_probe.waiting_services.extend(waiting_services.clone());
3037            }
3038        }
3039    }
3040
3041    /// Resolve the updated (including new) instances.
3042    ///
3043    /// Note: it is possible that more than 1 PTR pointing to the same
3044    /// instance. For example, a regular service type PTR and a sub-type
3045    /// service type PTR can both point to the same service instance.
3046    /// This loop automatically handles the sub-type PTRs.
3047    fn resolve_updated_instances(&mut self, updated_instances: &HashSet<String>) {
3048        if updated_instances.is_empty() {
3049            return;
3050        }
3051
3052        let mut resolved: HashSet<String> = HashSet::new();
3053        let mut unresolved: HashSet<String> = HashSet::new();
3054        let mut removed_instances = HashMap::new();
3055
3056        let now = current_time_millis();
3057
3058        for (ty_domain, records) in self.cache.all_ptr().iter() {
3059            if !self.service_queriers.contains_key(ty_domain) {
3060                // No need to resolve if not in our queries.
3061                continue;
3062            }
3063
3064            for ptr in records.iter().filter(|r| !r.record.expires_soon(now)) {
3065                let Some(dns_ptr) = ptr.record.any().downcast_ref::<DnsPointer>() else {
3066                    continue;
3067                };
3068
3069                let instance = dns_ptr.alias();
3070                if !updated_instances.contains(instance) {
3071                    continue;
3072                }
3073
3074                let Ok(resolved_service) = self.resolve_service_from_cache(ty_domain, instance)
3075                else {
3076                    continue;
3077                };
3078
3079                debug!("resolve_updated_instances: from cache: {instance}");
3080                if resolved_service.is_valid() {
3081                    debug!("call queriers to resolve {instance}");
3082                    resolved.insert(instance.to_string());
3083                    let event = ServiceEvent::ServiceResolved(Box::new(resolved_service));
3084                    call_service_listener(&self.service_queriers, ty_domain, event);
3085                } else {
3086                    debug!("Resolved service is not valid: {instance}");
3087                    if self.resolved.remove(dns_ptr.alias()) {
3088                        removed_instances
3089                            .entry(ty_domain.to_string())
3090                            .or_insert_with(HashSet::new)
3091                            .insert(instance.to_string());
3092                    }
3093                    unresolved.insert(instance.to_string());
3094                }
3095            }
3096        }
3097
3098        for instance in resolved.drain() {
3099            self.pending_resolves.remove(&instance);
3100            self.resolved.insert(instance);
3101        }
3102
3103        for instance in unresolved.drain() {
3104            self.add_pending_resolve(instance);
3105        }
3106
3107        if !removed_instances.is_empty() {
3108            debug!(
3109                "resolve_updated_instances: removed {}",
3110                &removed_instances.len()
3111            );
3112            self.notify_service_removal(removed_instances);
3113        }
3114    }
3115
3116    /// Handle incoming query packets, figure out whether and what to respond.
3117    fn handle_query(&mut self, msg: DnsIncoming, if_index: u32, querier_addr: SocketAddr) {
3118        let querier_ip = querier_addr.ip();
3119        let is_ipv4 = querier_ip.is_ipv4();
3120        let sock_opt = if is_ipv4 {
3121            &self.ipv4_sock
3122        } else {
3123            &self.ipv6_sock
3124        };
3125        let Some(sock) = sock_opt.as_ref() else {
3126            debug!("handle_query: socket not available for intf {}", if_index);
3127            return;
3128        };
3129        let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
3130
3131        // Special meta-query "_services._dns-sd._udp.<Domain>".
3132        // See https://datatracker.ietf.org/doc/html/rfc6763#section-9
3133        const META_QUERY: &str = "_services._dns-sd._udp.local.";
3134
3135        let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
3136            debug!("missing dns registry for intf {}", if_index);
3137            return;
3138        };
3139
3140        let Some(intf) = self.my_intfs.get(&if_index) else {
3141            debug!("handle_query: no intf found for index {if_index}");
3142            return;
3143        };
3144
3145        for question in msg.questions().iter() {
3146            let qtype = question.entry_type();
3147            let q_name = question.entry_name();
3148
3149            if qtype == RRType::PTR {
3150                for service in self.my_services.values() {
3151                    if service.get_status(if_index) != ServiceStatus::Announced {
3152                        continue;
3153                    }
3154
3155                    if service.matches_type_or_subtype(q_name) {
3156                        out.add_answer_with_additionals(&msg, service, intf, dns_registry, is_ipv4);
3157                    } else if q_name == META_QUERY {
3158                        let ttl = service.get_other_ttl();
3159                        let alias = service.get_type().to_string();
3160                        let ptr = DnsPointer::new(q_name, RRType::PTR, CLASS_IN, ttl, alias);
3161                        if !out.add_answer(&msg, ptr) {
3162                            trace!("answer was not added for meta-query {:?}", &question);
3163                        }
3164                    }
3165                }
3166            } else {
3167                // Simultaneous Probe Tiebreaking (RFC 6762 section 8.2)
3168                if qtype == RRType::ANY && msg.num_authorities() > 0 {
3169                    if let Some(probe) = dns_registry.probing.get_mut(q_name) {
3170                        probe.tiebreaking(&msg, q_name);
3171                    }
3172                }
3173
3174                if qtype == RRType::A || qtype == RRType::AAAA || qtype == RRType::ANY {
3175                    for service in self.my_services.values() {
3176                        if service.get_status(if_index) != ServiceStatus::Announced {
3177                            continue;
3178                        }
3179
3180                        let service_hostname = dns_registry.resolve_name(service.get_hostname());
3181
3182                        if service_hostname.to_lowercase() == question.entry_name().to_lowercase() {
3183                            // Pick addresses based on the question type, not the
3184                            // socket family. RFC 6762 doesn't require A queries
3185                            // to come over IPv4 transport — Android's getaddrinfo
3186                            // routinely sends both A and AAAA queries over its
3187                            // preferred IPv6 mDNS socket and expects A records
3188                            // to be answered with v4 addresses.
3189                            let mut intf_addrs: Vec<IpAddr> = Vec::new();
3190                            if qtype == RRType::A || qtype == RRType::ANY {
3191                                intf_addrs.extend(service.get_addrs_on_my_intf_v4(intf));
3192                            }
3193                            if qtype == RRType::AAAA || qtype == RRType::ANY {
3194                                intf_addrs.extend(service.get_addrs_on_my_intf_v6(intf));
3195                            }
3196                            if intf_addrs.is_empty()
3197                                && (qtype == RRType::A || qtype == RRType::AAAA)
3198                            {
3199                                let t = match qtype {
3200                                    RRType::A => "TYPE_A",
3201                                    RRType::AAAA => "TYPE_AAAA",
3202                                    _ => "invalid_type",
3203                                };
3204                                trace!(
3205                                    "Cannot find valid addrs for {} response on intf {:?}",
3206                                    t,
3207                                    &intf
3208                                );
3209                                continue;
3210                            }
3211                            for address in intf_addrs {
3212                                out.add_answer(
3213                                    &msg,
3214                                    DnsAddress::new(
3215                                        service_hostname,
3216                                        ip_address_rr_type(&address),
3217                                        CLASS_IN | CLASS_CACHE_FLUSH,
3218                                        service.get_host_ttl(),
3219                                        address,
3220                                        intf.into(),
3221                                    ),
3222                                );
3223                            }
3224                        }
3225                    }
3226                }
3227
3228                let query_name = q_name.to_lowercase();
3229                let service_opt = self
3230                    .my_services
3231                    .iter()
3232                    .find(|(k, _v)| dns_registry.resolve_name(k.as_str()) == query_name)
3233                    .map(|(_, v)| v);
3234
3235                let Some(service) = service_opt else {
3236                    continue;
3237                };
3238
3239                if service.get_status(if_index) != ServiceStatus::Announced {
3240                    continue;
3241                }
3242
3243                let intf_addrs = if is_ipv4 {
3244                    service.get_addrs_on_my_intf_v4(intf)
3245                } else {
3246                    service.get_addrs_on_my_intf_v6(intf)
3247                };
3248                if intf_addrs.is_empty() {
3249                    debug!(
3250                        "Cannot find valid addrs for TYPE_SRV response on intf {:?}",
3251                        &intf
3252                    );
3253                    continue;
3254                }
3255
3256                add_answer_of_service(
3257                    &mut out,
3258                    &msg,
3259                    question.entry_name(),
3260                    service,
3261                    qtype,
3262                    intf_addrs,
3263                );
3264            }
3265        }
3266
3267        if out.answers_count() > 0 {
3268            debug!("sending response on intf {}", &intf.name);
3269            out.set_id(msg.id());
3270
3271            // Pick a source IfAddr on `intf` whose subnet contains the querier's IP.
3272            // It's OK if it's None, `send_dns_outgoing` will then pick one address.
3273            let matched_source = intf
3274                .addrs
3275                .iter()
3276                .find(|if_addr| valid_ip_on_intf(&querier_ip, if_addr));
3277
3278            // RFC 6762 §6.7 (Legacy Unicast Responses): if the querier's source
3279            // port is not 5353, it's a one-shot legacy querier (e.g. Android's
3280            // getaddrinfo, iOS resolver fallback). The response MUST be unicast
3281            // back to the querier's source IP and port; multicast replies will
3282            // never reach the querier's ephemeral socket. Legacy unicast
3283            // responses must also echo the question section and clear the
3284            // cache-flush bit, since legacy resolvers don't understand it.
3285            let unicast_dest = if querier_addr.port() != MDNS_PORT {
3286                Some(querier_addr)
3287            } else {
3288                None
3289            };
3290
3291            if unicast_dest.is_some() {
3292                for q in msg.questions() {
3293                    out.add_question(q.entry_name(), q.entry_type());
3294                }
3295                out.clear_cache_flush_bits();
3296            }
3297
3298            if let Err(InternalError::IntfAddrInvalid(intf_addr)) = send_dns_outgoing(
3299                &out,
3300                intf,
3301                &sock.pktinfo,
3302                self.port,
3303                matched_source,
3304                unicast_dest,
3305            ) {
3306                let invalid_intf_addr = HashSet::from([intf_addr]);
3307                let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addr));
3308            }
3309
3310            let if_name = intf.name.clone();
3311
3312            self.increase_counter(Counter::Respond, 1);
3313            self.notify_monitors(DaemonEvent::Respond(if_name));
3314        }
3315
3316        self.increase_counter(Counter::KnownAnswerSuppression, out.known_answer_count());
3317    }
3318
3319    /// Increases the value of `counter` by `count`.
3320    fn increase_counter(&mut self, counter: Counter, count: i64) {
3321        let key = counter.to_string();
3322        match self.counters.get_mut(&key) {
3323            Some(v) => *v += count,
3324            None => {
3325                self.counters.insert(key, count);
3326            }
3327        }
3328    }
3329
3330    /// Sets the value of `counter` to `count`.
3331    fn set_counter(&mut self, counter: Counter, count: i64) {
3332        let key = counter.to_string();
3333        self.counters.insert(key, count);
3334    }
3335
3336    fn signal_sock_drain(&self) {
3337        let mut signal_buf = [0; 1024];
3338
3339        // This recv is non-blocking as the socket is non-blocking.
3340        while let Ok(sz) = self.signal_sock.recv(&mut signal_buf) {
3341            trace!(
3342                "signal socket recvd: {}",
3343                String::from_utf8_lossy(&signal_buf[0..sz])
3344            );
3345        }
3346    }
3347
3348    fn add_retransmission(&mut self, next_time: u64, command: Command) {
3349        self.retransmissions.push(ReRun { next_time, command });
3350        self.add_timer(next_time);
3351    }
3352
3353    /// Sends service removal event to listeners for expired service records.
3354    /// `expired`: map of service type domain to set of instance names.
3355    fn notify_service_removal(&self, expired: HashMap<String, HashSet<String>>) {
3356        for (ty_domain, sender) in self.service_queriers.iter() {
3357            if let Some(instances) = expired.get(ty_domain) {
3358                for instance_name in instances {
3359                    let event = ServiceEvent::ServiceRemoved(
3360                        ty_domain.to_string(),
3361                        instance_name.to_string(),
3362                    );
3363                    match sender.send(event) {
3364                        Ok(()) => debug!("notify_service_removal: sent ServiceRemoved to listener of {ty_domain}: {instance_name}"),
3365                        Err(e) => debug!("Failed to send event: {}", e),
3366                    }
3367                }
3368            }
3369        }
3370    }
3371
3372    /// The entry point that executes all commands received by the daemon.
3373    ///
3374    /// `repeating`: whether this is a retransmission.
3375    fn exec_command(&mut self, command: Command, repeating: bool) {
3376        trace!("exec_command: {:?} repeating: {}", &command, repeating);
3377        match command {
3378            Command::Browse(ty, next_delay, cache_only, listener) => {
3379                self.exec_command_browse(repeating, ty, next_delay, cache_only, listener);
3380            }
3381
3382            Command::ResolveHostname(hostname, next_delay, listener, timeout) => {
3383                self.exec_command_resolve_hostname(
3384                    repeating, hostname, next_delay, listener, timeout,
3385                );
3386            }
3387
3388            Command::Register(service_info) => {
3389                self.register_service(*service_info);
3390                self.increase_counter(Counter::Register, 1);
3391            }
3392
3393            Command::RegisterResend(fullname, intf) => {
3394                trace!("register-resend service: {fullname} on {}", &intf);
3395                if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
3396                    self.exec_command_register_resend(fullname, intf)
3397                {
3398                    let invalid_intf_addr = HashSet::from([intf_addr]);
3399                    let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addr));
3400                }
3401            }
3402
3403            Command::Unregister(fullname, resp_s) => {
3404                trace!("unregister service {} repeat {}", &fullname, &repeating);
3405                self.exec_command_unregister(repeating, fullname, resp_s);
3406            }
3407
3408            Command::UnregisterResend(packet, if_index, is_ipv4) => {
3409                self.exec_command_unregister_resend(packet, if_index, is_ipv4);
3410            }
3411
3412            Command::StopBrowse(ty_domain) => self.exec_command_stop_browse(ty_domain),
3413
3414            Command::StopResolveHostname(hostname) => {
3415                self.exec_command_stop_resolve_hostname(hostname.to_lowercase())
3416            }
3417
3418            Command::Resolve(instance, try_count) => self.exec_command_resolve(instance, try_count),
3419
3420            Command::GetMetrics(resp_s) => self.exec_command_get_metrics(resp_s),
3421
3422            Command::GetStatus(resp_s) => match resp_s.send(self.status.clone()) {
3423                Ok(()) => trace!("Sent status to the client"),
3424                Err(e) => debug!("Failed to send status: {}", e),
3425            },
3426
3427            Command::Monitor(resp_s) => {
3428                self.monitors.push(resp_s);
3429            }
3430
3431            Command::SetOption(daemon_opt) => {
3432                self.process_set_option(daemon_opt);
3433            }
3434
3435            Command::GetOption(resp_s) => {
3436                let val = DaemonOptionVal {
3437                    _service_name_len_max: self.service_name_len_max,
3438                    ip_check_interval: self.ip_check_interval,
3439                };
3440                if let Err(e) = resp_s.send(val) {
3441                    debug!("Failed to send options: {}", e);
3442                }
3443            }
3444
3445            Command::Verify(instance_fullname, timeout) => {
3446                self.exec_command_verify(instance_fullname, timeout, repeating);
3447            }
3448
3449            Command::InvalidIntfAddrs(invalid_intf_addrs) => {
3450                for intf_addr in invalid_intf_addrs {
3451                    self.del_interface_addr(&intf_addr);
3452                }
3453
3454                self.check_ip_changes();
3455            }
3456
3457            _ => {
3458                debug!("unexpected command: {:?}", &command);
3459            }
3460        }
3461    }
3462
3463    fn exec_command_get_metrics(&mut self, resp_s: Sender<HashMap<String, i64>>) {
3464        self.set_counter(Counter::CachedPTR, self.cache.ptr_count() as i64);
3465        self.set_counter(Counter::CachedSRV, self.cache.srv_count() as i64);
3466        self.set_counter(Counter::CachedAddr, self.cache.addr_count() as i64);
3467        self.set_counter(Counter::CachedTxt, self.cache.txt_count() as i64);
3468        self.set_counter(Counter::CachedNSec, self.cache.nsec_count() as i64);
3469        self.set_counter(Counter::CachedSubtype, self.cache.subtype_count() as i64);
3470        self.set_counter(Counter::Timer, self.timers.len() as i64);
3471
3472        let dns_registry_probe_count: usize = self
3473            .dns_registry_map
3474            .values()
3475            .map(|r| r.probing.len())
3476            .sum();
3477        self.set_counter(Counter::DnsRegistryProbe, dns_registry_probe_count as i64);
3478
3479        let dns_registry_active_count: usize = self
3480            .dns_registry_map
3481            .values()
3482            .map(|r| r.active.values().map(|a| a.len()).sum::<usize>())
3483            .sum();
3484        self.set_counter(Counter::DnsRegistryActive, dns_registry_active_count as i64);
3485
3486        let dns_registry_timer_count: usize = self
3487            .dns_registry_map
3488            .values()
3489            .map(|r| r.new_timers.len())
3490            .sum();
3491        self.set_counter(Counter::DnsRegistryTimer, dns_registry_timer_count as i64);
3492
3493        let dns_registry_name_change_count: usize = self
3494            .dns_registry_map
3495            .values()
3496            .map(|r| r.name_changes.len())
3497            .sum();
3498        self.set_counter(
3499            Counter::DnsRegistryNameChange,
3500            dns_registry_name_change_count as i64,
3501        );
3502
3503        // Send the metrics to the client.
3504        if let Err(e) = resp_s.send(self.counters.clone()) {
3505            debug!("Failed to send metrics: {}", e);
3506        }
3507    }
3508
3509    fn exec_command_browse(
3510        &mut self,
3511        repeating: bool,
3512        ty: String,
3513        next_delay: u32,
3514        cache_only: bool,
3515        listener: Sender<ServiceEvent>,
3516    ) {
3517        let pretty_addrs: Vec<String> = self
3518            .my_intfs
3519            .iter()
3520            .map(|(if_index, itf)| format!("{} ({if_index})", itf.name))
3521            .collect();
3522
3523        if let Err(e) = listener.send(ServiceEvent::SearchStarted(format!(
3524            "{ty} on {} interfaces [{}]",
3525            pretty_addrs.len(),
3526            pretty_addrs.join(", ")
3527        ))) {
3528            debug!(
3529                "Failed to send SearchStarted({})(repeating:{}): {}",
3530                &ty, repeating, e
3531            );
3532            return;
3533        }
3534
3535        let now = current_time_millis();
3536        if !repeating {
3537            // Binds a `listener` to querying mDNS domain type `ty`.
3538            //
3539            // If there is already a `listener`, it will be updated, i.e. overwritten.
3540            self.service_queriers.insert(ty.clone(), listener.clone());
3541
3542            // if we already have the records in our cache, just send them
3543            self.query_cache_for_service(&ty, &listener, now);
3544        }
3545
3546        if cache_only {
3547            // If cache_only is true, we do not send a query.
3548            match listener.send(ServiceEvent::SearchStopped(ty.clone())) {
3549                Ok(()) => debug!("SearchStopped sent for {}", &ty),
3550                Err(e) => debug!("Failed to send SearchStopped: {}", e),
3551            }
3552            return;
3553        }
3554
3555        self.send_query(&ty, RRType::PTR);
3556        self.increase_counter(Counter::Browse, 1);
3557
3558        let next_time = now + (next_delay * 1000) as u64;
3559        let max_delay = 60 * 60;
3560        let delay = cmp::min(next_delay * 2, max_delay);
3561        self.add_retransmission(next_time, Command::Browse(ty, delay, cache_only, listener));
3562    }
3563
3564    fn exec_command_resolve_hostname(
3565        &mut self,
3566        repeating: bool,
3567        hostname: String,
3568        next_delay: u32,
3569        listener: Sender<HostnameResolutionEvent>,
3570        timeout: Option<u64>,
3571    ) {
3572        let addr_list: Vec<_> = self.my_intfs.iter().collect();
3573        if let Err(e) = listener.send(HostnameResolutionEvent::SearchStarted(format!(
3574            "{} on addrs {:?}",
3575            &hostname, &addr_list
3576        ))) {
3577            debug!(
3578                "Failed to send ResolveStarted({})(repeating:{}): {}",
3579                &hostname, repeating, e
3580            );
3581            return;
3582        }
3583        if !repeating {
3584            self.add_hostname_resolver(hostname.to_owned(), listener.clone(), timeout);
3585            // if we already have the records in our cache, just send them
3586            self.query_cache_for_hostname(&hostname, listener.clone());
3587        }
3588
3589        self.send_query_vec(&[(&hostname, RRType::A), (&hostname, RRType::AAAA)]);
3590        self.increase_counter(Counter::ResolveHostname, 1);
3591
3592        let now = current_time_millis();
3593        let next_time = now + u64::from(next_delay) * 1000;
3594        let max_delay = 60 * 60;
3595        let delay = cmp::min(next_delay * 2, max_delay);
3596
3597        // Only add retransmission if it does not exceed the hostname resolver timeout, if any.
3598        if self
3599            .hostname_resolvers
3600            .get(&hostname)
3601            .and_then(|(_sender, timeout)| *timeout)
3602            .map(|timeout| next_time < timeout)
3603            .unwrap_or(true)
3604        {
3605            self.add_retransmission(
3606                next_time,
3607                Command::ResolveHostname(hostname, delay, listener, None),
3608            );
3609        }
3610    }
3611
3612    fn exec_command_resolve(&mut self, instance: String, try_count: u16) {
3613        let pending_query = self.query_unresolved(&instance);
3614        let max_try = 3;
3615        if pending_query && try_count < max_try {
3616            // Note that if the current try already succeeds, the next retransmission
3617            // will be no-op as the cache has been updated.
3618            let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
3619            self.add_retransmission(next_time, Command::Resolve(instance, try_count + 1));
3620        }
3621    }
3622
3623    fn exec_command_unregister(
3624        &mut self,
3625        repeating: bool,
3626        fullname: String,
3627        resp_s: Sender<UnregisterStatus>,
3628    ) {
3629        let response = match self.my_services.remove_entry(&fullname) {
3630            None => {
3631                debug!("unregister: cannot find such service {}", &fullname);
3632                UnregisterStatus::NotFound
3633            }
3634            Some((_k, info)) => {
3635                let mut timers = Vec::new();
3636
3637                for (if_index, intf) in self.my_intfs.iter() {
3638                    if let Some(sock) = self.ipv4_sock.as_ref() {
3639                        let packet = self.unregister_service(&info, intf, &sock.pktinfo);
3640                        // repeat for one time just in case some peers miss the message
3641                        if !repeating && !packet.is_empty() {
3642                            let next_time = current_time_millis() + 120;
3643                            self.retransmissions.push(ReRun {
3644                                next_time,
3645                                command: Command::UnregisterResend(packet, *if_index, true),
3646                            });
3647                            timers.push(next_time);
3648                        }
3649                    }
3650
3651                    // ipv6
3652                    if let Some(sock) = self.ipv6_sock.as_ref() {
3653                        let packet = self.unregister_service(&info, intf, &sock.pktinfo);
3654                        if !repeating && !packet.is_empty() {
3655                            let next_time = current_time_millis() + 120;
3656                            self.retransmissions.push(ReRun {
3657                                next_time,
3658                                command: Command::UnregisterResend(packet, *if_index, false),
3659                            });
3660                            timers.push(next_time);
3661                        }
3662                    }
3663                }
3664
3665                for t in timers {
3666                    self.add_timer(t);
3667                }
3668
3669                self.increase_counter(Counter::Unregister, 1);
3670                UnregisterStatus::OK
3671            }
3672        };
3673        if let Err(e) = resp_s.send(response) {
3674            debug!("unregister: failed to send response: {}", e);
3675        }
3676    }
3677
3678    fn exec_command_unregister_resend(&mut self, packet: Vec<u8>, if_index: u32, is_ipv4: bool) {
3679        let Some(intf) = self.my_intfs.get(&if_index) else {
3680            return;
3681        };
3682        let sock_opt = if is_ipv4 {
3683            &self.ipv4_sock
3684        } else {
3685            &self.ipv6_sock
3686        };
3687        let Some(sock) = sock_opt else {
3688            return;
3689        };
3690
3691        let if_addr = if is_ipv4 {
3692            match intf.next_ifaddr_v4() {
3693                Some(addr) => addr,
3694                None => return,
3695            }
3696        } else {
3697            match intf.next_ifaddr_v6() {
3698                Some(addr) => addr,
3699                None => return,
3700            }
3701        };
3702
3703        debug!("UnregisterResend from {:?}", if_addr);
3704        multicast_on_intf(
3705            &packet[..],
3706            &intf.name,
3707            intf.index,
3708            if_addr,
3709            &sock.pktinfo,
3710            self.port,
3711        );
3712
3713        self.increase_counter(Counter::UnregisterResend, 1);
3714    }
3715
3716    fn exec_command_stop_browse(&mut self, ty_domain: String) {
3717        match self.service_queriers.remove_entry(&ty_domain) {
3718            None => debug!("StopBrowse: cannot find querier for {}", &ty_domain),
3719            Some((ty, sender)) => {
3720                // Remove pending browse commands in the reruns.
3721                trace!("StopBrowse: removed queryer for {}", &ty);
3722                let mut i = 0;
3723                while i < self.retransmissions.len() {
3724                    if let Command::Browse(t, _, _, _) = &self.retransmissions[i].command {
3725                        if t == &ty {
3726                            self.retransmissions.remove(i);
3727                            trace!("StopBrowse: removed retransmission for {}", &ty);
3728                            continue;
3729                        }
3730                    }
3731                    i += 1;
3732                }
3733
3734                // Remove cache entries.
3735                self.cache.remove_service_type(&ty_domain);
3736
3737                // Notify the client.
3738                match sender.send(ServiceEvent::SearchStopped(ty_domain)) {
3739                    Ok(()) => trace!("Sent SearchStopped to the listener"),
3740                    Err(e) => debug!("Failed to send SearchStopped: {}", e),
3741                }
3742            }
3743        }
3744    }
3745
3746    fn exec_command_stop_resolve_hostname(&mut self, hostname: String) {
3747        if let Some((host, (sender, _timeout))) = self.hostname_resolvers.remove_entry(&hostname) {
3748            // Remove pending resolve commands in the reruns.
3749            trace!("StopResolve: removed queryer for {}", &host);
3750            let mut i = 0;
3751            while i < self.retransmissions.len() {
3752                if let Command::Resolve(t, _) = &self.retransmissions[i].command {
3753                    if t == &host {
3754                        self.retransmissions.remove(i);
3755                        trace!("StopResolve: removed retransmission for {}", &host);
3756                        continue;
3757                    }
3758                }
3759                i += 1;
3760            }
3761
3762            // Notify the client.
3763            match sender.send(HostnameResolutionEvent::SearchStopped(hostname)) {
3764                Ok(()) => trace!("Sent SearchStopped to the listener"),
3765                Err(e) => debug!("Failed to send SearchStopped: {}", e),
3766            }
3767        }
3768    }
3769
3770    fn exec_command_register_resend(&mut self, fullname: String, if_index: u32) -> MyResult<()> {
3771        let Some(info) = self.my_services.get_mut(&fullname) else {
3772            trace!("announce: cannot find such service {}", &fullname);
3773            return Ok(());
3774        };
3775
3776        let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
3777            return Ok(());
3778        };
3779
3780        let Some(intf) = self.my_intfs.get(&if_index) else {
3781            return Ok(());
3782        };
3783
3784        let announced_v4 = if let Some(sock) = self.ipv4_sock.as_ref() {
3785            announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo, self.port)?
3786        } else {
3787            false
3788        };
3789        let announced_v6 = if let Some(sock) = self.ipv6_sock.as_ref() {
3790            announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo, self.port)?
3791        } else {
3792            false
3793        };
3794
3795        if announced_v4 || announced_v6 {
3796            let hostname = dns_registry.resolve_name(info.get_hostname());
3797            let service_name = dns_registry.resolve_name(&fullname).to_string();
3798
3799            debug!("resend: announce service {service_name} on {}", intf.name);
3800
3801            notify_monitors(
3802                &mut self.monitors,
3803                DaemonEvent::Announce(service_name, format!("{}:{}", hostname, &intf.name)),
3804            );
3805            info.set_status(if_index, ServiceStatus::Announced);
3806        } else {
3807            debug!("register-resend should not fail");
3808        }
3809
3810        self.increase_counter(Counter::RegisterResend, 1);
3811        Ok(())
3812    }
3813
3814    fn exec_command_verify(&mut self, instance: String, timeout: Duration, repeating: bool) {
3815        /*
3816        RFC 6762 section 10.4:
3817        ...
3818        When the cache receives this hint that it should reconfirm some
3819        record, it MUST issue two or more queries for the resource record in
3820        dispute.  If no response is received within ten seconds, then, even
3821        though its TTL may indicate that it is not yet due to expire, that
3822        record SHOULD be promptly flushed from the cache.
3823        */
3824        let now = current_time_millis();
3825        let expire_at = if repeating {
3826            None
3827        } else {
3828            Some(now + timeout.as_millis() as u64)
3829        };
3830
3831        // send query for the resource records.
3832        let record_vec = self.cache.service_verify_queries(&instance, expire_at);
3833
3834        if !record_vec.is_empty() {
3835            let query_vec: Vec<(&str, RRType)> = record_vec
3836                .iter()
3837                .map(|(record, rr_type)| (record.as_str(), *rr_type))
3838                .collect();
3839            self.send_query_vec(&query_vec);
3840
3841            if let Some(new_expire) = expire_at {
3842                self.add_timer(new_expire); // ensure a check for the new expire time.
3843
3844                // schedule a resend 1 second later
3845                self.add_retransmission(now + 1000, Command::Verify(instance, timeout));
3846            }
3847        }
3848    }
3849
3850    /// Refresh cached service records with active queriers
3851    fn refresh_active_services(&mut self) {
3852        let mut query_ptr_count = 0;
3853        let mut query_srv_count = 0;
3854        let mut new_timers = HashSet::new();
3855        let mut query_addr_count = 0;
3856
3857        for (ty_domain, _sender) in self.service_queriers.iter() {
3858            let refreshed_timers = self.cache.refresh_due_ptr(ty_domain);
3859            if !refreshed_timers.is_empty() {
3860                trace!("sending refresh query for PTR: {}", ty_domain);
3861                self.send_query(ty_domain, RRType::PTR);
3862                query_ptr_count += 1;
3863                new_timers.extend(refreshed_timers);
3864            }
3865
3866            let (instances, timers) = self.cache.refresh_due_srv_txt(ty_domain);
3867            for (instance, types) in instances {
3868                trace!("sending refresh query for: {}", &instance);
3869                let query_vec = types
3870                    .into_iter()
3871                    .map(|ty| (instance.as_str(), ty))
3872                    .collect::<Vec<_>>();
3873                self.send_query_vec(&query_vec);
3874                query_srv_count += 1;
3875            }
3876            new_timers.extend(timers);
3877            let (hostnames, timers) = self.cache.refresh_due_hosts(ty_domain);
3878            for hostname in hostnames.iter() {
3879                trace!("sending refresh queries for A and AAAA:  {}", hostname);
3880                self.send_query_vec(&[(hostname, RRType::A), (hostname, RRType::AAAA)]);
3881                query_addr_count += 2;
3882            }
3883            new_timers.extend(timers);
3884        }
3885
3886        for timer in new_timers {
3887            self.add_timer(timer);
3888        }
3889
3890        self.increase_counter(Counter::CacheRefreshPTR, query_ptr_count);
3891        self.increase_counter(Counter::CacheRefreshSrvTxt, query_srv_count);
3892        self.increase_counter(Counter::CacheRefreshAddr, query_addr_count);
3893    }
3894}
3895
3896/// Adds one or more answers of a service for incoming msg and RR entry name.
3897fn add_answer_of_service(
3898    out: &mut DnsOutgoing,
3899    msg: &DnsIncoming,
3900    entry_name: &str,
3901    service: &ServiceInfo,
3902    qtype: RRType,
3903    intf_addrs: Vec<IpAddr>,
3904) {
3905    if qtype == RRType::SRV || qtype == RRType::ANY {
3906        out.add_answer(
3907            msg,
3908            DnsSrv::new(
3909                entry_name,
3910                CLASS_IN | CLASS_CACHE_FLUSH,
3911                service.get_host_ttl(),
3912                service.get_priority(),
3913                service.get_weight(),
3914                service.get_port(),
3915                service.get_hostname().to_string(),
3916            ),
3917        );
3918    }
3919
3920    if qtype == RRType::TXT || qtype == RRType::ANY {
3921        out.add_answer(
3922            msg,
3923            DnsTxt::new(
3924                entry_name,
3925                CLASS_IN | CLASS_CACHE_FLUSH,
3926                service.get_other_ttl(),
3927                service.generate_txt(),
3928            ),
3929        );
3930    }
3931
3932    if qtype == RRType::SRV {
3933        for address in intf_addrs {
3934            out.add_additional_answer(DnsAddress::new(
3935                service.get_hostname(),
3936                ip_address_rr_type(&address),
3937                CLASS_IN | CLASS_CACHE_FLUSH,
3938                service.get_host_ttl(),
3939                address,
3940                InterfaceId::default(),
3941            ));
3942        }
3943    }
3944}
3945
3946/// All possible events sent to the client from the daemon
3947/// regarding service discovery.
3948#[derive(Clone, Debug)]
3949#[non_exhaustive]
3950pub enum ServiceEvent {
3951    /// Started searching for a service type.
3952    SearchStarted(String),
3953
3954    /// Found a specific (service_type, fullname).
3955    ServiceFound(String, String),
3956
3957    /// Resolved a service instance in a ResolvedService struct.
3958    ServiceResolved(Box<ResolvedService>),
3959
3960    /// A service instance (service_type, fullname) was removed.
3961    ServiceRemoved(String, String),
3962
3963    /// Stopped searching for a service type.
3964    SearchStopped(String),
3965}
3966
3967/// All possible events sent to the client from the daemon
3968/// regarding host resolution.
3969#[derive(Clone, Debug)]
3970#[non_exhaustive]
3971pub enum HostnameResolutionEvent {
3972    /// Started searching for the ip address of a hostname.
3973    SearchStarted(String),
3974    /// One or more addresses for a hostname has been found.
3975    AddressesFound(String, HashSet<ScopedIp>),
3976    /// One or more addresses for a hostname has been removed.
3977    AddressesRemoved(String, HashSet<ScopedIp>),
3978    /// The search for the ip address of a hostname has timed out.
3979    SearchTimeout(String),
3980    /// Stopped searching for the ip address of a hostname.
3981    SearchStopped(String),
3982}
3983
3984/// Some notable events from the daemon besides [`ServiceEvent`].
3985/// These events are expected to happen infrequently.
3986#[derive(Clone, Debug)]
3987#[non_exhaustive]
3988pub enum DaemonEvent {
3989    /// Daemon unsolicitly announced a service from an interface.
3990    Announce(String, String),
3991
3992    /// Daemon encountered an error.
3993    Error(Error),
3994
3995    /// Daemon detected a new IP address from the host.
3996    IpAdd(IpAddr),
3997
3998    /// Daemon detected a IP address removed from the host.
3999    IpDel(IpAddr),
4000
4001    /// Daemon resolved a name conflict by changing one of its names.
4002    /// see [DnsNameChange] for more details.
4003    NameChange(DnsNameChange),
4004
4005    /// Send out a multicast response via an interface.
4006    Respond(String),
4007}
4008
4009/// Represents a name change due to a name conflict resolution.
4010/// See [RFC 6762 section 9](https://datatracker.ietf.org/doc/html/rfc6762#section-9)
4011#[derive(Clone, Debug)]
4012pub struct DnsNameChange {
4013    /// The original name set in `ServiceInfo` by the user.
4014    pub original: String,
4015
4016    /// A new name is created by appending a suffix after the original name.
4017    ///
4018    /// - for a service instance name, the suffix is `(N)`, where N starts at 2.
4019    /// - for a host name, the suffix is `-N`, where N starts at 2.
4020    ///
4021    /// For example:
4022    ///
4023    /// - Service name `foo._service-type._udp` becomes `foo (2)._service-type._udp`
4024    /// - Host name `foo.local.` becomes `foo-2.local.`
4025    pub new_name: String,
4026
4027    /// The resource record type
4028    pub rr_type: RRType,
4029
4030    /// The interface where the name conflict and its change happened.
4031    pub intf_name: String,
4032}
4033
4034/// Commands supported by the daemon
4035#[derive(Debug)]
4036enum Command {
4037    /// Browsing for a service type (ty_domain, next_time_delay_in_seconds, channel::sender)
4038    Browse(String, u32, bool, Sender<ServiceEvent>),
4039
4040    /// Resolve a hostname to IP addresses.
4041    ResolveHostname(String, u32, Sender<HostnameResolutionEvent>, Option<u64>), // (hostname, next_time_delay_in_seconds, sender, timeout_in_milliseconds)
4042
4043    /// Register a service
4044    Register(Box<ServiceInfo>),
4045
4046    /// Unregister a service
4047    Unregister(String, Sender<UnregisterStatus>), // (fullname)
4048
4049    /// Announce again a service to local network
4050    RegisterResend(String, u32), // (fullname)
4051
4052    /// Resend unregister packet.
4053    UnregisterResend(Vec<u8>, u32, bool), // (packet content, if_index, is_ipv4)
4054
4055    /// Stop browsing a service type
4056    StopBrowse(String), // (ty_domain)
4057
4058    /// Stop resolving a hostname
4059    StopResolveHostname(String), // (hostname)
4060
4061    /// Send query to resolve a service instance.
4062    /// This is used when a PTR record exists but SRV & TXT records are missing.
4063    Resolve(String, u16), // (service_instance_fullname, try_count)
4064
4065    /// Read the current values of the counters
4066    GetMetrics(Sender<Metrics>),
4067
4068    /// Get the current status of the daemon.
4069    GetStatus(Sender<DaemonStatus>),
4070
4071    /// Monitor noticeable events in the daemon.
4072    Monitor(Sender<DaemonEvent>),
4073
4074    SetOption(DaemonOption),
4075
4076    GetOption(Sender<DaemonOptionVal>),
4077
4078    /// Proactively confirm a DNS resource record.
4079    ///
4080    /// The intention is to check if a service name or IP address still valid
4081    /// before its TTL expires.
4082    Verify(String, Duration),
4083
4084    /// Invalidate some interface addresses.
4085    InvalidIntfAddrs(HashSet<Interface>),
4086
4087    Exit(Sender<DaemonStatus>),
4088}
4089
4090impl fmt::Display for Command {
4091    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4092        match self {
4093            Self::Browse(_, _, _, _) => write!(f, "Command Browse"),
4094            Self::ResolveHostname(_, _, _, _) => write!(f, "Command ResolveHostname"),
4095            Self::Exit(_) => write!(f, "Command Exit"),
4096            Self::GetStatus(_) => write!(f, "Command GetStatus"),
4097            Self::GetMetrics(_) => write!(f, "Command GetMetrics"),
4098            Self::Monitor(_) => write!(f, "Command Monitor"),
4099            Self::Register(_) => write!(f, "Command Register"),
4100            Self::RegisterResend(_, _) => write!(f, "Command RegisterResend"),
4101            Self::SetOption(_) => write!(f, "Command SetOption"),
4102            Self::GetOption(_) => write!(f, "Command GetOption"),
4103            Self::StopBrowse(_) => write!(f, "Command StopBrowse"),
4104            Self::StopResolveHostname(_) => write!(f, "Command StopResolveHostname"),
4105            Self::Unregister(_, _) => write!(f, "Command Unregister"),
4106            Self::UnregisterResend(_, _, _) => write!(f, "Command UnregisterResend"),
4107            Self::Resolve(_, _) => write!(f, "Command Resolve"),
4108            Self::Verify(_, _) => write!(f, "Command VerifyResource"),
4109            Self::InvalidIntfAddrs(_) => write!(f, "Command InvalidIntfAddrs"),
4110        }
4111    }
4112}
4113
4114struct DaemonOptionVal {
4115    _service_name_len_max: u8,
4116    ip_check_interval: u64,
4117}
4118
4119#[derive(Debug)]
4120enum DaemonOption {
4121    ServiceNameLenMax(u8),
4122    IpCheckInterval(u64),
4123    EnableInterface(Vec<IfKind>),
4124    DisableInterface(Vec<IfKind>),
4125    MulticastLoopV4(bool),
4126    MulticastLoopV6(bool),
4127    AcceptUnsolicited(bool),
4128    IncludeAppleP2P(bool),
4129    #[cfg(test)]
4130    TestDownInterface(String),
4131    #[cfg(test)]
4132    TestUpInterface(String),
4133}
4134
4135/// The length of Service Domain name supported in this lib.
4136const DOMAIN_LEN: usize = "._tcp.local.".len();
4137
4138/// Validate the length of "service_name" in a "_<service_name>.<domain_name>." string.
4139fn check_service_name_length(ty_domain: &str, limit: u8) -> Result<()> {
4140    if ty_domain.len() <= DOMAIN_LEN + 1 {
4141        // service name cannot be empty or only '_'.
4142        return Err(e_fmt!("Service type name cannot be empty: {}", ty_domain));
4143    }
4144
4145    let service_name_len = ty_domain.len() - DOMAIN_LEN - 1; // exclude the leading `_`
4146    if service_name_len > limit as usize {
4147        return Err(e_fmt!("Service name length must be <= {} bytes", limit));
4148    }
4149    Ok(())
4150}
4151
4152/// Checks if `name` ends with a valid domain: '._tcp.local.' or '._udp.local.'
4153fn check_domain_suffix(name: &str) -> Result<()> {
4154    if !(name.ends_with("._tcp.local.") || name.ends_with("._udp.local.")) {
4155        return Err(e_fmt!(
4156            "mDNS service {} must end with '._tcp.local.' or '._udp.local.'",
4157            name
4158        ));
4159    }
4160
4161    Ok(())
4162}
4163
4164/// Validate the service name in a fully qualified name.
4165///
4166/// A Full Name = <Instance>.<Service>.<Domain>
4167/// The only `<Domain>` supported are "._tcp.local." and "._udp.local.".
4168///
4169/// Note: this function does not check for the length of the service name.
4170/// Instead, `register_service` method will check the length.
4171fn check_service_name(fullname: &str) -> Result<()> {
4172    check_domain_suffix(fullname)?;
4173
4174    let remaining: Vec<&str> = fullname[..fullname.len() - DOMAIN_LEN].split('.').collect();
4175    let name = remaining.last().ok_or_else(|| e_fmt!("No service name"))?;
4176
4177    if &name[0..1] != "_" {
4178        return Err(e_fmt!("Service name must start with '_'"));
4179    }
4180
4181    let name = &name[1..];
4182
4183    if name.contains("--") {
4184        return Err(e_fmt!("Service name must not contain '--'"));
4185    }
4186
4187    if name.starts_with('-') || name.ends_with('-') {
4188        return Err(e_fmt!("Service name (%s) may not start or end with '-'"));
4189    }
4190
4191    let ascii_count = name.chars().filter(|c| c.is_ascii_alphabetic()).count();
4192    if ascii_count < 1 {
4193        return Err(e_fmt!(
4194            "Service name must contain at least one letter (eg: 'A-Za-z')"
4195        ));
4196    }
4197
4198    Ok(())
4199}
4200
4201/// Validate a hostname.
4202fn check_hostname(hostname: &str) -> Result<()> {
4203    if !hostname.ends_with(".local.") {
4204        return Err(e_fmt!("Hostname must end with '.local.': {hostname}"));
4205    }
4206
4207    if hostname == ".local." {
4208        return Err(e_fmt!(
4209            "The part of the hostname before '.local.' cannot be empty"
4210        ));
4211    }
4212
4213    if hostname.len() > 255 {
4214        return Err(e_fmt!("Hostname length must be <= 255 bytes"));
4215    }
4216
4217    Ok(())
4218}
4219
4220fn call_service_listener(
4221    listeners_map: &HashMap<String, Sender<ServiceEvent>>,
4222    ty_domain: &str,
4223    event: ServiceEvent,
4224) {
4225    if let Some(listener) = listeners_map.get(ty_domain) {
4226        match listener.send(event) {
4227            Ok(()) => trace!("Sent event to listener successfully"),
4228            Err(e) => debug!("Failed to send event: {}", e),
4229        }
4230    }
4231}
4232
4233fn call_hostname_resolution_listener(
4234    listeners_map: &HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>,
4235    hostname: &str,
4236    event: HostnameResolutionEvent,
4237) {
4238    let hostname_lower = hostname.to_lowercase();
4239    if let Some(listener) = listeners_map.get(&hostname_lower).map(|(l, _)| l) {
4240        match listener.send(event) {
4241            Ok(()) => trace!("Sent event to listener successfully"),
4242            Err(e) => debug!("Failed to send event: {}", e),
4243        }
4244    }
4245}
4246
4247/// Returns valid network interfaces in the host system.
4248/// Operational down interfaces are excluded.
4249/// Loopback interfaces are excluded if `with_loopback` is false.
4250fn my_ip_interfaces(with_loopback: bool) -> Vec<Interface> {
4251    my_ip_interfaces_inner(with_loopback, false)
4252}
4253
4254fn my_ip_interfaces_inner(with_loopback: bool, with_apple_p2p: bool) -> Vec<Interface> {
4255    if_addrs::get_if_addrs()
4256        .unwrap_or_default()
4257        .into_iter()
4258        .filter(|i| {
4259            i.is_oper_up()
4260                && !i.is_p2p()
4261                && (!i.is_loopback() || with_loopback)
4262                && (with_apple_p2p || !is_apple_p2p_by_name(&i.name))
4263        })
4264        .collect()
4265}
4266
4267/// Checks if the interface name indicates it's an Apple peer-to-peer interface,
4268/// which should be ignored by default.
4269fn is_apple_p2p_by_name(name: &str) -> bool {
4270    let p2p_prefixes = ["awdl", "llw"];
4271    p2p_prefixes.iter().any(|prefix| name.starts_with(prefix))
4272}
4273
4274/// Send an outgoing mDNS query or response, and returns the packet bytes.
4275/// Returns empty vec if no valid interface address is found.
4276fn send_dns_outgoing(
4277    out: &DnsOutgoing,
4278    my_intf: &MyIntf,
4279    sock: &PktInfoUdpSocket,
4280    port: u16,
4281    source: Option<&IfAddr>,
4282    unicast_dest: Option<SocketAddr>,
4283) -> MyResult<Vec<Vec<u8>>> {
4284    let if_name = &my_intf.name;
4285
4286    let if_addr = match source {
4287        Some(addr) => addr,
4288        None => {
4289            if sock.domain() == Domain::IPV4 {
4290                match my_intf.next_ifaddr_v4() {
4291                    Some(addr) => addr,
4292                    None => return Ok(vec![]),
4293                }
4294            } else {
4295                match my_intf.next_ifaddr_v6() {
4296                    Some(addr) => addr,
4297                    None => return Ok(vec![]),
4298                }
4299            }
4300        }
4301    };
4302
4303    send_dns_outgoing_impl(
4304        out,
4305        if_name,
4306        my_intf.index,
4307        if_addr,
4308        sock,
4309        port,
4310        unicast_dest,
4311    )
4312}
4313
4314/// Send an outgoing mDNS query or response, and returns the packet bytes.
4315fn send_dns_outgoing_impl(
4316    out: &DnsOutgoing,
4317    if_name: &str,
4318    if_index: u32,
4319    if_addr: &IfAddr,
4320    sock: &PktInfoUdpSocket,
4321    port: u16,
4322    unicast_dest: Option<SocketAddr>,
4323) -> MyResult<Vec<Vec<u8>>> {
4324    let qtype = if out.is_query() {
4325        "query"
4326    } else {
4327        if out.answers_count() == 0 && out.additionals().is_empty() {
4328            return Ok(vec![]); // no need to send empty response
4329        }
4330        "response"
4331    };
4332    trace!(
4333        "send {}: {} questions {} answers {} authorities {} additional",
4334        qtype,
4335        out.questions().len(),
4336        out.answers_count(),
4337        out.authorities().len(),
4338        out.additionals().len()
4339    );
4340
4341    match if_addr.ip() {
4342        IpAddr::V4(ipv4) => {
4343            if let Err(e) = sock.set_multicast_if_v4(&ipv4) {
4344                debug!(
4345                    "send_dns_outgoing: failed to set multicast interface for IPv4 {}: {}",
4346                    ipv4, e
4347                );
4348                // cannot send without a valid interface
4349                if e.kind() == std::io::ErrorKind::AddrNotAvailable {
4350                    let intf_addr = Interface {
4351                        name: if_name.to_string(),
4352                        addr: if_addr.clone(),
4353                        index: Some(if_index),
4354                        oper_status: if_addrs::IfOperStatus::Down,
4355                        is_p2p: false,
4356                        #[cfg(windows)]
4357                        adapter_name: String::new(),
4358                    };
4359                    return Err(InternalError::IntfAddrInvalid(intf_addr));
4360                }
4361                return Ok(vec![]); // non-fatal other failure
4362            }
4363        }
4364        IpAddr::V6(ipv6) => {
4365            if let Err(e) = sock.set_multicast_if_v6(if_index) {
4366                debug!(
4367                    "send_dns_outgoing: failed to set multicast interface for IPv6 {}: {}",
4368                    ipv6, e
4369                );
4370                // cannot send without a valid interface
4371                if e.kind() == std::io::ErrorKind::AddrNotAvailable {
4372                    let intf_addr = Interface {
4373                        name: if_name.to_string(),
4374                        addr: if_addr.clone(),
4375                        index: Some(if_index),
4376                        oper_status: if_addrs::IfOperStatus::Down,
4377                        is_p2p: false,
4378                        #[cfg(windows)]
4379                        adapter_name: String::new(),
4380                    };
4381                    return Err(InternalError::IntfAddrInvalid(intf_addr));
4382                }
4383                return Ok(vec![]); // non-fatal other failure
4384            }
4385        }
4386    }
4387
4388    let packet_list = out.to_data_on_wire();
4389    for packet in packet_list.iter() {
4390        match unicast_dest {
4391            Some(dest) => unicast_on_intf(packet, if_name, dest, sock),
4392            None => multicast_on_intf(packet, if_name, if_index, if_addr, sock, port),
4393        }
4394    }
4395    Ok(packet_list)
4396}
4397
4398/// Sends a unicast packet directly to `dest` (used for RFC 6762 §6.7
4399/// legacy unicast responses).
4400fn unicast_on_intf(packet: &[u8], if_name: &str, dest: SocketAddr, socket: &PktInfoUdpSocket) {
4401    if packet.len() > MAX_MSG_ABSOLUTE {
4402        debug!("Drop over-sized packet ({})", packet.len());
4403        return;
4404    }
4405
4406    let sock_addr = dest.into();
4407    match socket.send_to(packet, &sock_addr) {
4408        Ok(sz) => trace!(
4409            "sent unicast {} bytes on interface {} to {}",
4410            sz,
4411            if_name,
4412            dest
4413        ),
4414        Err(e) => trace!(
4415            "Failed to send unicast to {} via {:?}: {}",
4416            dest,
4417            &if_name,
4418            e
4419        ),
4420    }
4421}
4422
4423/// Sends a multicast packet, and returns the packet bytes.
4424fn multicast_on_intf(
4425    packet: &[u8],
4426    if_name: &str,
4427    if_index: u32,
4428    if_addr: &IfAddr,
4429    socket: &PktInfoUdpSocket,
4430    port: u16,
4431) {
4432    if packet.len() > MAX_MSG_ABSOLUTE {
4433        debug!("Drop over-sized packet ({})", packet.len());
4434        return;
4435    }
4436
4437    let addr: SocketAddr = match if_addr {
4438        if_addrs::IfAddr::V4(_) => SocketAddrV4::new(GROUP_ADDR_V4, port).into(),
4439        if_addrs::IfAddr::V6(_) => {
4440            let mut sock = SocketAddrV6::new(GROUP_ADDR_V6, port, 0, 0);
4441            sock.set_scope_id(if_index); // Choose iface for multicast
4442            sock.into()
4443        }
4444    };
4445
4446    // Sends out `packet` to `addr` on the socket.
4447    let sock_addr = addr.into();
4448    match socket.send_to(packet, &sock_addr) {
4449        Ok(sz) => trace!(
4450            "sent out {} bytes on interface {} (idx {}) addr {}",
4451            sz,
4452            if_name,
4453            if_index,
4454            if_addr.ip()
4455        ),
4456        Err(e) => trace!("Failed to send to {} via {:?}: {}", addr, &if_name, e),
4457    }
4458}
4459
4460/// Returns true if `name` is a valid instance name of format:
4461/// <instance>.<service_type>.<_udp|_tcp>.local.
4462/// Note: <instance> could contain '.' as well.
4463fn valid_instance_name(name: &str) -> bool {
4464    name.split('.').count() >= 5
4465}
4466
4467fn notify_monitors(monitors: &mut Vec<Sender<DaemonEvent>>, event: DaemonEvent) {
4468    monitors.retain(|sender| {
4469        if let Err(e) = sender.try_send(event.clone()) {
4470            debug!("notify_monitors: try_send: {}", &e);
4471            if matches!(e, TrySendError::Disconnected(_)) {
4472                return false; // This monitor is dropped.
4473            }
4474        }
4475        true
4476    });
4477}
4478
4479/// Check if all unique records passed "probing", and if yes, create a packet
4480/// to announce the service.
4481fn prepare_announce(
4482    info: &ServiceInfo,
4483    intf: &MyIntf,
4484    dns_registry: &mut DnsRegistry,
4485    is_ipv4: bool,
4486) -> Option<DnsOutgoing> {
4487    let intf_addrs = if is_ipv4 {
4488        info.get_addrs_on_my_intf_v4(intf)
4489    } else {
4490        info.get_addrs_on_my_intf_v6(intf)
4491    };
4492
4493    if intf_addrs.is_empty() {
4494        debug!(
4495            "prepare_announce (ipv4: {is_ipv4}): no valid addrs on interface {}",
4496            &intf.name
4497        );
4498        return None;
4499    }
4500
4501    // check if we changed our name due to conflicts.
4502    let service_fullname = dns_registry.resolve_name(info.get_fullname());
4503
4504    debug!(
4505        "prepare to announce service {service_fullname} on {:?}",
4506        &intf_addrs
4507    );
4508
4509    let mut probing_count = 0;
4510    let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
4511    let create_time = current_time_millis() + fastrand::u64(0..250);
4512
4513    out.add_answer_at_time(
4514        DnsPointer::new(
4515            info.get_type(),
4516            RRType::PTR,
4517            CLASS_IN,
4518            info.get_other_ttl(),
4519            service_fullname.to_string(),
4520        ),
4521        0,
4522    );
4523
4524    if let Some(sub) = info.get_subtype() {
4525        trace!("Adding subdomain {}", sub);
4526        out.add_answer_at_time(
4527            DnsPointer::new(
4528                sub,
4529                RRType::PTR,
4530                CLASS_IN,
4531                info.get_other_ttl(),
4532                service_fullname.to_string(),
4533            ),
4534            0,
4535        );
4536    }
4537
4538    // SRV records.
4539    let hostname = dns_registry.resolve_name(info.get_hostname()).to_string();
4540
4541    let mut srv = DnsSrv::new(
4542        info.get_fullname(),
4543        CLASS_IN | CLASS_CACHE_FLUSH,
4544        info.get_host_ttl(),
4545        info.get_priority(),
4546        info.get_weight(),
4547        info.get_port(),
4548        hostname,
4549    );
4550
4551    if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
4552        srv.get_record_mut().set_new_name(new_name.to_string());
4553    }
4554
4555    if !info.requires_probe()
4556        || dns_registry.is_probing_done(&srv, info.get_fullname(), create_time)
4557    {
4558        out.add_answer_at_time(srv, 0);
4559    } else {
4560        probing_count += 1;
4561    }
4562
4563    // TXT records.
4564
4565    let mut txt = DnsTxt::new(
4566        info.get_fullname(),
4567        CLASS_IN | CLASS_CACHE_FLUSH,
4568        info.get_other_ttl(),
4569        info.generate_txt(),
4570    );
4571
4572    if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
4573        txt.get_record_mut().set_new_name(new_name.to_string());
4574    }
4575
4576    if !info.requires_probe()
4577        || dns_registry.is_probing_done(&txt, info.get_fullname(), create_time)
4578    {
4579        out.add_answer_at_time(txt, 0);
4580    } else {
4581        probing_count += 1;
4582    }
4583
4584    // Address records. (A and AAAA)
4585
4586    let hostname = info.get_hostname();
4587    for address in intf_addrs {
4588        let mut dns_addr = DnsAddress::new(
4589            hostname,
4590            ip_address_rr_type(&address),
4591            CLASS_IN | CLASS_CACHE_FLUSH,
4592            info.get_host_ttl(),
4593            address,
4594            intf.into(),
4595        );
4596
4597        if let Some(new_name) = dns_registry.name_changes.get(hostname) {
4598            dns_addr.get_record_mut().set_new_name(new_name.to_string());
4599        }
4600
4601        if !info.requires_probe()
4602            || dns_registry.is_probing_done(&dns_addr, info.get_fullname(), create_time)
4603        {
4604            out.add_answer_at_time(dns_addr, 0);
4605        } else {
4606            probing_count += 1;
4607        }
4608    }
4609
4610    if probing_count > 0 {
4611        return None;
4612    }
4613
4614    Some(out)
4615}
4616
4617/// Send an unsolicited response for owned service via `intf` and `sock`.
4618/// Returns true if sent out successfully for IPv4 or IPv6.
4619fn announce_service_on_intf(
4620    dns_registry: &mut DnsRegistry,
4621    info: &ServiceInfo,
4622    intf: &MyIntf,
4623    sock: &PktInfoUdpSocket,
4624    port: u16,
4625) -> MyResult<bool> {
4626    let is_ipv4 = sock.domain() == Domain::IPV4;
4627    if let Some(out) = prepare_announce(info, intf, dns_registry, is_ipv4) {
4628        let _ = send_dns_outgoing(&out, intf, sock, port, None, None)?;
4629        return Ok(true);
4630    }
4631
4632    Ok(false)
4633}
4634
4635/// Returns a new name based on the `original` to avoid conflicts.
4636/// If the name already contains a number in parentheses, increments that number.
4637///
4638/// Examples:
4639/// - `foo.local.` becomes `foo (2).local.`
4640/// - `foo (2).local.` becomes `foo (3).local.`
4641/// - `foo (9)` becomes `foo (10)`
4642fn name_change(original: &str) -> String {
4643    let mut parts: Vec<_> = original.split('.').collect();
4644    let Some(first_part) = parts.get_mut(0) else {
4645        return format!("{original} (2)");
4646    };
4647
4648    let mut new_name = format!("{first_part} (2)");
4649
4650    // check if there is already has `(<num>)` suffix.
4651    if let Some(paren_pos) = first_part.rfind(" (") {
4652        // Check if there's a closing parenthesis
4653        if let Some(end_paren) = first_part[paren_pos..].find(')') {
4654            let absolute_end_pos = paren_pos + end_paren;
4655            // Only process if the closing parenthesis is the last character
4656            if absolute_end_pos == first_part.len() - 1 {
4657                let num_start = paren_pos + 2; // Skip " ("
4658                                               // Try to parse the number between parentheses
4659                if let Ok(number) = first_part[num_start..absolute_end_pos].parse::<u32>() {
4660                    let base_name = &first_part[..paren_pos];
4661                    new_name = format!("{} ({})", base_name, number + 1)
4662                }
4663            }
4664        }
4665    }
4666
4667    *first_part = &new_name;
4668    parts.join(".")
4669}
4670
4671/// Returns a new name based on the `original` to avoid conflicts.
4672/// If the name already contains a hyphenated number, increments that number.
4673///
4674/// Examples:
4675/// - `foo.local.` becomes `foo-2.local.`
4676/// - `foo-2.local.` becomes `foo-3.local.`
4677/// - `foo` becomes `foo-2`
4678fn hostname_change(original: &str) -> String {
4679    let mut parts: Vec<_> = original.split('.').collect();
4680    let Some(first_part) = parts.get_mut(0) else {
4681        return format!("{original}-2");
4682    };
4683
4684    let mut new_name = format!("{first_part}-2");
4685
4686    // check if there is already a `-<num>` suffix
4687    if let Some(hyphen_pos) = first_part.rfind('-') {
4688        // Try to parse everything after the hyphen as a number
4689        if let Ok(number) = first_part[hyphen_pos + 1..].parse::<u32>() {
4690            let base_name = &first_part[..hyphen_pos];
4691            new_name = format!("{}-{}", base_name, number + 1);
4692        }
4693    }
4694
4695    *first_part = &new_name;
4696    parts.join(".")
4697}
4698
4699/// Check probes in a registry and returns: a probing packet to send out, and a list of probe names
4700/// that are finished.
4701fn check_probing(
4702    dns_registry: &mut DnsRegistry,
4703    timers: &mut BinaryHeap<Reverse<u64>>,
4704    now: u64,
4705) -> (DnsOutgoing, Vec<String>) {
4706    let mut expired_probes = Vec::new();
4707    let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
4708
4709    for (name, probe) in dns_registry.probing.iter_mut() {
4710        if now >= probe.next_send {
4711            if probe.expired(now) {
4712                // move the record to active
4713                expired_probes.push(name.clone());
4714            } else {
4715                out.add_question(name, RRType::ANY);
4716
4717                /*
4718                RFC 6762 section 8.2: https://datatracker.ietf.org/doc/html/rfc6762#section-8.2
4719                ...
4720                for tiebreaking to work correctly in all
4721                cases, the Authority Section must contain *all* the records and
4722                proposed rdata being probed for uniqueness.
4723                    */
4724                for record in probe.records.iter() {
4725                    out.add_authority(record.clone());
4726                }
4727
4728                probe.update_next_send(now);
4729
4730                // add timer
4731                timers.push(Reverse(probe.next_send));
4732            }
4733        }
4734    }
4735
4736    (out, expired_probes)
4737}
4738
4739/// Process expired probes on an interface and return a list of services
4740/// that are waiting for the probe to finish.
4741///
4742/// `DnsNameChange` events are sent to the monitors.
4743fn handle_expired_probes(
4744    expired_probes: Vec<String>,
4745    intf_name: &str,
4746    dns_registry: &mut DnsRegistry,
4747    monitors: &mut Vec<Sender<DaemonEvent>>,
4748) -> HashSet<String> {
4749    let mut waiting_services = HashSet::new();
4750
4751    for name in expired_probes {
4752        let Some(probe) = dns_registry.probing.remove(&name) else {
4753            continue;
4754        };
4755
4756        // send notifications about name changes
4757        for record in probe.records.iter() {
4758            if let Some(new_name) = record.get_record().get_new_name() {
4759                dns_registry
4760                    .name_changes
4761                    .insert(name.clone(), new_name.to_string());
4762
4763                let event = DnsNameChange {
4764                    original: record.get_record().get_original_name().to_string(),
4765                    new_name: new_name.to_string(),
4766                    rr_type: record.get_type(),
4767                    intf_name: intf_name.to_string(),
4768                };
4769                debug!("Name change event: {:?}", &event);
4770                notify_monitors(monitors, DaemonEvent::NameChange(event));
4771            }
4772        }
4773
4774        // move RR from probe to active.
4775        debug!(
4776            "probe of '{name}' finished: move {} records to active. ({} waiting services)",
4777            probe.records.len(),
4778            probe.waiting_services.len(),
4779        );
4780
4781        // Move records to active and plan to wake up services if records are not empty.
4782        if !probe.records.is_empty() {
4783            match dns_registry.active.get_mut(&name) {
4784                Some(records) => {
4785                    records.extend(probe.records);
4786                }
4787                None => {
4788                    dns_registry.active.insert(name, probe.records);
4789                }
4790            }
4791
4792            waiting_services.extend(probe.waiting_services);
4793        }
4794    }
4795
4796    waiting_services
4797}
4798
4799/// Resolves `IfKind::Addr(ip)` to `IndexV4(if_index)` or `IndexV6(if_index)`.
4800fn resolve_addr_to_index(if_kind: IfKind, interfaces: &[Interface]) -> IfKind {
4801    if let IfKind::Addr(addr) = &if_kind {
4802        if let Some(intf) = interfaces.iter().find(|intf| &intf.ip() == addr) {
4803            let if_index = intf.index.unwrap_or(0);
4804            return if addr.is_ipv4() {
4805                IfKind::IndexV4(if_index)
4806            } else {
4807                IfKind::IndexV6(if_index)
4808            };
4809        }
4810    }
4811    if_kind
4812}
4813
4814#[cfg(test)]
4815mod tests {
4816    use super::{
4817        _new_socket_bind, check_domain_suffix, check_service_name_length, hostname_change,
4818        my_ip_interfaces, name_change, send_dns_outgoing_impl, valid_instance_name,
4819        valid_ip_on_intf, HostnameResolutionEvent, MyIntf, ServiceDaemon, ServiceEvent,
4820        ServiceInfo, GROUP_ADDR_V4, MDNS_PORT,
4821    };
4822    use crate::{
4823        dns_parser::{
4824            DnsEntryExt, DnsIncoming, DnsOutgoing, DnsPointer, InterfaceId, RRType, ScopedIp,
4825            CLASS_IN, FLAGS_AA, FLAGS_QR_QUERY, FLAGS_QR_RESPONSE,
4826        },
4827        service_daemon::{add_answer_of_service, check_hostname},
4828    };
4829    use if_addrs::{IfAddr, Ifv4Addr};
4830    use std::{
4831        collections::HashSet,
4832        net::{IpAddr, Ipv4Addr, UdpSocket},
4833        time::{Duration, Instant, SystemTime},
4834    };
4835    use test_log::test;
4836
4837    #[test]
4838    fn test_response_source_ifaddr_match() {
4839        // When an interface has multiple IPs on unrelated subnets,
4840        // handle_query should pick the IfAddr whose subnet contains the querier,
4841        // and fall back to None if none match.
4842        let ifaddr_a = IfAddr::V4(Ifv4Addr {
4843            ip: Ipv4Addr::new(192, 168, 1, 148),
4844            netmask: Ipv4Addr::new(255, 255, 255, 0),
4845            broadcast: None,
4846            prefixlen: 24,
4847        });
4848        let ifaddr_b = IfAddr::V4(Ifv4Addr {
4849            ip: Ipv4Addr::new(10, 238, 0, 51),
4850            netmask: Ipv4Addr::new(255, 255, 255, 0),
4851            broadcast: None,
4852            prefixlen: 24,
4853        });
4854
4855        let intf = MyIntf {
4856            name: "dummy0".to_string(),
4857            index: 1,
4858            addrs: HashSet::from([ifaddr_a.clone(), ifaddr_b.clone()]),
4859        };
4860
4861        let pick = |querier: IpAddr| -> Option<IfAddr> {
4862            intf.addrs
4863                .iter()
4864                .find(|a| valid_ip_on_intf(&querier, a))
4865                .cloned()
4866        };
4867
4868        assert_eq!(
4869            pick(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2))),
4870            Some(ifaddr_a)
4871        );
4872        assert_eq!(
4873            pick(IpAddr::V4(Ipv4Addr::new(10, 238, 0, 99))),
4874            Some(ifaddr_b)
4875        );
4876        // Querier not on any local subnet: fall back to None.
4877        assert_eq!(pick(IpAddr::V4(Ipv4Addr::new(172, 16, 0, 1))), None);
4878    }
4879
4880    #[test]
4881    fn test_instance_name() {
4882        assert!(valid_instance_name("my-laser._printer._tcp.local."));
4883        assert!(valid_instance_name("my-laser.._printer._tcp.local."));
4884        assert!(!valid_instance_name("_printer._tcp.local."));
4885    }
4886
4887    #[test]
4888    fn test_legacy_unicast_response() {
4889        // RFC 6762 §6.7: a query whose UDP source port is not 5353 (a
4890        // "legacy" / "one-shot" querier, e.g. Android's getaddrinfo) must
4891        // get its response via unicast, sent back to the querier's source
4892        // address, with the question echoed and the cache-flush bit cleared.
4893        //
4894        // This test sends such a query from an ephemeral port and asserts
4895        // the response arrives on that same socket. The socket is not joined
4896        // to the mDNS multicast group, so a multicast-only reply would never
4897        // reach it — simply receiving the response proves it was unicast.
4898
4899        let intf_ip = match my_ip_interfaces(false)
4900            .into_iter()
4901            .find_map(|intf| match intf.ip() {
4902                IpAddr::V4(ip) => Some(ip),
4903                IpAddr::V6(_) => None,
4904            }) {
4905            Some(ip) => ip,
4906            None => {
4907                println!("No IPv4 interface available; skipping test.");
4908                return;
4909            }
4910        };
4911
4912        // Register a service with a unique hostname on this host.
4913        let daemon = ServiceDaemon::new().expect("Failed to create daemon");
4914        let unique = SystemTime::now()
4915            .duration_since(SystemTime::UNIX_EPOCH)
4916            .unwrap()
4917            .as_micros();
4918        let hostname = format!("legacy-unicast-test-{unique}.local.");
4919        let service_info = ServiceInfo::new(
4920            "_legacy-uni._udp.local.",
4921            "test_instance",
4922            &hostname,
4923            &[IpAddr::V4(intf_ip)] as &[IpAddr],
4924            5353, // arbitrary; the test only resolves the hostname
4925            None,
4926        )
4927        .expect("invalid service info");
4928        daemon.register(service_info).expect("register service");
4929
4930        // A one-shot querier: ephemeral source port, not 5353. Binding to
4931        // `intf_ip` directs the multicast query out that interface, which is
4932        // one the daemon is listening on.
4933        let querier = UdpSocket::bind((intf_ip, 0)).expect("bind querier socket");
4934        querier
4935            .set_multicast_loop_v4(true)
4936            .expect("enable multicast loopback");
4937        querier
4938            .set_read_timeout(Some(Duration::from_millis(500)))
4939            .expect("set read timeout");
4940        assert_ne!(
4941            querier.local_addr().unwrap().port(),
4942            MDNS_PORT,
4943            "querier must use an ephemeral (non-5353) source port"
4944        );
4945
4946        // Build a one-question A-record query for our hostname.
4947        let mut query = DnsOutgoing::new(FLAGS_QR_QUERY);
4948        query.add_question(&hostname, RRType::A);
4949        let query_packet = query
4950            .to_data_on_wire()
4951            .pop()
4952            .expect("query serialized to one packet");
4953
4954        let if_id = InterfaceId {
4955            name: "test".to_string(),
4956            index: 0,
4957        };
4958
4959        // The service is announced asynchronously after register(), so retry
4960        // the query until our answer comes back or the deadline passes.
4961        let deadline = Instant::now() + Duration::from_secs(8);
4962        let mut response = None;
4963        'outer: while Instant::now() < deadline {
4964            querier
4965                .send_to(&query_packet, (GROUP_ADDR_V4, MDNS_PORT))
4966                .expect("send query");
4967
4968            // Drain whatever has arrived; on read timeout the loop ends and
4969            // we re-send the query.
4970            let mut buf = [0u8; 1500];
4971            while let Ok((len, from)) = querier.recv_from(&mut buf) {
4972                let Ok(msg) = DnsIncoming::new(buf[..len].to_vec(), if_id.clone()) else {
4973                    continue;
4974                };
4975                if msg.is_response()
4976                    && msg
4977                        .answers()
4978                        .iter()
4979                        .any(|a| a.get_name().eq_ignore_ascii_case(&hostname))
4980                {
4981                    response = Some((msg, from));
4982                    break 'outer;
4983                }
4984            }
4985        }
4986
4987        let (msg, from) = response.expect(
4988            "expected a unicast response to the legacy query; \
4989             a multicast-only reply would never reach this un-joined socket",
4990        );
4991
4992        // The reply came back to our ephemeral socket, from the mDNS port.
4993        assert_eq!(
4994            from.port(),
4995            MDNS_PORT,
4996            "response should originate from the mDNS port"
4997        );
4998
4999        // RFC 6762 §6.7: the original question must be echoed.
5000        assert!(
5001            msg.questions()
5002                .iter()
5003                .any(|q| q.entry_name().eq_ignore_ascii_case(&hostname)),
5004            "legacy unicast response must echo the question section"
5005        );
5006
5007        // RFC 6762 §6.7 / §10.2: the answer must be the A record we asked
5008        // for, with the cache-flush bit cleared.
5009        let answer = msg
5010            .answers()
5011            .iter()
5012            .find(|a| a.get_name().eq_ignore_ascii_case(&hostname))
5013            .expect("response contains an answer for our hostname");
5014        assert_eq!(
5015            answer.get_type(),
5016            RRType::A,
5017            "an A query should be answered with an A record"
5018        );
5019        assert!(
5020            !answer.get_cache_flush(),
5021            "legacy unicast responses must clear the cache-flush bit"
5022        );
5023
5024        daemon.shutdown().unwrap();
5025    }
5026
5027    #[test]
5028    fn test_check_service_name_length() {
5029        let result = check_service_name_length("_tcp", 100);
5030        assert!(result.is_err());
5031        if let Err(e) = result {
5032            println!("{}", e);
5033        }
5034    }
5035
5036    #[test]
5037    fn test_check_hostname() {
5038        // valid hostnames
5039        for hostname in &[
5040            "my_host.local.",
5041            &("A".repeat(255 - ".local.".len()) + ".local."),
5042        ] {
5043            let result = check_hostname(hostname);
5044            assert!(result.is_ok());
5045        }
5046
5047        // erroneous hostnames
5048        for hostname in &[
5049            "my_host.local",
5050            ".local.",
5051            &("A".repeat(256 - ".local.".len()) + ".local."),
5052        ] {
5053            let result = check_hostname(hostname);
5054            assert!(result.is_err());
5055            if let Err(e) = result {
5056                println!("{}", e);
5057            }
5058        }
5059    }
5060
5061    #[test]
5062    fn test_check_domain_suffix() {
5063        assert!(check_domain_suffix("_missing_dot._tcp.local").is_err());
5064        assert!(check_domain_suffix("_missing_bar.tcp.local.").is_err());
5065        assert!(check_domain_suffix("_mis_spell._tpp.local.").is_err());
5066        assert!(check_domain_suffix("_mis_spell._upp.local.").is_err());
5067        assert!(check_domain_suffix("_has_dot._tcp.local.").is_ok());
5068        assert!(check_domain_suffix("_goodname._udp.local.").is_ok());
5069    }
5070
5071    #[test]
5072    fn test_service_with_temporarily_invalidated_ptr() {
5073        // Create a daemon
5074        let d = ServiceDaemon::new().expect("Failed to create daemon");
5075
5076        let service = "_test_inval_ptr._udp.local.";
5077        let host_name = "my_host_tmp_invalidated_ptr.local.";
5078        let intfs: Vec<_> = my_ip_interfaces(false);
5079        let intf_ips: Vec<_> = intfs.iter().map(|intf| intf.ip()).collect();
5080        let port = 5201;
5081        let my_service =
5082            ServiceInfo::new(service, "my_instance", host_name, &intf_ips[..], port, None)
5083                .expect("invalid service info")
5084                .enable_addr_auto();
5085        let result = d.register(my_service.clone());
5086        assert!(result.is_ok());
5087
5088        // Browse for a service
5089        let browse_chan = d.browse(service).unwrap();
5090        let timeout = Duration::from_secs(2);
5091        let mut resolved = false;
5092
5093        while let Ok(event) = browse_chan.recv_timeout(timeout) {
5094            match event {
5095                ServiceEvent::ServiceResolved(info) => {
5096                    resolved = true;
5097                    println!("Resolved a service of {}", &info.fullname);
5098                    break;
5099                }
5100                e => {
5101                    println!("Received event {:?}", e);
5102                }
5103            }
5104        }
5105
5106        assert!(resolved);
5107
5108        println!("Stopping browse of {}", service);
5109        // Pause browsing so restarting will cause a new immediate query.
5110        // Unregistering will not work here, it will invalidate all the records.
5111        d.stop_browse(service).unwrap();
5112
5113        // Ensure the search is stopped.
5114        // Reduces the chance of receiving an answer adding the ptr back to the
5115        // cache causing the later browse to return directly from the cache.
5116        // (which invalidates what this test is trying to test for.)
5117        let mut stopped = false;
5118        while let Ok(event) = browse_chan.recv_timeout(timeout) {
5119            match event {
5120                ServiceEvent::SearchStopped(_) => {
5121                    stopped = true;
5122                    println!("Stopped browsing service");
5123                    break;
5124                }
5125                // Other `ServiceResolved` messages may be received
5126                // here as they come from different interfaces.
5127                // That's fine for this test.
5128                e => {
5129                    println!("Received event {:?}", e);
5130                }
5131            }
5132        }
5133
5134        assert!(stopped);
5135
5136        // Invalidate the ptr from the service to the host.
5137        let invalidate_ptr_packet = DnsPointer::new(
5138            my_service.get_type(),
5139            RRType::PTR,
5140            CLASS_IN,
5141            0,
5142            my_service.get_fullname().to_string(),
5143        );
5144
5145        let mut packet_buffer = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
5146        packet_buffer.add_additional_answer(invalidate_ptr_packet);
5147
5148        for intf in intfs {
5149            let sock = _new_socket_bind(&intf, true).unwrap();
5150            send_dns_outgoing_impl(
5151                &packet_buffer,
5152                &intf.name,
5153                intf.index.unwrap_or(0),
5154                &intf.addr,
5155                &sock.pktinfo,
5156                MDNS_PORT,
5157                None,
5158            )
5159            .unwrap();
5160        }
5161
5162        println!(
5163            "Sent PTR record invalidation. Starting second browse for {}",
5164            service
5165        );
5166
5167        // Restart the browse to force the sender to re-send the announcements.
5168        let browse_chan = d.browse(service).unwrap();
5169
5170        resolved = false;
5171        while let Ok(event) = browse_chan.recv_timeout(timeout) {
5172            match event {
5173                ServiceEvent::ServiceResolved(info) => {
5174                    resolved = true;
5175                    println!("Resolved a service of {}", &info.fullname);
5176                    break;
5177                }
5178                e => {
5179                    println!("Received event {:?}", e);
5180                }
5181            }
5182        }
5183
5184        assert!(resolved);
5185        d.shutdown().unwrap();
5186    }
5187
5188    #[test]
5189    fn test_expired_srv() {
5190        // construct service info
5191        let service_type = "_expired-srv._udp.local.";
5192        let instance = "test_instance";
5193        let host_name = "expired_srv_host.local.";
5194        let mut my_service = ServiceInfo::new(service_type, instance, host_name, "", 5023, None)
5195            .unwrap()
5196            .enable_addr_auto();
5197        // let fullname = my_service.get_fullname().to_string();
5198
5199        // set SRV to expire soon.
5200        let new_ttl = 3; // for testing only.
5201        my_service._set_host_ttl(new_ttl);
5202
5203        // register my service
5204        let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
5205        let result = mdns_server.register(my_service);
5206        assert!(result.is_ok());
5207
5208        let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
5209        let browse_chan = mdns_client.browse(service_type).unwrap();
5210        let timeout = Duration::from_secs(2);
5211        let mut resolved = false;
5212
5213        while let Ok(event) = browse_chan.recv_timeout(timeout) {
5214            if let ServiceEvent::ServiceResolved(info) = event {
5215                resolved = true;
5216                println!("Resolved a service of {}", &info.fullname);
5217                break;
5218            }
5219        }
5220
5221        assert!(resolved);
5222
5223        // Exit the server so that no more responses.
5224        mdns_server.shutdown().unwrap();
5225
5226        // SRV record in the client cache will expire.
5227        let expire_timeout = Duration::from_secs(new_ttl as u64);
5228        while let Ok(event) = browse_chan.recv_timeout(expire_timeout) {
5229            if let ServiceEvent::ServiceRemoved(service_type, full_name) = event {
5230                println!("Service removed: {}: {}", &service_type, &full_name);
5231                break;
5232            }
5233        }
5234    }
5235
5236    #[test]
5237    fn test_hostname_resolution_address_removed() {
5238        // Create a mDNS server
5239        let server = ServiceDaemon::new().expect("Failed to create server");
5240        let hostname = "addr_remove_host._tcp.local.";
5241        let service_ip_addr: ScopedIp = my_ip_interfaces(false)
5242            .iter()
5243            .find(|iface| iface.ip().is_ipv4())
5244            .map(|iface| iface.into())
5245            .unwrap();
5246
5247        let mut my_service = ServiceInfo::new(
5248            "_host_res_test._tcp.local.",
5249            "my_instance",
5250            hostname,
5251            service_ip_addr.to_ip_addr(),
5252            1234,
5253            None,
5254        )
5255        .expect("invalid service info");
5256
5257        // Set a short TTL for addresses for testing.
5258        let addr_ttl = 2;
5259        my_service._set_host_ttl(addr_ttl); // Expire soon
5260
5261        server.register(my_service).unwrap();
5262
5263        // Create a mDNS client for resolving the hostname.
5264        let client = ServiceDaemon::new().expect("Failed to create client");
5265        let event_receiver = client.resolve_hostname(hostname, None).unwrap();
5266        let resolved = loop {
5267            match event_receiver.recv() {
5268                Ok(HostnameResolutionEvent::AddressesFound(found_hostname, addresses)) => {
5269                    assert!(found_hostname == hostname);
5270                    assert!(addresses.contains(&service_ip_addr));
5271                    println!("address found: {:?}", &addresses);
5272                    break true;
5273                }
5274                Ok(HostnameResolutionEvent::SearchStopped(_)) => break false,
5275                Ok(_event) => {}
5276                Err(_) => break false,
5277            }
5278        };
5279
5280        assert!(resolved);
5281
5282        // Shutdown the server so no more responses / refreshes for addresses.
5283        server.shutdown().unwrap();
5284
5285        // Wait till hostname address record expires, with 1 second grace period.
5286        let timeout = Duration::from_secs(addr_ttl as u64 + 1);
5287        let removed = loop {
5288            match event_receiver.recv_timeout(timeout) {
5289                Ok(HostnameResolutionEvent::AddressesRemoved(removed_host, addresses)) => {
5290                    assert!(removed_host == hostname);
5291                    assert!(addresses.contains(&service_ip_addr));
5292
5293                    println!(
5294                        "address removed: hostname: {} addresses: {:?}",
5295                        &hostname, &addresses
5296                    );
5297                    break true;
5298                }
5299                Ok(_event) => {}
5300                Err(_) => {
5301                    break false;
5302                }
5303            }
5304        };
5305
5306        assert!(removed);
5307
5308        client.shutdown().unwrap();
5309    }
5310
5311    #[test]
5312    fn test_refresh_ptr() {
5313        // construct service info
5314        let service_type = "_refresh-ptr._udp.local.";
5315        let instance = "test_instance";
5316        let host_name = "refresh_ptr_host.local.";
5317        let service_ip_addr = my_ip_interfaces(false)
5318            .iter()
5319            .find(|iface| iface.ip().is_ipv4())
5320            .map(|iface| iface.ip())
5321            .unwrap();
5322
5323        let mut my_service = ServiceInfo::new(
5324            service_type,
5325            instance,
5326            host_name,
5327            service_ip_addr,
5328            5023,
5329            None,
5330        )
5331        .unwrap();
5332
5333        let new_ttl = 3; // for testing only.
5334        my_service._set_other_ttl(new_ttl);
5335
5336        // register my service
5337        let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
5338        let result = mdns_server.register(my_service);
5339        assert!(result.is_ok());
5340
5341        let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
5342        let browse_chan = mdns_client.browse(service_type).unwrap();
5343        let timeout = Duration::from_millis(1500); // Give at least 1 second for the service probing.
5344        let mut resolved = false;
5345
5346        // resolve the service first.
5347        while let Ok(event) = browse_chan.recv_timeout(timeout) {
5348            if let ServiceEvent::ServiceResolved(info) = event {
5349                resolved = true;
5350                println!("Resolved a service of {}", &info.fullname);
5351                break;
5352            }
5353        }
5354
5355        assert!(resolved);
5356
5357        // wait over 80% of TTL, and refresh PTR should be sent out.
5358        let timeout = Duration::from_millis(new_ttl as u64 * 1000 * 90 / 100);
5359        while let Ok(event) = browse_chan.recv_timeout(timeout) {
5360            println!("event: {:?}", &event);
5361        }
5362
5363        // verify refresh counter.
5364        let metrics_chan = mdns_client.get_metrics().unwrap();
5365        let metrics = metrics_chan.recv_timeout(timeout).unwrap();
5366        let ptr_refresh_counter = metrics["cache-refresh-ptr"];
5367        assert_eq!(ptr_refresh_counter, 1);
5368        let srvtxt_refresh_counter = metrics["cache-refresh-srv-txt"];
5369        assert_eq!(srvtxt_refresh_counter, 1);
5370
5371        // Exit the server so that no more responses.
5372        mdns_server.shutdown().unwrap();
5373        mdns_client.shutdown().unwrap();
5374    }
5375
5376    #[test]
5377    fn test_name_change() {
5378        assert_eq!(name_change("foo.local."), "foo (2).local.");
5379        assert_eq!(name_change("foo (2).local."), "foo (3).local.");
5380        assert_eq!(name_change("foo (9).local."), "foo (10).local.");
5381        assert_eq!(name_change("foo"), "foo (2)");
5382        assert_eq!(name_change("foo (2)"), "foo (3)");
5383        assert_eq!(name_change(""), " (2)");
5384
5385        // Additional edge cases
5386        assert_eq!(name_change("foo (abc)"), "foo (abc) (2)"); // Invalid number
5387        assert_eq!(name_change("foo (2"), "foo (2 (2)"); // Missing closing parenthesis
5388        assert_eq!(name_change("foo (2) extra"), "foo (2) extra (2)"); // Extra text after number
5389    }
5390
5391    #[test]
5392    fn test_hostname_change() {
5393        assert_eq!(hostname_change("foo.local."), "foo-2.local.");
5394        assert_eq!(hostname_change("foo"), "foo-2");
5395        assert_eq!(hostname_change("foo-2.local."), "foo-3.local.");
5396        assert_eq!(hostname_change("foo-9"), "foo-10");
5397        assert_eq!(hostname_change("test-42.domain."), "test-43.domain.");
5398    }
5399
5400    #[test]
5401    fn test_add_answer_txt_ttl() {
5402        // construct a simple service info
5403        let service_type = "_test_add_answer._udp.local.";
5404        let instance = "test_instance";
5405        let host_name = "add_answer_host.local.";
5406        let service_intf = my_ip_interfaces(false)
5407            .into_iter()
5408            .find(|iface| iface.ip().is_ipv4())
5409            .unwrap();
5410        let service_ip_addr = service_intf.ip();
5411        let my_service = ServiceInfo::new(
5412            service_type,
5413            instance,
5414            host_name,
5415            service_ip_addr,
5416            5023,
5417            None,
5418        )
5419        .unwrap();
5420
5421        // construct a DnsOutgoing message
5422        let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
5423
5424        // Construct a dummy DnsIncoming message
5425        let mut dummy_data = out.to_data_on_wire();
5426        let interface_id = InterfaceId::from(&service_intf);
5427        let incoming = DnsIncoming::new(dummy_data.pop().unwrap(), interface_id).unwrap();
5428
5429        // Add an answer of TXT type for the service.
5430        let if_addrs = vec![service_intf.ip()];
5431        add_answer_of_service(
5432            &mut out,
5433            &incoming,
5434            instance,
5435            &my_service,
5436            RRType::TXT,
5437            if_addrs,
5438        );
5439
5440        // Check if the answer was added correctly
5441        assert!(
5442            out.answers_count() > 0,
5443            "No answers added to the outgoing message"
5444        );
5445
5446        // Check if the first answer is of type TXT
5447        let answer = out._answers().first().unwrap();
5448        assert_eq!(answer.0.get_type(), RRType::TXT);
5449
5450        // Check TTL is set properly for the TXT record
5451        assert_eq!(answer.0.get_record().get_ttl(), my_service.get_other_ttl());
5452    }
5453
5454    #[test]
5455    fn test_interface_flip() {
5456        // start a server
5457        let ty_domain = "_intf-flip._udp.local.";
5458        let host_name = "intf_flip.local.";
5459        let now = SystemTime::now()
5460            .duration_since(SystemTime::UNIX_EPOCH)
5461            .unwrap();
5462        let instance_name = now.as_micros().to_string(); // Create a unique name.
5463        let port = 5200;
5464
5465        // Get a single IPv4 address
5466        let (ip_addr1, intf_name) = my_ip_interfaces(false)
5467            .iter()
5468            .find(|iface| iface.ip().is_ipv4())
5469            .map(|iface| (iface.ip(), iface.name.clone()))
5470            .unwrap();
5471
5472        println!("Using interface {} with IP {}", intf_name, ip_addr1);
5473
5474        // Register the service.
5475        let service1 = ServiceInfo::new(ty_domain, &instance_name, host_name, ip_addr1, port, None)
5476            .expect("valid service info");
5477        let server1 = ServiceDaemon::new().expect("failed to start server");
5478        server1
5479            .register(service1)
5480            .expect("Failed to register service1");
5481
5482        // wait for the service announced.
5483        std::thread::sleep(Duration::from_secs(2));
5484
5485        // start a client
5486        let client = ServiceDaemon::new().expect("failed to start client");
5487
5488        let receiver = client.browse(ty_domain).unwrap();
5489
5490        let timeout = Duration::from_secs(3);
5491        let mut got_data = false;
5492
5493        while let Ok(event) = receiver.recv_timeout(timeout) {
5494            if let ServiceEvent::ServiceResolved(_) = event {
5495                println!("Received ServiceResolved event");
5496                got_data = true;
5497                break;
5498            }
5499        }
5500
5501        assert!(got_data, "Should receive ServiceResolved event");
5502
5503        // Set a short IP check interval to detect interface changes quickly.
5504        client.set_ip_check_interval(1).unwrap();
5505
5506        // Now shutdown the interface and expect the client to lose the service.
5507        println!("Shutting down interface {}", &intf_name);
5508        client.test_down_interface(&intf_name).unwrap();
5509
5510        let mut got_removed = false;
5511
5512        while let Ok(event) = receiver.recv_timeout(timeout) {
5513            if let ServiceEvent::ServiceRemoved(ty_domain, instance) = event {
5514                got_removed = true;
5515                println!("removed: {ty_domain} : {instance}");
5516                break;
5517            }
5518        }
5519        assert!(got_removed, "Should receive ServiceRemoved event");
5520
5521        println!("Bringing up interface {}", &intf_name);
5522        client.test_up_interface(&intf_name).unwrap();
5523        let mut got_data = false;
5524        while let Ok(event) = receiver.recv_timeout(timeout) {
5525            if let ServiceEvent::ServiceResolved(resolved) = event {
5526                got_data = true;
5527                println!("Received ServiceResolved: {:?}", resolved);
5528                break;
5529            }
5530        }
5531        assert!(
5532            got_data,
5533            "Should receive ServiceResolved event after interface is back up"
5534        );
5535
5536        server1.shutdown().unwrap();
5537        client.shutdown().unwrap();
5538    }
5539
5540    #[test]
5541    fn test_cache_only() {
5542        // construct service info
5543        let service_type = "_cache_only._udp.local.";
5544        let instance = "test_instance";
5545        let host_name = "cache_only_host.local.";
5546        let service_ip_addr = my_ip_interfaces(false)
5547            .iter()
5548            .find(|iface| iface.ip().is_ipv4())
5549            .map(|iface| iface.ip())
5550            .unwrap();
5551
5552        let mut my_service = ServiceInfo::new(
5553            service_type,
5554            instance,
5555            host_name,
5556            service_ip_addr,
5557            5023,
5558            None,
5559        )
5560        .unwrap();
5561
5562        let new_ttl = 3; // for testing only.
5563        my_service._set_other_ttl(new_ttl);
5564
5565        let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
5566
5567        // make a single browse request to record that we are interested in the service.  This ensures that
5568        // subsequent announcements are cached.
5569        let browse_chan = mdns_client.browse_cache(service_type).unwrap();
5570        std::thread::sleep(Duration::from_secs(2));
5571
5572        // register my service
5573        let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
5574        let result = mdns_server.register(my_service);
5575        assert!(result.is_ok());
5576
5577        let timeout = Duration::from_millis(1500); // Give at least 1 second for the service probing.
5578        let mut resolved = false;
5579
5580        // resolve the service.
5581        while let Ok(event) = browse_chan.recv_timeout(timeout) {
5582            if let ServiceEvent::ServiceResolved(info) = event {
5583                resolved = true;
5584                println!("Resolved a service of {}", &info.get_fullname());
5585                break;
5586            }
5587        }
5588
5589        assert!(resolved);
5590
5591        // Exit the server so that no more responses.
5592        mdns_server.shutdown().unwrap();
5593        mdns_client.shutdown().unwrap();
5594    }
5595
5596    #[test]
5597    fn test_cache_only_unsolicited() {
5598        let service_type = "_c_unsolicit._udp.local.";
5599        let instance = "test_instance";
5600        let host_name = "c_unsolicit_host.local.";
5601        let service_ip_addr = my_ip_interfaces(false)
5602            .iter()
5603            .find(|iface| iface.ip().is_ipv4())
5604            .map(|iface| iface.ip())
5605            .unwrap();
5606
5607        let my_service = ServiceInfo::new(
5608            service_type,
5609            instance,
5610            host_name,
5611            service_ip_addr,
5612            5023,
5613            None,
5614        )
5615        .unwrap();
5616
5617        // register my service
5618        let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
5619        let result = mdns_server.register(my_service);
5620        assert!(result.is_ok());
5621
5622        let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
5623        mdns_client.accept_unsolicited(true).unwrap();
5624
5625        // Wait a bit for the service announcements to go out, before calling browse_cache.  This ensures
5626        // that the announcements are treated as unsolicited
5627        std::thread::sleep(Duration::from_secs(2));
5628        let browse_chan = mdns_client.browse_cache(service_type).unwrap();
5629        let timeout = Duration::from_millis(1500); // Give at least 1 second for the service probing.
5630        let mut resolved = false;
5631
5632        // resolve the service.
5633        while let Ok(event) = browse_chan.recv_timeout(timeout) {
5634            if let ServiceEvent::ServiceResolved(info) = event {
5635                resolved = true;
5636                println!("Resolved a service of {}", &info.get_fullname());
5637                break;
5638            }
5639        }
5640
5641        assert!(resolved);
5642
5643        // Exit the server so that no more responses.
5644        mdns_server.shutdown().unwrap();
5645        mdns_client.shutdown().unwrap();
5646    }
5647
5648    #[test]
5649    fn test_custom_port_isolation() {
5650        // This test verifies:
5651        // 1. Daemons on a custom port can communicate with each other
5652        // 2. Daemons on different ports are isolated (no cross-talk)
5653
5654        let service_type = "_custom_port._udp.local.";
5655        let instance_custom = "custom_port_instance";
5656        let instance_default = "default_port_instance";
5657        let host_name = "custom_port_host.local.";
5658
5659        let service_ip_addr = my_ip_interfaces(false)
5660            .iter()
5661            .find(|iface| iface.ip().is_ipv4())
5662            .map(|iface| iface.ip())
5663            .expect("Test requires an IPv4 interface");
5664
5665        // Create service info for custom port (5454)
5666        let service_custom = ServiceInfo::new(
5667            service_type,
5668            instance_custom,
5669            host_name,
5670            service_ip_addr,
5671            8080,
5672            None,
5673        )
5674        .unwrap();
5675
5676        // Create service info for default port (5353)
5677        let service_default = ServiceInfo::new(
5678            service_type,
5679            instance_default,
5680            host_name,
5681            service_ip_addr,
5682            8081,
5683            None,
5684        )
5685        .unwrap();
5686
5687        // Create two daemons on custom port 5454
5688        let custom_port = 5454u16;
5689        let server_custom =
5690            ServiceDaemon::new_with_port(custom_port).expect("Failed to create custom port server");
5691        let client_custom =
5692            ServiceDaemon::new_with_port(custom_port).expect("Failed to create custom port client");
5693
5694        // Create daemon on default port (5353)
5695        let server_default = ServiceDaemon::new().expect("Failed to create default port server");
5696
5697        // Register service on custom port
5698        server_custom
5699            .register(service_custom.clone())
5700            .expect("Failed to register custom port service");
5701
5702        // Register service on default port
5703        server_default
5704            .register(service_default.clone())
5705            .expect("Failed to register default port service");
5706
5707        // Browse from custom port client
5708        let browse_custom = client_custom
5709            .browse(service_type)
5710            .expect("Failed to browse on custom port");
5711
5712        let timeout = Duration::from_secs(3);
5713        let mut found_custom = false;
5714        let mut found_default_on_custom = false;
5715
5716        // Custom port client should find the custom port service
5717        while let Ok(event) = browse_custom.recv_timeout(timeout) {
5718            if let ServiceEvent::ServiceResolved(info) = event {
5719                println!(
5720                    "Custom port client resolved: {} on port {}",
5721                    info.get_fullname(),
5722                    info.get_port()
5723                );
5724                if info.get_fullname().starts_with(instance_custom) {
5725                    found_custom = true;
5726                    assert_eq!(info.get_port(), 8080);
5727                }
5728                if info.get_fullname().starts_with(instance_default) {
5729                    found_default_on_custom = true;
5730                }
5731            }
5732        }
5733
5734        assert!(
5735            found_custom,
5736            "Custom port client should find service on custom port"
5737        );
5738        assert!(
5739            !found_default_on_custom,
5740            "Custom port client should NOT find service on default port"
5741        );
5742
5743        // Now verify the default port daemon can find its own services
5744        // but not the custom port services
5745        let client_default = ServiceDaemon::new().expect("Failed to create default port client");
5746        let browse_default = client_default
5747            .browse(service_type)
5748            .expect("Failed to browse on default port");
5749
5750        let mut found_default = false;
5751        let mut found_custom_on_default = false;
5752
5753        while let Ok(event) = browse_default.recv_timeout(timeout) {
5754            if let ServiceEvent::ServiceResolved(info) = event {
5755                println!(
5756                    "Default port client resolved: {} on port {}",
5757                    info.get_fullname(),
5758                    info.get_port()
5759                );
5760                if info.get_fullname().starts_with(instance_default) {
5761                    found_default = true;
5762                    assert_eq!(info.get_port(), 8081);
5763                }
5764                if info.get_fullname().starts_with(instance_custom) {
5765                    found_custom_on_default = true;
5766                }
5767            }
5768        }
5769
5770        assert!(
5771            found_default,
5772            "Default port client should find service on default port"
5773        );
5774        assert!(
5775            !found_custom_on_default,
5776            "Default port client should NOT find service on custom port"
5777        );
5778
5779        // Cleanup
5780        server_custom.shutdown().unwrap();
5781        client_custom.shutdown().unwrap();
5782        server_default.shutdown().unwrap();
5783        client_default.shutdown().unwrap();
5784    }
5785}