Skip to main content

mdns_sd/
service_daemon.rs

1//! Service daemon for mDNS Service Discovery.
2
3// How DNS-based Service Discovery works in a nutshell:
4//
5// (excerpt from RFC 6763)
6// .... that a particular service instance can be
7//    described using a DNS SRV [RFC2782] and DNS TXT [RFC1035] record.
8//    The SRV record has a name of the form "<Instance>.<Service>.<Domain>"
9//    and gives the target host and port where the service instance can be
10//    reached.  The DNS TXT record of the same name gives additional
11//    information about this instance, in a structured form using key/value
12//    pairs, described in Section 6.  A client discovers the list of
13//    available instances of a given service type using a query for a DNS
14//    PTR [RFC1035] record with a name of the form "<Service>.<Domain>",
15//    which returns a set of zero or more names, which are the names of the
16//    aforementioned DNS SRV/TXT record pairs.
17//
18// Some naming conventions in this source code:
19//
20// `ty_domain` refers to service type together with domain name, i.e. <service>.<domain>.
21// Every <service> consists of two labels: service itself and "_udp." or "_tcp".
22// See RFC 6763 section 7 Service Names.
23//     for example: `_my-service._udp.local.`
24//
25// `fullname` refers to a full Service Instance Name, i.e. <instance>.<service>.<domain>
26//     for example: `my_home._my-service._udp.local.`
27//
28// In mDNS and DNS, the basic data structure is "Resource Record" (RR), where
29// in Service Discovery, the basic data structure is "Service Info". One Service Info
30// corresponds to a set of DNS Resource Records.
31#[cfg(feature = "logging")]
32use crate::log::{debug, error, trace};
33use crate::{
34    current_time_millis,
35    dns_cache::{DnsCache, IpType},
36    dns_parser::{
37        ip_address_rr_type, DnsAddress, DnsEntryExt, DnsIncoming, DnsOutgoing, DnsPointer,
38        DnsRecordBox, DnsRecordExt, DnsSrv, DnsTxt, InterfaceId, RRType, ScopedIp,
39        CLASS_CACHE_FLUSH, CLASS_IN, FLAGS_AA, FLAGS_QR_QUERY, FLAGS_QR_RESPONSE, MAX_MSG_ABSOLUTE,
40    },
41    error::{e_fmt, Error, Result},
42    service_info::{valid_ip_on_intf, DnsRegistry, MyIntf, Probe, ServiceInfo, ServiceStatus},
43    Receiver, ResolvedService, TxtProperties,
44};
45use flume::{bounded, Sender, TrySendError};
46use if_addrs::{IfAddr, Interface};
47use mio::{event::Source, net::UdpSocket as MioUdpSocket, Interest, Poll, Registry, Token};
48use socket2::Domain;
49use socket_pktinfo::PktInfoUdpSocket;
50use std::{
51    cmp::{self, Reverse},
52    collections::{hash_map::Entry, BinaryHeap, HashMap, HashSet},
53    fmt, io,
54    net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, UdpSocket},
55    str, thread,
56    time::Duration,
57    vec,
58};
59
60/// The default max length of the service name without domain, not including the
61/// leading underscore (`_`). It is set to 15 per
62/// [RFC 6763 section 7.2](https://www.rfc-editor.org/rfc/rfc6763#section-7.2).
63pub const SERVICE_NAME_LEN_MAX_DEFAULT: u8 = 15;
64
65/// The default interval for checking IP changes automatically.
66pub const IP_CHECK_INTERVAL_IN_SECS_DEFAULT: u32 = 5;
67
68/// The default time out for [ServiceDaemon::verify] is 10 seconds, per
69/// [RFC 6762 section 10.4](https://datatracker.ietf.org/doc/html/rfc6762#section-10.4)
70pub const VERIFY_TIMEOUT_DEFAULT: Duration = Duration::from_secs(10);
71
72/// The mDNS port number per RFC 6762.
73pub const MDNS_PORT: u16 = 5353;
74
75const GROUP_ADDR_V4: Ipv4Addr = Ipv4Addr::new(224, 0, 0, 251);
76const GROUP_ADDR_V6: Ipv6Addr = Ipv6Addr::new(0xff02, 0, 0, 0, 0, 0, 0, 0xfb);
77const LOOPBACK_V4: Ipv4Addr = Ipv4Addr::new(127, 0, 0, 1);
78
79const RESOLVE_WAIT_IN_MILLIS: u64 = 500;
80
81/// Response status code for the service `unregister` call.
82#[derive(Debug)]
83pub enum UnregisterStatus {
84    /// Unregister was successful.
85    OK,
86    /// The service was not found in the registration.
87    NotFound,
88}
89
90/// Status code for the service daemon.
91#[derive(Debug, PartialEq, Clone, Eq)]
92#[non_exhaustive]
93pub enum DaemonStatus {
94    /// The daemon is running as normal.
95    Running,
96
97    /// The daemon has been shutdown.
98    Shutdown,
99}
100
101/// Different counters included in the metrics.
102/// Currently all counters are for outgoing packets.
103#[derive(Hash, Eq, PartialEq)]
104enum Counter {
105    Register,
106    RegisterResend,
107    Unregister,
108    UnregisterResend,
109    Browse,
110    ResolveHostname,
111    Respond,
112    CacheRefreshPTR,
113    CacheRefreshSrvTxt,
114    CacheRefreshAddr,
115    KnownAnswerSuppression,
116    CachedPTR,
117    CachedSRV,
118    CachedAddr,
119    CachedTxt,
120    CachedNSec,
121    CachedSubtype,
122    DnsRegistryProbe,
123    DnsRegistryActive,
124    DnsRegistryTimer,
125    DnsRegistryNameChange,
126    Timer,
127}
128
129impl fmt::Display for Counter {
130    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
131        match self {
132            Self::Register => write!(f, "register"),
133            Self::RegisterResend => write!(f, "register-resend"),
134            Self::Unregister => write!(f, "unregister"),
135            Self::UnregisterResend => write!(f, "unregister-resend"),
136            Self::Browse => write!(f, "browse"),
137            Self::ResolveHostname => write!(f, "resolve-hostname"),
138            Self::Respond => write!(f, "respond"),
139            Self::CacheRefreshPTR => write!(f, "cache-refresh-ptr"),
140            Self::CacheRefreshSrvTxt => write!(f, "cache-refresh-srv-txt"),
141            Self::CacheRefreshAddr => write!(f, "cache-refresh-addr"),
142            Self::KnownAnswerSuppression => write!(f, "known-answer-suppression"),
143            Self::CachedPTR => write!(f, "cached-ptr"),
144            Self::CachedSRV => write!(f, "cached-srv"),
145            Self::CachedAddr => write!(f, "cached-addr"),
146            Self::CachedTxt => write!(f, "cached-txt"),
147            Self::CachedNSec => write!(f, "cached-nsec"),
148            Self::CachedSubtype => write!(f, "cached-subtype"),
149            Self::DnsRegistryProbe => write!(f, "dns-registry-probe"),
150            Self::DnsRegistryActive => write!(f, "dns-registry-active"),
151            Self::DnsRegistryTimer => write!(f, "dns-registry-timer"),
152            Self::DnsRegistryNameChange => write!(f, "dns-registry-name-change"),
153            Self::Timer => write!(f, "timer"),
154        }
155    }
156}
157
158#[derive(Debug)]
159enum InternalError {
160    IntfAddrInvalid(Interface),
161}
162
163impl fmt::Display for InternalError {
164    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
165        match self {
166            InternalError::IntfAddrInvalid(iface) => write!(f, "interface addr invalid: {iface:?}"),
167        }
168    }
169}
170
171type MyResult<T> = core::result::Result<T, InternalError>;
172
173/// A wrapper around UDP socket used by the mDNS daemon.
174///
175/// We do this because `mio` does not support PKTINFO and
176/// does not provide a way to implement `Source` trait directly and safely.
177struct MyUdpSocket {
178    /// The underlying socket that supports control messages like
179    /// `IP_PKTINFO` for IPv4 and `IPV6_PKTINFO` for IPv6.
180    pktinfo: PktInfoUdpSocket,
181
182    /// The mio UDP socket that is a clone of `pktinfo` and
183    /// is used for event polling.
184    mio: MioUdpSocket,
185}
186
187impl MyUdpSocket {
188    pub fn new(pktinfo: PktInfoUdpSocket) -> io::Result<Self> {
189        let std_sock = pktinfo.try_clone_std()?;
190        let mio = MioUdpSocket::from_std(std_sock);
191
192        Ok(Self { pktinfo, mio })
193    }
194}
195
196/// Implements the mio `Source` trait so that we can use `MyUdpSocket` with `Poll`.
197impl Source for MyUdpSocket {
198    fn register(
199        &mut self,
200        registry: &Registry,
201        token: Token,
202        interests: Interest,
203    ) -> io::Result<()> {
204        self.mio.register(registry, token, interests)
205    }
206
207    fn reregister(
208        &mut self,
209        registry: &Registry,
210        token: Token,
211        interests: Interest,
212    ) -> io::Result<()> {
213        self.mio.reregister(registry, token, interests)
214    }
215
216    fn deregister(&mut self, registry: &Registry) -> std::io::Result<()> {
217        self.mio.deregister(registry)
218    }
219}
220
221/// The metrics is a HashMap of (name_key, i64_value).
222/// The main purpose is to help monitoring the mDNS packet traffic.
223pub type Metrics = HashMap<String, i64>;
224
225const IPV4_SOCK_EVENT_KEY: usize = 4; // Pick a key just to indicate IPv4.
226const IPV6_SOCK_EVENT_KEY: usize = 6; // Pick a key just to indicate IPv6.
227const SIGNAL_SOCK_EVENT_KEY: usize = usize::MAX - 1; // avoid to overlap with zc.poll_ids
228
229/// A daemon thread for mDNS
230///
231/// This struct provides a handle and an API to the daemon. It is cloneable.
232#[derive(Clone)]
233pub struct ServiceDaemon {
234    /// Sender handle of the channel to the daemon.
235    sender: Sender<Command>,
236
237    /// Send to this addr to signal that a `Command` is coming.
238    ///
239    /// The daemon listens on this addr together with other mDNS sockets,
240    /// to avoid busy polling the flume channel. If there is a way to poll
241    /// the channel and mDNS sockets together, then this can be removed.
242    signal_addr: SocketAddr,
243}
244
245impl ServiceDaemon {
246    /// Creates a new daemon and spawns a thread to run the daemon.
247    ///
248    /// Creates a new mDNS service daemon using the default port (5353).
249    ///
250    /// For development/testing with custom ports, use [`ServiceDaemon::new_with_port`].
251    ///
252    /// # Errors
253    ///
254    /// Returns [`Error::Msg`] if the daemon cannot be initialized. This wraps an
255    /// underlying OS-level failure.
256    ///
257    /// Note that this constructor does not open the mDNS multicast sockets — those
258    /// are opened lazily by the daemon thread once it starts, so platform issues
259    /// such as "multicast not permitted" are surfaced later via [`DaemonEvent`] from
260    /// [`monitor`](Self::monitor) rather than here.
261    pub fn new() -> Result<Self> {
262        Self::new_with_port(MDNS_PORT)
263    }
264
265    /// Creates a new mDNS service daemon using a custom port.
266    ///
267    /// # Arguments
268    ///
269    /// * `port` - The UDP port to bind for mDNS communication.
270    ///   - In production, this should be `MDNS_PORT` (5353) per RFC 6762.
271    ///   - For development/testing, you can use a non-standard port (e.g., 5454)
272    ///     to avoid conflicts with system mDNS services.
273    ///   - Both publisher and browser must use the same port to communicate.
274    ///
275    /// # Example
276    ///
277    /// ```no_run
278    /// use mdns_sd::ServiceDaemon;
279    ///
280    /// // Use standard mDNS port (production)
281    /// let daemon = ServiceDaemon::new_with_port(5353)?;
282    ///
283    /// // Use custom port for development (avoids macOS Bonjour conflict)
284    /// let daemon_dev = ServiceDaemon::new_with_port(5454)?;
285    /// # Ok::<(), mdns_sd::Error>(())
286    /// ```
287    ///
288    /// # Errors
289    ///
290    /// See [`new`](Self::new) for the set of OS-level failures that may surface
291    /// here. Note that `port` is *not* validated against the kernel until the
292    /// daemon thread tries to bind the mDNS sockets, so an unusable `port`
293    /// (e.g., already in use, requires elevated privileges) will not be
294    /// reported by this constructor — listen for such failures via
295    /// [`monitor`](Self::monitor).
296    pub fn new_with_port(port: u16) -> Result<Self> {
297        // Use port 0 to allow the system assign a random available port,
298        // no need for a pre-defined port number.
299        let signal_addr = SocketAddrV4::new(LOOPBACK_V4, 0);
300
301        let signal_sock = UdpSocket::bind(signal_addr)
302            .map_err(|e| e_fmt!("failed to create signal_sock for daemon: {}", e))?;
303
304        // Get the socket with the OS chosen port
305        let signal_addr = signal_sock
306            .local_addr()
307            .map_err(|e| e_fmt!("failed to get signal sock addr: {}", e))?;
308
309        // Must be nonblocking so we can listen to it together with mDNS sockets.
310        signal_sock
311            .set_nonblocking(true)
312            .map_err(|e| e_fmt!("failed to set nonblocking for signal socket: {}", e))?;
313
314        let poller = Poll::new().map_err(|e| e_fmt!("failed to create mio Poll: {e}"))?;
315
316        let (sender, receiver) = bounded(100);
317
318        // Spawn the daemon thread
319        let mio_sock = MioUdpSocket::from_std(signal_sock);
320        let cmd_sender = sender.clone();
321        thread::Builder::new()
322            .name("mDNS_daemon".to_string())
323            .spawn(move || {
324                Self::daemon_thread(mio_sock, poller, receiver, port, cmd_sender, signal_addr)
325            })
326            .map_err(|e| e_fmt!("thread builder failed to spawn: {}", e))?;
327
328        Ok(Self {
329            sender,
330            signal_addr,
331        })
332    }
333
334    /// Sends `cmd` to the daemon via its channel, and sends a signal
335    /// to its sock addr to notify.
336    fn send_cmd(&self, cmd: Command) -> Result<()> {
337        let cmd_name = cmd.to_string();
338
339        // First, send to the flume channel.
340        self.sender.try_send(cmd).map_err(|e| match e {
341            TrySendError::Full(_) => Error::Again,
342            TrySendError::Disconnected(_) => Error::DaemonShutdown,
343        })?;
344
345        // Second, send a signal to notify the daemon.
346        let addr = SocketAddrV4::new(LOOPBACK_V4, 0);
347        let socket = UdpSocket::bind(addr)
348            .map_err(|e| e_fmt!("Failed to create socket to send signal: {}", e))?;
349        socket
350            .send_to(cmd_name.as_bytes(), self.signal_addr)
351            .map_err(|e| {
352                e_fmt!(
353                    "signal socket send_to {} ({}) failed: {}",
354                    self.signal_addr,
355                    cmd_name,
356                    e
357                )
358            })?;
359
360        Ok(())
361    }
362
363    /// Starts browsing for a specific service type.
364    ///
365    /// `service_type` must end with a valid mDNS domain: '._tcp.local.' or '._udp.local.'
366    ///
367    /// Returns a channel `Receiver` to receive events about the service. The caller
368    /// can call `.recv_async().await` on this receiver to handle events in an
369    /// async environment or call `.recv()` in a sync environment.
370    ///
371    /// When a new instance is found, the daemon automatically tries to resolve, i.e.
372    /// finding more details, i.e. SRV records and TXT records.
373    ///
374    /// # Errors
375    ///
376    /// Returns [`Error::Msg`] if `service_type` does not end with
377    /// `._tcp.local.` or `._udp.local.`.
378    ///
379    /// Returns [`Error::Again`] if the daemon's command queue is full.
380    ///
381    /// Returns [`Error::DaemonShutdown`] if the daemon thread has already exited.
382    pub fn browse(&self, service_type: &str) -> Result<Receiver<ServiceEvent>> {
383        check_domain_suffix(service_type)?;
384
385        let (resp_s, resp_r) = bounded(10);
386        self.send_cmd(Command::Browse(service_type.to_string(), 1, false, resp_s))?;
387        Ok(resp_r)
388    }
389
390    /// Preforms a "cache-only" browse.
391    ///
392    /// `service_type` must end with a valid mDNS domain: '._tcp.local.' or '._udp.local.'
393    ///
394    /// The functionality is identical to 'browse', but the service events are based solely on the contents
395    /// of the daemon's cache. No actual mDNS query is sent to the network.
396    ///
397    /// See [accept_unsolicited](Self::accept_unsolicited) if you want to do cache-only browsing.
398    ///
399    /// # Errors
400    ///
401    /// Same error conditions as [`browse`](Self::browse).
402    pub fn browse_cache(&self, service_type: &str) -> Result<Receiver<ServiceEvent>> {
403        check_domain_suffix(service_type)?;
404
405        let (resp_s, resp_r) = bounded(10);
406        self.send_cmd(Command::Browse(service_type.to_string(), 1, true, resp_s))?;
407        Ok(resp_r)
408    }
409
410    /// Stops searching for a specific service type.
411    ///
412    /// # Errors
413    ///
414    /// Returns [`Error::Again`] if the daemon's command queue is full.
415    ///
416    /// Returns [`Error::DaemonShutdown`] if the daemon thread has already exited.
417    pub fn stop_browse(&self, ty_domain: &str) -> Result<()> {
418        self.send_cmd(Command::StopBrowse(ty_domain.to_string()))
419    }
420
421    /// Starts querying for the ip addresses of a hostname.
422    ///
423    /// Returns a channel `Receiver` to receive events about the hostname.
424    /// The caller can call `.recv_async().await` on this receiver to handle events in an
425    /// async environment or call `.recv()` in a sync environment.
426    ///
427    /// The `timeout` is specified in milliseconds.
428    ///
429    /// # Errors
430    ///
431    /// Returns [`Error::Msg`] if:
432    ///
433    /// - `hostname` does not end with `.local.`;
434    /// - `hostname` is exactly `.local.` (the label before `.local.` is empty);
435    /// - `hostname` is longer than 255 bytes.
436    ///
437    /// Returns [`Error::Again`] if the daemon's command queue is full.
438    ///
439    /// Returns [`Error::DaemonShutdown`] if the daemon thread has already exited.
440    pub fn resolve_hostname(
441        &self,
442        hostname: &str,
443        timeout: Option<u64>,
444    ) -> Result<Receiver<HostnameResolutionEvent>> {
445        check_hostname(hostname)?;
446        let (resp_s, resp_r) = bounded(10);
447        self.send_cmd(Command::ResolveHostname(
448            hostname.to_string(),
449            1,
450            resp_s,
451            timeout,
452        ))?;
453        Ok(resp_r)
454    }
455
456    /// Stops querying for the ip addresses of a hostname.
457    ///
458    /// # Errors
459    ///
460    /// Same error conditions as [`stop_browse`](Self::stop_browse).
461    pub fn stop_resolve_hostname(&self, hostname: &str) -> Result<()> {
462        self.send_cmd(Command::StopResolveHostname(hostname.to_string()))
463    }
464
465    /// Registers a service provided by this host.
466    ///
467    /// If `service_info` has no addresses yet and its `addr_auto` is enabled,
468    /// this method will automatically fill in addresses from the host.
469    ///
470    /// To re-announce a service with an updated `service_info`, just call
471    /// this `register` function again. No need to call `unregister` first.
472    ///
473    /// # Errors
474    ///
475    /// Returns [`Error::Msg`] if the [`ServiceInfo`] is malformed, for example:
476    ///
477    /// - the fullname does not end with `._tcp.local.` or `._udp.local.`;
478    /// - the hostname does not end with `.local.`, is exactly `.local.`, or
479    ///   is longer than 255 bytes.
480    ///
481    /// Returns [`Error::Again`] if the daemon's command queue is full.
482    ///
483    /// Returns [`Error::DaemonShutdown`] if the daemon thread has already exited.
484    pub fn register(&self, service_info: ServiceInfo) -> Result<()> {
485        check_service_name(service_info.get_fullname())?;
486        check_hostname(service_info.get_hostname())?;
487
488        self.send_cmd(Command::Register(service_info.into()))
489    }
490
491    /// Unregisters a service. This is a graceful shutdown of a service.
492    ///
493    /// Returns a channel receiver that is used to receive the status code
494    /// of the unregister.
495    ///
496    /// # Errors
497    ///
498    /// Returns [`Error::Again`] if the daemon's command queue is full.
499    ///
500    /// Returns [`Error::DaemonShutdown`] if the daemon thread has already exited.
501    pub fn unregister(&self, fullname: &str) -> Result<Receiver<UnregisterStatus>> {
502        let (resp_s, resp_r) = bounded(1);
503        self.send_cmd(Command::Unregister(fullname.to_lowercase(), resp_s))?;
504        Ok(resp_r)
505    }
506
507    /// Starts to monitor events from the daemon.
508    ///
509    /// Returns a channel [`Receiver`] of [`DaemonEvent`].
510    ///
511    /// # Errors
512    ///
513    /// Returns [`Error::Again`] if the daemon's command queue is full.
514    ///
515    /// Returns [`Error::DaemonShutdown`] if the daemon thread has already exited.
516    pub fn monitor(&self) -> Result<Receiver<DaemonEvent>> {
517        let (resp_s, resp_r) = bounded(100);
518        self.send_cmd(Command::Monitor(resp_s))?;
519        Ok(resp_r)
520    }
521
522    /// Shuts down the daemon thread and returns a channel to receive the status.
523    ///
524    /// # Errors
525    ///
526    /// Returns [`Error::Again`] if the daemon's command queue is full.
527    ///
528    /// Returns [`Error::DaemonShutdown`] if the daemon thread has already exited.
529    pub fn shutdown(&self) -> Result<Receiver<DaemonStatus>> {
530        let (resp_s, resp_r) = bounded(1);
531        self.send_cmd(Command::Exit(resp_s))?;
532        Ok(resp_r)
533    }
534
535    /// Returns the status of the daemon.
536    ///
537    /// # Errors
538    ///
539    /// Returns [`Error::Again`] if the daemon's command queue is full.
540    ///
541    /// Returns [`Error::DaemonShutdown`] if the daemon thread has already exited.
542    pub fn status(&self) -> Result<Receiver<DaemonStatus>> {
543        let (resp_s, resp_r) = bounded(1);
544
545        if self.sender.is_disconnected() {
546            resp_s
547                .send(DaemonStatus::Shutdown)
548                .map_err(|e| e_fmt!("failed to send daemon status to the client: {}", e))?;
549        } else {
550            self.send_cmd(Command::GetStatus(resp_s))?;
551        }
552
553        Ok(resp_r)
554    }
555
556    /// Returns a channel receiver for the metrics, e.g. input/output counters.
557    ///
558    /// The metrics returned is a snapshot. Hence the caller should call
559    /// this method repeatedly if they want to monitor the metrics continuously.
560    ///
561    /// # Errors
562    ///
563    /// Returns [`Error::Again`] if the daemon's command queue is full.
564    ///
565    /// Returns [`Error::DaemonShutdown`] if the daemon thread has already exited.
566    pub fn get_metrics(&self) -> Result<Receiver<Metrics>> {
567        let (resp_s, resp_r) = bounded(1);
568        self.send_cmd(Command::GetMetrics(resp_s))?;
569        Ok(resp_r)
570    }
571
572    /// Change the max length allowed for a service name.
573    ///
574    /// As RFC 6763 defines a length max for a service name, a user should not call
575    /// this method unless they have to. See [`SERVICE_NAME_LEN_MAX_DEFAULT`].
576    ///
577    /// `len_max` is capped at an internal limit, which is currently 30.
578    ///
579    /// # Errors
580    ///
581    /// Returns [`Error::Msg`] if `len_max` exceeds the internal cap (30).
582    ///
583    /// Returns [`Error::Again`] if the daemon's command queue is full.
584    ///
585    /// Returns [`Error::DaemonShutdown`] if the daemon thread has already exited.
586    pub fn set_service_name_len_max(&self, len_max: u8) -> Result<()> {
587        const SERVICE_NAME_LEN_MAX_LIMIT: u8 = 30; // Double the default length max.
588
589        if len_max > SERVICE_NAME_LEN_MAX_LIMIT {
590            return Err(Error::Msg(format!(
591                "service name length max {len_max} is too large"
592            )));
593        }
594
595        self.send_cmd(Command::SetOption(DaemonOption::ServiceNameLenMax(len_max)))
596    }
597
598    /// Change the interval for checking IP changes automatically.
599    ///
600    /// Setting the interval to 0 disables the IP check.
601    ///
602    /// See [`IP_CHECK_INTERVAL_IN_SECS_DEFAULT`] for the default interval.
603    pub fn set_ip_check_interval(&self, interval_in_secs: u32) -> Result<()> {
604        let interval_in_millis = interval_in_secs as u64 * 1000;
605        self.send_cmd(Command::SetOption(DaemonOption::IpCheckInterval(
606            interval_in_millis,
607        )))
608    }
609
610    /// Get the current interval in seconds for checking IP changes automatically.
611    pub fn get_ip_check_interval(&self) -> Result<u32> {
612        let (resp_s, resp_r) = bounded(1);
613        self.send_cmd(Command::GetOption(resp_s))?;
614
615        let option = resp_r
616            .recv_timeout(Duration::from_secs(10))
617            .map_err(|e| e_fmt!("failed to receive ip check interval: {}", e))?;
618        let ip_check_interval_in_secs = option.ip_check_interval / 1000;
619        Ok(ip_check_interval_in_secs as u32)
620    }
621
622    /// Include interfaces that match `if_kind` for this service daemon.
623    ///
624    /// For example:
625    /// ```ignore
626    ///     daemon.enable_interface("en0")?;
627    /// ```
628    pub fn enable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
629        let if_kind_vec = if_kind.into_vec();
630        self.send_cmd(Command::SetOption(DaemonOption::EnableInterface(
631            if_kind_vec.kinds,
632        )))
633    }
634
635    /// Ignore/exclude interfaces that match `if_kind` for this daemon.
636    ///
637    /// For example:
638    /// ```ignore
639    ///     daemon.disable_interface(IfKind::IPv6)?;
640    /// ```
641    pub fn disable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
642        let if_kind_vec = if_kind.into_vec();
643        self.send_cmd(Command::SetOption(DaemonOption::DisableInterface(
644            if_kind_vec.kinds,
645        )))
646    }
647
648    /// If `accept` is true, accept and cache all responses, even if there is no active querier
649    /// for a given service type. This is useful / necessary when doing cache-only browsing. See
650    /// [browse_cache](Self::browse_cache).
651    ///
652    /// If `accept` is false (default), accept only responses matching queries that we have initiated.
653    ///
654    /// For example:
655    /// ```ignore
656    ///     daemon.accept_unsolicited(true)?;
657    /// ```
658    pub fn accept_unsolicited(&self, accept: bool) -> Result<()> {
659        self.send_cmd(Command::SetOption(DaemonOption::AcceptUnsolicited(accept)))
660    }
661
662    /// Include or exclude Apple P2P interfaces, e.g. "awdl0", "llw0".
663    /// By default, they are excluded.
664    pub fn include_apple_p2p(&self, include: bool) -> Result<()> {
665        self.send_cmd(Command::SetOption(DaemonOption::IncludeAppleP2P(include)))
666    }
667
668    #[cfg(test)]
669    pub fn test_down_interface(&self, ifname: &str) -> Result<()> {
670        self.send_cmd(Command::SetOption(DaemonOption::TestDownInterface(
671            ifname.to_string(),
672        )))
673    }
674
675    #[cfg(test)]
676    pub fn test_up_interface(&self, ifname: &str) -> Result<()> {
677        self.send_cmd(Command::SetOption(DaemonOption::TestUpInterface(
678            ifname.to_string(),
679        )))
680    }
681
682    /// Enable or disable the loopback for locally sent multicast packets in IPv4.
683    ///
684    /// By default, multicast loop is enabled for IPv4. When disabled, a querier will not
685    /// receive announcements from a responder on the same host.
686    ///
687    /// Reference: <https://learn.microsoft.com/en-us/windows/win32/winsock/ip-multicast-2>
688    ///
689    /// "The Winsock version of the IP_MULTICAST_LOOP option is semantically different than
690    /// the UNIX version of the IP_MULTICAST_LOOP option:
691    ///
692    /// In Winsock, the IP_MULTICAST_LOOP option applies only to the receive path.
693    /// In the UNIX version, the IP_MULTICAST_LOOP option applies to the send path."
694    ///
695    /// Which means, in order NOT to receive localhost announcements, you want to call
696    /// this API on the querier side on Windows, but on the responder side on Unix.
697    pub fn set_multicast_loop_v4(&self, on: bool) -> Result<()> {
698        self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV4(on)))
699    }
700
701    /// Enable or disable the loopback for locally sent multicast packets in IPv6.
702    ///
703    /// By default, multicast loop is enabled for IPv6. When disabled, a querier will not
704    /// receive announcements from a responder on the same host.
705    ///
706    /// Reference: <https://learn.microsoft.com/en-us/windows/win32/winsock/ip-multicast-2>
707    ///
708    /// "The Winsock version of the IP_MULTICAST_LOOP option is semantically different than
709    /// the UNIX version of the IP_MULTICAST_LOOP option:
710    ///
711    /// In Winsock, the IP_MULTICAST_LOOP option applies only to the receive path.
712    /// In the UNIX version, the IP_MULTICAST_LOOP option applies to the send path."
713    ///
714    /// Which means, in order NOT to receive localhost announcements, you want to call
715    /// this API on the querier side on Windows, but on the responder side on Unix.
716    pub fn set_multicast_loop_v6(&self, on: bool) -> Result<()> {
717        self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV6(on)))
718    }
719
720    /// Proactively confirms whether a service instance still valid.
721    ///
722    /// This call will issue queries for a service instance's SRV record and Address records.
723    ///
724    /// For `timeout`, most users should use [VERIFY_TIMEOUT_DEFAULT]
725    /// unless there is a reason not to follow RFC.
726    ///
727    /// If no response is received within `timeout`, the current resource
728    /// records will be flushed, and if needed, `ServiceRemoved` event will be
729    /// sent to active queriers.
730    ///
731    /// Reference: [RFC 6762](https://datatracker.ietf.org/doc/html/rfc6762#section-10.4)
732    ///
733    /// # Errors
734    ///
735    /// Returns [`Error::Again`] if the daemon's command queue is full.
736    ///
737    /// Returns [`Error::DaemonShutdown`] if the daemon thread has already exited.
738    pub fn verify(&self, instance_fullname: String, timeout: Duration) -> Result<()> {
739        self.send_cmd(Command::Verify(instance_fullname, timeout))
740    }
741
742    fn daemon_thread(
743        signal_sock: MioUdpSocket,
744        poller: Poll,
745        receiver: Receiver<Command>,
746        port: u16,
747        cmd_sender: Sender<Command>,
748        signal_addr: SocketAddr,
749    ) {
750        let mut zc = Zeroconf::new(signal_sock, poller, port, cmd_sender, signal_addr);
751
752        if let Some(cmd) = zc.run(receiver) {
753            match cmd {
754                Command::Exit(resp_s) => {
755                    // It is guaranteed that the receiver already dropped,
756                    // i.e. the daemon command channel closed.
757                    if let Err(e) = resp_s.send(DaemonStatus::Shutdown) {
758                        debug!("exit: failed to send response of shutdown: {}", e);
759                    }
760                }
761                _ => {
762                    debug!("Unexpected command: {:?}", cmd);
763                }
764            }
765        }
766    }
767}
768
769/// Creates a new UDP socket that uses `intf` to send and recv multicast.
770fn _new_socket_bind(intf: &Interface, should_loop: bool) -> Result<MyUdpSocket> {
771    // Use the same socket for receiving and sending multicast packets.
772    // Such socket has to bind to INADDR_ANY or IN6ADDR_ANY.
773    let intf_ip = &intf.ip();
774    match intf_ip {
775        IpAddr::V4(ip) => {
776            let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), MDNS_PORT);
777            let sock = new_socket(addr.into(), true)?;
778
779            // Join mDNS group to receive packets.
780            sock.join_multicast_v4(&GROUP_ADDR_V4, ip)
781                .map_err(|e| e_fmt!("join multicast group on addr {}: {}", intf_ip, e))?;
782
783            // Set IP_MULTICAST_IF to send packets.
784            sock.set_multicast_if_v4(ip)
785                .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
786
787            // Per RFC 6762 section 11:
788            // "All Multicast DNS responses (including responses sent via unicast) SHOULD
789            // be sent with IP TTL set to 255."
790            // Here we set the TTL to 255 for multicast as we don't support unicast yet.
791            sock.set_multicast_ttl_v4(255)
792                .map_err(|e| e_fmt!("set set_multicast_ttl_v4 on addr {}: {}", ip, e))?;
793
794            if !should_loop {
795                sock.set_multicast_loop_v4(false)
796                    .map_err(|e| e_fmt!("failed to set multicast loop v4 for {ip}: {e}"))?;
797            }
798
799            // Test if we can send packets successfully.
800            let multicast_addr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
801            let test_packets = DnsOutgoing::new(0).to_data_on_wire();
802            for packet in test_packets {
803                sock.send_to(&packet, &multicast_addr)
804                    .map_err(|e| e_fmt!("send multicast packet on addr {}: {}", ip, e))?;
805            }
806            MyUdpSocket::new(sock)
807                .map_err(|e| e_fmt!("failed to create MySocket for interface {}: {e}", intf.name))
808        }
809        IpAddr::V6(ip) => {
810            let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), MDNS_PORT, 0, 0);
811            let sock = new_socket(addr.into(), true)?;
812
813            let if_index = intf.index.unwrap_or(0);
814
815            // Join mDNS group to receive packets.
816            sock.join_multicast_v6(&GROUP_ADDR_V6, if_index)
817                .map_err(|e| e_fmt!("join multicast group on addr {}: {}", ip, e))?;
818
819            // Set IPV6_MULTICAST_IF to send packets.
820            sock.set_multicast_if_v6(if_index)
821                .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
822
823            // We are not sending multicast packets to test this socket as there might
824            // be many IPv6 interfaces on a host and could cause such send error:
825            // "No buffer space available (os error 55)".
826
827            MyUdpSocket::new(sock)
828                .map_err(|e| e_fmt!("failed to create MySocket for interface {}: {e}", intf.name))
829        }
830    }
831}
832
833/// Creates a new UDP socket to bind to `port` with REUSEPORT option.
834/// `non_block` indicates whether to set O_NONBLOCK for the socket.
835fn new_socket(addr: SocketAddr, non_block: bool) -> Result<PktInfoUdpSocket> {
836    let domain = match addr {
837        SocketAddr::V4(_) => socket2::Domain::IPV4,
838        SocketAddr::V6(_) => socket2::Domain::IPV6,
839    };
840
841    let fd = PktInfoUdpSocket::new(domain).map_err(|e| e_fmt!("create socket failed: {}", e))?;
842
843    fd.set_reuse_address(true)
844        .map_err(|e| e_fmt!("set ReuseAddr failed: {}", e))?;
845    #[cfg(unix)]
846    if let Err(e) = fd.set_reuse_port(true) {
847        debug!(
848            "SO_REUSEPORT is not supported, continuing without it: {}",
849            e
850        );
851    }
852
853    if non_block {
854        fd.set_nonblocking(true)
855            .map_err(|e| e_fmt!("set O_NONBLOCK: {}", e))?;
856    }
857
858    fd.bind(&addr.into())
859        .map_err(|e| e_fmt!("socket bind to {} failed: {}", &addr, e))?;
860
861    trace!("new socket bind to {}", &addr);
862    Ok(fd)
863}
864
865/// Specify a UNIX timestamp in millis to run `command` for the next time.
866struct ReRun {
867    /// UNIX timestamp in millis.
868    next_time: u64,
869    command: Command,
870}
871
872/// Specify kinds of interfaces. It is used to enable or to disable interfaces in the daemon.
873///
874/// Note that for ergonomic reasons, `From<&str>` and `From<IpAddr>` are implemented.
875#[derive(Debug, Clone)]
876#[non_exhaustive]
877pub enum IfKind {
878    /// All interfaces.
879    All,
880
881    /// All IPv4 interfaces.
882    IPv4,
883
884    /// All IPv6 interfaces.
885    IPv6,
886
887    /// By the interface name, for example "en0"
888    Name(String),
889
890    /// By an IPv4 or IPv6 address.
891    /// This is used to look up the interface. The semantics is to identify an interface of
892    /// IPv4 or IPv6, not a specific address on the interface.
893    Addr(IpAddr),
894
895    /// 127.0.0.1 (or anything in 127.0.0.0/8), enabled by default.
896    ///
897    /// Loopback interfaces are required by some use cases (e.g., OSCQuery) for publishing.
898    LoopbackV4,
899
900    /// ::1/128, enabled by default.
901    LoopbackV6,
902
903    /// By interface index, IPv4 only.
904    IndexV4(u32),
905
906    /// By interface index, IPv6 only.
907    IndexV6(u32),
908
909    /// By a user-supplied predicate function.
910    Predicate(IfPredicate),
911}
912
913impl IfKind {
914    /// Checks if `intf` matches with this interface kind.
915    pub(crate) fn matches(&self, intf: &Interface) -> bool {
916        match self {
917            Self::All => true,
918            Self::IPv4 => intf.ip().is_ipv4(),
919            Self::IPv6 => intf.ip().is_ipv6(),
920            Self::Name(ifname) => ifname == &intf.name,
921            Self::Addr(addr) => addr == &intf.ip(),
922            Self::LoopbackV4 => intf.is_loopback() && intf.ip().is_ipv4(),
923            Self::LoopbackV6 => intf.is_loopback() && intf.ip().is_ipv6(),
924            Self::IndexV4(idx) => intf.index == Some(*idx) && intf.ip().is_ipv4(),
925            Self::IndexV6(idx) => intf.index == Some(*idx) && intf.ip().is_ipv6(),
926            Self::Predicate(p) => p.matches(intf),
927        }
928    }
929}
930
931/// The first use case of specifying an interface was to
932/// use an interface name. Hence adding this for ergonomic reasons.
933impl From<&str> for IfKind {
934    fn from(val: &str) -> Self {
935        Self::Name(val.to_string())
936    }
937}
938
939impl From<&String> for IfKind {
940    fn from(val: &String) -> Self {
941        Self::Name(val.to_string())
942    }
943}
944
945/// Still for ergonomic reasons.
946impl From<IpAddr> for IfKind {
947    fn from(val: IpAddr) -> Self {
948        Self::Addr(val)
949    }
950}
951
952/// A list of `IfKind` that can be used to match interfaces.
953pub struct IfKindVec {
954    kinds: Vec<IfKind>,
955}
956
957/// A trait that converts a type into a Vec of `IfKind`.
958pub trait IntoIfKindVec {
959    fn into_vec(self) -> IfKindVec;
960}
961
962impl<T: Into<IfKind>> IntoIfKindVec for T {
963    fn into_vec(self) -> IfKindVec {
964        let if_kind: IfKind = self.into();
965        IfKindVec {
966            kinds: vec![if_kind],
967        }
968    }
969}
970
971impl<T: Into<IfKind>> IntoIfKindVec for Vec<T> {
972    fn into_vec(self) -> IfKindVec {
973        let kinds: Vec<IfKind> = self.into_iter().map(|x| x.into()).collect();
974        IfKindVec { kinds }
975    }
976}
977
978/// A predicate function for matching against interfaces.
979#[derive(Clone)]
980pub struct IfPredicate(std::sync::Arc<dyn Fn(&Interface) -> bool + Send + Sync>);
981
982impl IfPredicate {
983    /// Creates a predicate from a closure that decides whether an interface
984    /// matches.
985    ///
986    /// # Example
987    ///
988    /// ```no_run
989    /// # use mdns_sd::IfPredicate;
990    /// // Match any interface that doesn't look like a virtual bridge
991    /// IfPredicate::new(|intf| !intf.name.starts_with("virbr"));
992    /// ```
993    pub fn new(predicate: impl Fn(&Interface) -> bool + Send + Sync + 'static) -> Self {
994        Self(std::sync::Arc::new(predicate))
995    }
996
997    pub(crate) fn matches(&self, intf: &Interface) -> bool {
998        self.0(intf)
999    }
1000}
1001
1002impl std::fmt::Debug for IfPredicate {
1003    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1004        write!(f, "IfPredicate(...)")
1005    }
1006}
1007
1008/// Selection of interfaces.
1009struct IfSelection {
1010    /// The interfaces to be selected.
1011    if_kind: IfKind,
1012
1013    /// Whether the `if_kind` should be enabled or not.
1014    selected: bool,
1015}
1016
1017/// A struct holding the state. It was inspired by `zeroconf` package in Python.
1018struct Zeroconf {
1019    /// The mDNS port number to use for socket binding.
1020    /// Typically MDNS_PORT (5353), but can be customized for development/testing.
1021    port: u16,
1022
1023    /// Local interfaces keyed by interface index.
1024    my_intfs: HashMap<u32, MyIntf>,
1025
1026    /// A common socket for IPv4 interfaces. It's None if IPv4 is disabled in OS kernel.
1027    ipv4_sock: Option<MyUdpSocket>,
1028
1029    /// A common socket for IPv6 interfaces. It's None if IPv6 is disabled in OS kernel.
1030    ipv6_sock: Option<MyUdpSocket>,
1031
1032    /// Local registered services, keyed by service full names.
1033    my_services: HashMap<String, ServiceInfo>,
1034
1035    /// Received DNS records.
1036    cache: DnsCache,
1037
1038    /// Registered service records, keyed by interface index.
1039    dns_registry_map: HashMap<u32, DnsRegistry>,
1040
1041    /// Active "Browse" commands.
1042    service_queriers: HashMap<String, Sender<ServiceEvent>>, // <ty_domain, channel::sender>
1043
1044    /// Active "ResolveHostname" commands.
1045    ///
1046    /// The timestamps are set at the future timestamp when the command should timeout.
1047    /// `hostname` is case-insensitive and stored in lowercase.
1048    hostname_resolvers: HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>, // <hostname, (channel::sender, UNIX timestamp in millis)>
1049
1050    /// All repeating transmissions.
1051    retransmissions: Vec<ReRun>,
1052
1053    counters: Metrics,
1054
1055    /// Waits for incoming packets.
1056    poller: Poll,
1057
1058    /// Channels to notify events.
1059    monitors: Vec<Sender<DaemonEvent>>,
1060
1061    /// Options
1062    service_name_len_max: u8,
1063
1064    /// Interval in millis to check IP address changes.
1065    ip_check_interval: u64,
1066
1067    /// All interface selections called to the daemon.
1068    if_selections: Vec<IfSelection>,
1069
1070    /// Socket for signaling.
1071    signal_sock: MioUdpSocket,
1072
1073    /// Timestamps marking where we need another iteration of the run loop,
1074    /// to react to events like retransmissions, cache refreshes, interface IP address changes, etc.
1075    ///
1076    /// When the run loop goes through a single iteration, it will
1077    /// set its timeout to the earliest timer in this list.
1078    timers: BinaryHeap<Reverse<u64>>,
1079
1080    status: DaemonStatus,
1081
1082    /// Service instances that are pending for resolving SRV and TXT.
1083    pending_resolves: HashSet<String>,
1084
1085    /// Service instances that are already resolved.
1086    resolved: HashSet<String>,
1087
1088    multicast_loop_v4: bool,
1089
1090    multicast_loop_v6: bool,
1091
1092    accept_unsolicited: bool,
1093
1094    include_apple_p2p: bool,
1095
1096    cmd_sender: Sender<Command>,
1097
1098    signal_addr: SocketAddr,
1099
1100    #[cfg(test)]
1101    test_down_interfaces: HashSet<String>,
1102}
1103
1104/// Join the multicast group for the given interface.
1105fn join_multicast_group(my_sock: &PktInfoUdpSocket, intf: &Interface) -> Result<()> {
1106    let intf_ip = &intf.ip();
1107    match intf_ip {
1108        IpAddr::V4(ip) => {
1109            // Join mDNS group to receive packets.
1110            debug!("join multicast group V4 on {} addr {ip}", intf.name);
1111            my_sock
1112                .join_multicast_v4(&GROUP_ADDR_V4, ip)
1113                .map_err(|e| e_fmt!("PKT join multicast group on addr {}: {}", intf_ip, e))?;
1114        }
1115        IpAddr::V6(ip) => {
1116            let if_index = intf.index.unwrap_or(0);
1117            // Join mDNS group to receive packets.
1118            debug!(
1119                "join multicast group V6 on {} addr {ip} with index {if_index}",
1120                intf.name
1121            );
1122            my_sock
1123                .join_multicast_v6(&GROUP_ADDR_V6, if_index)
1124                .map_err(|e| e_fmt!("PKT join multicast group on addr {}: {}", ip, e))?;
1125        }
1126    }
1127    Ok(())
1128}
1129
1130impl Zeroconf {
1131    fn new(
1132        signal_sock: MioUdpSocket,
1133        poller: Poll,
1134        port: u16,
1135        cmd_sender: Sender<Command>,
1136        signal_addr: SocketAddr,
1137    ) -> Self {
1138        // Get interfaces.
1139        let my_ifaddrs = my_ip_interfaces(true);
1140
1141        // Create a socket for every IP addr.
1142        // Note: it is possible that `my_ifaddrs` contains the same IP addr with different interface names,
1143        // or the same interface name with different IP addrs.
1144        let mut my_intfs = HashMap::new();
1145        let mut dns_registry_map = HashMap::new();
1146
1147        // Use the same socket for receiving and sending multicast packets.
1148        // Such socket has to bind to INADDR_ANY or IN6ADDR_ANY.
1149        let mut ipv4_sock = None;
1150        let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), port);
1151        match new_socket(addr.into(), true) {
1152            Ok(sock) => {
1153                // Per RFC 6762 section 11:
1154                // "All Multicast DNS responses (including responses sent via unicast) SHOULD
1155                // be sent with IP TTL set to 255."
1156                // Here we set the TTL to 255 for multicast as we don't support unicast yet.
1157                sock.set_multicast_ttl_v4(255)
1158                    .map_err(|e| e_fmt!("set set_multicast_ttl_v4 on addr: {}", e))
1159                    .ok();
1160
1161                // This clones a socket.
1162                ipv4_sock = match MyUdpSocket::new(sock) {
1163                    Ok(s) => Some(s),
1164                    Err(e) => {
1165                        debug!("failed to create IPv4 MyUdpSocket: {e}");
1166                        None
1167                    }
1168                };
1169            }
1170            // Per RFC 6762 section 11:}
1171            Err(e) => debug!("failed to create IPv4 socket: {e}"),
1172        }
1173
1174        let mut ipv6_sock = None;
1175        let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), port, 0, 0);
1176        match new_socket(addr.into(), true) {
1177            Ok(sock) => {
1178                // Per RFC 6762 section 11:
1179                // "All Multicast DNS responses (including responses sent via unicast) SHOULD
1180                // be sent with IP TTL set to 255."
1181                sock.set_multicast_hops_v6(255)
1182                    .map_err(|e| e_fmt!("set set_multicast_hops_v6: {}", e))
1183                    .ok();
1184
1185                // This clones the ipv6 socket.
1186                ipv6_sock = match MyUdpSocket::new(sock) {
1187                    Ok(s) => Some(s),
1188                    Err(e) => {
1189                        debug!("failed to create IPv6 MyUdpSocket: {e}");
1190                        None
1191                    }
1192                };
1193            }
1194            Err(e) => debug!("failed to create IPv6 socket: {e}"),
1195        }
1196
1197        // Configure sockets to join multicast groups.
1198        for intf in my_ifaddrs {
1199            let sock_opt = if intf.ip().is_ipv4() {
1200                &ipv4_sock
1201            } else {
1202                &ipv6_sock
1203            };
1204            let Some(sock) = sock_opt else {
1205                debug!(
1206                    "no socket available for interface {} with addr {}. Skipped.",
1207                    intf.name,
1208                    intf.ip()
1209                );
1210                continue;
1211            };
1212
1213            if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1214                debug!("failed to join multicast: {}: {e}. Skipped.", &intf.ip());
1215            }
1216
1217            let if_index = intf.index.unwrap_or(0);
1218
1219            // Add this interface address if not already present.
1220            dns_registry_map
1221                .entry(if_index)
1222                .or_insert_with(DnsRegistry::new);
1223
1224            my_intfs
1225                .entry(if_index)
1226                .and_modify(|v: &mut MyIntf| {
1227                    v.addrs.insert(intf.addr.clone());
1228                })
1229                .or_insert(MyIntf {
1230                    name: intf.name.clone(),
1231                    index: if_index,
1232                    addrs: HashSet::from([intf.addr]),
1233                });
1234        }
1235
1236        let monitors = Vec::new();
1237        let service_name_len_max = SERVICE_NAME_LEN_MAX_DEFAULT;
1238        let ip_check_interval = IP_CHECK_INTERVAL_IN_SECS_DEFAULT as u64 * 1000;
1239
1240        let timers = BinaryHeap::new();
1241
1242        // Enable everything, including loopback interfaces.
1243        let if_selections = vec![];
1244
1245        let status = DaemonStatus::Running;
1246
1247        Self {
1248            port,
1249            my_intfs,
1250            ipv4_sock,
1251            ipv6_sock,
1252            my_services: HashMap::new(),
1253            cache: DnsCache::new(),
1254            dns_registry_map,
1255            hostname_resolvers: HashMap::new(),
1256            service_queriers: HashMap::new(),
1257            retransmissions: Vec::new(),
1258            counters: HashMap::new(),
1259            poller,
1260            monitors,
1261            service_name_len_max,
1262            ip_check_interval,
1263            if_selections,
1264            signal_sock,
1265            timers,
1266            status,
1267            pending_resolves: HashSet::new(),
1268            resolved: HashSet::new(),
1269            multicast_loop_v4: true,
1270            multicast_loop_v6: true,
1271            accept_unsolicited: false,
1272            include_apple_p2p: false,
1273            cmd_sender,
1274            signal_addr,
1275
1276            #[cfg(test)]
1277            test_down_interfaces: HashSet::new(),
1278        }
1279    }
1280
1281    /// Send a Command into the daemon channel and poke the signal socket to wake the poll loop.
1282    fn send_cmd_to_self(&self, cmd: Command) -> Result<()> {
1283        let cmd_name = cmd.to_string();
1284
1285        self.cmd_sender.try_send(cmd).map_err(|e| match e {
1286            TrySendError::Full(_) => Error::Again,
1287            TrySendError::Disconnected(_) => Error::DaemonShutdown,
1288        })?;
1289
1290        let addr = SocketAddrV4::new(LOOPBACK_V4, 0);
1291        let socket = UdpSocket::bind(addr)
1292            .map_err(|e| e_fmt!("Failed to create socket to send signal: {}", e))?;
1293        socket
1294            .send_to(cmd_name.as_bytes(), self.signal_addr)
1295            .map_err(|e| {
1296                e_fmt!(
1297                    "signal socket send_to {} ({}) failed: {}",
1298                    self.signal_addr,
1299                    cmd_name,
1300                    e
1301                )
1302            })?;
1303
1304        Ok(())
1305    }
1306
1307    /// Clean up all resources before shutdown.
1308    ///
1309    /// This method:
1310    /// 1. Unregisters all registered services (sends goodbye packets)
1311    /// 2. Stops all active browse operations
1312    /// 3. Stops all active hostname resolution operations
1313    /// 4. Clears all retransmissions
1314    fn cleanup(&mut self) {
1315        debug!("Starting cleanup for shutdown");
1316
1317        // 1. Unregister all services - send goodbye packets
1318        let service_names: Vec<String> = self.my_services.keys().cloned().collect();
1319        for fullname in service_names {
1320            if let Some(info) = self.my_services.get(&fullname) {
1321                debug!("Unregistering service during shutdown: {}", &fullname);
1322
1323                for intf in self.my_intfs.values() {
1324                    if let Some(sock) = self.ipv4_sock.as_ref() {
1325                        self.unregister_service(info, intf, &sock.pktinfo);
1326                    }
1327
1328                    if let Some(sock) = self.ipv6_sock.as_ref() {
1329                        self.unregister_service(info, intf, &sock.pktinfo);
1330                    }
1331                }
1332            }
1333        }
1334        self.my_services.clear();
1335
1336        // 2. Stop all browse operations
1337        let browse_types: Vec<String> = self.service_queriers.keys().cloned().collect();
1338        for ty_domain in browse_types {
1339            debug!("Stopping browse during shutdown: {}", &ty_domain);
1340            if let Some(sender) = self.service_queriers.remove(&ty_domain) {
1341                // Notify the client
1342                if let Err(e) = sender.send(ServiceEvent::SearchStopped(ty_domain.clone())) {
1343                    debug!("Failed to send SearchStopped during shutdown: {}", e);
1344                }
1345            }
1346        }
1347
1348        // 3. Stop all hostname resolution operations
1349        let hostnames: Vec<String> = self.hostname_resolvers.keys().cloned().collect();
1350        for hostname in hostnames {
1351            debug!(
1352                "Stopping hostname resolution during shutdown: {}",
1353                &hostname
1354            );
1355            if let Some((sender, _timeout)) = self.hostname_resolvers.remove(&hostname) {
1356                // Notify the client
1357                if let Err(e) =
1358                    sender.send(HostnameResolutionEvent::SearchStopped(hostname.clone()))
1359                {
1360                    debug!(
1361                        "Failed to send HostnameResolutionEvent::SearchStopped during shutdown: {}",
1362                        e
1363                    );
1364                }
1365            }
1366        }
1367
1368        // 4. Clear all retransmissions
1369        self.retransmissions.clear();
1370
1371        debug!("Cleanup completed");
1372    }
1373
1374    /// The main event loop of the daemon thread
1375    ///
1376    /// In each round, it will:
1377    /// 1. select the listening sockets with a timeout.
1378    /// 2. process the incoming packets if any.
1379    /// 3. try_recv on its channel and execute commands.
1380    /// 4. announce its registered services.
1381    /// 5. process retransmissions if any.
1382    fn run(&mut self, receiver: Receiver<Command>) -> Option<Command> {
1383        // Add the daemon's signal socket to the poller.
1384        if let Err(e) = self.poller.registry().register(
1385            &mut self.signal_sock,
1386            mio::Token(SIGNAL_SOCK_EVENT_KEY),
1387            mio::Interest::READABLE,
1388        ) {
1389            debug!("failed to add signal socket to the poller: {}", e);
1390            return None;
1391        }
1392
1393        if let Some(sock) = self.ipv4_sock.as_mut() {
1394            if let Err(e) = self.poller.registry().register(
1395                sock,
1396                mio::Token(IPV4_SOCK_EVENT_KEY),
1397                mio::Interest::READABLE,
1398            ) {
1399                debug!("failed to register ipv4 socket: {}", e);
1400                return None;
1401            }
1402        }
1403
1404        if let Some(sock) = self.ipv6_sock.as_mut() {
1405            if let Err(e) = self.poller.registry().register(
1406                sock,
1407                mio::Token(IPV6_SOCK_EVENT_KEY),
1408                mio::Interest::READABLE,
1409            ) {
1410                debug!("failed to register ipv6 socket: {}", e);
1411                return None;
1412            }
1413        }
1414
1415        // Setup timer for IP checks.
1416        let mut next_ip_check = if self.ip_check_interval > 0 {
1417            current_time_millis() + self.ip_check_interval
1418        } else {
1419            0
1420        };
1421
1422        if next_ip_check > 0 {
1423            self.add_timer(next_ip_check);
1424        }
1425
1426        // Start the run loop.
1427
1428        let mut events = mio::Events::with_capacity(1024);
1429        loop {
1430            let now = current_time_millis();
1431
1432            let earliest_timer = self.peek_earliest_timer();
1433            let timeout = earliest_timer.map(|timer| {
1434                // If `timer` already passed, set `timeout` to be 1ms.
1435                let millis = if timer > now { timer - now } else { 1 };
1436                Duration::from_millis(millis)
1437            });
1438
1439            // Process incoming packets, command events and optional timeout.
1440            events.clear();
1441            match self.poller.poll(&mut events, timeout) {
1442                Ok(_) => self.handle_poller_events(&events),
1443                Err(e) => debug!("failed to select from sockets: {}", e),
1444            }
1445
1446            let now = current_time_millis();
1447
1448            // Remove the timers if already passed.
1449            self.pop_timers_till(now);
1450
1451            // Remove hostname resolvers with expired timeouts.
1452            for hostname in self
1453                .hostname_resolvers
1454                .clone()
1455                .into_iter()
1456                .filter(|(_, (_, timeout))| timeout.map(|t| now >= t).unwrap_or(false))
1457                .map(|(hostname, _)| hostname)
1458            {
1459                trace!("hostname resolver timeout for {}", &hostname);
1460                call_hostname_resolution_listener(
1461                    &self.hostname_resolvers,
1462                    &hostname,
1463                    HostnameResolutionEvent::SearchTimeout(hostname.to_owned()),
1464                );
1465                call_hostname_resolution_listener(
1466                    &self.hostname_resolvers,
1467                    &hostname,
1468                    HostnameResolutionEvent::SearchStopped(hostname.to_owned()),
1469                );
1470                self.hostname_resolvers.remove(&hostname);
1471            }
1472
1473            // process commands from the command channel
1474            while let Ok(command) = receiver.try_recv() {
1475                if matches!(command, Command::Exit(_)) {
1476                    debug!("Exit command received, performing cleanup");
1477                    self.cleanup();
1478                    self.status = DaemonStatus::Shutdown;
1479                    return Some(command);
1480                }
1481                self.exec_command(command, false);
1482            }
1483
1484            // check for repeated commands and run them if their time is up.
1485            let mut i = 0;
1486            while i < self.retransmissions.len() {
1487                if now >= self.retransmissions[i].next_time {
1488                    let rerun = self.retransmissions.remove(i);
1489                    self.exec_command(rerun.command, true);
1490                } else {
1491                    i += 1;
1492                }
1493            }
1494
1495            // Refresh cached service records with active queriers
1496            self.refresh_active_services();
1497
1498            // Refresh cached A/AAAA records with active queriers
1499            let mut query_count = 0;
1500            for (hostname, _sender) in self.hostname_resolvers.iter() {
1501                for (hostname, ip_addr) in
1502                    self.cache.refresh_due_hostname_resolutions(hostname).iter()
1503                {
1504                    self.send_query(hostname, ip_address_rr_type(&ip_addr.to_ip_addr()));
1505                    query_count += 1;
1506                }
1507            }
1508
1509            self.increase_counter(Counter::CacheRefreshAddr, query_count);
1510
1511            // check and evict expired records in our cache
1512            let now = current_time_millis();
1513
1514            // Notify service listeners about the expired records.
1515            let expired_services = self.cache.evict_expired_services(now);
1516            if !expired_services.is_empty() {
1517                debug!(
1518                    "run: send {} service removal to listeners",
1519                    expired_services.len()
1520                );
1521                self.notify_service_removal(expired_services);
1522            }
1523
1524            // Notify hostname listeners about the expired records.
1525            let expired_addrs = self.cache.evict_expired_addr(now);
1526            for (hostname, addrs) in expired_addrs {
1527                call_hostname_resolution_listener(
1528                    &self.hostname_resolvers,
1529                    &hostname,
1530                    HostnameResolutionEvent::AddressesRemoved(hostname.clone(), addrs),
1531                );
1532                let instances = self.cache.get_instances_on_host(&hostname);
1533                let instance_set: HashSet<String> = instances.into_iter().collect();
1534                self.resolve_updated_instances(&instance_set);
1535            }
1536
1537            // Send out probing queries.
1538            self.probing_handler();
1539
1540            // check IP changes if next_ip_check is reached.
1541            if now >= next_ip_check && next_ip_check > 0 {
1542                next_ip_check = now + self.ip_check_interval;
1543                self.add_timer(next_ip_check);
1544
1545                self.check_ip_changes();
1546            }
1547        }
1548    }
1549
1550    fn process_set_option(&mut self, daemon_opt: DaemonOption) {
1551        match daemon_opt {
1552            DaemonOption::ServiceNameLenMax(length) => self.service_name_len_max = length,
1553            DaemonOption::IpCheckInterval(interval) => self.ip_check_interval = interval,
1554            DaemonOption::EnableInterface(if_kind) => self.enable_interface(if_kind),
1555            DaemonOption::DisableInterface(if_kind) => self.disable_interface(if_kind),
1556            DaemonOption::MulticastLoopV4(on) => self.set_multicast_loop_v4(on),
1557            DaemonOption::MulticastLoopV6(on) => self.set_multicast_loop_v6(on),
1558            DaemonOption::AcceptUnsolicited(accept) => self.set_accept_unsolicited(accept),
1559            DaemonOption::IncludeAppleP2P(enable) => self.set_apple_p2p(enable),
1560            #[cfg(test)]
1561            DaemonOption::TestDownInterface(ifname) => {
1562                self.test_down_interfaces.insert(ifname);
1563            }
1564            #[cfg(test)]
1565            DaemonOption::TestUpInterface(ifname) => {
1566                self.test_down_interfaces.remove(&ifname);
1567            }
1568        }
1569    }
1570
1571    fn enable_interface(&mut self, kinds: Vec<IfKind>) {
1572        debug!("enable_interface: {:?}", kinds);
1573        let interfaces = my_ip_interfaces_inner(true, self.include_apple_p2p);
1574
1575        for if_kind in kinds {
1576            self.if_selections.push(IfSelection {
1577                if_kind: resolve_addr_to_index(if_kind, &interfaces),
1578                selected: true,
1579            });
1580        }
1581
1582        self.apply_intf_selections(interfaces);
1583    }
1584
1585    fn disable_interface(&mut self, kinds: Vec<IfKind>) {
1586        debug!("disable_interface: {:?}", kinds);
1587        let interfaces = my_ip_interfaces_inner(true, self.include_apple_p2p);
1588
1589        for if_kind in kinds {
1590            self.if_selections.push(IfSelection {
1591                if_kind: resolve_addr_to_index(if_kind, &interfaces),
1592                selected: false,
1593            });
1594        }
1595
1596        self.apply_intf_selections(interfaces);
1597    }
1598
1599    fn set_multicast_loop_v4(&mut self, on: bool) {
1600        let Some(sock) = self.ipv4_sock.as_mut() else {
1601            return;
1602        };
1603        self.multicast_loop_v4 = on;
1604        sock.pktinfo
1605            .set_multicast_loop_v4(on)
1606            .map_err(|e| e_fmt!("failed to set multicast loop v4: {}", e))
1607            .unwrap();
1608    }
1609
1610    fn set_multicast_loop_v6(&mut self, on: bool) {
1611        let Some(sock) = self.ipv6_sock.as_mut() else {
1612            return;
1613        };
1614        self.multicast_loop_v6 = on;
1615        sock.pktinfo
1616            .set_multicast_loop_v6(on)
1617            .map_err(|e| e_fmt!("failed to set multicast loop v6: {}", e))
1618            .unwrap();
1619    }
1620
1621    fn set_accept_unsolicited(&mut self, accept: bool) {
1622        self.accept_unsolicited = accept;
1623    }
1624
1625    fn set_apple_p2p(&mut self, include: bool) {
1626        if self.include_apple_p2p != include {
1627            self.include_apple_p2p = include;
1628            self.apply_intf_selections(my_ip_interfaces_inner(true, self.include_apple_p2p));
1629        }
1630    }
1631
1632    fn notify_monitors(&mut self, event: DaemonEvent) {
1633        // Only retain the monitors that are still connected.
1634        self.monitors.retain(|sender| {
1635            if let Err(e) = sender.try_send(event.clone()) {
1636                debug!("notify_monitors: try_send: {}", &e);
1637                if matches!(e, TrySendError::Disconnected(_)) {
1638                    return false; // This monitor is dropped.
1639                }
1640            }
1641            true
1642        });
1643    }
1644
1645    /// Remove `addr` in my services that enabled `addr_auto`.
1646    fn del_addr_in_my_services(&mut self, addr: &IpAddr) {
1647        for (_, service_info) in self.my_services.iter_mut() {
1648            if service_info.is_addr_auto() {
1649                service_info.remove_ipaddr(addr);
1650            }
1651        }
1652    }
1653
1654    fn add_timer(&mut self, next_time: u64) {
1655        self.timers.push(Reverse(next_time));
1656    }
1657
1658    fn peek_earliest_timer(&self) -> Option<u64> {
1659        self.timers.peek().map(|Reverse(v)| *v)
1660    }
1661
1662    fn _pop_earliest_timer(&mut self) -> Option<u64> {
1663        self.timers.pop().map(|Reverse(v)| v)
1664    }
1665
1666    /// Pop all timers that are already passed till `now`.
1667    fn pop_timers_till(&mut self, now: u64) {
1668        while let Some(Reverse(v)) = self.timers.peek() {
1669            if *v > now {
1670                break;
1671            }
1672            self.timers.pop();
1673        }
1674    }
1675
1676    /// Apply all selections to `interfaces` and return the selected addresses.
1677    fn selected_intfs(&self, interfaces: Vec<Interface>) -> HashSet<Interface> {
1678        let intf_count = interfaces.len();
1679        let mut intf_selections = vec![true; intf_count];
1680
1681        // apply if_selections
1682        for selection in self.if_selections.iter() {
1683            // Mark the interfaces for this selection.
1684            for i in 0..intf_count {
1685                if selection.if_kind.matches(&interfaces[i]) {
1686                    intf_selections[i] = selection.selected;
1687                }
1688            }
1689        }
1690
1691        let mut selected_addrs = HashSet::new();
1692        for i in 0..intf_count {
1693            if intf_selections[i] {
1694                selected_addrs.insert(interfaces[i].clone());
1695            }
1696        }
1697
1698        selected_addrs
1699    }
1700
1701    /// Apply all selections to `interfaces`.
1702    ///
1703    /// For any interface, add it if selected but not bound yet,
1704    /// delete it if not selected but still bound.
1705    fn apply_intf_selections(&mut self, interfaces: Vec<Interface>) {
1706        // By default, we enable all interfaces.
1707        let intf_count = interfaces.len();
1708        let mut intf_selections = vec![true; intf_count];
1709
1710        // apply if_selections
1711        for selection in self.if_selections.iter() {
1712            // Mark the interfaces for this selection.
1713            for i in 0..intf_count {
1714                if selection.if_kind.matches(&interfaces[i]) {
1715                    intf_selections[i] = selection.selected;
1716                }
1717            }
1718        }
1719
1720        // Update `my_intfs` based on the selections.
1721        for (idx, intf) in interfaces.into_iter().enumerate() {
1722            if intf_selections[idx] {
1723                // Add the interface
1724                self.add_interface(intf);
1725            } else {
1726                // Remove the interface
1727                self.del_interface_addr(&intf);
1728            }
1729        }
1730    }
1731
1732    fn del_ip(&mut self, ip: IpAddr) {
1733        self.del_addr_in_my_services(&ip);
1734        self.notify_monitors(DaemonEvent::IpDel(ip));
1735    }
1736
1737    /// Check for IP changes and update [my_intfs] as needed.
1738    fn check_ip_changes(&mut self) {
1739        // Get the current interfaces.
1740        let my_ifaddrs = my_ip_interfaces_inner(true, self.include_apple_p2p);
1741
1742        #[cfg(test)]
1743        let my_ifaddrs: Vec<_> = my_ifaddrs
1744            .into_iter()
1745            .filter(|intf| !self.test_down_interfaces.contains(&intf.name))
1746            .collect();
1747
1748        let ifaddrs_map: HashMap<u32, Vec<&IfAddr>> =
1749            my_ifaddrs.iter().fold(HashMap::new(), |mut acc, intf| {
1750                let if_index = intf.index.unwrap_or(0);
1751                acc.entry(if_index).or_default().push(&intf.addr);
1752                acc
1753            });
1754
1755        let mut deleted_intfs = Vec::new();
1756        let mut deleted_ips = Vec::new();
1757
1758        for (if_index, my_intf) in self.my_intfs.iter_mut() {
1759            let mut last_ipv4 = None;
1760            let mut last_ipv6 = None;
1761
1762            if let Some(current_addrs) = ifaddrs_map.get(if_index) {
1763                my_intf.addrs.retain(|addr| {
1764                    if current_addrs.contains(&addr) {
1765                        true
1766                    } else {
1767                        match addr.ip() {
1768                            IpAddr::V4(ipv4) => last_ipv4 = Some(ipv4),
1769                            IpAddr::V6(ipv6) => last_ipv6 = Some(ipv6),
1770                        }
1771                        deleted_ips.push(addr.ip());
1772                        false
1773                    }
1774                });
1775                if my_intf.addrs.is_empty() {
1776                    deleted_intfs.push((*if_index, last_ipv4, last_ipv6))
1777                }
1778            } else {
1779                // If it does not exist, remove the interface.
1780                debug!(
1781                    "check_ip_changes: interface {} ({}) no longer exists, removing",
1782                    my_intf.name, if_index
1783                );
1784                for addr in my_intf.addrs.iter() {
1785                    match addr.ip() {
1786                        IpAddr::V4(ipv4) => last_ipv4 = Some(ipv4),
1787                        IpAddr::V6(ipv6) => last_ipv6 = Some(ipv6),
1788                    }
1789                    deleted_ips.push(addr.ip())
1790                }
1791                deleted_intfs.push((*if_index, last_ipv4, last_ipv6));
1792            }
1793        }
1794
1795        if !deleted_ips.is_empty() || !deleted_intfs.is_empty() {
1796            debug!(
1797                "check_ip_changes: {} deleted ips {} deleted intfs",
1798                deleted_ips.len(),
1799                deleted_intfs.len()
1800            );
1801        }
1802
1803        for ip in deleted_ips {
1804            self.del_ip(ip);
1805        }
1806
1807        for (if_index, last_ipv4, last_ipv6) in deleted_intfs {
1808            let Some(my_intf) = self.my_intfs.remove(&if_index) else {
1809                continue;
1810            };
1811
1812            if let Some(ipv4) = last_ipv4 {
1813                debug!("leave multicast for {ipv4}");
1814                if let Some(sock) = self.ipv4_sock.as_mut() {
1815                    if let Err(e) = sock.pktinfo.leave_multicast_v4(&GROUP_ADDR_V4, &ipv4) {
1816                        debug!("leave multicast group for addr {ipv4}: {e}");
1817                    }
1818                }
1819            }
1820
1821            if let Some(ipv6) = last_ipv6 {
1822                debug!("leave multicast for {ipv6}");
1823                if let Some(sock) = self.ipv6_sock.as_mut() {
1824                    if let Err(e) = sock
1825                        .pktinfo
1826                        .leave_multicast_v6(&GROUP_ADDR_V6, my_intf.index)
1827                    {
1828                        debug!("leave multicast group for IPv6: {ipv6}: {e}");
1829                    }
1830                }
1831            }
1832
1833            // Remove cache records for this interface.
1834            let intf_id = InterfaceId {
1835                name: my_intf.name.to_string(),
1836                index: my_intf.index,
1837            };
1838            let result = self.cache.remove_records_on_intf(intf_id);
1839            self.notify_service_removal(result.removed_instances);
1840            self.resolve_updated_instances(&result.modified_instances);
1841        }
1842
1843        // Add newly found interfaces only if in our selections.
1844        self.apply_intf_selections(my_ifaddrs);
1845    }
1846
1847    /// Remove an interface address when it was down, disabled or removed from the system.
1848    /// If no more addresses on the interface, remove the interface as well.
1849    fn del_interface_addr(&mut self, intf: &Interface) {
1850        let if_index = intf.index.unwrap_or(0);
1851        debug!(
1852            "del_interface_addr: {} ({if_index}) addr {}",
1853            intf.name,
1854            intf.ip()
1855        );
1856
1857        let Some(my_intf) = self.my_intfs.get_mut(&if_index) else {
1858            debug!("del_interface_addr: interface {} not found", intf.name);
1859            return;
1860        };
1861
1862        let mut ip_removed = false;
1863
1864        if my_intf.addrs.remove(&intf.addr) {
1865            ip_removed = true;
1866
1867            match intf.addr.ip() {
1868                IpAddr::V4(ipv4) => {
1869                    if my_intf.next_ifaddr_v4().is_none() {
1870                        if let Some(sock) = self.ipv4_sock.as_mut() {
1871                            if let Err(e) = sock.pktinfo.leave_multicast_v4(&GROUP_ADDR_V4, &ipv4) {
1872                                debug!("leave multicast group for addr {ipv4}: {e}");
1873                            } else {
1874                                debug!("leave multicast for {ipv4}");
1875                            }
1876                        }
1877                    }
1878                }
1879
1880                IpAddr::V6(ipv6) => {
1881                    if my_intf.next_ifaddr_v6().is_none() {
1882                        if let Some(sock) = self.ipv6_sock.as_mut() {
1883                            if let Err(e) =
1884                                sock.pktinfo.leave_multicast_v6(&GROUP_ADDR_V6, if_index)
1885                            {
1886                                debug!("leave multicast group for addr {ipv6}: {e}");
1887                            }
1888                        }
1889                    }
1890                }
1891            }
1892
1893            if my_intf.addrs.is_empty() {
1894                // If no more addresses, remove the interface.
1895                debug!("del_interface_addr: removing interface {}", intf.name);
1896                self.my_intfs.remove(&if_index);
1897                self.dns_registry_map.remove(&if_index);
1898                self.cache
1899                    .remove_addrs_on_disabled_intf(if_index, IpType::BOTH);
1900            } else {
1901                // Interface still has addresses of the other IP version.
1902                // Remove cached address records for the disabled IP version
1903                // only if no more addresses of that version remain.
1904                let is_v4 = intf.addr.ip().is_ipv4();
1905                let version_gone = if is_v4 {
1906                    my_intf.next_ifaddr_v4().is_none()
1907                } else {
1908                    my_intf.next_ifaddr_v6().is_none()
1909                };
1910                if version_gone {
1911                    let ip_type = if is_v4 { IpType::V4 } else { IpType::V6 };
1912                    self.cache.remove_addrs_on_disabled_intf(if_index, ip_type);
1913                }
1914            }
1915        }
1916
1917        if ip_removed {
1918            // Notify the monitors.
1919            self.notify_monitors(DaemonEvent::IpDel(intf.ip()));
1920            // Remove the interface from my services that enabled `addr_auto`.
1921            self.del_addr_in_my_services(&intf.ip());
1922        }
1923    }
1924
1925    fn add_interface(&mut self, intf: Interface) {
1926        let sock_opt = if intf.ip().is_ipv4() {
1927            &self.ipv4_sock
1928        } else {
1929            &self.ipv6_sock
1930        };
1931
1932        let Some(sock) = sock_opt else {
1933            debug!(
1934                "add_interface: no socket available for interface {} with addr {}. Skipped.",
1935                intf.name,
1936                intf.ip()
1937            );
1938            return;
1939        };
1940
1941        let if_index = intf.index.unwrap_or(0);
1942        let mut new_addr = false;
1943
1944        match self.my_intfs.entry(if_index) {
1945            Entry::Occupied(mut entry) => {
1946                // If intf has a new address, add it to the existing interface.
1947                let my_intf = entry.get_mut();
1948                if !my_intf.addrs.contains(&intf.addr) {
1949                    if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1950                        debug!("add_interface: socket_config {}: {e}", &intf.name);
1951                    }
1952                    my_intf.addrs.insert(intf.addr.clone());
1953                    new_addr = true;
1954                }
1955            }
1956            Entry::Vacant(entry) => {
1957                if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1958                    debug!("add_interface: socket_config {}: {e}. Skipped.", &intf.name);
1959                    return;
1960                }
1961
1962                new_addr = true;
1963                let new_intf = MyIntf {
1964                    name: intf.name.clone(),
1965                    index: if_index,
1966                    addrs: HashSet::from([intf.addr.clone()]),
1967                };
1968                entry.insert(new_intf);
1969            }
1970        }
1971
1972        if !new_addr {
1973            trace!("add_interface: interface {} already exists", &intf.name);
1974            return;
1975        }
1976
1977        debug!("add new interface {}: {}", intf.name, intf.ip());
1978
1979        let Some(my_intf) = self.my_intfs.get(&if_index) else {
1980            debug!("add_interface: cannot find if_index {if_index}");
1981            return;
1982        };
1983
1984        let dns_registry = match self.dns_registry_map.get_mut(&if_index) {
1985            Some(registry) => registry,
1986            None => self
1987                .dns_registry_map
1988                .entry(if_index)
1989                .or_insert_with(DnsRegistry::new),
1990        };
1991
1992        for (_, service_info) in self.my_services.iter_mut() {
1993            if service_info.is_addr_auto() {
1994                service_info.insert_ipaddr(&intf);
1995
1996                if let Ok(true) = announce_service_on_intf(
1997                    dns_registry,
1998                    service_info,
1999                    my_intf,
2000                    &sock.pktinfo,
2001                    self.port,
2002                ) {
2003                    debug!(
2004                        "Announce service {} on {}",
2005                        service_info.get_fullname(),
2006                        intf.ip()
2007                    );
2008                    service_info.set_status(if_index, ServiceStatus::Announced);
2009                } else {
2010                    for timer in dns_registry.new_timers.drain(..) {
2011                        self.timers.push(Reverse(timer));
2012                    }
2013                    service_info.set_status(if_index, ServiceStatus::Probing);
2014                }
2015            }
2016        }
2017
2018        // Send browse queries on the new interface without known answers.
2019        // This avoids known-answer suppression (RFC 6762 Section 7.1) that
2020        // would cause the responder to suppress its response, preventing
2021        // address records from being attributed to the new interface.
2022        if let Some(my_intf) = self.my_intfs.get(&if_index) {
2023            for ty in self.service_queriers.keys() {
2024                self.send_query_on_intf(ty, RRType::PTR, my_intf);
2025            }
2026        }
2027
2028        // Notify the monitors.
2029        self.notify_monitors(DaemonEvent::IpAdd(intf.ip()));
2030    }
2031
2032    /// Registers a service.
2033    ///
2034    /// RFC 6762 section 8.3.
2035    /// ...the Multicast DNS responder MUST send
2036    ///    an unsolicited Multicast DNS response containing, in the Answer
2037    ///    Section, all of its newly registered resource records
2038    ///
2039    /// Zeroconf will then respond to requests for information about this service.
2040    fn register_service(&mut self, mut info: ServiceInfo) {
2041        // Check the service name length.
2042        if let Err(e) = check_service_name_length(info.get_type(), self.service_name_len_max) {
2043            error!("check_service_name_length: {}", &e);
2044            self.notify_monitors(DaemonEvent::Error(e));
2045            return;
2046        }
2047
2048        if info.is_addr_auto() {
2049            let selected_intfs =
2050                self.selected_intfs(my_ip_interfaces_inner(true, self.include_apple_p2p));
2051            for intf in selected_intfs {
2052                info.insert_ipaddr(&intf);
2053            }
2054        }
2055
2056        debug!("register service {:?}", &info);
2057
2058        let outgoing_addrs = self.send_unsolicited_response(&mut info);
2059        if !outgoing_addrs.is_empty() {
2060            self.notify_monitors(DaemonEvent::Announce(
2061                info.get_fullname().to_string(),
2062                format!("{:?}", &outgoing_addrs),
2063            ));
2064        }
2065
2066        // The key has to be lower case letter as DNS record name is case insensitive.
2067        // The info will have the original name.
2068        let service_fullname = info.get_fullname().to_lowercase();
2069        self.my_services.insert(service_fullname, info);
2070    }
2071
2072    /// Sends out announcement of `info` on every valid interface.
2073    /// Returns the list of interface IPs that sent out the announcement.
2074    fn send_unsolicited_response(&mut self, info: &mut ServiceInfo) -> Vec<IpAddr> {
2075        let mut outgoing_addrs = Vec::new();
2076        let mut outgoing_intfs = HashSet::new();
2077
2078        let mut invalid_intf_addrs = HashSet::new();
2079
2080        for (if_index, intf) in self.my_intfs.iter() {
2081            let dns_registry = match self.dns_registry_map.get_mut(if_index) {
2082                Some(registry) => registry,
2083                None => self
2084                    .dns_registry_map
2085                    .entry(*if_index)
2086                    .or_insert_with(DnsRegistry::new),
2087            };
2088
2089            let mut announced = false;
2090
2091            // IPv4
2092            if let Some(sock) = self.ipv4_sock.as_mut() {
2093                match announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo, self.port) {
2094                    Ok(true) => {
2095                        for addr in intf.addrs.iter().filter(|a| a.ip().is_ipv4()) {
2096                            outgoing_addrs.push(addr.ip());
2097                        }
2098                        outgoing_intfs.insert(intf.index);
2099
2100                        debug!(
2101                            "Announce service IPv4 {} on {}",
2102                            info.get_fullname(),
2103                            intf.name
2104                        );
2105                        announced = true;
2106                    }
2107                    Ok(false) => {}
2108                    Err(InternalError::IntfAddrInvalid(intf_addr)) => {
2109                        invalid_intf_addrs.insert(intf_addr);
2110                    }
2111                }
2112            }
2113
2114            if let Some(sock) = self.ipv6_sock.as_mut() {
2115                match announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo, self.port) {
2116                    Ok(true) => {
2117                        for addr in intf.addrs.iter().filter(|a| a.ip().is_ipv6()) {
2118                            outgoing_addrs.push(addr.ip());
2119                        }
2120                        outgoing_intfs.insert(intf.index);
2121
2122                        debug!(
2123                            "Announce service IPv6 {} on {}",
2124                            info.get_fullname(),
2125                            intf.name
2126                        );
2127                        announced = true;
2128                    }
2129                    Ok(false) => {}
2130                    Err(InternalError::IntfAddrInvalid(intf_addr)) => {
2131                        invalid_intf_addrs.insert(intf_addr);
2132                    }
2133                }
2134            }
2135
2136            if announced {
2137                info.set_status(intf.index, ServiceStatus::Announced);
2138            } else {
2139                for timer in dns_registry.new_timers.drain(..) {
2140                    self.timers.push(Reverse(timer));
2141                }
2142                info.set_status(*if_index, ServiceStatus::Probing);
2143            }
2144        }
2145
2146        if !invalid_intf_addrs.is_empty() {
2147            let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addrs));
2148        }
2149
2150        // RFC 6762 section 8.3.
2151        // ..The Multicast DNS responder MUST send at least two unsolicited
2152        //    responses, one second apart.
2153        let next_time = current_time_millis() + 1000;
2154        for if_index in outgoing_intfs {
2155            self.add_retransmission(
2156                next_time,
2157                Command::RegisterResend(info.get_fullname().to_string(), if_index),
2158            );
2159        }
2160
2161        outgoing_addrs
2162    }
2163
2164    /// Send probings or finish them if expired. Notify waiting services.
2165    fn probing_handler(&mut self) {
2166        let now = current_time_millis();
2167        let mut invalid_intf_addrs = HashSet::new();
2168
2169        for (if_index, intf) in self.my_intfs.iter() {
2170            let Some(dns_registry) = self.dns_registry_map.get_mut(if_index) else {
2171                continue;
2172            };
2173
2174            let (out, expired_probes) = check_probing(dns_registry, &mut self.timers, now);
2175
2176            // send probing.
2177            if !out.questions().is_empty() {
2178                trace!("sending out probing of questions: {:?}", out.questions());
2179                if let Some(sock) = self.ipv4_sock.as_mut() {
2180                    if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
2181                        send_dns_outgoing(&out, intf, &sock.pktinfo, self.port, None, None)
2182                    {
2183                        invalid_intf_addrs.insert(intf_addr);
2184                    }
2185                }
2186                if let Some(sock) = self.ipv6_sock.as_mut() {
2187                    if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
2188                        send_dns_outgoing(&out, intf, &sock.pktinfo, self.port, None, None)
2189                    {
2190                        invalid_intf_addrs.insert(intf_addr);
2191                    }
2192                }
2193            }
2194
2195            // For finished probes, wake up services that are waiting for the probes.
2196            let waiting_services =
2197                handle_expired_probes(expired_probes, &intf.name, dns_registry, &mut self.monitors);
2198
2199            for service_name in waiting_services {
2200                // service names are lowercase
2201                if let Some(info) = self.my_services.get_mut(&service_name.to_lowercase()) {
2202                    if info.get_status(*if_index) == ServiceStatus::Announced {
2203                        debug!("service {} already announced", info.get_fullname());
2204                        continue;
2205                    }
2206
2207                    let announced_v4 = if let Some(sock) = self.ipv4_sock.as_mut() {
2208                        match announce_service_on_intf(
2209                            dns_registry,
2210                            info,
2211                            intf,
2212                            &sock.pktinfo,
2213                            self.port,
2214                        ) {
2215                            Ok(announced) => announced,
2216                            Err(InternalError::IntfAddrInvalid(intf_addr)) => {
2217                                invalid_intf_addrs.insert(intf_addr);
2218                                false
2219                            }
2220                        }
2221                    } else {
2222                        false
2223                    };
2224                    let announced_v6 = if let Some(sock) = self.ipv6_sock.as_mut() {
2225                        match announce_service_on_intf(
2226                            dns_registry,
2227                            info,
2228                            intf,
2229                            &sock.pktinfo,
2230                            self.port,
2231                        ) {
2232                            Ok(announced) => announced,
2233                            Err(InternalError::IntfAddrInvalid(intf_addr)) => {
2234                                invalid_intf_addrs.insert(intf_addr);
2235                                false
2236                            }
2237                        }
2238                    } else {
2239                        false
2240                    };
2241
2242                    if announced_v4 || announced_v6 {
2243                        let next_time = now + 1000;
2244                        let command =
2245                            Command::RegisterResend(info.get_fullname().to_string(), *if_index);
2246                        self.retransmissions.push(ReRun { next_time, command });
2247                        self.timers.push(Reverse(next_time));
2248
2249                        let fullname = dns_registry.resolve_name(&service_name).to_string();
2250
2251                        let hostname = dns_registry.resolve_name(info.get_hostname());
2252
2253                        debug!("wake up: announce service {} on {}", fullname, intf.name);
2254                        notify_monitors(
2255                            &mut self.monitors,
2256                            DaemonEvent::Announce(fullname, format!("{}:{}", hostname, &intf.name)),
2257                        );
2258
2259                        info.set_status(*if_index, ServiceStatus::Announced);
2260                    }
2261                }
2262            }
2263        }
2264
2265        if !invalid_intf_addrs.is_empty() {
2266            let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addrs));
2267        }
2268    }
2269
2270    fn unregister_service(
2271        &self,
2272        info: &ServiceInfo,
2273        intf: &MyIntf,
2274        sock: &PktInfoUdpSocket,
2275    ) -> Vec<u8> {
2276        let is_ipv4 = sock.domain() == Domain::IPV4;
2277
2278        let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
2279        out.add_answer_at_time(
2280            DnsPointer::new(
2281                info.get_type(),
2282                RRType::PTR,
2283                CLASS_IN,
2284                0,
2285                info.get_fullname().to_string(),
2286            ),
2287            0,
2288        );
2289
2290        if let Some(sub) = info.get_subtype() {
2291            trace!("Adding subdomain {}", sub);
2292            out.add_answer_at_time(
2293                DnsPointer::new(
2294                    sub,
2295                    RRType::PTR,
2296                    CLASS_IN,
2297                    0,
2298                    info.get_fullname().to_string(),
2299                ),
2300                0,
2301            );
2302        }
2303
2304        out.add_answer_at_time(
2305            DnsSrv::new(
2306                info.get_fullname(),
2307                CLASS_IN | CLASS_CACHE_FLUSH,
2308                0,
2309                info.get_priority(),
2310                info.get_weight(),
2311                info.get_port(),
2312                info.get_hostname().to_string(),
2313            ),
2314            0,
2315        );
2316        out.add_answer_at_time(
2317            DnsTxt::new(
2318                info.get_fullname(),
2319                CLASS_IN | CLASS_CACHE_FLUSH,
2320                0,
2321                info.generate_txt(),
2322            ),
2323            0,
2324        );
2325
2326        let if_addrs = if is_ipv4 {
2327            info.get_addrs_on_my_intf_v4(intf)
2328        } else {
2329            info.get_addrs_on_my_intf_v6(intf)
2330        };
2331
2332        if if_addrs.is_empty() {
2333            return vec![];
2334        }
2335
2336        for address in if_addrs {
2337            out.add_answer_at_time(
2338                DnsAddress::new(
2339                    info.get_hostname(),
2340                    ip_address_rr_type(&address),
2341                    CLASS_IN | CLASS_CACHE_FLUSH,
2342                    0,
2343                    address,
2344                    intf.into(),
2345                ),
2346                0,
2347            );
2348        }
2349
2350        // Only (at most) one packet is expected to be sent out.
2351        let sent_vec = match send_dns_outgoing(&out, intf, sock, self.port, None, None) {
2352            Ok(sent_vec) => sent_vec,
2353            Err(InternalError::IntfAddrInvalid(intf_addr)) => {
2354                let invalid_intf_addrs = HashSet::from([intf_addr]);
2355                let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addrs));
2356                vec![]
2357            }
2358        };
2359        sent_vec.into_iter().next().unwrap_or_default()
2360    }
2361
2362    /// Binds a channel `listener` to querying mDNS hostnames.
2363    ///
2364    /// If there is already a `listener`, it will be updated, i.e. overwritten.
2365    fn add_hostname_resolver(
2366        &mut self,
2367        hostname: String,
2368        listener: Sender<HostnameResolutionEvent>,
2369        timeout: Option<u64>,
2370    ) {
2371        let real_timeout = timeout.map(|t| current_time_millis() + t);
2372        self.hostname_resolvers
2373            .insert(hostname.to_lowercase(), (listener, real_timeout));
2374        if let Some(t) = real_timeout {
2375            self.add_timer(t);
2376        }
2377    }
2378
2379    /// Sends a multicast query for `name` with `qtype`.
2380    fn send_query(&self, name: &str, qtype: RRType) {
2381        self.send_query_vec(&[(name, qtype)]);
2382    }
2383
2384    /// Sends a query on a specific interface without known answers.
2385    ///
2386    /// Used when a new interface is added so the responder won't suppress
2387    /// its response due to known-answer suppression (RFC 6762 Section 7.1).
2388    fn send_query_on_intf(&self, name: &str, qtype: RRType, intf: &MyIntf) {
2389        let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
2390        out.add_question(name, qtype);
2391
2392        let mut invalid_intf_addrs = HashSet::new();
2393        if let Some(sock) = self.ipv4_sock.as_ref() {
2394            if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
2395                send_dns_outgoing(&out, intf, &sock.pktinfo, self.port, None, None)
2396            {
2397                invalid_intf_addrs.insert(intf_addr);
2398            }
2399        }
2400        if let Some(sock) = self.ipv6_sock.as_ref() {
2401            if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
2402                send_dns_outgoing(&out, intf, &sock.pktinfo, self.port, None, None)
2403            {
2404                invalid_intf_addrs.insert(intf_addr);
2405            }
2406        }
2407        if !invalid_intf_addrs.is_empty() {
2408            let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addrs));
2409        }
2410    }
2411
2412    /// Sends out a list of `questions` (i.e. DNS questions) via multicast.
2413    fn send_query_vec(&self, questions: &[(&str, RRType)]) {
2414        let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
2415        let now = current_time_millis();
2416
2417        for (name, qtype) in questions {
2418            out.add_question(name, *qtype);
2419
2420            for record in self.cache.get_known_answers(name, *qtype, now) {
2421                /*
2422                RFC 6762 section 7.1: https://datatracker.ietf.org/doc/html/rfc6762#section-7.1
2423                ...
2424                    When a Multicast DNS querier sends a query to which it already knows
2425                    some answers, it populates the Answer Section of the DNS query
2426                    message with those answers.
2427                 */
2428                trace!("add known answer: {:?}", record.record);
2429                let mut new_record = record.record.clone();
2430                new_record.get_record_mut().update_ttl(now);
2431                out.add_answer_box(new_record);
2432            }
2433        }
2434
2435        let mut invalid_intf_addrs = HashSet::new();
2436        for (_, intf) in self.my_intfs.iter() {
2437            if let Some(sock) = self.ipv4_sock.as_ref() {
2438                if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
2439                    send_dns_outgoing(&out, intf, &sock.pktinfo, self.port, None, None)
2440                {
2441                    invalid_intf_addrs.insert(intf_addr);
2442                }
2443            }
2444            if let Some(sock) = self.ipv6_sock.as_ref() {
2445                if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
2446                    send_dns_outgoing(&out, intf, &sock.pktinfo, self.port, None, None)
2447                {
2448                    invalid_intf_addrs.insert(intf_addr);
2449                }
2450            }
2451        }
2452
2453        if !invalid_intf_addrs.is_empty() {
2454            let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addrs));
2455        }
2456    }
2457
2458    /// Reads one UDP datagram from the socket of `intf`.
2459    ///
2460    /// Returns false if failed to receive a packet,
2461    /// otherwise returns true.
2462    fn handle_read(&mut self, event_key: usize) -> bool {
2463        let sock_opt = match event_key {
2464            IPV4_SOCK_EVENT_KEY => &mut self.ipv4_sock,
2465            IPV6_SOCK_EVENT_KEY => &mut self.ipv6_sock,
2466            _ => {
2467                debug!("handle_read: unknown token {}", event_key);
2468                return false;
2469            }
2470        };
2471        let Some(sock) = sock_opt.as_mut() else {
2472            debug!("handle_read: socket not available for token {}", event_key);
2473            return false;
2474        };
2475        let mut buf = vec![0u8; MAX_MSG_ABSOLUTE];
2476
2477        // Read the next mDNS UDP datagram.
2478        //
2479        // If the datagram is larger than `buf`, excess bytes may or may not
2480        // be truncated by the socket layer depending on the platform's libc.
2481        // In any case, such large datagram will not be decoded properly and
2482        // this function should return false but should not crash.
2483        let (sz, pktinfo) = match sock.pktinfo.recv(&mut buf) {
2484            Ok(sz) => sz,
2485            Err(e) => {
2486                if e.kind() != std::io::ErrorKind::WouldBlock {
2487                    debug!("listening socket read failed: {}", e);
2488                }
2489                return false;
2490            }
2491        };
2492
2493        // Find the interface that received the packet.
2494        let pkt_if_index = pktinfo.if_index as u32;
2495        let Some(my_intf) = self.my_intfs.get(&pkt_if_index) else {
2496            debug!(
2497                "handle_read: no interface found for pktinfo if_index: {}",
2498                pktinfo.if_index
2499            );
2500            return true; // We still return true to indicate that we read something.
2501        };
2502
2503        // Drop packets for an IP version that has been disabled on this interface.
2504        // This is needed because some times the socket layer may still receive packets
2505        // for an IP version even after we left the multicast group for that IP version.
2506        // We want to drop such packets to avoid unnecessary processing.
2507        let is_ipv4 = event_key == IPV4_SOCK_EVENT_KEY;
2508        if (is_ipv4 && my_intf.next_ifaddr_v4().is_none())
2509            || (!is_ipv4 && my_intf.next_ifaddr_v6().is_none())
2510        {
2511            debug!(
2512                "handle_read: dropping {} packet on intf {} (disabled)",
2513                if is_ipv4 { "IPv4" } else { "IPv6" },
2514                my_intf.name
2515            );
2516            return true;
2517        }
2518
2519        buf.truncate(sz); // reduce potential processing errors
2520
2521        match DnsIncoming::new(buf, my_intf.into()) {
2522            Ok(msg) => {
2523                if msg.is_query() {
2524                    let querier_addr = pktinfo.addr_src;
2525                    self.handle_query(msg, pkt_if_index, querier_addr);
2526                } else if msg.is_response() {
2527                    self.handle_response(msg, pkt_if_index);
2528                } else {
2529                    debug!("Invalid message: not query and not response");
2530                }
2531            }
2532            Err(e) => debug!("Invalid incoming DNS message: {}", e),
2533        }
2534
2535        true
2536    }
2537
2538    /// Returns true, if sent query. Returns false if SRV already exists.
2539    fn query_unresolved(&mut self, instance: &str) -> bool {
2540        if !valid_instance_name(instance) {
2541            trace!("instance name {} not valid", instance);
2542            return false;
2543        }
2544
2545        if let Some(records) = self.cache.get_srv(instance) {
2546            for record in records {
2547                if let Some(srv) = record.record.any().downcast_ref::<DnsSrv>() {
2548                    if self.cache.get_addr(srv.host()).is_none() {
2549                        self.send_query_vec(&[(srv.host(), RRType::A), (srv.host(), RRType::AAAA)]);
2550                        return true;
2551                    }
2552                }
2553            }
2554        } else {
2555            self.send_query(instance, RRType::ANY);
2556            return true;
2557        }
2558
2559        false
2560    }
2561
2562    /// Checks if `ty_domain` has records in the cache. If yes, sends the
2563    /// cached records via `sender`.
2564    fn query_cache_for_service(
2565        &mut self,
2566        ty_domain: &str,
2567        sender: &Sender<ServiceEvent>,
2568        now: u64,
2569    ) {
2570        let mut resolved: HashSet<String> = HashSet::new();
2571        let mut unresolved: HashSet<String> = HashSet::new();
2572
2573        if let Some(records) = self.cache.get_ptr(ty_domain) {
2574            for record in records.iter().filter(|r| !r.record.expires_soon(now)) {
2575                if let Some(ptr) = record.record.any().downcast_ref::<DnsPointer>() {
2576                    let mut new_event = None;
2577                    match self.resolve_service_from_cache(ty_domain, ptr.alias()) {
2578                        Ok(resolved_service) => {
2579                            if resolved_service.is_valid() {
2580                                debug!("Resolved service from cache: {}", ptr.alias());
2581                                new_event =
2582                                    Some(ServiceEvent::ServiceResolved(Box::new(resolved_service)));
2583                            } else {
2584                                debug!("Resolved service is not valid: {}", ptr.alias());
2585                            }
2586                        }
2587                        Err(err) => {
2588                            debug!("Error while resolving service from cache: {}", err);
2589                            continue;
2590                        }
2591                    }
2592
2593                    match sender.send(ServiceEvent::ServiceFound(
2594                        ty_domain.to_string(),
2595                        ptr.alias().to_string(),
2596                    )) {
2597                        Ok(()) => debug!("sent service found {}", ptr.alias()),
2598                        Err(e) => {
2599                            debug!("failed to send service found: {}", e);
2600                            continue;
2601                        }
2602                    }
2603
2604                    if let Some(event) = new_event {
2605                        resolved.insert(ptr.alias().to_string());
2606                        match sender.send(event) {
2607                            Ok(()) => debug!("sent service resolved: {}", ptr.alias()),
2608                            Err(e) => debug!("failed to send service resolved: {}", e),
2609                        }
2610                    } else {
2611                        unresolved.insert(ptr.alias().to_string());
2612                    }
2613                }
2614            }
2615        }
2616
2617        for instance in resolved.drain() {
2618            self.pending_resolves.remove(&instance);
2619            self.resolved.insert(instance);
2620        }
2621
2622        for instance in unresolved.drain() {
2623            self.add_pending_resolve(instance);
2624        }
2625    }
2626
2627    /// Checks if `hostname` has records in the cache. If yes, sends the
2628    /// cached records via `sender`.
2629    fn query_cache_for_hostname(
2630        &mut self,
2631        hostname: &str,
2632        sender: Sender<HostnameResolutionEvent>,
2633    ) {
2634        let addresses_map = self.cache.get_addresses_for_host(hostname);
2635        for (name, addresses) in addresses_map {
2636            match sender.send(HostnameResolutionEvent::AddressesFound(name, addresses)) {
2637                Ok(()) => trace!("sent hostname addresses found"),
2638                Err(e) => debug!("failed to send hostname addresses found: {}", e),
2639            }
2640        }
2641    }
2642
2643    fn add_pending_resolve(&mut self, instance: String) {
2644        if !self.pending_resolves.contains(&instance) {
2645            let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
2646            self.add_retransmission(next_time, Command::Resolve(instance.clone(), 1));
2647            self.pending_resolves.insert(instance);
2648        }
2649    }
2650
2651    /// Creates a `ResolvedService` from the cache.
2652    fn resolve_service_from_cache(
2653        &self,
2654        ty_domain: &str,
2655        fullname: &str,
2656    ) -> Result<ResolvedService> {
2657        let now = current_time_millis();
2658        let mut resolved_service = ResolvedService {
2659            ty_domain: ty_domain.to_string(),
2660            sub_ty_domain: None,
2661            fullname: fullname.to_string(),
2662            host: String::new(),
2663            port: 0,
2664            addresses: HashSet::new(),
2665            txt_properties: TxtProperties::new(),
2666        };
2667
2668        // Be sure setting `subtype` if available even when querying for the parent domain.
2669        if let Some(subtype) = self.cache.get_subtype(fullname) {
2670            trace!(
2671                "ty_domain: {} found subtype {} for instance: {}",
2672                ty_domain,
2673                subtype,
2674                fullname
2675            );
2676            if resolved_service.sub_ty_domain.is_none() {
2677                resolved_service.sub_ty_domain = Some(subtype.to_string());
2678            }
2679        }
2680
2681        // resolve SRV record
2682        if let Some(records) = self.cache.get_srv(fullname) {
2683            if let Some(answer) = records.iter().find(|r| !r.record.expires_soon(now)) {
2684                if let Some(dns_srv) = answer.record.any().downcast_ref::<DnsSrv>() {
2685                    resolved_service.host = dns_srv.host().to_string();
2686                    resolved_service.port = dns_srv.port();
2687                }
2688            }
2689        }
2690
2691        // resolve TXT record
2692        if let Some(records) = self.cache.get_txt(fullname) {
2693            if let Some(record) = records.iter().find(|r| !r.record.expires_soon(now)) {
2694                if let Some(dns_txt) = record.record.any().downcast_ref::<DnsTxt>() {
2695                    resolved_service.txt_properties = dns_txt.text().into();
2696                }
2697            }
2698        }
2699
2700        // resolve A and AAAA records
2701        if let Some(records) = self.cache.get_addr(&resolved_service.host) {
2702            for answer in records.iter() {
2703                if let Some(dns_a) = answer.record.any().downcast_ref::<DnsAddress>() {
2704                    if dns_a.expires_soon(now) {
2705                        trace!(
2706                            "Addr expired or expires soon: {}",
2707                            dns_a.address().to_ip_addr()
2708                        );
2709                    } else {
2710                        let scoped = dns_a.address();
2711                        if let ScopedIp::V4(v4) = &scoped {
2712                            // Merge interface_ids if this V4 addr already exists.
2713                            // Linear scan by IP since Eq/Hash include interface_ids.
2714                            let existing = resolved_service
2715                                .addresses
2716                                .iter()
2717                                .find(|a| a.to_ip_addr() == IpAddr::V4(*v4.addr()))
2718                                .cloned();
2719                            if let Some(mut existing) = existing {
2720                                resolved_service.addresses.remove(&existing);
2721                                if let ScopedIp::V4(existing_v4) = &mut existing {
2722                                    for id in v4.interface_ids() {
2723                                        existing_v4.add_interface_id(id.clone());
2724                                    }
2725                                }
2726                                resolved_service.addresses.insert(existing);
2727                            } else {
2728                                resolved_service.addresses.insert(scoped);
2729                            }
2730                        } else {
2731                            resolved_service.addresses.insert(scoped);
2732                        }
2733                    }
2734                }
2735            }
2736        }
2737
2738        Ok(resolved_service)
2739    }
2740
2741    fn handle_poller_events(&mut self, events: &mio::Events) {
2742        for ev in events.iter() {
2743            trace!("event received with key {:?}", ev.token());
2744            if ev.token().0 == SIGNAL_SOCK_EVENT_KEY {
2745                // Drain signals as we will drain commands as well.
2746                self.signal_sock_drain();
2747
2748                if let Err(e) = self.poller.registry().reregister(
2749                    &mut self.signal_sock,
2750                    ev.token(),
2751                    mio::Interest::READABLE,
2752                ) {
2753                    debug!("failed to modify poller for signal socket: {}", e);
2754                }
2755                continue; // Next event.
2756            }
2757
2758            // Read until no more packets available.
2759            while self.handle_read(ev.token().0) {}
2760
2761            // we continue to monitor this socket.
2762            if ev.token().0 == IPV4_SOCK_EVENT_KEY {
2763                // Re-register the IPv4 socket for reading.
2764                if let Some(sock) = self.ipv4_sock.as_mut() {
2765                    if let Err(e) =
2766                        self.poller
2767                            .registry()
2768                            .reregister(sock, ev.token(), mio::Interest::READABLE)
2769                    {
2770                        debug!("modify poller for IPv4 socket: {}", e);
2771                    }
2772                }
2773            } else if ev.token().0 == IPV6_SOCK_EVENT_KEY {
2774                // Re-register the IPv6 socket for reading.
2775                if let Some(sock) = self.ipv6_sock.as_mut() {
2776                    if let Err(e) =
2777                        self.poller
2778                            .registry()
2779                            .reregister(sock, ev.token(), mio::Interest::READABLE)
2780                    {
2781                        debug!("modify poller for IPv6 socket: {}", e);
2782                    }
2783                }
2784            }
2785        }
2786    }
2787
2788    /// Deal with incoming response packets.  All answers
2789    /// are held in the cache, and listeners are notified.
2790    fn handle_response(&mut self, mut msg: DnsIncoming, if_index: u32) {
2791        let now = current_time_millis();
2792
2793        // remove records that are expired.
2794        let mut record_predicate = |record: &DnsRecordBox| {
2795            if !record.get_record().is_expired(now) {
2796                return true;
2797            }
2798
2799            debug!("record is expired, removing it from cache.");
2800            if self.cache.remove(record) {
2801                // for PTR records, send event to listeners
2802                if let Some(dns_ptr) = record.any().downcast_ref::<DnsPointer>() {
2803                    call_service_listener(
2804                        &self.service_queriers,
2805                        dns_ptr.get_name(),
2806                        ServiceEvent::ServiceRemoved(
2807                            dns_ptr.get_name().to_string(),
2808                            dns_ptr.alias().to_string(),
2809                        ),
2810                    );
2811                }
2812            }
2813            false
2814        };
2815        msg.answers_mut().retain(&mut record_predicate);
2816        msg.authorities_mut().retain(&mut record_predicate);
2817        msg.additionals_mut().retain(&mut record_predicate);
2818
2819        // check possible conflicts and handle them.
2820        self.conflict_handler(&msg, if_index);
2821
2822        // check if the message is for us.
2823        let mut is_for_us = true; // assume it is for us.
2824
2825        // If there are any PTR records in the answers, there should be
2826        // at least one PTR for us. Otherwise, the message is not for us.
2827        // If there are no PTR records at all, assume this message is for us.
2828        for answer in msg.answers() {
2829            if answer.get_type() == RRType::PTR {
2830                if self.service_queriers.contains_key(answer.get_name()) {
2831                    is_for_us = true;
2832                    break; // OK to break: at least one PTR for us.
2833                } else {
2834                    is_for_us = false;
2835                }
2836            } else if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2837                // If there is a hostname querier for this address, then it is for us.
2838                let answer_lowercase = answer.get_name().to_lowercase();
2839                if self.hostname_resolvers.contains_key(&answer_lowercase) {
2840                    is_for_us = true;
2841                    break; // OK to break: at least one hostname for us.
2842                }
2843            }
2844        }
2845
2846        // if we explicitily want to accept unsolicited responses, we should consider all messages as for us.
2847        if self.accept_unsolicited {
2848            is_for_us = true;
2849        }
2850
2851        /// Represents a DNS record change that involves one service instance.
2852        struct InstanceChange {
2853            ty: RRType,   // The type of DNS record for the instance.
2854            name: String, // The name of the record.
2855        }
2856
2857        // Go through all answers to get the new and updated records.
2858        // For new PTR records, send out ServiceFound immediately. For others,
2859        // collect them into `changes`.
2860        //
2861        // Note: we don't try to identify the update instances based on
2862        // each record immediately as the answers are likely related to each
2863        // other.
2864        let mut changes = Vec::new();
2865        let mut timers = Vec::new();
2866        let Some(my_intf) = self.my_intfs.get(&if_index) else {
2867            return;
2868        };
2869        for record in msg.all_records() {
2870            match self
2871                .cache
2872                .add_or_update(my_intf, record, &mut timers, is_for_us)
2873            {
2874                Some((dns_record, true)) => {
2875                    timers.push(dns_record.record.get_record().get_expire_time());
2876                    timers.push(dns_record.record.get_record().get_refresh_time());
2877
2878                    let ty = dns_record.record.get_type();
2879                    let name = dns_record.record.get_name();
2880
2881                    // Only process PTR that does not expire soon (i.e. TTL > 1).
2882                    if ty == RRType::PTR && dns_record.record.get_record().get_ttl() > 1 {
2883                        if self.service_queriers.contains_key(name) {
2884                            timers.push(dns_record.record.get_record().get_refresh_time());
2885                        }
2886
2887                        // send ServiceFound
2888                        if let Some(dns_ptr) = dns_record.record.any().downcast_ref::<DnsPointer>()
2889                        {
2890                            debug!("calling listener with service found: {name}");
2891                            call_service_listener(
2892                                &self.service_queriers,
2893                                name,
2894                                ServiceEvent::ServiceFound(
2895                                    name.to_string(),
2896                                    dns_ptr.alias().to_string(),
2897                                ),
2898                            );
2899                            changes.push(InstanceChange {
2900                                ty,
2901                                name: dns_ptr.alias().to_string(),
2902                            });
2903                        }
2904                    } else {
2905                        changes.push(InstanceChange {
2906                            ty,
2907                            name: name.to_string(),
2908                        });
2909                    }
2910                }
2911                Some((dns_record, false)) => {
2912                    timers.push(dns_record.record.get_record().get_expire_time());
2913                    timers.push(dns_record.record.get_record().get_refresh_time());
2914                }
2915                _ => {}
2916            }
2917        }
2918
2919        // Add timers for the new records.
2920        for t in timers {
2921            self.add_timer(t);
2922        }
2923
2924        // Go through remaining changes to see if any hostname resolutions were found or updated.
2925        for change in changes
2926            .iter()
2927            .filter(|change| change.ty == RRType::A || change.ty == RRType::AAAA)
2928        {
2929            let addr_map = self.cache.get_addresses_for_host(&change.name);
2930            for (name, addresses) in addr_map {
2931                call_hostname_resolution_listener(
2932                    &self.hostname_resolvers,
2933                    &change.name,
2934                    HostnameResolutionEvent::AddressesFound(name, addresses),
2935                )
2936            }
2937        }
2938
2939        // Identify the instances that need to be "resolved".
2940        let mut updated_instances = HashSet::new();
2941        for update in changes {
2942            match update.ty {
2943                RRType::PTR | RRType::SRV | RRType::TXT => {
2944                    updated_instances.insert(update.name);
2945                }
2946                RRType::A | RRType::AAAA => {
2947                    let instances = self.cache.get_instances_on_host(&update.name);
2948                    updated_instances.extend(instances);
2949                }
2950                _ => {}
2951            }
2952        }
2953
2954        self.resolve_updated_instances(&updated_instances);
2955    }
2956
2957    fn conflict_handler(&mut self, msg: &DnsIncoming, if_index: u32) {
2958        let Some(my_intf) = self.my_intfs.get(&if_index) else {
2959            debug!("handle_response: no intf found for index {if_index}");
2960            return;
2961        };
2962
2963        let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
2964            return;
2965        };
2966
2967        for answer in msg.answers().iter() {
2968            let mut new_records = Vec::new();
2969
2970            let name = answer.get_name();
2971            let Some(probe) = dns_registry.probing.get_mut(name) else {
2972                continue;
2973            };
2974
2975            // check against possible multicast forwarding
2976            if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2977                if let Some(answer_addr) = answer.any().downcast_ref::<DnsAddress>() {
2978                    if answer_addr.interface_id.index != if_index {
2979                        debug!(
2980                            "conflict handler: answer addr {:?} not in the subnet of intf {}",
2981                            answer_addr, my_intf.name
2982                        );
2983                        continue;
2984                    }
2985                }
2986
2987                // double check if any other address record matches rrdata,
2988                // as there could be multiple addresses for the same name.
2989                let any_match = probe.records.iter().any(|r| {
2990                    r.get_type() == answer.get_type()
2991                        && r.get_class() == answer.get_class()
2992                        && r.rrdata_match(answer.as_ref())
2993                });
2994                if any_match {
2995                    continue; // no conflict for this answer.
2996                }
2997            }
2998
2999            probe.records.retain(|record| {
3000                if record.get_type() == answer.get_type()
3001                    && record.get_class() == answer.get_class()
3002                    && !record.rrdata_match(answer.as_ref())
3003                {
3004                    debug!(
3005                        "found conflict name: '{name}' record: {}: {} PEER: {}",
3006                        record.get_type(),
3007                        record.rdata_print(),
3008                        answer.rdata_print()
3009                    );
3010
3011                    // create a new name for this record
3012                    // then remove the old record in probing.
3013                    let mut new_record = record.clone();
3014                    let new_name = match record.get_type() {
3015                        RRType::A => hostname_change(name),
3016                        RRType::AAAA => hostname_change(name),
3017                        _ => name_change(name),
3018                    };
3019                    new_record.get_record_mut().set_new_name(new_name);
3020                    new_records.push(new_record);
3021                    return false; // old record is dropped from the probe.
3022                }
3023
3024                true
3025            });
3026
3027            // ?????
3028            // if probe.records.is_empty() {
3029            //     dns_registry.probing.remove(name);
3030            // }
3031
3032            // Probing again with the new names.
3033            let create_time = current_time_millis() + fastrand::u64(0..250);
3034
3035            let waiting_services = probe.waiting_services.clone();
3036
3037            for record in new_records {
3038                if dns_registry.update_hostname(name, record.get_name(), create_time) {
3039                    self.timers.push(Reverse(create_time));
3040                }
3041
3042                // remember the name changes (note: `name` might not be the original, it could be already changed once.)
3043                dns_registry.name_changes.insert(
3044                    record.get_record().get_original_name().to_string(),
3045                    record.get_name().to_string(),
3046                );
3047
3048                let new_probe = match dns_registry.probing.get_mut(record.get_name()) {
3049                    Some(p) => p,
3050                    None => {
3051                        let new_probe = dns_registry
3052                            .probing
3053                            .entry(record.get_name().to_string())
3054                            .or_insert_with(|| {
3055                                debug!("conflict handler: new probe of {}", record.get_name());
3056                                Probe::new(create_time)
3057                            });
3058                        self.timers.push(Reverse(new_probe.next_send));
3059                        new_probe
3060                    }
3061                };
3062
3063                debug!(
3064                    "insert record with new name '{}' {} into probe",
3065                    record.get_name(),
3066                    record.get_type()
3067                );
3068                new_probe.insert_record(record);
3069
3070                new_probe.waiting_services.extend(waiting_services.clone());
3071            }
3072        }
3073    }
3074
3075    /// Resolve the updated (including new) instances.
3076    ///
3077    /// Note: it is possible that more than 1 PTR pointing to the same
3078    /// instance. For example, a regular service type PTR and a sub-type
3079    /// service type PTR can both point to the same service instance.
3080    /// This loop automatically handles the sub-type PTRs.
3081    fn resolve_updated_instances(&mut self, updated_instances: &HashSet<String>) {
3082        if updated_instances.is_empty() {
3083            return;
3084        }
3085
3086        let mut resolved: HashSet<String> = HashSet::new();
3087        let mut unresolved: HashSet<String> = HashSet::new();
3088        let mut removed_instances = HashMap::new();
3089
3090        let now = current_time_millis();
3091
3092        for (ty_domain, records) in self.cache.all_ptr().iter() {
3093            if !self.service_queriers.contains_key(ty_domain) {
3094                // No need to resolve if not in our queries.
3095                continue;
3096            }
3097
3098            for ptr in records.iter().filter(|r| !r.record.expires_soon(now)) {
3099                let Some(dns_ptr) = ptr.record.any().downcast_ref::<DnsPointer>() else {
3100                    continue;
3101                };
3102
3103                let instance = dns_ptr.alias();
3104                if !updated_instances.contains(instance) {
3105                    continue;
3106                }
3107
3108                let Ok(resolved_service) = self.resolve_service_from_cache(ty_domain, instance)
3109                else {
3110                    continue;
3111                };
3112
3113                debug!("resolve_updated_instances: from cache: {instance}");
3114                if resolved_service.is_valid() {
3115                    debug!("call queriers to resolve {instance}");
3116                    resolved.insert(instance.to_string());
3117                    let event = ServiceEvent::ServiceResolved(Box::new(resolved_service));
3118                    call_service_listener(&self.service_queriers, ty_domain, event);
3119                } else {
3120                    debug!("Resolved service is not valid: {instance}");
3121                    if self.resolved.remove(dns_ptr.alias()) {
3122                        removed_instances
3123                            .entry(ty_domain.to_string())
3124                            .or_insert_with(HashSet::new)
3125                            .insert(instance.to_string());
3126                    }
3127                    unresolved.insert(instance.to_string());
3128                }
3129            }
3130        }
3131
3132        for instance in resolved.drain() {
3133            self.pending_resolves.remove(&instance);
3134            self.resolved.insert(instance);
3135        }
3136
3137        for instance in unresolved.drain() {
3138            self.add_pending_resolve(instance);
3139        }
3140
3141        if !removed_instances.is_empty() {
3142            debug!(
3143                "resolve_updated_instances: removed {}",
3144                &removed_instances.len()
3145            );
3146            self.notify_service_removal(removed_instances);
3147        }
3148    }
3149
3150    /// Handle incoming query packets, figure out whether and what to respond.
3151    fn handle_query(&mut self, msg: DnsIncoming, if_index: u32, querier_addr: SocketAddr) {
3152        let querier_ip = querier_addr.ip();
3153        let is_ipv4 = querier_ip.is_ipv4();
3154        let sock_opt = if is_ipv4 {
3155            &self.ipv4_sock
3156        } else {
3157            &self.ipv6_sock
3158        };
3159        let Some(sock) = sock_opt.as_ref() else {
3160            debug!("handle_query: socket not available for intf {}", if_index);
3161            return;
3162        };
3163        let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
3164
3165        // Special meta-query "_services._dns-sd._udp.<Domain>".
3166        // See https://datatracker.ietf.org/doc/html/rfc6763#section-9
3167        const META_QUERY: &str = "_services._dns-sd._udp.local.";
3168
3169        let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
3170            debug!("missing dns registry for intf {}", if_index);
3171            return;
3172        };
3173
3174        let Some(intf) = self.my_intfs.get(&if_index) else {
3175            debug!("handle_query: no intf found for index {if_index}");
3176            return;
3177        };
3178
3179        for question in msg.questions().iter() {
3180            let qtype = question.entry_type();
3181            let q_name = question.entry_name();
3182
3183            if qtype == RRType::PTR {
3184                for service in self.my_services.values() {
3185                    if service.get_status(if_index) != ServiceStatus::Announced {
3186                        continue;
3187                    }
3188
3189                    if service.matches_type_or_subtype(q_name) {
3190                        out.add_answer_with_additionals(&msg, service, intf, dns_registry, is_ipv4);
3191                    } else if q_name == META_QUERY {
3192                        let ttl = service.get_other_ttl();
3193                        let alias = service.get_type().to_string();
3194                        let ptr = DnsPointer::new(q_name, RRType::PTR, CLASS_IN, ttl, alias);
3195                        if !out.add_answer(&msg, ptr) {
3196                            trace!("answer was not added for meta-query {:?}", &question);
3197                        }
3198                    }
3199                }
3200            } else {
3201                // Simultaneous Probe Tiebreaking (RFC 6762 section 8.2)
3202                if qtype == RRType::ANY && msg.num_authorities() > 0 {
3203                    if let Some(probe) = dns_registry.probing.get_mut(q_name) {
3204                        probe.tiebreaking(&msg, q_name);
3205                    }
3206                }
3207
3208                if qtype == RRType::A || qtype == RRType::AAAA || qtype == RRType::ANY {
3209                    for service in self.my_services.values() {
3210                        if service.get_status(if_index) != ServiceStatus::Announced {
3211                            continue;
3212                        }
3213
3214                        let service_hostname = dns_registry.resolve_name(service.get_hostname());
3215
3216                        if service_hostname.to_lowercase() == question.entry_name().to_lowercase() {
3217                            // Pick addresses based on the question type, not the
3218                            // socket family. RFC 6762 doesn't require A queries
3219                            // to come over IPv4 transport — Android's getaddrinfo
3220                            // routinely sends both A and AAAA queries over its
3221                            // preferred IPv6 mDNS socket and expects A records
3222                            // to be answered with v4 addresses.
3223                            let mut intf_addrs: Vec<IpAddr> = Vec::new();
3224                            if qtype == RRType::A || qtype == RRType::ANY {
3225                                intf_addrs.extend(service.get_addrs_on_my_intf_v4(intf));
3226                            }
3227                            if qtype == RRType::AAAA || qtype == RRType::ANY {
3228                                intf_addrs.extend(service.get_addrs_on_my_intf_v6(intf));
3229                            }
3230                            if intf_addrs.is_empty()
3231                                && (qtype == RRType::A || qtype == RRType::AAAA)
3232                            {
3233                                let t = match qtype {
3234                                    RRType::A => "TYPE_A",
3235                                    RRType::AAAA => "TYPE_AAAA",
3236                                    _ => "invalid_type",
3237                                };
3238                                trace!(
3239                                    "Cannot find valid addrs for {} response on intf {:?}",
3240                                    t,
3241                                    &intf
3242                                );
3243                                continue;
3244                            }
3245                            for address in intf_addrs {
3246                                out.add_answer(
3247                                    &msg,
3248                                    DnsAddress::new(
3249                                        service_hostname,
3250                                        ip_address_rr_type(&address),
3251                                        CLASS_IN | CLASS_CACHE_FLUSH,
3252                                        service.get_host_ttl(),
3253                                        address,
3254                                        intf.into(),
3255                                    ),
3256                                );
3257                            }
3258                        }
3259                    }
3260                }
3261
3262                let query_name = q_name.to_lowercase();
3263                let service_opt = self
3264                    .my_services
3265                    .iter()
3266                    .find(|(k, _v)| dns_registry.resolve_name(k.as_str()) == query_name)
3267                    .map(|(_, v)| v);
3268
3269                let Some(service) = service_opt else {
3270                    continue;
3271                };
3272
3273                if service.get_status(if_index) != ServiceStatus::Announced {
3274                    continue;
3275                }
3276
3277                let intf_addrs = if is_ipv4 {
3278                    service.get_addrs_on_my_intf_v4(intf)
3279                } else {
3280                    service.get_addrs_on_my_intf_v6(intf)
3281                };
3282                if intf_addrs.is_empty() {
3283                    debug!(
3284                        "Cannot find valid addrs for TYPE_SRV response on intf {:?}",
3285                        &intf
3286                    );
3287                    continue;
3288                }
3289
3290                add_answer_of_service(
3291                    &mut out,
3292                    &msg,
3293                    question.entry_name(),
3294                    service,
3295                    qtype,
3296                    intf_addrs,
3297                );
3298            }
3299        }
3300
3301        if out.answers_count() > 0 {
3302            debug!("sending response on intf {}", &intf.name);
3303            out.set_id(msg.id());
3304
3305            // Pick a source IfAddr on `intf` whose subnet contains the querier's IP.
3306            // It's OK if it's None, `send_dns_outgoing` will then pick one address.
3307            let matched_source = intf
3308                .addrs
3309                .iter()
3310                .find(|if_addr| valid_ip_on_intf(&querier_ip, if_addr));
3311
3312            // RFC 6762 §6.7 (Legacy Unicast Responses): if the querier's source
3313            // port is not 5353, it's a one-shot legacy querier (e.g. Android's
3314            // getaddrinfo, iOS resolver fallback). The response MUST be unicast
3315            // back to the querier's source IP and port; multicast replies will
3316            // never reach the querier's ephemeral socket. Legacy unicast
3317            // responses must also echo the question section and clear the
3318            // cache-flush bit, since legacy resolvers don't understand it.
3319            let unicast_dest = if querier_addr.port() != MDNS_PORT {
3320                Some(querier_addr)
3321            } else {
3322                None
3323            };
3324
3325            if unicast_dest.is_some() {
3326                for q in msg.questions() {
3327                    out.add_question(q.entry_name(), q.entry_type());
3328                }
3329                out.clear_cache_flush_bits();
3330            }
3331
3332            if let Err(InternalError::IntfAddrInvalid(intf_addr)) = send_dns_outgoing(
3333                &out,
3334                intf,
3335                &sock.pktinfo,
3336                self.port,
3337                matched_source,
3338                unicast_dest,
3339            ) {
3340                let invalid_intf_addr = HashSet::from([intf_addr]);
3341                let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addr));
3342            }
3343
3344            let if_name = intf.name.clone();
3345
3346            self.increase_counter(Counter::Respond, 1);
3347            self.notify_monitors(DaemonEvent::Respond(if_name));
3348        }
3349
3350        self.increase_counter(Counter::KnownAnswerSuppression, out.known_answer_count());
3351    }
3352
3353    /// Increases the value of `counter` by `count`.
3354    fn increase_counter(&mut self, counter: Counter, count: i64) {
3355        let key = counter.to_string();
3356        match self.counters.get_mut(&key) {
3357            Some(v) => *v += count,
3358            None => {
3359                self.counters.insert(key, count);
3360            }
3361        }
3362    }
3363
3364    /// Sets the value of `counter` to `count`.
3365    fn set_counter(&mut self, counter: Counter, count: i64) {
3366        let key = counter.to_string();
3367        self.counters.insert(key, count);
3368    }
3369
3370    fn signal_sock_drain(&self) {
3371        let mut signal_buf = [0; 1024];
3372
3373        // This recv is non-blocking as the socket is non-blocking.
3374        while let Ok(sz) = self.signal_sock.recv(&mut signal_buf) {
3375            trace!(
3376                "signal socket recvd: {}",
3377                String::from_utf8_lossy(&signal_buf[0..sz])
3378            );
3379        }
3380    }
3381
3382    fn add_retransmission(&mut self, next_time: u64, command: Command) {
3383        self.retransmissions.push(ReRun { next_time, command });
3384        self.add_timer(next_time);
3385    }
3386
3387    /// Sends service removal event to listeners for expired service records.
3388    /// `expired`: map of service type domain to set of instance names.
3389    fn notify_service_removal(&self, expired: HashMap<String, HashSet<String>>) {
3390        for (ty_domain, sender) in self.service_queriers.iter() {
3391            if let Some(instances) = expired.get(ty_domain) {
3392                for instance_name in instances {
3393                    let event = ServiceEvent::ServiceRemoved(
3394                        ty_domain.to_string(),
3395                        instance_name.to_string(),
3396                    );
3397                    match sender.send(event) {
3398                        Ok(()) => debug!("notify_service_removal: sent ServiceRemoved to listener of {ty_domain}: {instance_name}"),
3399                        Err(e) => debug!("Failed to send event: {}", e),
3400                    }
3401                }
3402            }
3403        }
3404    }
3405
3406    /// The entry point that executes all commands received by the daemon.
3407    ///
3408    /// `repeating`: whether this is a retransmission.
3409    fn exec_command(&mut self, command: Command, repeating: bool) {
3410        trace!("exec_command: {:?} repeating: {}", &command, repeating);
3411        match command {
3412            Command::Browse(ty, next_delay, cache_only, listener) => {
3413                self.exec_command_browse(repeating, ty, next_delay, cache_only, listener);
3414            }
3415
3416            Command::ResolveHostname(hostname, next_delay, listener, timeout) => {
3417                self.exec_command_resolve_hostname(
3418                    repeating, hostname, next_delay, listener, timeout,
3419                );
3420            }
3421
3422            Command::Register(service_info) => {
3423                self.register_service(*service_info);
3424                self.increase_counter(Counter::Register, 1);
3425            }
3426
3427            Command::RegisterResend(fullname, intf) => {
3428                trace!("register-resend service: {fullname} on {}", &intf);
3429                if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
3430                    self.exec_command_register_resend(fullname, intf)
3431                {
3432                    let invalid_intf_addr = HashSet::from([intf_addr]);
3433                    let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addr));
3434                }
3435            }
3436
3437            Command::Unregister(fullname, resp_s) => {
3438                trace!("unregister service {} repeat {}", &fullname, &repeating);
3439                self.exec_command_unregister(repeating, fullname, resp_s);
3440            }
3441
3442            Command::UnregisterResend(packet, if_index, is_ipv4) => {
3443                self.exec_command_unregister_resend(packet, if_index, is_ipv4);
3444            }
3445
3446            Command::StopBrowse(ty_domain) => self.exec_command_stop_browse(ty_domain),
3447
3448            Command::StopResolveHostname(hostname) => {
3449                self.exec_command_stop_resolve_hostname(hostname.to_lowercase())
3450            }
3451
3452            Command::Resolve(instance, try_count) => self.exec_command_resolve(instance, try_count),
3453
3454            Command::GetMetrics(resp_s) => self.exec_command_get_metrics(resp_s),
3455
3456            Command::GetStatus(resp_s) => match resp_s.send(self.status.clone()) {
3457                Ok(()) => trace!("Sent status to the client"),
3458                Err(e) => debug!("Failed to send status: {}", e),
3459            },
3460
3461            Command::Monitor(resp_s) => {
3462                self.monitors.push(resp_s);
3463            }
3464
3465            Command::SetOption(daemon_opt) => {
3466                self.process_set_option(daemon_opt);
3467            }
3468
3469            Command::GetOption(resp_s) => {
3470                let val = DaemonOptionVal {
3471                    _service_name_len_max: self.service_name_len_max,
3472                    ip_check_interval: self.ip_check_interval,
3473                };
3474                if let Err(e) = resp_s.send(val) {
3475                    debug!("Failed to send options: {}", e);
3476                }
3477            }
3478
3479            Command::Verify(instance_fullname, timeout) => {
3480                self.exec_command_verify(instance_fullname, timeout, repeating);
3481            }
3482
3483            Command::InvalidIntfAddrs(invalid_intf_addrs) => {
3484                for intf_addr in invalid_intf_addrs {
3485                    self.del_interface_addr(&intf_addr);
3486                }
3487
3488                self.check_ip_changes();
3489            }
3490
3491            _ => {
3492                debug!("unexpected command: {:?}", &command);
3493            }
3494        }
3495    }
3496
3497    fn exec_command_get_metrics(&mut self, resp_s: Sender<HashMap<String, i64>>) {
3498        self.set_counter(Counter::CachedPTR, self.cache.ptr_count() as i64);
3499        self.set_counter(Counter::CachedSRV, self.cache.srv_count() as i64);
3500        self.set_counter(Counter::CachedAddr, self.cache.addr_count() as i64);
3501        self.set_counter(Counter::CachedTxt, self.cache.txt_count() as i64);
3502        self.set_counter(Counter::CachedNSec, self.cache.nsec_count() as i64);
3503        self.set_counter(Counter::CachedSubtype, self.cache.subtype_count() as i64);
3504        self.set_counter(Counter::Timer, self.timers.len() as i64);
3505
3506        let dns_registry_probe_count: usize = self
3507            .dns_registry_map
3508            .values()
3509            .map(|r| r.probing.len())
3510            .sum();
3511        self.set_counter(Counter::DnsRegistryProbe, dns_registry_probe_count as i64);
3512
3513        let dns_registry_active_count: usize = self
3514            .dns_registry_map
3515            .values()
3516            .map(|r| r.active.values().map(|a| a.len()).sum::<usize>())
3517            .sum();
3518        self.set_counter(Counter::DnsRegistryActive, dns_registry_active_count as i64);
3519
3520        let dns_registry_timer_count: usize = self
3521            .dns_registry_map
3522            .values()
3523            .map(|r| r.new_timers.len())
3524            .sum();
3525        self.set_counter(Counter::DnsRegistryTimer, dns_registry_timer_count as i64);
3526
3527        let dns_registry_name_change_count: usize = self
3528            .dns_registry_map
3529            .values()
3530            .map(|r| r.name_changes.len())
3531            .sum();
3532        self.set_counter(
3533            Counter::DnsRegistryNameChange,
3534            dns_registry_name_change_count as i64,
3535        );
3536
3537        // Send the metrics to the client.
3538        if let Err(e) = resp_s.send(self.counters.clone()) {
3539            debug!("Failed to send metrics: {}", e);
3540        }
3541    }
3542
3543    fn exec_command_browse(
3544        &mut self,
3545        repeating: bool,
3546        ty: String,
3547        next_delay: u32,
3548        cache_only: bool,
3549        listener: Sender<ServiceEvent>,
3550    ) {
3551        let pretty_addrs: Vec<String> = self
3552            .my_intfs
3553            .iter()
3554            .map(|(if_index, itf)| format!("{} ({if_index})", itf.name))
3555            .collect();
3556
3557        if let Err(e) = listener.send(ServiceEvent::SearchStarted(format!(
3558            "{ty} on {} interfaces [{}]",
3559            pretty_addrs.len(),
3560            pretty_addrs.join(", ")
3561        ))) {
3562            debug!(
3563                "Failed to send SearchStarted({})(repeating:{}): {}",
3564                &ty, repeating, e
3565            );
3566            return;
3567        }
3568
3569        let now = current_time_millis();
3570        if !repeating {
3571            // Binds a `listener` to querying mDNS domain type `ty`.
3572            //
3573            // If there is already a `listener`, it will be updated, i.e. overwritten.
3574            self.service_queriers.insert(ty.clone(), listener.clone());
3575
3576            // if we already have the records in our cache, just send them
3577            self.query_cache_for_service(&ty, &listener, now);
3578        }
3579
3580        if cache_only {
3581            // If cache_only is true, we do not send a query.
3582            match listener.send(ServiceEvent::SearchStopped(ty.clone())) {
3583                Ok(()) => debug!("SearchStopped sent for {}", &ty),
3584                Err(e) => debug!("Failed to send SearchStopped: {}", e),
3585            }
3586            return;
3587        }
3588
3589        self.send_query(&ty, RRType::PTR);
3590        self.increase_counter(Counter::Browse, 1);
3591
3592        let next_time = now + (next_delay * 1000) as u64;
3593        let max_delay = 60 * 60;
3594        let delay = cmp::min(next_delay * 2, max_delay);
3595        self.add_retransmission(next_time, Command::Browse(ty, delay, cache_only, listener));
3596    }
3597
3598    fn exec_command_resolve_hostname(
3599        &mut self,
3600        repeating: bool,
3601        hostname: String,
3602        next_delay: u32,
3603        listener: Sender<HostnameResolutionEvent>,
3604        timeout: Option<u64>,
3605    ) {
3606        let addr_list: Vec<_> = self.my_intfs.iter().collect();
3607        if let Err(e) = listener.send(HostnameResolutionEvent::SearchStarted(format!(
3608            "{} on addrs {:?}",
3609            &hostname, &addr_list
3610        ))) {
3611            debug!(
3612                "Failed to send ResolveStarted({})(repeating:{}): {}",
3613                &hostname, repeating, e
3614            );
3615            return;
3616        }
3617        if !repeating {
3618            self.add_hostname_resolver(hostname.to_owned(), listener.clone(), timeout);
3619            // if we already have the records in our cache, just send them
3620            self.query_cache_for_hostname(&hostname, listener.clone());
3621        }
3622
3623        self.send_query_vec(&[(&hostname, RRType::A), (&hostname, RRType::AAAA)]);
3624        self.increase_counter(Counter::ResolveHostname, 1);
3625
3626        let now = current_time_millis();
3627        let next_time = now + u64::from(next_delay) * 1000;
3628        let max_delay = 60 * 60;
3629        let delay = cmp::min(next_delay * 2, max_delay);
3630
3631        // Only add retransmission if it does not exceed the hostname resolver timeout, if any.
3632        if self
3633            .hostname_resolvers
3634            .get(&hostname)
3635            .and_then(|(_sender, timeout)| *timeout)
3636            .map(|timeout| next_time < timeout)
3637            .unwrap_or(true)
3638        {
3639            self.add_retransmission(
3640                next_time,
3641                Command::ResolveHostname(hostname, delay, listener, None),
3642            );
3643        }
3644    }
3645
3646    fn exec_command_resolve(&mut self, instance: String, try_count: u16) {
3647        let pending_query = self.query_unresolved(&instance);
3648        let max_try = 3;
3649        if pending_query && try_count < max_try {
3650            // Note that if the current try already succeeds, the next retransmission
3651            // will be no-op as the cache has been updated.
3652            let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
3653            self.add_retransmission(next_time, Command::Resolve(instance, try_count + 1));
3654        }
3655    }
3656
3657    fn exec_command_unregister(
3658        &mut self,
3659        repeating: bool,
3660        fullname: String,
3661        resp_s: Sender<UnregisterStatus>,
3662    ) {
3663        let response = match self.my_services.remove_entry(&fullname) {
3664            None => {
3665                debug!("unregister: cannot find such service {}", &fullname);
3666                UnregisterStatus::NotFound
3667            }
3668            Some((_k, info)) => {
3669                let mut timers = Vec::new();
3670
3671                for (if_index, intf) in self.my_intfs.iter() {
3672                    if let Some(sock) = self.ipv4_sock.as_ref() {
3673                        let packet = self.unregister_service(&info, intf, &sock.pktinfo);
3674                        // repeat for one time just in case some peers miss the message
3675                        if !repeating && !packet.is_empty() {
3676                            let next_time = current_time_millis() + 120;
3677                            self.retransmissions.push(ReRun {
3678                                next_time,
3679                                command: Command::UnregisterResend(packet, *if_index, true),
3680                            });
3681                            timers.push(next_time);
3682                        }
3683                    }
3684
3685                    // ipv6
3686                    if let Some(sock) = self.ipv6_sock.as_ref() {
3687                        let packet = self.unregister_service(&info, intf, &sock.pktinfo);
3688                        if !repeating && !packet.is_empty() {
3689                            let next_time = current_time_millis() + 120;
3690                            self.retransmissions.push(ReRun {
3691                                next_time,
3692                                command: Command::UnregisterResend(packet, *if_index, false),
3693                            });
3694                            timers.push(next_time);
3695                        }
3696                    }
3697                }
3698
3699                for t in timers {
3700                    self.add_timer(t);
3701                }
3702
3703                self.increase_counter(Counter::Unregister, 1);
3704                UnregisterStatus::OK
3705            }
3706        };
3707        if let Err(e) = resp_s.send(response) {
3708            debug!("unregister: failed to send response: {}", e);
3709        }
3710    }
3711
3712    fn exec_command_unregister_resend(&mut self, packet: Vec<u8>, if_index: u32, is_ipv4: bool) {
3713        let Some(intf) = self.my_intfs.get(&if_index) else {
3714            return;
3715        };
3716        let sock_opt = if is_ipv4 {
3717            &self.ipv4_sock
3718        } else {
3719            &self.ipv6_sock
3720        };
3721        let Some(sock) = sock_opt else {
3722            return;
3723        };
3724
3725        let if_addr = if is_ipv4 {
3726            match intf.next_ifaddr_v4() {
3727                Some(addr) => addr,
3728                None => return,
3729            }
3730        } else {
3731            match intf.next_ifaddr_v6() {
3732                Some(addr) => addr,
3733                None => return,
3734            }
3735        };
3736
3737        debug!("UnregisterResend from {:?}", if_addr);
3738        multicast_on_intf(
3739            &packet[..],
3740            &intf.name,
3741            intf.index,
3742            if_addr,
3743            &sock.pktinfo,
3744            self.port,
3745        );
3746
3747        self.increase_counter(Counter::UnregisterResend, 1);
3748    }
3749
3750    fn exec_command_stop_browse(&mut self, ty_domain: String) {
3751        match self.service_queriers.remove_entry(&ty_domain) {
3752            None => debug!("StopBrowse: cannot find querier for {}", &ty_domain),
3753            Some((ty, sender)) => {
3754                // Remove pending browse commands in the reruns.
3755                trace!("StopBrowse: removed queryer for {}", &ty);
3756                let mut i = 0;
3757                while i < self.retransmissions.len() {
3758                    if let Command::Browse(t, _, _, _) = &self.retransmissions[i].command {
3759                        if t == &ty {
3760                            self.retransmissions.remove(i);
3761                            trace!("StopBrowse: removed retransmission for {}", &ty);
3762                            continue;
3763                        }
3764                    }
3765                    i += 1;
3766                }
3767
3768                // Remove cache entries.
3769                self.cache.remove_service_type(&ty_domain);
3770
3771                // Notify the client.
3772                match sender.send(ServiceEvent::SearchStopped(ty_domain)) {
3773                    Ok(()) => trace!("Sent SearchStopped to the listener"),
3774                    Err(e) => debug!("Failed to send SearchStopped: {}", e),
3775                }
3776            }
3777        }
3778    }
3779
3780    fn exec_command_stop_resolve_hostname(&mut self, hostname: String) {
3781        if let Some((host, (sender, _timeout))) = self.hostname_resolvers.remove_entry(&hostname) {
3782            // Remove pending resolve commands in the reruns.
3783            trace!("StopResolve: removed queryer for {}", &host);
3784            let mut i = 0;
3785            while i < self.retransmissions.len() {
3786                if let Command::Resolve(t, _) = &self.retransmissions[i].command {
3787                    if t == &host {
3788                        self.retransmissions.remove(i);
3789                        trace!("StopResolve: removed retransmission for {}", &host);
3790                        continue;
3791                    }
3792                }
3793                i += 1;
3794            }
3795
3796            // Notify the client.
3797            match sender.send(HostnameResolutionEvent::SearchStopped(hostname)) {
3798                Ok(()) => trace!("Sent SearchStopped to the listener"),
3799                Err(e) => debug!("Failed to send SearchStopped: {}", e),
3800            }
3801        }
3802    }
3803
3804    fn exec_command_register_resend(&mut self, fullname: String, if_index: u32) -> MyResult<()> {
3805        let Some(info) = self.my_services.get_mut(&fullname) else {
3806            trace!("announce: cannot find such service {}", &fullname);
3807            return Ok(());
3808        };
3809
3810        let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
3811            return Ok(());
3812        };
3813
3814        let Some(intf) = self.my_intfs.get(&if_index) else {
3815            return Ok(());
3816        };
3817
3818        let announced_v4 = if let Some(sock) = self.ipv4_sock.as_ref() {
3819            announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo, self.port)?
3820        } else {
3821            false
3822        };
3823        let announced_v6 = if let Some(sock) = self.ipv6_sock.as_ref() {
3824            announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo, self.port)?
3825        } else {
3826            false
3827        };
3828
3829        if announced_v4 || announced_v6 {
3830            let hostname = dns_registry.resolve_name(info.get_hostname());
3831            let service_name = dns_registry.resolve_name(&fullname).to_string();
3832
3833            debug!("resend: announce service {service_name} on {}", intf.name);
3834
3835            notify_monitors(
3836                &mut self.monitors,
3837                DaemonEvent::Announce(service_name, format!("{}:{}", hostname, &intf.name)),
3838            );
3839            info.set_status(if_index, ServiceStatus::Announced);
3840        } else {
3841            debug!("register-resend should not fail");
3842        }
3843
3844        self.increase_counter(Counter::RegisterResend, 1);
3845        Ok(())
3846    }
3847
3848    fn exec_command_verify(&mut self, instance: String, timeout: Duration, repeating: bool) {
3849        /*
3850        RFC 6762 section 10.4:
3851        ...
3852        When the cache receives this hint that it should reconfirm some
3853        record, it MUST issue two or more queries for the resource record in
3854        dispute.  If no response is received within ten seconds, then, even
3855        though its TTL may indicate that it is not yet due to expire, that
3856        record SHOULD be promptly flushed from the cache.
3857        */
3858        let now = current_time_millis();
3859        let expire_at = if repeating {
3860            None
3861        } else {
3862            Some(now + timeout.as_millis() as u64)
3863        };
3864
3865        // send query for the resource records.
3866        let record_vec = self.cache.service_verify_queries(&instance, expire_at);
3867
3868        if !record_vec.is_empty() {
3869            let query_vec: Vec<(&str, RRType)> = record_vec
3870                .iter()
3871                .map(|(record, rr_type)| (record.as_str(), *rr_type))
3872                .collect();
3873            self.send_query_vec(&query_vec);
3874
3875            if let Some(new_expire) = expire_at {
3876                self.add_timer(new_expire); // ensure a check for the new expire time.
3877
3878                // schedule a resend 1 second later
3879                self.add_retransmission(now + 1000, Command::Verify(instance, timeout));
3880            }
3881        }
3882    }
3883
3884    /// Refresh cached service records with active queriers
3885    fn refresh_active_services(&mut self) {
3886        let mut query_ptr_count = 0;
3887        let mut query_srv_count = 0;
3888        let mut new_timers = HashSet::new();
3889        let mut query_addr_count = 0;
3890
3891        for (ty_domain, _sender) in self.service_queriers.iter() {
3892            let refreshed_timers = self.cache.refresh_due_ptr(ty_domain);
3893            if !refreshed_timers.is_empty() {
3894                trace!("sending refresh query for PTR: {}", ty_domain);
3895                self.send_query(ty_domain, RRType::PTR);
3896                query_ptr_count += 1;
3897                new_timers.extend(refreshed_timers);
3898            }
3899
3900            let (instances, timers) = self.cache.refresh_due_srv_txt(ty_domain);
3901            for (instance, types) in instances {
3902                trace!("sending refresh query for: {}", &instance);
3903                let query_vec = types
3904                    .into_iter()
3905                    .map(|ty| (instance.as_str(), ty))
3906                    .collect::<Vec<_>>();
3907                self.send_query_vec(&query_vec);
3908                query_srv_count += 1;
3909            }
3910            new_timers.extend(timers);
3911            let (hostnames, timers) = self.cache.refresh_due_hosts(ty_domain);
3912            for hostname in hostnames.iter() {
3913                trace!("sending refresh queries for A and AAAA:  {}", hostname);
3914                self.send_query_vec(&[(hostname, RRType::A), (hostname, RRType::AAAA)]);
3915                query_addr_count += 2;
3916            }
3917            new_timers.extend(timers);
3918        }
3919
3920        for timer in new_timers {
3921            self.add_timer(timer);
3922        }
3923
3924        self.increase_counter(Counter::CacheRefreshPTR, query_ptr_count);
3925        self.increase_counter(Counter::CacheRefreshSrvTxt, query_srv_count);
3926        self.increase_counter(Counter::CacheRefreshAddr, query_addr_count);
3927    }
3928}
3929
3930/// Adds one or more answers of a service for incoming msg and RR entry name.
3931fn add_answer_of_service(
3932    out: &mut DnsOutgoing,
3933    msg: &DnsIncoming,
3934    entry_name: &str,
3935    service: &ServiceInfo,
3936    qtype: RRType,
3937    intf_addrs: Vec<IpAddr>,
3938) {
3939    if qtype == RRType::SRV || qtype == RRType::ANY {
3940        out.add_answer(
3941            msg,
3942            DnsSrv::new(
3943                entry_name,
3944                CLASS_IN | CLASS_CACHE_FLUSH,
3945                service.get_host_ttl(),
3946                service.get_priority(),
3947                service.get_weight(),
3948                service.get_port(),
3949                service.get_hostname().to_string(),
3950            ),
3951        );
3952    }
3953
3954    if qtype == RRType::TXT || qtype == RRType::ANY {
3955        out.add_answer(
3956            msg,
3957            DnsTxt::new(
3958                entry_name,
3959                CLASS_IN | CLASS_CACHE_FLUSH,
3960                service.get_other_ttl(),
3961                service.generate_txt(),
3962            ),
3963        );
3964    }
3965
3966    if qtype == RRType::SRV {
3967        for address in intf_addrs {
3968            out.add_additional_answer(DnsAddress::new(
3969                service.get_hostname(),
3970                ip_address_rr_type(&address),
3971                CLASS_IN | CLASS_CACHE_FLUSH,
3972                service.get_host_ttl(),
3973                address,
3974                InterfaceId::default(),
3975            ));
3976        }
3977    }
3978}
3979
3980/// All possible events sent to the client from the daemon
3981/// regarding service discovery.
3982#[derive(Clone, Debug)]
3983#[non_exhaustive]
3984pub enum ServiceEvent {
3985    /// Started searching for a service type.
3986    SearchStarted(String),
3987
3988    /// Found a specific (service_type, fullname).
3989    ServiceFound(String, String),
3990
3991    /// Resolved a service instance in a ResolvedService struct.
3992    ServiceResolved(Box<ResolvedService>),
3993
3994    /// A service instance (service_type, fullname) was removed.
3995    ServiceRemoved(String, String),
3996
3997    /// Stopped searching for a service type.
3998    SearchStopped(String),
3999}
4000
4001/// All possible events sent to the client from the daemon
4002/// regarding host resolution.
4003#[derive(Clone, Debug)]
4004#[non_exhaustive]
4005pub enum HostnameResolutionEvent {
4006    /// Started searching for the ip address of a hostname.
4007    SearchStarted(String),
4008    /// One or more addresses for a hostname has been found.
4009    AddressesFound(String, HashSet<ScopedIp>),
4010    /// One or more addresses for a hostname has been removed.
4011    AddressesRemoved(String, HashSet<ScopedIp>),
4012    /// The search for the ip address of a hostname has timed out.
4013    SearchTimeout(String),
4014    /// Stopped searching for the ip address of a hostname.
4015    SearchStopped(String),
4016}
4017
4018/// Some notable events from the daemon besides [`ServiceEvent`].
4019/// These events are expected to happen infrequently.
4020#[derive(Clone, Debug)]
4021#[non_exhaustive]
4022pub enum DaemonEvent {
4023    /// Daemon unsolicitly announced a service from an interface.
4024    Announce(String, String),
4025
4026    /// Daemon encountered an error.
4027    Error(Error),
4028
4029    /// Daemon detected a new IP address from the host.
4030    IpAdd(IpAddr),
4031
4032    /// Daemon detected a IP address removed from the host.
4033    IpDel(IpAddr),
4034
4035    /// Daemon resolved a name conflict by changing one of its names.
4036    /// see [DnsNameChange] for more details.
4037    NameChange(DnsNameChange),
4038
4039    /// Send out a multicast response via an interface.
4040    Respond(String),
4041}
4042
4043/// Represents a name change due to a name conflict resolution.
4044/// See [RFC 6762 section 9](https://datatracker.ietf.org/doc/html/rfc6762#section-9)
4045#[derive(Clone, Debug)]
4046pub struct DnsNameChange {
4047    /// The original name set in `ServiceInfo` by the user.
4048    pub original: String,
4049
4050    /// A new name is created by appending a suffix after the original name.
4051    ///
4052    /// - for a service instance name, the suffix is `(N)`, where N starts at 2.
4053    /// - for a host name, the suffix is `-N`, where N starts at 2.
4054    ///
4055    /// For example:
4056    ///
4057    /// - Service name `foo._service-type._udp` becomes `foo (2)._service-type._udp`
4058    /// - Host name `foo.local.` becomes `foo-2.local.`
4059    pub new_name: String,
4060
4061    /// The resource record type
4062    pub rr_type: RRType,
4063
4064    /// The interface where the name conflict and its change happened.
4065    pub intf_name: String,
4066}
4067
4068/// Commands supported by the daemon
4069#[derive(Debug)]
4070enum Command {
4071    /// Browsing for a service type (ty_domain, next_time_delay_in_seconds, channel::sender)
4072    Browse(String, u32, bool, Sender<ServiceEvent>),
4073
4074    /// Resolve a hostname to IP addresses.
4075    ResolveHostname(String, u32, Sender<HostnameResolutionEvent>, Option<u64>), // (hostname, next_time_delay_in_seconds, sender, timeout_in_milliseconds)
4076
4077    /// Register a service
4078    Register(Box<ServiceInfo>),
4079
4080    /// Unregister a service
4081    Unregister(String, Sender<UnregisterStatus>), // (fullname)
4082
4083    /// Announce again a service to local network
4084    RegisterResend(String, u32), // (fullname)
4085
4086    /// Resend unregister packet.
4087    UnregisterResend(Vec<u8>, u32, bool), // (packet content, if_index, is_ipv4)
4088
4089    /// Stop browsing a service type
4090    StopBrowse(String), // (ty_domain)
4091
4092    /// Stop resolving a hostname
4093    StopResolveHostname(String), // (hostname)
4094
4095    /// Send query to resolve a service instance.
4096    /// This is used when a PTR record exists but SRV & TXT records are missing.
4097    Resolve(String, u16), // (service_instance_fullname, try_count)
4098
4099    /// Read the current values of the counters
4100    GetMetrics(Sender<Metrics>),
4101
4102    /// Get the current status of the daemon.
4103    GetStatus(Sender<DaemonStatus>),
4104
4105    /// Monitor noticeable events in the daemon.
4106    Monitor(Sender<DaemonEvent>),
4107
4108    SetOption(DaemonOption),
4109
4110    GetOption(Sender<DaemonOptionVal>),
4111
4112    /// Proactively confirm a DNS resource record.
4113    ///
4114    /// The intention is to check if a service name or IP address still valid
4115    /// before its TTL expires.
4116    Verify(String, Duration),
4117
4118    /// Invalidate some interface addresses.
4119    InvalidIntfAddrs(HashSet<Interface>),
4120
4121    Exit(Sender<DaemonStatus>),
4122}
4123
4124impl fmt::Display for Command {
4125    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4126        match self {
4127            Self::Browse(_, _, _, _) => write!(f, "Command Browse"),
4128            Self::ResolveHostname(_, _, _, _) => write!(f, "Command ResolveHostname"),
4129            Self::Exit(_) => write!(f, "Command Exit"),
4130            Self::GetStatus(_) => write!(f, "Command GetStatus"),
4131            Self::GetMetrics(_) => write!(f, "Command GetMetrics"),
4132            Self::Monitor(_) => write!(f, "Command Monitor"),
4133            Self::Register(_) => write!(f, "Command Register"),
4134            Self::RegisterResend(_, _) => write!(f, "Command RegisterResend"),
4135            Self::SetOption(_) => write!(f, "Command SetOption"),
4136            Self::GetOption(_) => write!(f, "Command GetOption"),
4137            Self::StopBrowse(_) => write!(f, "Command StopBrowse"),
4138            Self::StopResolveHostname(_) => write!(f, "Command StopResolveHostname"),
4139            Self::Unregister(_, _) => write!(f, "Command Unregister"),
4140            Self::UnregisterResend(_, _, _) => write!(f, "Command UnregisterResend"),
4141            Self::Resolve(_, _) => write!(f, "Command Resolve"),
4142            Self::Verify(_, _) => write!(f, "Command VerifyResource"),
4143            Self::InvalidIntfAddrs(_) => write!(f, "Command InvalidIntfAddrs"),
4144        }
4145    }
4146}
4147
4148struct DaemonOptionVal {
4149    _service_name_len_max: u8,
4150    ip_check_interval: u64,
4151}
4152
4153#[derive(Debug)]
4154enum DaemonOption {
4155    ServiceNameLenMax(u8),
4156    IpCheckInterval(u64),
4157    EnableInterface(Vec<IfKind>),
4158    DisableInterface(Vec<IfKind>),
4159    MulticastLoopV4(bool),
4160    MulticastLoopV6(bool),
4161    AcceptUnsolicited(bool),
4162    IncludeAppleP2P(bool),
4163    #[cfg(test)]
4164    TestDownInterface(String),
4165    #[cfg(test)]
4166    TestUpInterface(String),
4167}
4168
4169/// The length of Service Domain name supported in this lib.
4170const DOMAIN_LEN: usize = "._tcp.local.".len();
4171
4172/// Validate the length of "service_name" in a "_<service_name>.<domain_name>." string.
4173fn check_service_name_length(ty_domain: &str, limit: u8) -> Result<()> {
4174    if ty_domain.len() <= DOMAIN_LEN + 1 {
4175        // service name cannot be empty or only '_'.
4176        return Err(e_fmt!("Service type name cannot be empty: {}", ty_domain));
4177    }
4178
4179    let service_name_len = ty_domain.len() - DOMAIN_LEN - 1; // exclude the leading `_`
4180    if service_name_len > limit as usize {
4181        return Err(e_fmt!("Service name length must be <= {} bytes", limit));
4182    }
4183    Ok(())
4184}
4185
4186/// Checks if `name` ends with a valid domain: '._tcp.local.' or '._udp.local.'
4187fn check_domain_suffix(name: &str) -> Result<()> {
4188    if !(name.ends_with("._tcp.local.") || name.ends_with("._udp.local.")) {
4189        return Err(e_fmt!(
4190            "mDNS service {} must end with '._tcp.local.' or '._udp.local.'",
4191            name
4192        ));
4193    }
4194
4195    Ok(())
4196}
4197
4198/// Validate the service name in a fully qualified name.
4199///
4200/// A Full Name = <Instance>.<Service>.<Domain>
4201/// The only `<Domain>` supported are "._tcp.local." and "._udp.local.".
4202///
4203/// Note: this function does not check for the length of the service name.
4204/// Instead, `register_service` method will check the length.
4205fn check_service_name(fullname: &str) -> Result<()> {
4206    check_domain_suffix(fullname)?;
4207
4208    let remaining: Vec<&str> = fullname[..fullname.len() - DOMAIN_LEN].split('.').collect();
4209    let name = remaining.last().ok_or_else(|| e_fmt!("No service name"))?;
4210
4211    if &name[0..1] != "_" {
4212        return Err(e_fmt!("Service name must start with '_'"));
4213    }
4214
4215    let name = &name[1..];
4216
4217    if name.contains("--") {
4218        return Err(e_fmt!("Service name must not contain '--'"));
4219    }
4220
4221    if name.starts_with('-') || name.ends_with('-') {
4222        return Err(e_fmt!("Service name (%s) may not start or end with '-'"));
4223    }
4224
4225    let ascii_count = name.chars().filter(|c| c.is_ascii_alphabetic()).count();
4226    if ascii_count < 1 {
4227        return Err(e_fmt!(
4228            "Service name must contain at least one letter (eg: 'A-Za-z')"
4229        ));
4230    }
4231
4232    Ok(())
4233}
4234
4235/// Validate a hostname.
4236fn check_hostname(hostname: &str) -> Result<()> {
4237    if !hostname.ends_with(".local.") {
4238        return Err(e_fmt!("Hostname must end with '.local.': {hostname}"));
4239    }
4240
4241    if hostname == ".local." {
4242        return Err(e_fmt!(
4243            "The part of the hostname before '.local.' cannot be empty"
4244        ));
4245    }
4246
4247    if hostname.len() > 255 {
4248        return Err(e_fmt!("Hostname length must be <= 255 bytes"));
4249    }
4250
4251    Ok(())
4252}
4253
4254fn call_service_listener(
4255    listeners_map: &HashMap<String, Sender<ServiceEvent>>,
4256    ty_domain: &str,
4257    event: ServiceEvent,
4258) {
4259    if let Some(listener) = listeners_map.get(ty_domain) {
4260        match listener.send(event) {
4261            Ok(()) => trace!("Sent event to listener successfully"),
4262            Err(e) => debug!("Failed to send event: {}", e),
4263        }
4264    }
4265}
4266
4267fn call_hostname_resolution_listener(
4268    listeners_map: &HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>,
4269    hostname: &str,
4270    event: HostnameResolutionEvent,
4271) {
4272    let hostname_lower = hostname.to_lowercase();
4273    if let Some(listener) = listeners_map.get(&hostname_lower).map(|(l, _)| l) {
4274        match listener.send(event) {
4275            Ok(()) => trace!("Sent event to listener successfully"),
4276            Err(e) => debug!("Failed to send event: {}", e),
4277        }
4278    }
4279}
4280
4281/// Returns valid network interfaces in the host system.
4282/// Operational down interfaces are excluded.
4283/// Loopback interfaces are excluded if `with_loopback` is false.
4284fn my_ip_interfaces(with_loopback: bool) -> Vec<Interface> {
4285    my_ip_interfaces_inner(with_loopback, false)
4286}
4287
4288fn my_ip_interfaces_inner(with_loopback: bool, with_apple_p2p: bool) -> Vec<Interface> {
4289    if_addrs::get_if_addrs()
4290        .unwrap_or_default()
4291        .into_iter()
4292        .filter(|i| {
4293            i.is_oper_up()
4294                && !i.is_p2p()
4295                && (!i.is_loopback() || with_loopback)
4296                && (with_apple_p2p || !is_apple_p2p_by_name(&i.name))
4297        })
4298        .collect()
4299}
4300
4301/// Checks if the interface name indicates it's an Apple peer-to-peer interface,
4302/// which should be ignored by default.
4303fn is_apple_p2p_by_name(name: &str) -> bool {
4304    let p2p_prefixes = ["awdl", "llw"];
4305    p2p_prefixes.iter().any(|prefix| name.starts_with(prefix))
4306}
4307
4308/// Send an outgoing mDNS query or response, and returns the packet bytes.
4309/// Returns empty vec if no valid interface address is found.
4310fn send_dns_outgoing(
4311    out: &DnsOutgoing,
4312    my_intf: &MyIntf,
4313    sock: &PktInfoUdpSocket,
4314    port: u16,
4315    source: Option<&IfAddr>,
4316    unicast_dest: Option<SocketAddr>,
4317) -> MyResult<Vec<Vec<u8>>> {
4318    let if_name = &my_intf.name;
4319
4320    let if_addr = match source {
4321        Some(addr) => addr,
4322        None => {
4323            if sock.domain() == Domain::IPV4 {
4324                match my_intf.next_ifaddr_v4() {
4325                    Some(addr) => addr,
4326                    None => return Ok(vec![]),
4327                }
4328            } else {
4329                match my_intf.next_ifaddr_v6() {
4330                    Some(addr) => addr,
4331                    None => return Ok(vec![]),
4332                }
4333            }
4334        }
4335    };
4336
4337    send_dns_outgoing_impl(
4338        out,
4339        if_name,
4340        my_intf.index,
4341        if_addr,
4342        sock,
4343        port,
4344        unicast_dest,
4345    )
4346}
4347
4348/// Send an outgoing mDNS query or response, and returns the packet bytes.
4349fn send_dns_outgoing_impl(
4350    out: &DnsOutgoing,
4351    if_name: &str,
4352    if_index: u32,
4353    if_addr: &IfAddr,
4354    sock: &PktInfoUdpSocket,
4355    port: u16,
4356    unicast_dest: Option<SocketAddr>,
4357) -> MyResult<Vec<Vec<u8>>> {
4358    let qtype = if out.is_query() {
4359        "query"
4360    } else {
4361        if out.answers_count() == 0 && out.additionals().is_empty() {
4362            return Ok(vec![]); // no need to send empty response
4363        }
4364        "response"
4365    };
4366    trace!(
4367        "send {}: {} questions {} answers {} authorities {} additional",
4368        qtype,
4369        out.questions().len(),
4370        out.answers_count(),
4371        out.authorities().len(),
4372        out.additionals().len()
4373    );
4374
4375    match if_addr.ip() {
4376        IpAddr::V4(ipv4) => {
4377            if let Err(e) = sock.set_multicast_if_v4(&ipv4) {
4378                debug!(
4379                    "send_dns_outgoing: failed to set multicast interface for IPv4 {}: {}",
4380                    ipv4, e
4381                );
4382                // cannot send without a valid interface
4383                if e.kind() == std::io::ErrorKind::AddrNotAvailable {
4384                    let intf_addr = Interface {
4385                        name: if_name.to_string(),
4386                        addr: if_addr.clone(),
4387                        index: Some(if_index),
4388                        oper_status: if_addrs::IfOperStatus::Down,
4389                        is_p2p: false,
4390                        #[cfg(windows)]
4391                        adapter_name: String::new(),
4392                    };
4393                    return Err(InternalError::IntfAddrInvalid(intf_addr));
4394                }
4395                return Ok(vec![]); // non-fatal other failure
4396            }
4397        }
4398        IpAddr::V6(ipv6) => {
4399            if let Err(e) = sock.set_multicast_if_v6(if_index) {
4400                debug!(
4401                    "send_dns_outgoing: failed to set multicast interface for IPv6 {}: {}",
4402                    ipv6, e
4403                );
4404                // cannot send without a valid interface
4405                if e.kind() == std::io::ErrorKind::AddrNotAvailable {
4406                    let intf_addr = Interface {
4407                        name: if_name.to_string(),
4408                        addr: if_addr.clone(),
4409                        index: Some(if_index),
4410                        oper_status: if_addrs::IfOperStatus::Down,
4411                        is_p2p: false,
4412                        #[cfg(windows)]
4413                        adapter_name: String::new(),
4414                    };
4415                    return Err(InternalError::IntfAddrInvalid(intf_addr));
4416                }
4417                return Ok(vec![]); // non-fatal other failure
4418            }
4419        }
4420    }
4421
4422    let packet_list = out.to_data_on_wire();
4423    for packet in packet_list.iter() {
4424        match unicast_dest {
4425            Some(dest) => unicast_on_intf(packet, if_name, dest, sock),
4426            None => multicast_on_intf(packet, if_name, if_index, if_addr, sock, port),
4427        }
4428    }
4429    Ok(packet_list)
4430}
4431
4432/// Sends a unicast packet directly to `dest` (used for RFC 6762 §6.7
4433/// legacy unicast responses).
4434fn unicast_on_intf(packet: &[u8], if_name: &str, dest: SocketAddr, socket: &PktInfoUdpSocket) {
4435    if packet.len() > MAX_MSG_ABSOLUTE {
4436        debug!("Drop over-sized packet ({})", packet.len());
4437        return;
4438    }
4439
4440    let sock_addr = dest.into();
4441    match socket.send_to(packet, &sock_addr) {
4442        Ok(sz) => trace!(
4443            "sent unicast {} bytes on interface {} to {}",
4444            sz,
4445            if_name,
4446            dest
4447        ),
4448        Err(e) => trace!(
4449            "Failed to send unicast to {} via {:?}: {}",
4450            dest,
4451            &if_name,
4452            e
4453        ),
4454    }
4455}
4456
4457/// Sends a multicast packet, and returns the packet bytes.
4458fn multicast_on_intf(
4459    packet: &[u8],
4460    if_name: &str,
4461    if_index: u32,
4462    if_addr: &IfAddr,
4463    socket: &PktInfoUdpSocket,
4464    port: u16,
4465) {
4466    if packet.len() > MAX_MSG_ABSOLUTE {
4467        debug!("Drop over-sized packet ({})", packet.len());
4468        return;
4469    }
4470
4471    let addr: SocketAddr = match if_addr {
4472        if_addrs::IfAddr::V4(_) => SocketAddrV4::new(GROUP_ADDR_V4, port).into(),
4473        if_addrs::IfAddr::V6(_) => {
4474            let mut sock = SocketAddrV6::new(GROUP_ADDR_V6, port, 0, 0);
4475            sock.set_scope_id(if_index); // Choose iface for multicast
4476            sock.into()
4477        }
4478    };
4479
4480    // Sends out `packet` to `addr` on the socket.
4481    let sock_addr = addr.into();
4482    match socket.send_to(packet, &sock_addr) {
4483        Ok(sz) => trace!(
4484            "sent out {} bytes on interface {} (idx {}) addr {}",
4485            sz,
4486            if_name,
4487            if_index,
4488            if_addr.ip()
4489        ),
4490        Err(e) => trace!("Failed to send to {} via {:?}: {}", addr, &if_name, e),
4491    }
4492}
4493
4494/// Returns true if `name` is a valid instance name of format:
4495/// <instance>.<service_type>.<_udp|_tcp>.local.
4496/// Note: <instance> could contain '.' as well.
4497fn valid_instance_name(name: &str) -> bool {
4498    name.split('.').count() >= 5
4499}
4500
4501fn notify_monitors(monitors: &mut Vec<Sender<DaemonEvent>>, event: DaemonEvent) {
4502    monitors.retain(|sender| {
4503        if let Err(e) = sender.try_send(event.clone()) {
4504            debug!("notify_monitors: try_send: {}", &e);
4505            if matches!(e, TrySendError::Disconnected(_)) {
4506                return false; // This monitor is dropped.
4507            }
4508        }
4509        true
4510    });
4511}
4512
4513/// Check if all unique records passed "probing", and if yes, create a packet
4514/// to announce the service.
4515fn prepare_announce(
4516    info: &ServiceInfo,
4517    intf: &MyIntf,
4518    dns_registry: &mut DnsRegistry,
4519    is_ipv4: bool,
4520) -> Option<DnsOutgoing> {
4521    let intf_addrs = if is_ipv4 {
4522        info.get_addrs_on_my_intf_v4(intf)
4523    } else {
4524        info.get_addrs_on_my_intf_v6(intf)
4525    };
4526
4527    if intf_addrs.is_empty() {
4528        debug!(
4529            "prepare_announce (ipv4: {is_ipv4}): no valid addrs on interface {}",
4530            &intf.name
4531        );
4532        return None;
4533    }
4534
4535    // check if we changed our name due to conflicts.
4536    let service_fullname = dns_registry.resolve_name(info.get_fullname());
4537
4538    debug!(
4539        "prepare to announce service {service_fullname} on {:?}",
4540        &intf_addrs
4541    );
4542
4543    let mut probing_count = 0;
4544    let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
4545    let create_time = current_time_millis() + fastrand::u64(0..250);
4546
4547    out.add_answer_at_time(
4548        DnsPointer::new(
4549            info.get_type(),
4550            RRType::PTR,
4551            CLASS_IN,
4552            info.get_other_ttl(),
4553            service_fullname.to_string(),
4554        ),
4555        0,
4556    );
4557
4558    if let Some(sub) = info.get_subtype() {
4559        trace!("Adding subdomain {}", sub);
4560        out.add_answer_at_time(
4561            DnsPointer::new(
4562                sub,
4563                RRType::PTR,
4564                CLASS_IN,
4565                info.get_other_ttl(),
4566                service_fullname.to_string(),
4567            ),
4568            0,
4569        );
4570    }
4571
4572    // SRV records.
4573    let hostname = dns_registry.resolve_name(info.get_hostname()).to_string();
4574
4575    let mut srv = DnsSrv::new(
4576        info.get_fullname(),
4577        CLASS_IN | CLASS_CACHE_FLUSH,
4578        info.get_host_ttl(),
4579        info.get_priority(),
4580        info.get_weight(),
4581        info.get_port(),
4582        hostname,
4583    );
4584
4585    if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
4586        srv.get_record_mut().set_new_name(new_name.to_string());
4587    }
4588
4589    if !info.requires_probe()
4590        || dns_registry.is_probing_done(&srv, info.get_fullname(), create_time)
4591    {
4592        out.add_answer_at_time(srv, 0);
4593    } else {
4594        probing_count += 1;
4595    }
4596
4597    // TXT records.
4598
4599    let mut txt = DnsTxt::new(
4600        info.get_fullname(),
4601        CLASS_IN | CLASS_CACHE_FLUSH,
4602        info.get_other_ttl(),
4603        info.generate_txt(),
4604    );
4605
4606    if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
4607        txt.get_record_mut().set_new_name(new_name.to_string());
4608    }
4609
4610    if !info.requires_probe()
4611        || dns_registry.is_probing_done(&txt, info.get_fullname(), create_time)
4612    {
4613        out.add_answer_at_time(txt, 0);
4614    } else {
4615        probing_count += 1;
4616    }
4617
4618    // Address records. (A and AAAA)
4619
4620    let hostname = info.get_hostname();
4621    for address in intf_addrs {
4622        let mut dns_addr = DnsAddress::new(
4623            hostname,
4624            ip_address_rr_type(&address),
4625            CLASS_IN | CLASS_CACHE_FLUSH,
4626            info.get_host_ttl(),
4627            address,
4628            intf.into(),
4629        );
4630
4631        if let Some(new_name) = dns_registry.name_changes.get(hostname) {
4632            dns_addr.get_record_mut().set_new_name(new_name.to_string());
4633        }
4634
4635        if !info.requires_probe()
4636            || dns_registry.is_probing_done(&dns_addr, info.get_fullname(), create_time)
4637        {
4638            out.add_answer_at_time(dns_addr, 0);
4639        } else {
4640            probing_count += 1;
4641        }
4642    }
4643
4644    if probing_count > 0 {
4645        return None;
4646    }
4647
4648    Some(out)
4649}
4650
4651/// Send an unsolicited response for owned service via `intf` and `sock`.
4652/// Returns true if sent out successfully for IPv4 or IPv6.
4653fn announce_service_on_intf(
4654    dns_registry: &mut DnsRegistry,
4655    info: &ServiceInfo,
4656    intf: &MyIntf,
4657    sock: &PktInfoUdpSocket,
4658    port: u16,
4659) -> MyResult<bool> {
4660    let is_ipv4 = sock.domain() == Domain::IPV4;
4661    if let Some(out) = prepare_announce(info, intf, dns_registry, is_ipv4) {
4662        let _ = send_dns_outgoing(&out, intf, sock, port, None, None)?;
4663        return Ok(true);
4664    }
4665
4666    Ok(false)
4667}
4668
4669/// Returns a new name based on the `original` to avoid conflicts.
4670/// If the name already contains a number in parentheses, increments that number.
4671///
4672/// Examples:
4673/// - `foo.local.` becomes `foo (2).local.`
4674/// - `foo (2).local.` becomes `foo (3).local.`
4675/// - `foo (9)` becomes `foo (10)`
4676fn name_change(original: &str) -> String {
4677    let mut parts: Vec<_> = original.split('.').collect();
4678    let Some(first_part) = parts.get_mut(0) else {
4679        return format!("{original} (2)");
4680    };
4681
4682    let mut new_name = format!("{first_part} (2)");
4683
4684    // check if there is already has `(<num>)` suffix.
4685    if let Some(paren_pos) = first_part.rfind(" (") {
4686        // Check if there's a closing parenthesis
4687        if let Some(end_paren) = first_part[paren_pos..].find(')') {
4688            let absolute_end_pos = paren_pos + end_paren;
4689            // Only process if the closing parenthesis is the last character
4690            if absolute_end_pos == first_part.len() - 1 {
4691                let num_start = paren_pos + 2; // Skip " ("
4692                                               // Try to parse the number between parentheses
4693                if let Ok(number) = first_part[num_start..absolute_end_pos].parse::<u32>() {
4694                    let base_name = &first_part[..paren_pos];
4695                    new_name = format!("{} ({})", base_name, number + 1)
4696                }
4697            }
4698        }
4699    }
4700
4701    *first_part = &new_name;
4702    parts.join(".")
4703}
4704
4705/// Returns a new name based on the `original` to avoid conflicts.
4706/// If the name already contains a hyphenated number, increments that number.
4707///
4708/// Examples:
4709/// - `foo.local.` becomes `foo-2.local.`
4710/// - `foo-2.local.` becomes `foo-3.local.`
4711/// - `foo` becomes `foo-2`
4712fn hostname_change(original: &str) -> String {
4713    let mut parts: Vec<_> = original.split('.').collect();
4714    let Some(first_part) = parts.get_mut(0) else {
4715        return format!("{original}-2");
4716    };
4717
4718    let mut new_name = format!("{first_part}-2");
4719
4720    // check if there is already a `-<num>` suffix
4721    if let Some(hyphen_pos) = first_part.rfind('-') {
4722        // Try to parse everything after the hyphen as a number
4723        if let Ok(number) = first_part[hyphen_pos + 1..].parse::<u32>() {
4724            let base_name = &first_part[..hyphen_pos];
4725            new_name = format!("{}-{}", base_name, number + 1);
4726        }
4727    }
4728
4729    *first_part = &new_name;
4730    parts.join(".")
4731}
4732
4733/// Check probes in a registry and returns: a probing packet to send out, and a list of probe names
4734/// that are finished.
4735fn check_probing(
4736    dns_registry: &mut DnsRegistry,
4737    timers: &mut BinaryHeap<Reverse<u64>>,
4738    now: u64,
4739) -> (DnsOutgoing, Vec<String>) {
4740    let mut expired_probes = Vec::new();
4741    let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
4742
4743    for (name, probe) in dns_registry.probing.iter_mut() {
4744        if now >= probe.next_send {
4745            if probe.expired(now) {
4746                // move the record to active
4747                expired_probes.push(name.clone());
4748            } else {
4749                out.add_question(name, RRType::ANY);
4750
4751                /*
4752                RFC 6762 section 8.2: https://datatracker.ietf.org/doc/html/rfc6762#section-8.2
4753                ...
4754                for tiebreaking to work correctly in all
4755                cases, the Authority Section must contain *all* the records and
4756                proposed rdata being probed for uniqueness.
4757                    */
4758                for record in probe.records.iter() {
4759                    out.add_authority(record.clone());
4760                }
4761
4762                probe.update_next_send(now);
4763
4764                // add timer
4765                timers.push(Reverse(probe.next_send));
4766            }
4767        }
4768    }
4769
4770    (out, expired_probes)
4771}
4772
4773/// Process expired probes on an interface and return a list of services
4774/// that are waiting for the probe to finish.
4775///
4776/// `DnsNameChange` events are sent to the monitors.
4777fn handle_expired_probes(
4778    expired_probes: Vec<String>,
4779    intf_name: &str,
4780    dns_registry: &mut DnsRegistry,
4781    monitors: &mut Vec<Sender<DaemonEvent>>,
4782) -> HashSet<String> {
4783    let mut waiting_services = HashSet::new();
4784
4785    for name in expired_probes {
4786        let Some(probe) = dns_registry.probing.remove(&name) else {
4787            continue;
4788        };
4789
4790        // send notifications about name changes
4791        for record in probe.records.iter() {
4792            if let Some(new_name) = record.get_record().get_new_name() {
4793                dns_registry
4794                    .name_changes
4795                    .insert(name.clone(), new_name.to_string());
4796
4797                let event = DnsNameChange {
4798                    original: record.get_record().get_original_name().to_string(),
4799                    new_name: new_name.to_string(),
4800                    rr_type: record.get_type(),
4801                    intf_name: intf_name.to_string(),
4802                };
4803                debug!("Name change event: {:?}", &event);
4804                notify_monitors(monitors, DaemonEvent::NameChange(event));
4805            }
4806        }
4807
4808        // move RR from probe to active.
4809        debug!(
4810            "probe of '{name}' finished: move {} records to active. ({} waiting services)",
4811            probe.records.len(),
4812            probe.waiting_services.len(),
4813        );
4814
4815        // Move records to active and plan to wake up services if records are not empty.
4816        if !probe.records.is_empty() {
4817            match dns_registry.active.get_mut(&name) {
4818                Some(records) => {
4819                    records.extend(probe.records);
4820                }
4821                None => {
4822                    dns_registry.active.insert(name, probe.records);
4823                }
4824            }
4825
4826            waiting_services.extend(probe.waiting_services);
4827        }
4828    }
4829
4830    waiting_services
4831}
4832
4833/// Resolves `IfKind::Addr(ip)` to `IndexV4(if_index)` or `IndexV6(if_index)`.
4834fn resolve_addr_to_index(if_kind: IfKind, interfaces: &[Interface]) -> IfKind {
4835    if let IfKind::Addr(addr) = &if_kind {
4836        if let Some(intf) = interfaces.iter().find(|intf| &intf.ip() == addr) {
4837            let if_index = intf.index.unwrap_or(0);
4838            return if addr.is_ipv4() {
4839                IfKind::IndexV4(if_index)
4840            } else {
4841                IfKind::IndexV6(if_index)
4842            };
4843        }
4844    }
4845    if_kind
4846}
4847
4848#[cfg(test)]
4849mod tests {
4850    use super::{
4851        _new_socket_bind, check_domain_suffix, check_service_name_length, hostname_change,
4852        my_ip_interfaces, name_change, send_dns_outgoing_impl, valid_instance_name,
4853        valid_ip_on_intf, HostnameResolutionEvent, MyIntf, ServiceDaemon, ServiceEvent,
4854        ServiceInfo, GROUP_ADDR_V4, MDNS_PORT,
4855    };
4856    use crate::{
4857        dns_parser::{
4858            DnsEntryExt, DnsIncoming, DnsOutgoing, DnsPointer, InterfaceId, RRType, ScopedIp,
4859            CLASS_IN, FLAGS_AA, FLAGS_QR_QUERY, FLAGS_QR_RESPONSE,
4860        },
4861        service_daemon::{add_answer_of_service, check_hostname},
4862    };
4863    use if_addrs::{IfAddr, Ifv4Addr};
4864    use std::{
4865        collections::HashSet,
4866        net::{IpAddr, Ipv4Addr, UdpSocket},
4867        time::{Duration, Instant, SystemTime},
4868    };
4869    use test_log::test;
4870
4871    #[test]
4872    fn test_response_source_ifaddr_match() {
4873        // When an interface has multiple IPs on unrelated subnets,
4874        // handle_query should pick the IfAddr whose subnet contains the querier,
4875        // and fall back to None if none match.
4876        let ifaddr_a = IfAddr::V4(Ifv4Addr {
4877            ip: Ipv4Addr::new(192, 168, 1, 148),
4878            netmask: Ipv4Addr::new(255, 255, 255, 0),
4879            broadcast: None,
4880            prefixlen: 24,
4881        });
4882        let ifaddr_b = IfAddr::V4(Ifv4Addr {
4883            ip: Ipv4Addr::new(10, 238, 0, 51),
4884            netmask: Ipv4Addr::new(255, 255, 255, 0),
4885            broadcast: None,
4886            prefixlen: 24,
4887        });
4888
4889        let intf = MyIntf {
4890            name: "dummy0".to_string(),
4891            index: 1,
4892            addrs: HashSet::from([ifaddr_a.clone(), ifaddr_b.clone()]),
4893        };
4894
4895        let pick = |querier: IpAddr| -> Option<IfAddr> {
4896            intf.addrs
4897                .iter()
4898                .find(|a| valid_ip_on_intf(&querier, a))
4899                .cloned()
4900        };
4901
4902        assert_eq!(
4903            pick(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2))),
4904            Some(ifaddr_a)
4905        );
4906        assert_eq!(
4907            pick(IpAddr::V4(Ipv4Addr::new(10, 238, 0, 99))),
4908            Some(ifaddr_b)
4909        );
4910        // Querier not on any local subnet: fall back to None.
4911        assert_eq!(pick(IpAddr::V4(Ipv4Addr::new(172, 16, 0, 1))), None);
4912    }
4913
4914    #[test]
4915    fn test_instance_name() {
4916        assert!(valid_instance_name("my-laser._printer._tcp.local."));
4917        assert!(valid_instance_name("my-laser.._printer._tcp.local."));
4918        assert!(!valid_instance_name("_printer._tcp.local."));
4919    }
4920
4921    #[test]
4922    fn test_legacy_unicast_response() {
4923        // RFC 6762 §6.7: a query whose UDP source port is not 5353 (a
4924        // "legacy" / "one-shot" querier, e.g. Android's getaddrinfo) must
4925        // get its response via unicast, sent back to the querier's source
4926        // address, with the question echoed and the cache-flush bit cleared.
4927        //
4928        // This test sends such a query from an ephemeral port and asserts
4929        // the response arrives on that same socket. The socket is not joined
4930        // to the mDNS multicast group, so a multicast-only reply would never
4931        // reach it — simply receiving the response proves it was unicast.
4932
4933        let intf_ip = match my_ip_interfaces(false)
4934            .into_iter()
4935            .find_map(|intf| match intf.ip() {
4936                IpAddr::V4(ip) => Some(ip),
4937                IpAddr::V6(_) => None,
4938            }) {
4939            Some(ip) => ip,
4940            None => {
4941                println!("No IPv4 interface available; skipping test.");
4942                return;
4943            }
4944        };
4945
4946        // Register a service with a unique hostname on this host.
4947        let daemon = ServiceDaemon::new().expect("Failed to create daemon");
4948        let unique = SystemTime::now()
4949            .duration_since(SystemTime::UNIX_EPOCH)
4950            .unwrap()
4951            .as_micros();
4952        let hostname = format!("legacy-unicast-test-{unique}.local.");
4953        let service_info = ServiceInfo::new(
4954            "_legacy-uni._udp.local.",
4955            "test_instance",
4956            &hostname,
4957            &[IpAddr::V4(intf_ip)] as &[IpAddr],
4958            5353, // arbitrary; the test only resolves the hostname
4959            None,
4960        )
4961        .expect("invalid service info");
4962        daemon.register(service_info).expect("register service");
4963
4964        // A one-shot querier: ephemeral source port, not 5353. Binding to
4965        // `intf_ip` directs the multicast query out that interface, which is
4966        // one the daemon is listening on.
4967        let querier = UdpSocket::bind((intf_ip, 0)).expect("bind querier socket");
4968        querier
4969            .set_multicast_loop_v4(true)
4970            .expect("enable multicast loopback");
4971        querier
4972            .set_read_timeout(Some(Duration::from_millis(500)))
4973            .expect("set read timeout");
4974        assert_ne!(
4975            querier.local_addr().unwrap().port(),
4976            MDNS_PORT,
4977            "querier must use an ephemeral (non-5353) source port"
4978        );
4979
4980        // Build a one-question A-record query for our hostname.
4981        let mut query = DnsOutgoing::new(FLAGS_QR_QUERY);
4982        query.add_question(&hostname, RRType::A);
4983        let query_packet = query
4984            .to_data_on_wire()
4985            .pop()
4986            .expect("query serialized to one packet");
4987
4988        let if_id = InterfaceId {
4989            name: "test".to_string(),
4990            index: 0,
4991        };
4992
4993        // The service is announced asynchronously after register(), so retry
4994        // the query until our answer comes back or the deadline passes.
4995        let deadline = Instant::now() + Duration::from_secs(8);
4996        let mut response = None;
4997        'outer: while Instant::now() < deadline {
4998            querier
4999                .send_to(&query_packet, (GROUP_ADDR_V4, MDNS_PORT))
5000                .expect("send query");
5001
5002            // Drain whatever has arrived; on read timeout the loop ends and
5003            // we re-send the query.
5004            let mut buf = [0u8; 1500];
5005            while let Ok((len, from)) = querier.recv_from(&mut buf) {
5006                let Ok(msg) = DnsIncoming::new(buf[..len].to_vec(), if_id.clone()) else {
5007                    continue;
5008                };
5009                if msg.is_response()
5010                    && msg
5011                        .answers()
5012                        .iter()
5013                        .any(|a| a.get_name().eq_ignore_ascii_case(&hostname))
5014                {
5015                    response = Some((msg, from));
5016                    break 'outer;
5017                }
5018            }
5019        }
5020
5021        let (msg, from) = response.expect(
5022            "expected a unicast response to the legacy query; \
5023             a multicast-only reply would never reach this un-joined socket",
5024        );
5025
5026        // The reply came back to our ephemeral socket, from the mDNS port.
5027        assert_eq!(
5028            from.port(),
5029            MDNS_PORT,
5030            "response should originate from the mDNS port"
5031        );
5032
5033        // RFC 6762 §6.7: the original question must be echoed.
5034        assert!(
5035            msg.questions()
5036                .iter()
5037                .any(|q| q.entry_name().eq_ignore_ascii_case(&hostname)),
5038            "legacy unicast response must echo the question section"
5039        );
5040
5041        // RFC 6762 §6.7 / §10.2: the answer must be the A record we asked
5042        // for, with the cache-flush bit cleared.
5043        let answer = msg
5044            .answers()
5045            .iter()
5046            .find(|a| a.get_name().eq_ignore_ascii_case(&hostname))
5047            .expect("response contains an answer for our hostname");
5048        assert_eq!(
5049            answer.get_type(),
5050            RRType::A,
5051            "an A query should be answered with an A record"
5052        );
5053        assert!(
5054            !answer.get_cache_flush(),
5055            "legacy unicast responses must clear the cache-flush bit"
5056        );
5057
5058        daemon.shutdown().unwrap();
5059    }
5060
5061    #[test]
5062    fn test_check_service_name_length() {
5063        let result = check_service_name_length("_tcp", 100);
5064        assert!(result.is_err());
5065        if let Err(e) = result {
5066            println!("{}", e);
5067        }
5068    }
5069
5070    #[test]
5071    fn test_check_hostname() {
5072        // valid hostnames
5073        for hostname in &[
5074            "my_host.local.",
5075            &("A".repeat(255 - ".local.".len()) + ".local."),
5076        ] {
5077            let result = check_hostname(hostname);
5078            assert!(result.is_ok());
5079        }
5080
5081        // erroneous hostnames
5082        for hostname in &[
5083            "my_host.local",
5084            ".local.",
5085            &("A".repeat(256 - ".local.".len()) + ".local."),
5086        ] {
5087            let result = check_hostname(hostname);
5088            assert!(result.is_err());
5089            if let Err(e) = result {
5090                println!("{}", e);
5091            }
5092        }
5093    }
5094
5095    #[test]
5096    fn test_check_domain_suffix() {
5097        assert!(check_domain_suffix("_missing_dot._tcp.local").is_err());
5098        assert!(check_domain_suffix("_missing_bar.tcp.local.").is_err());
5099        assert!(check_domain_suffix("_mis_spell._tpp.local.").is_err());
5100        assert!(check_domain_suffix("_mis_spell._upp.local.").is_err());
5101        assert!(check_domain_suffix("_has_dot._tcp.local.").is_ok());
5102        assert!(check_domain_suffix("_goodname._udp.local.").is_ok());
5103    }
5104
5105    #[test]
5106    fn test_service_with_temporarily_invalidated_ptr() {
5107        // Create a daemon
5108        let d = ServiceDaemon::new().expect("Failed to create daemon");
5109
5110        let service = "_test_inval_ptr._udp.local.";
5111        let host_name = "my_host_tmp_invalidated_ptr.local.";
5112        let intfs: Vec<_> = my_ip_interfaces(false);
5113        let intf_ips: Vec<_> = intfs.iter().map(|intf| intf.ip()).collect();
5114        let port = 5201;
5115        let my_service =
5116            ServiceInfo::new(service, "my_instance", host_name, &intf_ips[..], port, None)
5117                .expect("invalid service info")
5118                .enable_addr_auto();
5119        let result = d.register(my_service.clone());
5120        assert!(result.is_ok());
5121
5122        // Browse for a service
5123        let browse_chan = d.browse(service).unwrap();
5124        let timeout = Duration::from_secs(2);
5125        let mut resolved = false;
5126
5127        while let Ok(event) = browse_chan.recv_timeout(timeout) {
5128            match event {
5129                ServiceEvent::ServiceResolved(info) => {
5130                    resolved = true;
5131                    println!("Resolved a service of {}", &info.fullname);
5132                    break;
5133                }
5134                e => {
5135                    println!("Received event {:?}", e);
5136                }
5137            }
5138        }
5139
5140        assert!(resolved);
5141
5142        println!("Stopping browse of {}", service);
5143        // Pause browsing so restarting will cause a new immediate query.
5144        // Unregistering will not work here, it will invalidate all the records.
5145        d.stop_browse(service).unwrap();
5146
5147        // Ensure the search is stopped.
5148        // Reduces the chance of receiving an answer adding the ptr back to the
5149        // cache causing the later browse to return directly from the cache.
5150        // (which invalidates what this test is trying to test for.)
5151        let mut stopped = false;
5152        while let Ok(event) = browse_chan.recv_timeout(timeout) {
5153            match event {
5154                ServiceEvent::SearchStopped(_) => {
5155                    stopped = true;
5156                    println!("Stopped browsing service");
5157                    break;
5158                }
5159                // Other `ServiceResolved` messages may be received
5160                // here as they come from different interfaces.
5161                // That's fine for this test.
5162                e => {
5163                    println!("Received event {:?}", e);
5164                }
5165            }
5166        }
5167
5168        assert!(stopped);
5169
5170        // Invalidate the ptr from the service to the host.
5171        let invalidate_ptr_packet = DnsPointer::new(
5172            my_service.get_type(),
5173            RRType::PTR,
5174            CLASS_IN,
5175            0,
5176            my_service.get_fullname().to_string(),
5177        );
5178
5179        let mut packet_buffer = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
5180        packet_buffer.add_additional_answer(invalidate_ptr_packet);
5181
5182        for intf in intfs {
5183            let sock = _new_socket_bind(&intf, true).unwrap();
5184            send_dns_outgoing_impl(
5185                &packet_buffer,
5186                &intf.name,
5187                intf.index.unwrap_or(0),
5188                &intf.addr,
5189                &sock.pktinfo,
5190                MDNS_PORT,
5191                None,
5192            )
5193            .unwrap();
5194        }
5195
5196        println!(
5197            "Sent PTR record invalidation. Starting second browse for {}",
5198            service
5199        );
5200
5201        // Restart the browse to force the sender to re-send the announcements.
5202        let browse_chan = d.browse(service).unwrap();
5203
5204        resolved = false;
5205        while let Ok(event) = browse_chan.recv_timeout(timeout) {
5206            match event {
5207                ServiceEvent::ServiceResolved(info) => {
5208                    resolved = true;
5209                    println!("Resolved a service of {}", &info.fullname);
5210                    break;
5211                }
5212                e => {
5213                    println!("Received event {:?}", e);
5214                }
5215            }
5216        }
5217
5218        assert!(resolved);
5219        d.shutdown().unwrap();
5220    }
5221
5222    #[test]
5223    fn test_expired_srv() {
5224        // construct service info
5225        let service_type = "_expired-srv._udp.local.";
5226        let instance = "test_instance";
5227        let host_name = "expired_srv_host.local.";
5228        let mut my_service = ServiceInfo::new(service_type, instance, host_name, "", 5023, None)
5229            .unwrap()
5230            .enable_addr_auto();
5231        // let fullname = my_service.get_fullname().to_string();
5232
5233        // set SRV to expire soon.
5234        let new_ttl = 3; // for testing only.
5235        my_service._set_host_ttl(new_ttl);
5236
5237        // register my service
5238        let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
5239        let result = mdns_server.register(my_service);
5240        assert!(result.is_ok());
5241
5242        let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
5243        let browse_chan = mdns_client.browse(service_type).unwrap();
5244        let timeout = Duration::from_secs(2);
5245        let mut resolved = false;
5246
5247        while let Ok(event) = browse_chan.recv_timeout(timeout) {
5248            if let ServiceEvent::ServiceResolved(info) = event {
5249                resolved = true;
5250                println!("Resolved a service of {}", &info.fullname);
5251                break;
5252            }
5253        }
5254
5255        assert!(resolved);
5256
5257        // Exit the server so that no more responses.
5258        mdns_server.shutdown().unwrap();
5259
5260        // SRV record in the client cache will expire.
5261        let expire_timeout = Duration::from_secs(new_ttl as u64);
5262        while let Ok(event) = browse_chan.recv_timeout(expire_timeout) {
5263            if let ServiceEvent::ServiceRemoved(service_type, full_name) = event {
5264                println!("Service removed: {}: {}", &service_type, &full_name);
5265                break;
5266            }
5267        }
5268    }
5269
5270    #[test]
5271    fn test_hostname_resolution_address_removed() {
5272        // Create a mDNS server
5273        let server = ServiceDaemon::new().expect("Failed to create server");
5274        let hostname = "addr_remove_host._tcp.local.";
5275        let service_ip_addr: ScopedIp = my_ip_interfaces(false)
5276            .iter()
5277            .find(|iface| iface.ip().is_ipv4())
5278            .map(|iface| iface.into())
5279            .unwrap();
5280
5281        let mut my_service = ServiceInfo::new(
5282            "_host_res_test._tcp.local.",
5283            "my_instance",
5284            hostname,
5285            service_ip_addr.to_ip_addr(),
5286            1234,
5287            None,
5288        )
5289        .expect("invalid service info");
5290
5291        // Set a short TTL for addresses for testing.
5292        let addr_ttl = 2;
5293        my_service._set_host_ttl(addr_ttl); // Expire soon
5294
5295        server.register(my_service).unwrap();
5296
5297        // Create a mDNS client for resolving the hostname.
5298        let client = ServiceDaemon::new().expect("Failed to create client");
5299        let event_receiver = client.resolve_hostname(hostname, None).unwrap();
5300        let resolved = loop {
5301            match event_receiver.recv() {
5302                Ok(HostnameResolutionEvent::AddressesFound(found_hostname, addresses)) => {
5303                    assert!(found_hostname == hostname);
5304                    assert!(addresses.contains(&service_ip_addr));
5305                    println!("address found: {:?}", &addresses);
5306                    break true;
5307                }
5308                Ok(HostnameResolutionEvent::SearchStopped(_)) => break false,
5309                Ok(_event) => {}
5310                Err(_) => break false,
5311            }
5312        };
5313
5314        assert!(resolved);
5315
5316        // Shutdown the server so no more responses / refreshes for addresses.
5317        server.shutdown().unwrap();
5318
5319        // Wait till hostname address record expires, with 1 second grace period.
5320        let timeout = Duration::from_secs(addr_ttl as u64 + 1);
5321        let removed = loop {
5322            match event_receiver.recv_timeout(timeout) {
5323                Ok(HostnameResolutionEvent::AddressesRemoved(removed_host, addresses)) => {
5324                    assert!(removed_host == hostname);
5325                    assert!(addresses.contains(&service_ip_addr));
5326
5327                    println!(
5328                        "address removed: hostname: {} addresses: {:?}",
5329                        &hostname, &addresses
5330                    );
5331                    break true;
5332                }
5333                Ok(_event) => {}
5334                Err(_) => {
5335                    break false;
5336                }
5337            }
5338        };
5339
5340        assert!(removed);
5341
5342        client.shutdown().unwrap();
5343    }
5344
5345    #[test]
5346    fn test_refresh_ptr() {
5347        // construct service info
5348        let service_type = "_refresh-ptr._udp.local.";
5349        let instance = "test_instance";
5350        let host_name = "refresh_ptr_host.local.";
5351        let service_ip_addr = my_ip_interfaces(false)
5352            .iter()
5353            .find(|iface| iface.ip().is_ipv4())
5354            .map(|iface| iface.ip())
5355            .unwrap();
5356
5357        let mut my_service = ServiceInfo::new(
5358            service_type,
5359            instance,
5360            host_name,
5361            service_ip_addr,
5362            5023,
5363            None,
5364        )
5365        .unwrap();
5366
5367        let new_ttl = 3; // for testing only.
5368        my_service._set_other_ttl(new_ttl);
5369
5370        // register my service
5371        let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
5372        let result = mdns_server.register(my_service);
5373        assert!(result.is_ok());
5374
5375        let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
5376        let browse_chan = mdns_client.browse(service_type).unwrap();
5377        let timeout = Duration::from_millis(1500); // Give at least 1 second for the service probing.
5378        let mut resolved = false;
5379
5380        // resolve the service first.
5381        while let Ok(event) = browse_chan.recv_timeout(timeout) {
5382            if let ServiceEvent::ServiceResolved(info) = event {
5383                resolved = true;
5384                println!("Resolved a service of {}", &info.fullname);
5385                break;
5386            }
5387        }
5388
5389        assert!(resolved);
5390
5391        // wait over 80% of TTL, and refresh PTR should be sent out.
5392        let timeout = Duration::from_millis(new_ttl as u64 * 1000 * 90 / 100);
5393        while let Ok(event) = browse_chan.recv_timeout(timeout) {
5394            println!("event: {:?}", &event);
5395        }
5396
5397        // verify refresh counter.
5398        let metrics_chan = mdns_client.get_metrics().unwrap();
5399        let metrics = metrics_chan.recv_timeout(timeout).unwrap();
5400        let ptr_refresh_counter = metrics["cache-refresh-ptr"];
5401        assert_eq!(ptr_refresh_counter, 1);
5402        let srvtxt_refresh_counter = metrics["cache-refresh-srv-txt"];
5403        assert_eq!(srvtxt_refresh_counter, 1);
5404
5405        // Exit the server so that no more responses.
5406        mdns_server.shutdown().unwrap();
5407        mdns_client.shutdown().unwrap();
5408    }
5409
5410    #[test]
5411    fn test_name_change() {
5412        assert_eq!(name_change("foo.local."), "foo (2).local.");
5413        assert_eq!(name_change("foo (2).local."), "foo (3).local.");
5414        assert_eq!(name_change("foo (9).local."), "foo (10).local.");
5415        assert_eq!(name_change("foo"), "foo (2)");
5416        assert_eq!(name_change("foo (2)"), "foo (3)");
5417        assert_eq!(name_change(""), " (2)");
5418
5419        // Additional edge cases
5420        assert_eq!(name_change("foo (abc)"), "foo (abc) (2)"); // Invalid number
5421        assert_eq!(name_change("foo (2"), "foo (2 (2)"); // Missing closing parenthesis
5422        assert_eq!(name_change("foo (2) extra"), "foo (2) extra (2)"); // Extra text after number
5423    }
5424
5425    #[test]
5426    fn test_hostname_change() {
5427        assert_eq!(hostname_change("foo.local."), "foo-2.local.");
5428        assert_eq!(hostname_change("foo"), "foo-2");
5429        assert_eq!(hostname_change("foo-2.local."), "foo-3.local.");
5430        assert_eq!(hostname_change("foo-9"), "foo-10");
5431        assert_eq!(hostname_change("test-42.domain."), "test-43.domain.");
5432    }
5433
5434    #[test]
5435    fn test_add_answer_txt_ttl() {
5436        // construct a simple service info
5437        let service_type = "_test_add_answer._udp.local.";
5438        let instance = "test_instance";
5439        let host_name = "add_answer_host.local.";
5440        let service_intf = my_ip_interfaces(false)
5441            .into_iter()
5442            .find(|iface| iface.ip().is_ipv4())
5443            .unwrap();
5444        let service_ip_addr = service_intf.ip();
5445        let my_service = ServiceInfo::new(
5446            service_type,
5447            instance,
5448            host_name,
5449            service_ip_addr,
5450            5023,
5451            None,
5452        )
5453        .unwrap();
5454
5455        // construct a DnsOutgoing message
5456        let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
5457
5458        // Construct a dummy DnsIncoming message
5459        let mut dummy_data = out.to_data_on_wire();
5460        let interface_id = InterfaceId::from(&service_intf);
5461        let incoming = DnsIncoming::new(dummy_data.pop().unwrap(), interface_id).unwrap();
5462
5463        // Add an answer of TXT type for the service.
5464        let if_addrs = vec![service_intf.ip()];
5465        add_answer_of_service(
5466            &mut out,
5467            &incoming,
5468            instance,
5469            &my_service,
5470            RRType::TXT,
5471            if_addrs,
5472        );
5473
5474        // Check if the answer was added correctly
5475        assert!(
5476            out.answers_count() > 0,
5477            "No answers added to the outgoing message"
5478        );
5479
5480        // Check if the first answer is of type TXT
5481        let answer = out._answers().first().unwrap();
5482        assert_eq!(answer.0.get_type(), RRType::TXT);
5483
5484        // Check TTL is set properly for the TXT record
5485        assert_eq!(answer.0.get_record().get_ttl(), my_service.get_other_ttl());
5486    }
5487
5488    #[test]
5489    fn test_interface_flip() {
5490        // start a server
5491        let ty_domain = "_intf-flip._udp.local.";
5492        let host_name = "intf_flip.local.";
5493        let now = SystemTime::now()
5494            .duration_since(SystemTime::UNIX_EPOCH)
5495            .unwrap();
5496        let instance_name = now.as_micros().to_string(); // Create a unique name.
5497        let port = 5200;
5498
5499        // Get a single IPv4 address
5500        let (ip_addr1, intf_name) = my_ip_interfaces(false)
5501            .iter()
5502            .find(|iface| iface.ip().is_ipv4())
5503            .map(|iface| (iface.ip(), iface.name.clone()))
5504            .unwrap();
5505
5506        println!("Using interface {} with IP {}", intf_name, ip_addr1);
5507
5508        // Register the service.
5509        let service1 = ServiceInfo::new(ty_domain, &instance_name, host_name, ip_addr1, port, None)
5510            .expect("valid service info");
5511        let server1 = ServiceDaemon::new().expect("failed to start server");
5512        server1
5513            .register(service1)
5514            .expect("Failed to register service1");
5515
5516        // wait for the service announced.
5517        std::thread::sleep(Duration::from_secs(2));
5518
5519        // start a client
5520        let client = ServiceDaemon::new().expect("failed to start client");
5521
5522        let receiver = client.browse(ty_domain).unwrap();
5523
5524        let timeout = Duration::from_secs(3);
5525        let mut got_data = false;
5526
5527        while let Ok(event) = receiver.recv_timeout(timeout) {
5528            if let ServiceEvent::ServiceResolved(_) = event {
5529                println!("Received ServiceResolved event");
5530                got_data = true;
5531                break;
5532            }
5533        }
5534
5535        assert!(got_data, "Should receive ServiceResolved event");
5536
5537        // Set a short IP check interval to detect interface changes quickly.
5538        client.set_ip_check_interval(1).unwrap();
5539
5540        // Now shutdown the interface and expect the client to lose the service.
5541        println!("Shutting down interface {}", &intf_name);
5542        client.test_down_interface(&intf_name).unwrap();
5543
5544        let mut got_removed = false;
5545
5546        while let Ok(event) = receiver.recv_timeout(timeout) {
5547            if let ServiceEvent::ServiceRemoved(ty_domain, instance) = event {
5548                got_removed = true;
5549                println!("removed: {ty_domain} : {instance}");
5550                break;
5551            }
5552        }
5553        assert!(got_removed, "Should receive ServiceRemoved event");
5554
5555        println!("Bringing up interface {}", &intf_name);
5556        client.test_up_interface(&intf_name).unwrap();
5557        let mut got_data = false;
5558        while let Ok(event) = receiver.recv_timeout(timeout) {
5559            if let ServiceEvent::ServiceResolved(resolved) = event {
5560                got_data = true;
5561                println!("Received ServiceResolved: {:?}", resolved);
5562                break;
5563            }
5564        }
5565        assert!(
5566            got_data,
5567            "Should receive ServiceResolved event after interface is back up"
5568        );
5569
5570        server1.shutdown().unwrap();
5571        client.shutdown().unwrap();
5572    }
5573
5574    #[test]
5575    fn test_cache_only() {
5576        // construct service info
5577        let service_type = "_cache_only._udp.local.";
5578        let instance = "test_instance";
5579        let host_name = "cache_only_host.local.";
5580        let service_ip_addr = my_ip_interfaces(false)
5581            .iter()
5582            .find(|iface| iface.ip().is_ipv4())
5583            .map(|iface| iface.ip())
5584            .unwrap();
5585
5586        let mut my_service = ServiceInfo::new(
5587            service_type,
5588            instance,
5589            host_name,
5590            service_ip_addr,
5591            5023,
5592            None,
5593        )
5594        .unwrap();
5595
5596        let new_ttl = 3; // for testing only.
5597        my_service._set_other_ttl(new_ttl);
5598
5599        let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
5600
5601        // make a single browse request to record that we are interested in the service.  This ensures that
5602        // subsequent announcements are cached.
5603        let browse_chan = mdns_client.browse_cache(service_type).unwrap();
5604        std::thread::sleep(Duration::from_secs(2));
5605
5606        // register my service
5607        let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
5608        let result = mdns_server.register(my_service);
5609        assert!(result.is_ok());
5610
5611        let timeout = Duration::from_millis(1500); // Give at least 1 second for the service probing.
5612        let mut resolved = false;
5613
5614        // resolve the service.
5615        while let Ok(event) = browse_chan.recv_timeout(timeout) {
5616            if let ServiceEvent::ServiceResolved(info) = event {
5617                resolved = true;
5618                println!("Resolved a service of {}", &info.get_fullname());
5619                break;
5620            }
5621        }
5622
5623        assert!(resolved);
5624
5625        // Exit the server so that no more responses.
5626        mdns_server.shutdown().unwrap();
5627        mdns_client.shutdown().unwrap();
5628    }
5629
5630    #[test]
5631    fn test_cache_only_unsolicited() {
5632        let service_type = "_c_unsolicit._udp.local.";
5633        let instance = "test_instance";
5634        let host_name = "c_unsolicit_host.local.";
5635        let service_ip_addr = my_ip_interfaces(false)
5636            .iter()
5637            .find(|iface| iface.ip().is_ipv4())
5638            .map(|iface| iface.ip())
5639            .unwrap();
5640
5641        let my_service = ServiceInfo::new(
5642            service_type,
5643            instance,
5644            host_name,
5645            service_ip_addr,
5646            5023,
5647            None,
5648        )
5649        .unwrap();
5650
5651        // register my service
5652        let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
5653        let result = mdns_server.register(my_service);
5654        assert!(result.is_ok());
5655
5656        let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
5657        mdns_client.accept_unsolicited(true).unwrap();
5658
5659        // Wait a bit for the service announcements to go out, before calling browse_cache.  This ensures
5660        // that the announcements are treated as unsolicited
5661        std::thread::sleep(Duration::from_secs(2));
5662        let browse_chan = mdns_client.browse_cache(service_type).unwrap();
5663        let timeout = Duration::from_millis(1500); // Give at least 1 second for the service probing.
5664        let mut resolved = false;
5665
5666        // resolve the service.
5667        while let Ok(event) = browse_chan.recv_timeout(timeout) {
5668            if let ServiceEvent::ServiceResolved(info) = event {
5669                resolved = true;
5670                println!("Resolved a service of {}", &info.get_fullname());
5671                break;
5672            }
5673        }
5674
5675        assert!(resolved);
5676
5677        // Exit the server so that no more responses.
5678        mdns_server.shutdown().unwrap();
5679        mdns_client.shutdown().unwrap();
5680    }
5681
5682    #[test]
5683    fn test_custom_port_isolation() {
5684        // This test verifies:
5685        // 1. Daemons on a custom port can communicate with each other
5686        // 2. Daemons on different ports are isolated (no cross-talk)
5687
5688        let service_type = "_custom_port._udp.local.";
5689        let instance_custom = "custom_port_instance";
5690        let instance_default = "default_port_instance";
5691        let host_name = "custom_port_host.local.";
5692
5693        let service_ip_addr = my_ip_interfaces(false)
5694            .iter()
5695            .find(|iface| iface.ip().is_ipv4())
5696            .map(|iface| iface.ip())
5697            .expect("Test requires an IPv4 interface");
5698
5699        // Create service info for custom port (5454)
5700        let service_custom = ServiceInfo::new(
5701            service_type,
5702            instance_custom,
5703            host_name,
5704            service_ip_addr,
5705            8080,
5706            None,
5707        )
5708        .unwrap();
5709
5710        // Create service info for default port (5353)
5711        let service_default = ServiceInfo::new(
5712            service_type,
5713            instance_default,
5714            host_name,
5715            service_ip_addr,
5716            8081,
5717            None,
5718        )
5719        .unwrap();
5720
5721        // Create two daemons on custom port 5454
5722        let custom_port = 5454u16;
5723        let server_custom =
5724            ServiceDaemon::new_with_port(custom_port).expect("Failed to create custom port server");
5725        let client_custom =
5726            ServiceDaemon::new_with_port(custom_port).expect("Failed to create custom port client");
5727
5728        // Create daemon on default port (5353)
5729        let server_default = ServiceDaemon::new().expect("Failed to create default port server");
5730
5731        // Register service on custom port
5732        server_custom
5733            .register(service_custom.clone())
5734            .expect("Failed to register custom port service");
5735
5736        // Register service on default port
5737        server_default
5738            .register(service_default.clone())
5739            .expect("Failed to register default port service");
5740
5741        // Browse from custom port client
5742        let browse_custom = client_custom
5743            .browse(service_type)
5744            .expect("Failed to browse on custom port");
5745
5746        let timeout = Duration::from_secs(3);
5747        let mut found_custom = false;
5748        let mut found_default_on_custom = false;
5749
5750        // Custom port client should find the custom port service
5751        while let Ok(event) = browse_custom.recv_timeout(timeout) {
5752            if let ServiceEvent::ServiceResolved(info) = event {
5753                println!(
5754                    "Custom port client resolved: {} on port {}",
5755                    info.get_fullname(),
5756                    info.get_port()
5757                );
5758                if info.get_fullname().starts_with(instance_custom) {
5759                    found_custom = true;
5760                    assert_eq!(info.get_port(), 8080);
5761                }
5762                if info.get_fullname().starts_with(instance_default) {
5763                    found_default_on_custom = true;
5764                }
5765            }
5766        }
5767
5768        assert!(
5769            found_custom,
5770            "Custom port client should find service on custom port"
5771        );
5772        assert!(
5773            !found_default_on_custom,
5774            "Custom port client should NOT find service on default port"
5775        );
5776
5777        // Now verify the default port daemon can find its own services
5778        // but not the custom port services
5779        let client_default = ServiceDaemon::new().expect("Failed to create default port client");
5780        let browse_default = client_default
5781            .browse(service_type)
5782            .expect("Failed to browse on default port");
5783
5784        let mut found_default = false;
5785        let mut found_custom_on_default = false;
5786
5787        while let Ok(event) = browse_default.recv_timeout(timeout) {
5788            if let ServiceEvent::ServiceResolved(info) = event {
5789                println!(
5790                    "Default port client resolved: {} on port {}",
5791                    info.get_fullname(),
5792                    info.get_port()
5793                );
5794                if info.get_fullname().starts_with(instance_default) {
5795                    found_default = true;
5796                    assert_eq!(info.get_port(), 8081);
5797                }
5798                if info.get_fullname().starts_with(instance_custom) {
5799                    found_custom_on_default = true;
5800                }
5801            }
5802        }
5803
5804        assert!(
5805            found_default,
5806            "Default port client should find service on default port"
5807        );
5808        assert!(
5809            !found_custom_on_default,
5810            "Default port client should NOT find service on custom port"
5811        );
5812
5813        // Cleanup
5814        server_custom.shutdown().unwrap();
5815        client_custom.shutdown().unwrap();
5816        server_default.shutdown().unwrap();
5817        client_default.shutdown().unwrap();
5818    }
5819}