Skip to main content

mdns_sd/
service_daemon.rs

1//! Service daemon for mDNS Service Discovery.
2
3// How DNS-based Service Discovery works in a nutshell:
4//
5// (excerpt from RFC 6763)
6// .... that a particular service instance can be
7//    described using a DNS SRV [RFC2782] and DNS TXT [RFC1035] record.
8//    The SRV record has a name of the form "<Instance>.<Service>.<Domain>"
9//    and gives the target host and port where the service instance can be
10//    reached.  The DNS TXT record of the same name gives additional
11//    information about this instance, in a structured form using key/value
12//    pairs, described in Section 6.  A client discovers the list of
13//    available instances of a given service type using a query for a DNS
14//    PTR [RFC1035] record with a name of the form "<Service>.<Domain>",
15//    which returns a set of zero or more names, which are the names of the
16//    aforementioned DNS SRV/TXT record pairs.
17//
18// Some naming conventions in this source code:
19//
20// `ty_domain` refers to service type together with domain name, i.e. <service>.<domain>.
21// Every <service> consists of two labels: service itself and "_udp." or "_tcp".
22// See RFC 6763 section 7 Service Names.
23//     for example: `_my-service._udp.local.`
24//
25// `fullname` refers to a full Service Instance Name, i.e. <instance>.<service>.<domain>
26//     for example: `my_home._my-service._udp.local.`
27//
28// In mDNS and DNS, the basic data structure is "Resource Record" (RR), where
29// in Service Discovery, the basic data structure is "Service Info". One Service Info
30// corresponds to a set of DNS Resource Records.
31#[cfg(feature = "logging")]
32use crate::log::{debug, error, trace};
33use crate::{
34    current_time_millis,
35    dns_cache::{DnsCache, IpType},
36    dns_parser::{
37        ip_address_rr_type, DnsAddress, DnsEntryExt, DnsIncoming, DnsOutgoing, DnsPointer,
38        DnsRecordBox, DnsRecordExt, DnsSrv, DnsTxt, InterfaceId, RRType, ScopedIp,
39        CLASS_CACHE_FLUSH, CLASS_IN, FLAGS_AA, FLAGS_QR_QUERY, FLAGS_QR_RESPONSE, MAX_MSG_ABSOLUTE,
40    },
41    error::{e_fmt, Error, Result},
42    service_info::{DnsRegistry, MyIntf, Probe, ServiceInfo, ServiceStatus},
43    Receiver, ResolvedService, TxtProperties,
44};
45use flume::{bounded, Sender, TrySendError};
46use if_addrs::{IfAddr, Interface};
47use mio::{event::Source, net::UdpSocket as MioUdpSocket, Interest, Poll, Registry, Token};
48use socket2::Domain;
49use socket_pktinfo::PktInfoUdpSocket;
50use std::{
51    cmp::{self, Reverse},
52    collections::{hash_map::Entry, BinaryHeap, HashMap, HashSet},
53    fmt, io,
54    net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, UdpSocket},
55    str, thread,
56    time::Duration,
57    vec,
58};
59
60/// The default max length of the service name without domain, not including the
61/// leading underscore (`_`). It is set to 15 per
62/// [RFC 6763 section 7.2](https://www.rfc-editor.org/rfc/rfc6763#section-7.2).
63pub const SERVICE_NAME_LEN_MAX_DEFAULT: u8 = 15;
64
65/// The default interval for checking IP changes automatically.
66pub const IP_CHECK_INTERVAL_IN_SECS_DEFAULT: u32 = 5;
67
68/// The default time out for [ServiceDaemon::verify] is 10 seconds, per
69/// [RFC 6762 section 10.4](https://datatracker.ietf.org/doc/html/rfc6762#section-10.4)
70pub const VERIFY_TIMEOUT_DEFAULT: Duration = Duration::from_secs(10);
71
72/// The mDNS port number per RFC 6762.
73pub const MDNS_PORT: u16 = 5353;
74
75const GROUP_ADDR_V4: Ipv4Addr = Ipv4Addr::new(224, 0, 0, 251);
76const GROUP_ADDR_V6: Ipv6Addr = Ipv6Addr::new(0xff02, 0, 0, 0, 0, 0, 0, 0xfb);
77const LOOPBACK_V4: Ipv4Addr = Ipv4Addr::new(127, 0, 0, 1);
78
79const RESOLVE_WAIT_IN_MILLIS: u64 = 500;
80
81/// Response status code for the service `unregister` call.
82#[derive(Debug)]
83pub enum UnregisterStatus {
84    /// Unregister was successful.
85    OK,
86    /// The service was not found in the registration.
87    NotFound,
88}
89
90/// Status code for the service daemon.
91#[derive(Debug, PartialEq, Clone, Eq)]
92#[non_exhaustive]
93pub enum DaemonStatus {
94    /// The daemon is running as normal.
95    Running,
96
97    /// The daemon has been shutdown.
98    Shutdown,
99}
100
101/// Different counters included in the metrics.
102/// Currently all counters are for outgoing packets.
103#[derive(Hash, Eq, PartialEq)]
104enum Counter {
105    Register,
106    RegisterResend,
107    Unregister,
108    UnregisterResend,
109    Browse,
110    ResolveHostname,
111    Respond,
112    CacheRefreshPTR,
113    CacheRefreshSrvTxt,
114    CacheRefreshAddr,
115    KnownAnswerSuppression,
116    CachedPTR,
117    CachedSRV,
118    CachedAddr,
119    CachedTxt,
120    CachedNSec,
121    CachedSubtype,
122    DnsRegistryProbe,
123    DnsRegistryActive,
124    DnsRegistryTimer,
125    DnsRegistryNameChange,
126    Timer,
127}
128
129impl fmt::Display for Counter {
130    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
131        match self {
132            Self::Register => write!(f, "register"),
133            Self::RegisterResend => write!(f, "register-resend"),
134            Self::Unregister => write!(f, "unregister"),
135            Self::UnregisterResend => write!(f, "unregister-resend"),
136            Self::Browse => write!(f, "browse"),
137            Self::ResolveHostname => write!(f, "resolve-hostname"),
138            Self::Respond => write!(f, "respond"),
139            Self::CacheRefreshPTR => write!(f, "cache-refresh-ptr"),
140            Self::CacheRefreshSrvTxt => write!(f, "cache-refresh-srv-txt"),
141            Self::CacheRefreshAddr => write!(f, "cache-refresh-addr"),
142            Self::KnownAnswerSuppression => write!(f, "known-answer-suppression"),
143            Self::CachedPTR => write!(f, "cached-ptr"),
144            Self::CachedSRV => write!(f, "cached-srv"),
145            Self::CachedAddr => write!(f, "cached-addr"),
146            Self::CachedTxt => write!(f, "cached-txt"),
147            Self::CachedNSec => write!(f, "cached-nsec"),
148            Self::CachedSubtype => write!(f, "cached-subtype"),
149            Self::DnsRegistryProbe => write!(f, "dns-registry-probe"),
150            Self::DnsRegistryActive => write!(f, "dns-registry-active"),
151            Self::DnsRegistryTimer => write!(f, "dns-registry-timer"),
152            Self::DnsRegistryNameChange => write!(f, "dns-registry-name-change"),
153            Self::Timer => write!(f, "timer"),
154        }
155    }
156}
157
158#[derive(Debug)]
159enum InternalError {
160    IntfAddrInvalid(Interface),
161}
162
163impl fmt::Display for InternalError {
164    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
165        match self {
166            InternalError::IntfAddrInvalid(iface) => write!(f, "interface addr invalid: {iface:?}"),
167        }
168    }
169}
170
171type MyResult<T> = core::result::Result<T, InternalError>;
172
173/// A wrapper around UDP socket used by the mDNS daemon.
174///
175/// We do this because `mio` does not support PKTINFO and
176/// does not provide a way to implement `Source` trait directly and safely.
177struct MyUdpSocket {
178    /// The underlying socket that supports control messages like
179    /// `IP_PKTINFO` for IPv4 and `IPV6_PKTINFO` for IPv6.
180    pktinfo: PktInfoUdpSocket,
181
182    /// The mio UDP socket that is a clone of `pktinfo` and
183    /// is used for event polling.
184    mio: MioUdpSocket,
185}
186
187impl MyUdpSocket {
188    pub fn new(pktinfo: PktInfoUdpSocket) -> io::Result<Self> {
189        let std_sock = pktinfo.try_clone_std()?;
190        let mio = MioUdpSocket::from_std(std_sock);
191
192        Ok(Self { pktinfo, mio })
193    }
194}
195
196/// Implements the mio `Source` trait so that we can use `MyUdpSocket` with `Poll`.
197impl Source for MyUdpSocket {
198    fn register(
199        &mut self,
200        registry: &Registry,
201        token: Token,
202        interests: Interest,
203    ) -> io::Result<()> {
204        self.mio.register(registry, token, interests)
205    }
206
207    fn reregister(
208        &mut self,
209        registry: &Registry,
210        token: Token,
211        interests: Interest,
212    ) -> io::Result<()> {
213        self.mio.reregister(registry, token, interests)
214    }
215
216    fn deregister(&mut self, registry: &Registry) -> std::io::Result<()> {
217        self.mio.deregister(registry)
218    }
219}
220
221/// The metrics is a HashMap of (name_key, i64_value).
222/// The main purpose is to help monitoring the mDNS packet traffic.
223pub type Metrics = HashMap<String, i64>;
224
225const IPV4_SOCK_EVENT_KEY: usize = 4; // Pick a key just to indicate IPv4.
226const IPV6_SOCK_EVENT_KEY: usize = 6; // Pick a key just to indicate IPv6.
227const SIGNAL_SOCK_EVENT_KEY: usize = usize::MAX - 1; // avoid to overlap with zc.poll_ids
228
229/// A daemon thread for mDNS
230///
231/// This struct provides a handle and an API to the daemon. It is cloneable.
232#[derive(Clone)]
233pub struct ServiceDaemon {
234    /// Sender handle of the channel to the daemon.
235    sender: Sender<Command>,
236
237    /// Send to this addr to signal that a `Command` is coming.
238    ///
239    /// The daemon listens on this addr together with other mDNS sockets,
240    /// to avoid busy polling the flume channel. If there is a way to poll
241    /// the channel and mDNS sockets together, then this can be removed.
242    signal_addr: SocketAddr,
243}
244
245impl ServiceDaemon {
246    /// Creates a new daemon and spawns a thread to run the daemon.
247    ///
248    /// Creates a new mDNS service daemon using the default port (5353).
249    ///
250    /// For development/testing with custom ports, use [`ServiceDaemon::new_with_port`].
251    pub fn new() -> Result<Self> {
252        Self::new_with_port(MDNS_PORT)
253    }
254
255    /// Creates a new mDNS service daemon using a custom port.
256    ///
257    /// # Arguments
258    ///
259    /// * `port` - The UDP port to bind for mDNS communication.
260    ///   - In production, this should be `MDNS_PORT` (5353) per RFC 6762.
261    ///   - For development/testing, you can use a non-standard port (e.g., 5454)
262    ///     to avoid conflicts with system mDNS services.
263    ///   - Both publisher and browser must use the same port to communicate.
264    ///
265    /// # Example
266    ///
267    /// ```no_run
268    /// use mdns_sd::ServiceDaemon;
269    ///
270    /// // Use standard mDNS port (production)
271    /// let daemon = ServiceDaemon::new_with_port(5353)?;
272    ///
273    /// // Use custom port for development (avoids macOS Bonjour conflict)
274    /// let daemon_dev = ServiceDaemon::new_with_port(5454)?;
275    /// # Ok::<(), mdns_sd::Error>(())
276    /// ```
277    pub fn new_with_port(port: u16) -> Result<Self> {
278        // Use port 0 to allow the system assign a random available port,
279        // no need for a pre-defined port number.
280        let signal_addr = SocketAddrV4::new(LOOPBACK_V4, 0);
281
282        let signal_sock = UdpSocket::bind(signal_addr)
283            .map_err(|e| e_fmt!("failed to create signal_sock for daemon: {}", e))?;
284
285        // Get the socket with the OS chosen port
286        let signal_addr = signal_sock
287            .local_addr()
288            .map_err(|e| e_fmt!("failed to get signal sock addr: {}", e))?;
289
290        // Must be nonblocking so we can listen to it together with mDNS sockets.
291        signal_sock
292            .set_nonblocking(true)
293            .map_err(|e| e_fmt!("failed to set nonblocking for signal socket: {}", e))?;
294
295        let poller = Poll::new().map_err(|e| e_fmt!("failed to create mio Poll: {e}"))?;
296
297        let (sender, receiver) = bounded(100);
298
299        // Spawn the daemon thread
300        let mio_sock = MioUdpSocket::from_std(signal_sock);
301        let cmd_sender = sender.clone();
302        thread::Builder::new()
303            .name("mDNS_daemon".to_string())
304            .spawn(move || {
305                Self::daemon_thread(mio_sock, poller, receiver, port, cmd_sender, signal_addr)
306            })
307            .map_err(|e| e_fmt!("thread builder failed to spawn: {}", e))?;
308
309        Ok(Self {
310            sender,
311            signal_addr,
312        })
313    }
314
315    /// Sends `cmd` to the daemon via its channel, and sends a signal
316    /// to its sock addr to notify.
317    fn send_cmd(&self, cmd: Command) -> Result<()> {
318        let cmd_name = cmd.to_string();
319
320        // First, send to the flume channel.
321        self.sender.try_send(cmd).map_err(|e| match e {
322            TrySendError::Full(_) => Error::Again,
323            e => e_fmt!("flume::channel::send failed: {}", e),
324        })?;
325
326        // Second, send a signal to notify the daemon.
327        let addr = SocketAddrV4::new(LOOPBACK_V4, 0);
328        let socket = UdpSocket::bind(addr)
329            .map_err(|e| e_fmt!("Failed to create socket to send signal: {}", e))?;
330        socket
331            .send_to(cmd_name.as_bytes(), self.signal_addr)
332            .map_err(|e| {
333                e_fmt!(
334                    "signal socket send_to {} ({}) failed: {}",
335                    self.signal_addr,
336                    cmd_name,
337                    e
338                )
339            })?;
340
341        Ok(())
342    }
343
344    /// Starts browsing for a specific service type.
345    ///
346    /// `service_type` must end with a valid mDNS domain: '._tcp.local.' or '._udp.local.'
347    ///
348    /// Returns a channel `Receiver` to receive events about the service. The caller
349    /// can call `.recv_async().await` on this receiver to handle events in an
350    /// async environment or call `.recv()` in a sync environment.
351    ///
352    /// When a new instance is found, the daemon automatically tries to resolve, i.e.
353    /// finding more details, i.e. SRV records and TXT records.
354    pub fn browse(&self, service_type: &str) -> Result<Receiver<ServiceEvent>> {
355        check_domain_suffix(service_type)?;
356
357        let (resp_s, resp_r) = bounded(10);
358        self.send_cmd(Command::Browse(service_type.to_string(), 1, false, resp_s))?;
359        Ok(resp_r)
360    }
361
362    /// Preforms a "cache-only" browse.
363    ///
364    /// `service_type` must end with a valid mDNS domain: '._tcp.local.' or '._udp.local.'
365    ///
366    /// The functionality is identical to 'browse', but the service events are based solely on the contents
367    /// of the daemon's cache. No actual mDNS query is sent to the network.
368    ///
369    /// See [accept_unsolicited](Self::accept_unsolicited) if you want to do cache-only browsing.
370    pub fn browse_cache(&self, service_type: &str) -> Result<Receiver<ServiceEvent>> {
371        check_domain_suffix(service_type)?;
372
373        let (resp_s, resp_r) = bounded(10);
374        self.send_cmd(Command::Browse(service_type.to_string(), 1, true, resp_s))?;
375        Ok(resp_r)
376    }
377
378    /// Stops searching for a specific service type.
379    ///
380    /// When an error is returned, the caller should retry only when
381    /// the error is `Error::Again`, otherwise should log and move on.
382    pub fn stop_browse(&self, ty_domain: &str) -> Result<()> {
383        self.send_cmd(Command::StopBrowse(ty_domain.to_string()))
384    }
385
386    /// Starts querying for the ip addresses of a hostname.
387    ///
388    /// Returns a channel `Receiver` to receive events about the hostname.
389    /// The caller can call `.recv_async().await` on this receiver to handle events in an
390    /// async environment or call `.recv()` in a sync environment.
391    ///
392    /// The `timeout` is specified in milliseconds.
393    pub fn resolve_hostname(
394        &self,
395        hostname: &str,
396        timeout: Option<u64>,
397    ) -> Result<Receiver<HostnameResolutionEvent>> {
398        check_hostname(hostname)?;
399        let (resp_s, resp_r) = bounded(10);
400        self.send_cmd(Command::ResolveHostname(
401            hostname.to_string(),
402            1,
403            resp_s,
404            timeout,
405        ))?;
406        Ok(resp_r)
407    }
408
409    /// Stops querying for the ip addresses of a hostname.
410    ///
411    /// When an error is returned, the caller should retry only when
412    /// the error is `Error::Again`, otherwise should log and move on.
413    pub fn stop_resolve_hostname(&self, hostname: &str) -> Result<()> {
414        self.send_cmd(Command::StopResolveHostname(hostname.to_string()))
415    }
416
417    /// Registers a service provided by this host.
418    ///
419    /// If `service_info` has no addresses yet and its `addr_auto` is enabled,
420    /// this method will automatically fill in addresses from the host.
421    ///
422    /// To re-announce a service with an updated `service_info`, just call
423    /// this `register` function again. No need to call `unregister` first.
424    pub fn register(&self, service_info: ServiceInfo) -> Result<()> {
425        check_service_name(service_info.get_fullname())?;
426        check_hostname(service_info.get_hostname())?;
427
428        self.send_cmd(Command::Register(service_info.into()))
429    }
430
431    /// Unregisters a service. This is a graceful shutdown of a service.
432    ///
433    /// Returns a channel receiver that is used to receive the status code
434    /// of the unregister.
435    ///
436    /// When an error is returned, the caller should retry only when
437    /// the error is `Error::Again`, otherwise should log and move on.
438    pub fn unregister(&self, fullname: &str) -> Result<Receiver<UnregisterStatus>> {
439        let (resp_s, resp_r) = bounded(1);
440        self.send_cmd(Command::Unregister(fullname.to_lowercase(), resp_s))?;
441        Ok(resp_r)
442    }
443
444    /// Starts to monitor events from the daemon.
445    ///
446    /// Returns a channel [`Receiver`] of [`DaemonEvent`].
447    pub fn monitor(&self) -> Result<Receiver<DaemonEvent>> {
448        let (resp_s, resp_r) = bounded(100);
449        self.send_cmd(Command::Monitor(resp_s))?;
450        Ok(resp_r)
451    }
452
453    /// Shuts down the daemon thread and returns a channel to receive the status.
454    ///
455    /// When an error is returned, the caller should retry only when
456    /// the error is `Error::Again`, otherwise should log and move on.
457    pub fn shutdown(&self) -> Result<Receiver<DaemonStatus>> {
458        let (resp_s, resp_r) = bounded(1);
459        self.send_cmd(Command::Exit(resp_s))?;
460        Ok(resp_r)
461    }
462
463    /// Returns the status of the daemon.
464    ///
465    /// When an error is returned, the caller should retry only when
466    /// the error is `Error::Again`, otherwise should consider the daemon
467    /// stopped working and move on.
468    pub fn status(&self) -> Result<Receiver<DaemonStatus>> {
469        let (resp_s, resp_r) = bounded(1);
470
471        if self.sender.is_disconnected() {
472            resp_s
473                .send(DaemonStatus::Shutdown)
474                .map_err(|e| e_fmt!("failed to send daemon status to the client: {}", e))?;
475        } else {
476            self.send_cmd(Command::GetStatus(resp_s))?;
477        }
478
479        Ok(resp_r)
480    }
481
482    /// Returns a channel receiver for the metrics, e.g. input/output counters.
483    ///
484    /// The metrics returned is a snapshot. Hence the caller should call
485    /// this method repeatedly if they want to monitor the metrics continuously.
486    pub fn get_metrics(&self) -> Result<Receiver<Metrics>> {
487        let (resp_s, resp_r) = bounded(1);
488        self.send_cmd(Command::GetMetrics(resp_s))?;
489        Ok(resp_r)
490    }
491
492    /// Change the max length allowed for a service name.
493    ///
494    /// As RFC 6763 defines a length max for a service name, a user should not call
495    /// this method unless they have to. See [`SERVICE_NAME_LEN_MAX_DEFAULT`].
496    ///
497    /// `len_max` is capped at an internal limit, which is currently 30.
498    pub fn set_service_name_len_max(&self, len_max: u8) -> Result<()> {
499        const SERVICE_NAME_LEN_MAX_LIMIT: u8 = 30; // Double the default length max.
500
501        if len_max > SERVICE_NAME_LEN_MAX_LIMIT {
502            return Err(Error::Msg(format!(
503                "service name length max {len_max} is too large"
504            )));
505        }
506
507        self.send_cmd(Command::SetOption(DaemonOption::ServiceNameLenMax(len_max)))
508    }
509
510    /// Change the interval for checking IP changes automatically.
511    ///
512    /// Setting the interval to 0 disables the IP check.
513    ///
514    /// See [`IP_CHECK_INTERVAL_IN_SECS_DEFAULT`] for the default interval.
515    pub fn set_ip_check_interval(&self, interval_in_secs: u32) -> Result<()> {
516        let interval_in_millis = interval_in_secs as u64 * 1000;
517        self.send_cmd(Command::SetOption(DaemonOption::IpCheckInterval(
518            interval_in_millis,
519        )))
520    }
521
522    /// Get the current interval in seconds for checking IP changes automatically.
523    pub fn get_ip_check_interval(&self) -> Result<u32> {
524        let (resp_s, resp_r) = bounded(1);
525        self.send_cmd(Command::GetOption(resp_s))?;
526
527        let option = resp_r
528            .recv_timeout(Duration::from_secs(10))
529            .map_err(|e| e_fmt!("failed to receive ip check interval: {}", e))?;
530        let ip_check_interval_in_secs = option.ip_check_interval / 1000;
531        Ok(ip_check_interval_in_secs as u32)
532    }
533
534    /// Include interfaces that match `if_kind` for this service daemon.
535    ///
536    /// For example:
537    /// ```ignore
538    ///     daemon.enable_interface("en0")?;
539    /// ```
540    pub fn enable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
541        let if_kind_vec = if_kind.into_vec();
542        self.send_cmd(Command::SetOption(DaemonOption::EnableInterface(
543            if_kind_vec.kinds,
544        )))
545    }
546
547    /// Ignore/exclude interfaces that match `if_kind` for this daemon.
548    ///
549    /// For example:
550    /// ```ignore
551    ///     daemon.disable_interface(IfKind::IPv6)?;
552    /// ```
553    pub fn disable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
554        let if_kind_vec = if_kind.into_vec();
555        self.send_cmd(Command::SetOption(DaemonOption::DisableInterface(
556            if_kind_vec.kinds,
557        )))
558    }
559
560    /// If `accept` is true, accept and cache all responses, even if there is no active querier
561    /// for a given service type. This is useful / necessary when doing cache-only browsing. See
562    /// [browse_cache](Self::browse_cache).
563    ///
564    /// If `accept` is false (default), accept only responses matching queries that we have initiated.
565    ///
566    /// For example:
567    /// ```ignore
568    ///     daemon.accept_unsolicited(true)?;
569    /// ```
570    pub fn accept_unsolicited(&self, accept: bool) -> Result<()> {
571        self.send_cmd(Command::SetOption(DaemonOption::AcceptUnsolicited(accept)))
572    }
573
574    /// Include or exclude Apple P2P interfaces, e.g. "awdl0", "llw0".
575    /// By default, they are excluded.
576    pub fn include_apple_p2p(&self, include: bool) -> Result<()> {
577        self.send_cmd(Command::SetOption(DaemonOption::IncludeAppleP2P(include)))
578    }
579
580    #[cfg(test)]
581    pub fn test_down_interface(&self, ifname: &str) -> Result<()> {
582        self.send_cmd(Command::SetOption(DaemonOption::TestDownInterface(
583            ifname.to_string(),
584        )))
585    }
586
587    #[cfg(test)]
588    pub fn test_up_interface(&self, ifname: &str) -> Result<()> {
589        self.send_cmd(Command::SetOption(DaemonOption::TestUpInterface(
590            ifname.to_string(),
591        )))
592    }
593
594    /// Enable or disable the loopback for locally sent multicast packets in IPv4.
595    ///
596    /// By default, multicast loop is enabled for IPv4. When disabled, a querier will not
597    /// receive announcements from a responder on the same host.
598    ///
599    /// Reference: <https://learn.microsoft.com/en-us/windows/win32/winsock/ip-multicast-2>
600    ///
601    /// "The Winsock version of the IP_MULTICAST_LOOP option is semantically different than
602    /// the UNIX version of the IP_MULTICAST_LOOP option:
603    ///
604    /// In Winsock, the IP_MULTICAST_LOOP option applies only to the receive path.
605    /// In the UNIX version, the IP_MULTICAST_LOOP option applies to the send path."
606    ///
607    /// Which means, in order NOT to receive localhost announcements, you want to call
608    /// this API on the querier side on Windows, but on the responder side on Unix.
609    pub fn set_multicast_loop_v4(&self, on: bool) -> Result<()> {
610        self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV4(on)))
611    }
612
613    /// Enable or disable the loopback for locally sent multicast packets in IPv6.
614    ///
615    /// By default, multicast loop is enabled for IPv6. When disabled, a querier will not
616    /// receive announcements from a responder on the same host.
617    ///
618    /// Reference: <https://learn.microsoft.com/en-us/windows/win32/winsock/ip-multicast-2>
619    ///
620    /// "The Winsock version of the IP_MULTICAST_LOOP option is semantically different than
621    /// the UNIX version of the IP_MULTICAST_LOOP option:
622    ///
623    /// In Winsock, the IP_MULTICAST_LOOP option applies only to the receive path.
624    /// In the UNIX version, the IP_MULTICAST_LOOP option applies to the send path."
625    ///
626    /// Which means, in order NOT to receive localhost announcements, you want to call
627    /// this API on the querier side on Windows, but on the responder side on Unix.
628    pub fn set_multicast_loop_v6(&self, on: bool) -> Result<()> {
629        self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV6(on)))
630    }
631
632    /// Proactively confirms whether a service instance still valid.
633    ///
634    /// This call will issue queries for a service instance's SRV record and Address records.
635    ///
636    /// For `timeout`, most users should use [VERIFY_TIMEOUT_DEFAULT]
637    /// unless there is a reason not to follow RFC.
638    ///
639    /// If no response is received within `timeout`, the current resource
640    /// records will be flushed, and if needed, `ServiceRemoved` event will be
641    /// sent to active queriers.
642    ///
643    /// Reference: [RFC 6762](https://datatracker.ietf.org/doc/html/rfc6762#section-10.4)
644    pub fn verify(&self, instance_fullname: String, timeout: Duration) -> Result<()> {
645        self.send_cmd(Command::Verify(instance_fullname, timeout))
646    }
647
648    fn daemon_thread(
649        signal_sock: MioUdpSocket,
650        poller: Poll,
651        receiver: Receiver<Command>,
652        port: u16,
653        cmd_sender: Sender<Command>,
654        signal_addr: SocketAddr,
655    ) {
656        let mut zc = Zeroconf::new(signal_sock, poller, port, cmd_sender, signal_addr);
657
658        if let Some(cmd) = zc.run(receiver) {
659            match cmd {
660                Command::Exit(resp_s) => {
661                    // It is guaranteed that the receiver already dropped,
662                    // i.e. the daemon command channel closed.
663                    if let Err(e) = resp_s.send(DaemonStatus::Shutdown) {
664                        debug!("exit: failed to send response of shutdown: {}", e);
665                    }
666                }
667                _ => {
668                    debug!("Unexpected command: {:?}", cmd);
669                }
670            }
671        }
672    }
673}
674
675/// Creates a new UDP socket that uses `intf` to send and recv multicast.
676fn _new_socket_bind(intf: &Interface, should_loop: bool) -> Result<MyUdpSocket> {
677    // Use the same socket for receiving and sending multicast packets.
678    // Such socket has to bind to INADDR_ANY or IN6ADDR_ANY.
679    let intf_ip = &intf.ip();
680    match intf_ip {
681        IpAddr::V4(ip) => {
682            let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), MDNS_PORT);
683            let sock = new_socket(addr.into(), true)?;
684
685            // Join mDNS group to receive packets.
686            sock.join_multicast_v4(&GROUP_ADDR_V4, ip)
687                .map_err(|e| e_fmt!("join multicast group on addr {}: {}", intf_ip, e))?;
688
689            // Set IP_MULTICAST_IF to send packets.
690            sock.set_multicast_if_v4(ip)
691                .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
692
693            // Per RFC 6762 section 11:
694            // "All Multicast DNS responses (including responses sent via unicast) SHOULD
695            // be sent with IP TTL set to 255."
696            // Here we set the TTL to 255 for multicast as we don't support unicast yet.
697            sock.set_multicast_ttl_v4(255)
698                .map_err(|e| e_fmt!("set set_multicast_ttl_v4 on addr {}: {}", ip, e))?;
699
700            if !should_loop {
701                sock.set_multicast_loop_v4(false)
702                    .map_err(|e| e_fmt!("failed to set multicast loop v4 for {ip}: {e}"))?;
703            }
704
705            // Test if we can send packets successfully.
706            let multicast_addr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
707            let test_packets = DnsOutgoing::new(0).to_data_on_wire();
708            for packet in test_packets {
709                sock.send_to(&packet, &multicast_addr)
710                    .map_err(|e| e_fmt!("send multicast packet on addr {}: {}", ip, e))?;
711            }
712            MyUdpSocket::new(sock)
713                .map_err(|e| e_fmt!("failed to create MySocket for interface {}: {e}", intf.name))
714        }
715        IpAddr::V6(ip) => {
716            let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), MDNS_PORT, 0, 0);
717            let sock = new_socket(addr.into(), true)?;
718
719            let if_index = intf.index.unwrap_or(0);
720
721            // Join mDNS group to receive packets.
722            sock.join_multicast_v6(&GROUP_ADDR_V6, if_index)
723                .map_err(|e| e_fmt!("join multicast group on addr {}: {}", ip, e))?;
724
725            // Set IPV6_MULTICAST_IF to send packets.
726            sock.set_multicast_if_v6(if_index)
727                .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
728
729            // We are not sending multicast packets to test this socket as there might
730            // be many IPv6 interfaces on a host and could cause such send error:
731            // "No buffer space available (os error 55)".
732
733            MyUdpSocket::new(sock)
734                .map_err(|e| e_fmt!("failed to create MySocket for interface {}: {e}", intf.name))
735        }
736    }
737}
738
739/// Creates a new UDP socket to bind to `port` with REUSEPORT option.
740/// `non_block` indicates whether to set O_NONBLOCK for the socket.
741fn new_socket(addr: SocketAddr, non_block: bool) -> Result<PktInfoUdpSocket> {
742    let domain = match addr {
743        SocketAddr::V4(_) => socket2::Domain::IPV4,
744        SocketAddr::V6(_) => socket2::Domain::IPV6,
745    };
746
747    let fd = PktInfoUdpSocket::new(domain).map_err(|e| e_fmt!("create socket failed: {}", e))?;
748
749    fd.set_reuse_address(true)
750        .map_err(|e| e_fmt!("set ReuseAddr failed: {}", e))?;
751    #[cfg(unix)]
752    if let Err(e) = fd.set_reuse_port(true) {
753        debug!(
754            "SO_REUSEPORT is not supported, continuing without it: {}",
755            e
756        );
757    }
758
759    if non_block {
760        fd.set_nonblocking(true)
761            .map_err(|e| e_fmt!("set O_NONBLOCK: {}", e))?;
762    }
763
764    fd.bind(&addr.into())
765        .map_err(|e| e_fmt!("socket bind to {} failed: {}", &addr, e))?;
766
767    trace!("new socket bind to {}", &addr);
768    Ok(fd)
769}
770
771/// Specify a UNIX timestamp in millis to run `command` for the next time.
772struct ReRun {
773    /// UNIX timestamp in millis.
774    next_time: u64,
775    command: Command,
776}
777
778/// Specify kinds of interfaces. It is used to enable or to disable interfaces in the daemon.
779///
780/// Note that for ergonomic reasons, `From<&str>` and `From<IpAddr>` are implemented.
781#[derive(Debug, Clone)]
782#[non_exhaustive]
783pub enum IfKind {
784    /// All interfaces.
785    All,
786
787    /// All IPv4 interfaces.
788    IPv4,
789
790    /// All IPv6 interfaces.
791    IPv6,
792
793    /// By the interface name, for example "en0"
794    Name(String),
795
796    /// By an IPv4 or IPv6 address.
797    /// This is used to look up the interface. The semantics is to identify an interface of
798    /// IPv4 or IPv6, not a specific address on the interface.
799    Addr(IpAddr),
800
801    /// 127.0.0.1 (or anything in 127.0.0.0/8), enabled by default.
802    ///
803    /// Loopback interfaces are required by some use cases (e.g., OSCQuery) for publishing.
804    LoopbackV4,
805
806    /// ::1/128, enabled by default.
807    LoopbackV6,
808
809    /// By interface index, IPv4 only.
810    IndexV4(u32),
811
812    /// By interface index, IPv6 only.
813    IndexV6(u32),
814}
815
816impl IfKind {
817    /// Checks if `intf` matches with this interface kind.
818    fn matches(&self, intf: &Interface) -> bool {
819        match self {
820            Self::All => true,
821            Self::IPv4 => intf.ip().is_ipv4(),
822            Self::IPv6 => intf.ip().is_ipv6(),
823            Self::Name(ifname) => ifname == &intf.name,
824            Self::Addr(addr) => addr == &intf.ip(),
825            Self::LoopbackV4 => intf.is_loopback() && intf.ip().is_ipv4(),
826            Self::LoopbackV6 => intf.is_loopback() && intf.ip().is_ipv6(),
827            Self::IndexV4(idx) => intf.index == Some(*idx) && intf.ip().is_ipv4(),
828            Self::IndexV6(idx) => intf.index == Some(*idx) && intf.ip().is_ipv6(),
829        }
830    }
831}
832
833/// The first use case of specifying an interface was to
834/// use an interface name. Hence adding this for ergonomic reasons.
835impl From<&str> for IfKind {
836    fn from(val: &str) -> Self {
837        Self::Name(val.to_string())
838    }
839}
840
841impl From<&String> for IfKind {
842    fn from(val: &String) -> Self {
843        Self::Name(val.to_string())
844    }
845}
846
847/// Still for ergonomic reasons.
848impl From<IpAddr> for IfKind {
849    fn from(val: IpAddr) -> Self {
850        Self::Addr(val)
851    }
852}
853
854/// A list of `IfKind` that can be used to match interfaces.
855pub struct IfKindVec {
856    kinds: Vec<IfKind>,
857}
858
859/// A trait that converts a type into a Vec of `IfKind`.
860pub trait IntoIfKindVec {
861    fn into_vec(self) -> IfKindVec;
862}
863
864impl<T: Into<IfKind>> IntoIfKindVec for T {
865    fn into_vec(self) -> IfKindVec {
866        let if_kind: IfKind = self.into();
867        IfKindVec {
868            kinds: vec![if_kind],
869        }
870    }
871}
872
873impl<T: Into<IfKind>> IntoIfKindVec for Vec<T> {
874    fn into_vec(self) -> IfKindVec {
875        let kinds: Vec<IfKind> = self.into_iter().map(|x| x.into()).collect();
876        IfKindVec { kinds }
877    }
878}
879
880/// Selection of interfaces.
881struct IfSelection {
882    /// The interfaces to be selected.
883    if_kind: IfKind,
884
885    /// Whether the `if_kind` should be enabled or not.
886    selected: bool,
887}
888
889/// A struct holding the state. It was inspired by `zeroconf` package in Python.
890struct Zeroconf {
891    /// The mDNS port number to use for socket binding.
892    /// Typically MDNS_PORT (5353), but can be customized for development/testing.
893    port: u16,
894
895    /// Local interfaces keyed by interface index.
896    my_intfs: HashMap<u32, MyIntf>,
897
898    /// A common socket for IPv4 interfaces. It's None if IPv4 is disabled in OS kernel.
899    ipv4_sock: Option<MyUdpSocket>,
900
901    /// A common socket for IPv6 interfaces. It's None if IPv6 is disabled in OS kernel.
902    ipv6_sock: Option<MyUdpSocket>,
903
904    /// Local registered services, keyed by service full names.
905    my_services: HashMap<String, ServiceInfo>,
906
907    /// Received DNS records.
908    cache: DnsCache,
909
910    /// Registered service records, keyed by interface index.
911    dns_registry_map: HashMap<u32, DnsRegistry>,
912
913    /// Active "Browse" commands.
914    service_queriers: HashMap<String, Sender<ServiceEvent>>, // <ty_domain, channel::sender>
915
916    /// Active "ResolveHostname" commands.
917    ///
918    /// The timestamps are set at the future timestamp when the command should timeout.
919    /// `hostname` is case-insensitive and stored in lowercase.
920    hostname_resolvers: HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>, // <hostname, (channel::sender, UNIX timestamp in millis)>
921
922    /// All repeating transmissions.
923    retransmissions: Vec<ReRun>,
924
925    counters: Metrics,
926
927    /// Waits for incoming packets.
928    poller: Poll,
929
930    /// Channels to notify events.
931    monitors: Vec<Sender<DaemonEvent>>,
932
933    /// Options
934    service_name_len_max: u8,
935
936    /// Interval in millis to check IP address changes.
937    ip_check_interval: u64,
938
939    /// All interface selections called to the daemon.
940    if_selections: Vec<IfSelection>,
941
942    /// Socket for signaling.
943    signal_sock: MioUdpSocket,
944
945    /// Timestamps marking where we need another iteration of the run loop,
946    /// to react to events like retransmissions, cache refreshes, interface IP address changes, etc.
947    ///
948    /// When the run loop goes through a single iteration, it will
949    /// set its timeout to the earliest timer in this list.
950    timers: BinaryHeap<Reverse<u64>>,
951
952    status: DaemonStatus,
953
954    /// Service instances that are pending for resolving SRV and TXT.
955    pending_resolves: HashSet<String>,
956
957    /// Service instances that are already resolved.
958    resolved: HashSet<String>,
959
960    multicast_loop_v4: bool,
961
962    multicast_loop_v6: bool,
963
964    accept_unsolicited: bool,
965
966    include_apple_p2p: bool,
967
968    cmd_sender: Sender<Command>,
969
970    signal_addr: SocketAddr,
971
972    #[cfg(test)]
973    test_down_interfaces: HashSet<String>,
974}
975
976/// Join the multicast group for the given interface.
977fn join_multicast_group(my_sock: &PktInfoUdpSocket, intf: &Interface) -> Result<()> {
978    let intf_ip = &intf.ip();
979    match intf_ip {
980        IpAddr::V4(ip) => {
981            // Join mDNS group to receive packets.
982            debug!("join multicast group V4 on {} addr {ip}", intf.name);
983            my_sock
984                .join_multicast_v4(&GROUP_ADDR_V4, ip)
985                .map_err(|e| e_fmt!("PKT join multicast group on addr {}: {}", intf_ip, e))?;
986        }
987        IpAddr::V6(ip) => {
988            let if_index = intf.index.unwrap_or(0);
989            // Join mDNS group to receive packets.
990            debug!(
991                "join multicast group V6 on {} addr {ip} with index {if_index}",
992                intf.name
993            );
994            my_sock
995                .join_multicast_v6(&GROUP_ADDR_V6, if_index)
996                .map_err(|e| e_fmt!("PKT join multicast group on addr {}: {}", ip, e))?;
997        }
998    }
999    Ok(())
1000}
1001
1002impl Zeroconf {
1003    fn new(
1004        signal_sock: MioUdpSocket,
1005        poller: Poll,
1006        port: u16,
1007        cmd_sender: Sender<Command>,
1008        signal_addr: SocketAddr,
1009    ) -> Self {
1010        // Get interfaces.
1011        let my_ifaddrs = my_ip_interfaces(true);
1012
1013        // Create a socket for every IP addr.
1014        // Note: it is possible that `my_ifaddrs` contains the same IP addr with different interface names,
1015        // or the same interface name with different IP addrs.
1016        let mut my_intfs = HashMap::new();
1017        let mut dns_registry_map = HashMap::new();
1018
1019        // Use the same socket for receiving and sending multicast packets.
1020        // Such socket has to bind to INADDR_ANY or IN6ADDR_ANY.
1021        let mut ipv4_sock = None;
1022        let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), port);
1023        match new_socket(addr.into(), true) {
1024            Ok(sock) => {
1025                // Per RFC 6762 section 11:
1026                // "All Multicast DNS responses (including responses sent via unicast) SHOULD
1027                // be sent with IP TTL set to 255."
1028                // Here we set the TTL to 255 for multicast as we don't support unicast yet.
1029                sock.set_multicast_ttl_v4(255)
1030                    .map_err(|e| e_fmt!("set set_multicast_ttl_v4 on addr: {}", e))
1031                    .ok();
1032
1033                // This clones a socket.
1034                ipv4_sock = match MyUdpSocket::new(sock) {
1035                    Ok(s) => Some(s),
1036                    Err(e) => {
1037                        debug!("failed to create IPv4 MyUdpSocket: {e}");
1038                        None
1039                    }
1040                };
1041            }
1042            // Per RFC 6762 section 11:}
1043            Err(e) => debug!("failed to create IPv4 socket: {e}"),
1044        }
1045
1046        let mut ipv6_sock = None;
1047        let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), port, 0, 0);
1048        match new_socket(addr.into(), true) {
1049            Ok(sock) => {
1050                // Per RFC 6762 section 11:
1051                // "All Multicast DNS responses (including responses sent via unicast) SHOULD
1052                // be sent with IP TTL set to 255."
1053                sock.set_multicast_hops_v6(255)
1054                    .map_err(|e| e_fmt!("set set_multicast_hops_v6: {}", e))
1055                    .ok();
1056
1057                // This clones the ipv6 socket.
1058                ipv6_sock = match MyUdpSocket::new(sock) {
1059                    Ok(s) => Some(s),
1060                    Err(e) => {
1061                        debug!("failed to create IPv6 MyUdpSocket: {e}");
1062                        None
1063                    }
1064                };
1065            }
1066            Err(e) => debug!("failed to create IPv6 socket: {e}"),
1067        }
1068
1069        // Configure sockets to join multicast groups.
1070        for intf in my_ifaddrs {
1071            let sock_opt = if intf.ip().is_ipv4() {
1072                &ipv4_sock
1073            } else {
1074                &ipv6_sock
1075            };
1076            let Some(sock) = sock_opt else {
1077                debug!(
1078                    "no socket available for interface {} with addr {}. Skipped.",
1079                    intf.name,
1080                    intf.ip()
1081                );
1082                continue;
1083            };
1084
1085            if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1086                debug!("failed to join multicast: {}: {e}. Skipped.", &intf.ip());
1087            }
1088
1089            let if_index = intf.index.unwrap_or(0);
1090
1091            // Add this interface address if not already present.
1092            dns_registry_map
1093                .entry(if_index)
1094                .or_insert_with(DnsRegistry::new);
1095
1096            my_intfs
1097                .entry(if_index)
1098                .and_modify(|v: &mut MyIntf| {
1099                    v.addrs.insert(intf.addr.clone());
1100                })
1101                .or_insert(MyIntf {
1102                    name: intf.name.clone(),
1103                    index: if_index,
1104                    addrs: HashSet::from([intf.addr]),
1105                });
1106        }
1107
1108        let monitors = Vec::new();
1109        let service_name_len_max = SERVICE_NAME_LEN_MAX_DEFAULT;
1110        let ip_check_interval = IP_CHECK_INTERVAL_IN_SECS_DEFAULT as u64 * 1000;
1111
1112        let timers = BinaryHeap::new();
1113
1114        // Enable everything, including loopback interfaces.
1115        let if_selections = vec![];
1116
1117        let status = DaemonStatus::Running;
1118
1119        Self {
1120            port,
1121            my_intfs,
1122            ipv4_sock,
1123            ipv6_sock,
1124            my_services: HashMap::new(),
1125            cache: DnsCache::new(),
1126            dns_registry_map,
1127            hostname_resolvers: HashMap::new(),
1128            service_queriers: HashMap::new(),
1129            retransmissions: Vec::new(),
1130            counters: HashMap::new(),
1131            poller,
1132            monitors,
1133            service_name_len_max,
1134            ip_check_interval,
1135            if_selections,
1136            signal_sock,
1137            timers,
1138            status,
1139            pending_resolves: HashSet::new(),
1140            resolved: HashSet::new(),
1141            multicast_loop_v4: true,
1142            multicast_loop_v6: true,
1143            accept_unsolicited: false,
1144            include_apple_p2p: false,
1145            cmd_sender,
1146            signal_addr,
1147
1148            #[cfg(test)]
1149            test_down_interfaces: HashSet::new(),
1150        }
1151    }
1152
1153    /// Send a Command into the daemon channel and poke the signal socket to wake the poll loop.
1154    fn send_cmd_to_self(&self, cmd: Command) -> Result<()> {
1155        let cmd_name = cmd.to_string();
1156
1157        self.cmd_sender.try_send(cmd).map_err(|e| match e {
1158            TrySendError::Full(_) => Error::Again,
1159            e => e_fmt!("flume::channel::send failed: {}", e),
1160        })?;
1161
1162        let addr = SocketAddrV4::new(LOOPBACK_V4, 0);
1163        let socket = UdpSocket::bind(addr)
1164            .map_err(|e| e_fmt!("Failed to create socket to send signal: {}", e))?;
1165        socket
1166            .send_to(cmd_name.as_bytes(), self.signal_addr)
1167            .map_err(|e| {
1168                e_fmt!(
1169                    "signal socket send_to {} ({}) failed: {}",
1170                    self.signal_addr,
1171                    cmd_name,
1172                    e
1173                )
1174            })?;
1175
1176        Ok(())
1177    }
1178
1179    /// Clean up all resources before shutdown.
1180    ///
1181    /// This method:
1182    /// 1. Unregisters all registered services (sends goodbye packets)
1183    /// 2. Stops all active browse operations
1184    /// 3. Stops all active hostname resolution operations
1185    /// 4. Clears all retransmissions
1186    fn cleanup(&mut self) {
1187        debug!("Starting cleanup for shutdown");
1188
1189        // 1. Unregister all services - send goodbye packets
1190        let service_names: Vec<String> = self.my_services.keys().cloned().collect();
1191        for fullname in service_names {
1192            if let Some(info) = self.my_services.get(&fullname) {
1193                debug!("Unregistering service during shutdown: {}", &fullname);
1194
1195                for intf in self.my_intfs.values() {
1196                    if let Some(sock) = self.ipv4_sock.as_ref() {
1197                        self.unregister_service(info, intf, &sock.pktinfo);
1198                    }
1199
1200                    if let Some(sock) = self.ipv6_sock.as_ref() {
1201                        self.unregister_service(info, intf, &sock.pktinfo);
1202                    }
1203                }
1204            }
1205        }
1206        self.my_services.clear();
1207
1208        // 2. Stop all browse operations
1209        let browse_types: Vec<String> = self.service_queriers.keys().cloned().collect();
1210        for ty_domain in browse_types {
1211            debug!("Stopping browse during shutdown: {}", &ty_domain);
1212            if let Some(sender) = self.service_queriers.remove(&ty_domain) {
1213                // Notify the client
1214                if let Err(e) = sender.send(ServiceEvent::SearchStopped(ty_domain.clone())) {
1215                    debug!("Failed to send SearchStopped during shutdown: {}", e);
1216                }
1217            }
1218        }
1219
1220        // 3. Stop all hostname resolution operations
1221        let hostnames: Vec<String> = self.hostname_resolvers.keys().cloned().collect();
1222        for hostname in hostnames {
1223            debug!(
1224                "Stopping hostname resolution during shutdown: {}",
1225                &hostname
1226            );
1227            if let Some((sender, _timeout)) = self.hostname_resolvers.remove(&hostname) {
1228                // Notify the client
1229                if let Err(e) =
1230                    sender.send(HostnameResolutionEvent::SearchStopped(hostname.clone()))
1231                {
1232                    debug!(
1233                        "Failed to send HostnameResolutionEvent::SearchStopped during shutdown: {}",
1234                        e
1235                    );
1236                }
1237            }
1238        }
1239
1240        // 4. Clear all retransmissions
1241        self.retransmissions.clear();
1242
1243        debug!("Cleanup completed");
1244    }
1245
1246    /// The main event loop of the daemon thread
1247    ///
1248    /// In each round, it will:
1249    /// 1. select the listening sockets with a timeout.
1250    /// 2. process the incoming packets if any.
1251    /// 3. try_recv on its channel and execute commands.
1252    /// 4. announce its registered services.
1253    /// 5. process retransmissions if any.
1254    fn run(&mut self, receiver: Receiver<Command>) -> Option<Command> {
1255        // Add the daemon's signal socket to the poller.
1256        if let Err(e) = self.poller.registry().register(
1257            &mut self.signal_sock,
1258            mio::Token(SIGNAL_SOCK_EVENT_KEY),
1259            mio::Interest::READABLE,
1260        ) {
1261            debug!("failed to add signal socket to the poller: {}", e);
1262            return None;
1263        }
1264
1265        if let Some(sock) = self.ipv4_sock.as_mut() {
1266            if let Err(e) = self.poller.registry().register(
1267                sock,
1268                mio::Token(IPV4_SOCK_EVENT_KEY),
1269                mio::Interest::READABLE,
1270            ) {
1271                debug!("failed to register ipv4 socket: {}", e);
1272                return None;
1273            }
1274        }
1275
1276        if let Some(sock) = self.ipv6_sock.as_mut() {
1277            if let Err(e) = self.poller.registry().register(
1278                sock,
1279                mio::Token(IPV6_SOCK_EVENT_KEY),
1280                mio::Interest::READABLE,
1281            ) {
1282                debug!("failed to register ipv6 socket: {}", e);
1283                return None;
1284            }
1285        }
1286
1287        // Setup timer for IP checks.
1288        let mut next_ip_check = if self.ip_check_interval > 0 {
1289            current_time_millis() + self.ip_check_interval
1290        } else {
1291            0
1292        };
1293
1294        if next_ip_check > 0 {
1295            self.add_timer(next_ip_check);
1296        }
1297
1298        // Start the run loop.
1299
1300        let mut events = mio::Events::with_capacity(1024);
1301        loop {
1302            let now = current_time_millis();
1303
1304            let earliest_timer = self.peek_earliest_timer();
1305            let timeout = earliest_timer.map(|timer| {
1306                // If `timer` already passed, set `timeout` to be 1ms.
1307                let millis = if timer > now { timer - now } else { 1 };
1308                Duration::from_millis(millis)
1309            });
1310
1311            // Process incoming packets, command events and optional timeout.
1312            events.clear();
1313            match self.poller.poll(&mut events, timeout) {
1314                Ok(_) => self.handle_poller_events(&events),
1315                Err(e) => debug!("failed to select from sockets: {}", e),
1316            }
1317
1318            let now = current_time_millis();
1319
1320            // Remove the timers if already passed.
1321            self.pop_timers_till(now);
1322
1323            // Remove hostname resolvers with expired timeouts.
1324            for hostname in self
1325                .hostname_resolvers
1326                .clone()
1327                .into_iter()
1328                .filter(|(_, (_, timeout))| timeout.map(|t| now >= t).unwrap_or(false))
1329                .map(|(hostname, _)| hostname)
1330            {
1331                trace!("hostname resolver timeout for {}", &hostname);
1332                call_hostname_resolution_listener(
1333                    &self.hostname_resolvers,
1334                    &hostname,
1335                    HostnameResolutionEvent::SearchTimeout(hostname.to_owned()),
1336                );
1337                call_hostname_resolution_listener(
1338                    &self.hostname_resolvers,
1339                    &hostname,
1340                    HostnameResolutionEvent::SearchStopped(hostname.to_owned()),
1341                );
1342                self.hostname_resolvers.remove(&hostname);
1343            }
1344
1345            // process commands from the command channel
1346            while let Ok(command) = receiver.try_recv() {
1347                if matches!(command, Command::Exit(_)) {
1348                    debug!("Exit command received, performing cleanup");
1349                    self.cleanup();
1350                    self.status = DaemonStatus::Shutdown;
1351                    return Some(command);
1352                }
1353                self.exec_command(command, false);
1354            }
1355
1356            // check for repeated commands and run them if their time is up.
1357            let mut i = 0;
1358            while i < self.retransmissions.len() {
1359                if now >= self.retransmissions[i].next_time {
1360                    let rerun = self.retransmissions.remove(i);
1361                    self.exec_command(rerun.command, true);
1362                } else {
1363                    i += 1;
1364                }
1365            }
1366
1367            // Refresh cached service records with active queriers
1368            self.refresh_active_services();
1369
1370            // Refresh cached A/AAAA records with active queriers
1371            let mut query_count = 0;
1372            for (hostname, _sender) in self.hostname_resolvers.iter() {
1373                for (hostname, ip_addr) in
1374                    self.cache.refresh_due_hostname_resolutions(hostname).iter()
1375                {
1376                    self.send_query(hostname, ip_address_rr_type(&ip_addr.to_ip_addr()));
1377                    query_count += 1;
1378                }
1379            }
1380
1381            self.increase_counter(Counter::CacheRefreshAddr, query_count);
1382
1383            // check and evict expired records in our cache
1384            let now = current_time_millis();
1385
1386            // Notify service listeners about the expired records.
1387            let expired_services = self.cache.evict_expired_services(now);
1388            if !expired_services.is_empty() {
1389                debug!(
1390                    "run: send {} service removal to listeners",
1391                    expired_services.len()
1392                );
1393                self.notify_service_removal(expired_services);
1394            }
1395
1396            // Notify hostname listeners about the expired records.
1397            let expired_addrs = self.cache.evict_expired_addr(now);
1398            for (hostname, addrs) in expired_addrs {
1399                call_hostname_resolution_listener(
1400                    &self.hostname_resolvers,
1401                    &hostname,
1402                    HostnameResolutionEvent::AddressesRemoved(hostname.clone(), addrs),
1403                );
1404                let instances = self.cache.get_instances_on_host(&hostname);
1405                let instance_set: HashSet<String> = instances.into_iter().collect();
1406                self.resolve_updated_instances(&instance_set);
1407            }
1408
1409            // Send out probing queries.
1410            self.probing_handler();
1411
1412            // check IP changes if next_ip_check is reached.
1413            if now >= next_ip_check && next_ip_check > 0 {
1414                next_ip_check = now + self.ip_check_interval;
1415                self.add_timer(next_ip_check);
1416
1417                self.check_ip_changes();
1418            }
1419        }
1420    }
1421
1422    fn process_set_option(&mut self, daemon_opt: DaemonOption) {
1423        match daemon_opt {
1424            DaemonOption::ServiceNameLenMax(length) => self.service_name_len_max = length,
1425            DaemonOption::IpCheckInterval(interval) => self.ip_check_interval = interval,
1426            DaemonOption::EnableInterface(if_kind) => self.enable_interface(if_kind),
1427            DaemonOption::DisableInterface(if_kind) => self.disable_interface(if_kind),
1428            DaemonOption::MulticastLoopV4(on) => self.set_multicast_loop_v4(on),
1429            DaemonOption::MulticastLoopV6(on) => self.set_multicast_loop_v6(on),
1430            DaemonOption::AcceptUnsolicited(accept) => self.set_accept_unsolicited(accept),
1431            DaemonOption::IncludeAppleP2P(enable) => self.set_apple_p2p(enable),
1432            #[cfg(test)]
1433            DaemonOption::TestDownInterface(ifname) => {
1434                self.test_down_interfaces.insert(ifname);
1435            }
1436            #[cfg(test)]
1437            DaemonOption::TestUpInterface(ifname) => {
1438                self.test_down_interfaces.remove(&ifname);
1439            }
1440        }
1441    }
1442
1443    fn enable_interface(&mut self, kinds: Vec<IfKind>) {
1444        debug!("enable_interface: {:?}", kinds);
1445        let interfaces = my_ip_interfaces_inner(true, self.include_apple_p2p);
1446
1447        for if_kind in kinds {
1448            self.if_selections.push(IfSelection {
1449                if_kind: resolve_addr_to_index(if_kind, &interfaces),
1450                selected: true,
1451            });
1452        }
1453
1454        self.apply_intf_selections(interfaces);
1455    }
1456
1457    fn disable_interface(&mut self, kinds: Vec<IfKind>) {
1458        debug!("disable_interface: {:?}", kinds);
1459        let interfaces = my_ip_interfaces_inner(true, self.include_apple_p2p);
1460
1461        for if_kind in kinds {
1462            self.if_selections.push(IfSelection {
1463                if_kind: resolve_addr_to_index(if_kind, &interfaces),
1464                selected: false,
1465            });
1466        }
1467
1468        self.apply_intf_selections(interfaces);
1469    }
1470
1471    fn set_multicast_loop_v4(&mut self, on: bool) {
1472        let Some(sock) = self.ipv4_sock.as_mut() else {
1473            return;
1474        };
1475        self.multicast_loop_v4 = on;
1476        sock.pktinfo
1477            .set_multicast_loop_v4(on)
1478            .map_err(|e| e_fmt!("failed to set multicast loop v4: {}", e))
1479            .unwrap();
1480    }
1481
1482    fn set_multicast_loop_v6(&mut self, on: bool) {
1483        let Some(sock) = self.ipv6_sock.as_mut() else {
1484            return;
1485        };
1486        self.multicast_loop_v6 = on;
1487        sock.pktinfo
1488            .set_multicast_loop_v6(on)
1489            .map_err(|e| e_fmt!("failed to set multicast loop v6: {}", e))
1490            .unwrap();
1491    }
1492
1493    fn set_accept_unsolicited(&mut self, accept: bool) {
1494        self.accept_unsolicited = accept;
1495    }
1496
1497    fn set_apple_p2p(&mut self, include: bool) {
1498        if self.include_apple_p2p != include {
1499            self.include_apple_p2p = include;
1500            self.apply_intf_selections(my_ip_interfaces_inner(true, self.include_apple_p2p));
1501        }
1502    }
1503
1504    fn notify_monitors(&mut self, event: DaemonEvent) {
1505        // Only retain the monitors that are still connected.
1506        self.monitors.retain(|sender| {
1507            if let Err(e) = sender.try_send(event.clone()) {
1508                debug!("notify_monitors: try_send: {}", &e);
1509                if matches!(e, TrySendError::Disconnected(_)) {
1510                    return false; // This monitor is dropped.
1511                }
1512            }
1513            true
1514        });
1515    }
1516
1517    /// Remove `addr` in my services that enabled `addr_auto`.
1518    fn del_addr_in_my_services(&mut self, addr: &IpAddr) {
1519        for (_, service_info) in self.my_services.iter_mut() {
1520            if service_info.is_addr_auto() {
1521                service_info.remove_ipaddr(addr);
1522            }
1523        }
1524    }
1525
1526    fn add_timer(&mut self, next_time: u64) {
1527        self.timers.push(Reverse(next_time));
1528    }
1529
1530    fn peek_earliest_timer(&self) -> Option<u64> {
1531        self.timers.peek().map(|Reverse(v)| *v)
1532    }
1533
1534    fn _pop_earliest_timer(&mut self) -> Option<u64> {
1535        self.timers.pop().map(|Reverse(v)| v)
1536    }
1537
1538    /// Pop all timers that are already passed till `now`.
1539    fn pop_timers_till(&mut self, now: u64) {
1540        while let Some(Reverse(v)) = self.timers.peek() {
1541            if *v > now {
1542                break;
1543            }
1544            self.timers.pop();
1545        }
1546    }
1547
1548    /// Apply all selections to `interfaces` and return the selected addresses.
1549    fn selected_intfs(&self, interfaces: Vec<Interface>) -> HashSet<Interface> {
1550        let intf_count = interfaces.len();
1551        let mut intf_selections = vec![true; intf_count];
1552
1553        // apply if_selections
1554        for selection in self.if_selections.iter() {
1555            // Mark the interfaces for this selection.
1556            for i in 0..intf_count {
1557                if selection.if_kind.matches(&interfaces[i]) {
1558                    intf_selections[i] = selection.selected;
1559                }
1560            }
1561        }
1562
1563        let mut selected_addrs = HashSet::new();
1564        for i in 0..intf_count {
1565            if intf_selections[i] {
1566                selected_addrs.insert(interfaces[i].clone());
1567            }
1568        }
1569
1570        selected_addrs
1571    }
1572
1573    /// Apply all selections to `interfaces`.
1574    ///
1575    /// For any interface, add it if selected but not bound yet,
1576    /// delete it if not selected but still bound.
1577    fn apply_intf_selections(&mut self, interfaces: Vec<Interface>) {
1578        // By default, we enable all interfaces.
1579        let intf_count = interfaces.len();
1580        let mut intf_selections = vec![true; intf_count];
1581
1582        // apply if_selections
1583        for selection in self.if_selections.iter() {
1584            // Mark the interfaces for this selection.
1585            for i in 0..intf_count {
1586                if selection.if_kind.matches(&interfaces[i]) {
1587                    intf_selections[i] = selection.selected;
1588                }
1589            }
1590        }
1591
1592        // Update `my_intfs` based on the selections.
1593        for (idx, intf) in interfaces.into_iter().enumerate() {
1594            if intf_selections[idx] {
1595                // Add the interface
1596                self.add_interface(intf);
1597            } else {
1598                // Remove the interface
1599                self.del_interface_addr(&intf);
1600            }
1601        }
1602    }
1603
1604    fn del_ip(&mut self, ip: IpAddr) {
1605        self.del_addr_in_my_services(&ip);
1606        self.notify_monitors(DaemonEvent::IpDel(ip));
1607    }
1608
1609    /// Check for IP changes and update [my_intfs] as needed.
1610    fn check_ip_changes(&mut self) {
1611        // Get the current interfaces.
1612        let my_ifaddrs = my_ip_interfaces_inner(true, self.include_apple_p2p);
1613
1614        #[cfg(test)]
1615        let my_ifaddrs: Vec<_> = my_ifaddrs
1616            .into_iter()
1617            .filter(|intf| !self.test_down_interfaces.contains(&intf.name))
1618            .collect();
1619
1620        let ifaddrs_map: HashMap<u32, Vec<&IfAddr>> =
1621            my_ifaddrs.iter().fold(HashMap::new(), |mut acc, intf| {
1622                let if_index = intf.index.unwrap_or(0);
1623                acc.entry(if_index).or_default().push(&intf.addr);
1624                acc
1625            });
1626
1627        let mut deleted_intfs = Vec::new();
1628        let mut deleted_ips = Vec::new();
1629
1630        for (if_index, my_intf) in self.my_intfs.iter_mut() {
1631            let mut last_ipv4 = None;
1632            let mut last_ipv6 = None;
1633
1634            if let Some(current_addrs) = ifaddrs_map.get(if_index) {
1635                my_intf.addrs.retain(|addr| {
1636                    if current_addrs.contains(&addr) {
1637                        true
1638                    } else {
1639                        match addr.ip() {
1640                            IpAddr::V4(ipv4) => last_ipv4 = Some(ipv4),
1641                            IpAddr::V6(ipv6) => last_ipv6 = Some(ipv6),
1642                        }
1643                        deleted_ips.push(addr.ip());
1644                        false
1645                    }
1646                });
1647                if my_intf.addrs.is_empty() {
1648                    deleted_intfs.push((*if_index, last_ipv4, last_ipv6))
1649                }
1650            } else {
1651                // If it does not exist, remove the interface.
1652                debug!(
1653                    "check_ip_changes: interface {} ({}) no longer exists, removing",
1654                    my_intf.name, if_index
1655                );
1656                for addr in my_intf.addrs.iter() {
1657                    match addr.ip() {
1658                        IpAddr::V4(ipv4) => last_ipv4 = Some(ipv4),
1659                        IpAddr::V6(ipv6) => last_ipv6 = Some(ipv6),
1660                    }
1661                    deleted_ips.push(addr.ip())
1662                }
1663                deleted_intfs.push((*if_index, last_ipv4, last_ipv6));
1664            }
1665        }
1666
1667        if !deleted_ips.is_empty() || !deleted_intfs.is_empty() {
1668            debug!(
1669                "check_ip_changes: {} deleted ips {} deleted intfs",
1670                deleted_ips.len(),
1671                deleted_intfs.len()
1672            );
1673        }
1674
1675        for ip in deleted_ips {
1676            self.del_ip(ip);
1677        }
1678
1679        for (if_index, last_ipv4, last_ipv6) in deleted_intfs {
1680            let Some(my_intf) = self.my_intfs.remove(&if_index) else {
1681                continue;
1682            };
1683
1684            if let Some(ipv4) = last_ipv4 {
1685                debug!("leave multicast for {ipv4}");
1686                if let Some(sock) = self.ipv4_sock.as_mut() {
1687                    if let Err(e) = sock.pktinfo.leave_multicast_v4(&GROUP_ADDR_V4, &ipv4) {
1688                        debug!("leave multicast group for addr {ipv4}: {e}");
1689                    }
1690                }
1691            }
1692
1693            if let Some(ipv6) = last_ipv6 {
1694                debug!("leave multicast for {ipv6}");
1695                if let Some(sock) = self.ipv6_sock.as_mut() {
1696                    if let Err(e) = sock
1697                        .pktinfo
1698                        .leave_multicast_v6(&GROUP_ADDR_V6, my_intf.index)
1699                    {
1700                        debug!("leave multicast group for IPv6: {ipv6}: {e}");
1701                    }
1702                }
1703            }
1704
1705            // Remove cache records for this interface.
1706            let intf_id = InterfaceId {
1707                name: my_intf.name.to_string(),
1708                index: my_intf.index,
1709            };
1710            let result = self.cache.remove_records_on_intf(intf_id);
1711            self.notify_service_removal(result.removed_instances);
1712            self.resolve_updated_instances(&result.modified_instances);
1713        }
1714
1715        // Add newly found interfaces only if in our selections.
1716        self.apply_intf_selections(my_ifaddrs);
1717    }
1718
1719    /// Remove an interface address when it was down, disabled or removed from the system.
1720    /// If no more addresses on the interface, remove the interface as well.
1721    fn del_interface_addr(&mut self, intf: &Interface) {
1722        let if_index = intf.index.unwrap_or(0);
1723        debug!(
1724            "del_interface_addr: {} ({if_index}) addr {}",
1725            intf.name,
1726            intf.ip()
1727        );
1728
1729        let Some(my_intf) = self.my_intfs.get_mut(&if_index) else {
1730            debug!("del_interface_addr: interface {} not found", intf.name);
1731            return;
1732        };
1733
1734        let mut ip_removed = false;
1735
1736        if my_intf.addrs.remove(&intf.addr) {
1737            ip_removed = true;
1738
1739            match intf.addr.ip() {
1740                IpAddr::V4(ipv4) => {
1741                    if my_intf.next_ifaddr_v4().is_none() {
1742                        if let Some(sock) = self.ipv4_sock.as_mut() {
1743                            if let Err(e) = sock.pktinfo.leave_multicast_v4(&GROUP_ADDR_V4, &ipv4) {
1744                                debug!("leave multicast group for addr {ipv4}: {e}");
1745                            } else {
1746                                debug!("leave multicast for {ipv4}");
1747                            }
1748                        }
1749                    }
1750                }
1751
1752                IpAddr::V6(ipv6) => {
1753                    if my_intf.next_ifaddr_v6().is_none() {
1754                        if let Some(sock) = self.ipv6_sock.as_mut() {
1755                            if let Err(e) =
1756                                sock.pktinfo.leave_multicast_v6(&GROUP_ADDR_V6, if_index)
1757                            {
1758                                debug!("leave multicast group for addr {ipv6}: {e}");
1759                            }
1760                        }
1761                    }
1762                }
1763            }
1764
1765            if my_intf.addrs.is_empty() {
1766                // If no more addresses, remove the interface.
1767                debug!("del_interface_addr: removing interface {}", intf.name);
1768                self.my_intfs.remove(&if_index);
1769                self.dns_registry_map.remove(&if_index);
1770                self.cache
1771                    .remove_addrs_on_disabled_intf(if_index, IpType::BOTH);
1772            } else {
1773                // Interface still has addresses of the other IP version.
1774                // Remove cached address records for the disabled IP version
1775                // only if no more addresses of that version remain.
1776                let is_v4 = intf.addr.ip().is_ipv4();
1777                let version_gone = if is_v4 {
1778                    my_intf.next_ifaddr_v4().is_none()
1779                } else {
1780                    my_intf.next_ifaddr_v6().is_none()
1781                };
1782                if version_gone {
1783                    let ip_type = if is_v4 { IpType::V4 } else { IpType::V6 };
1784                    self.cache.remove_addrs_on_disabled_intf(if_index, ip_type);
1785                }
1786            }
1787        }
1788
1789        if ip_removed {
1790            // Notify the monitors.
1791            self.notify_monitors(DaemonEvent::IpDel(intf.ip()));
1792            // Remove the interface from my services that enabled `addr_auto`.
1793            self.del_addr_in_my_services(&intf.ip());
1794        }
1795    }
1796
1797    fn add_interface(&mut self, intf: Interface) {
1798        let sock_opt = if intf.ip().is_ipv4() {
1799            &self.ipv4_sock
1800        } else {
1801            &self.ipv6_sock
1802        };
1803
1804        let Some(sock) = sock_opt else {
1805            debug!(
1806                "add_interface: no socket available for interface {} with addr {}. Skipped.",
1807                intf.name,
1808                intf.ip()
1809            );
1810            return;
1811        };
1812
1813        let if_index = intf.index.unwrap_or(0);
1814        let mut new_addr = false;
1815
1816        match self.my_intfs.entry(if_index) {
1817            Entry::Occupied(mut entry) => {
1818                // If intf has a new address, add it to the existing interface.
1819                let my_intf = entry.get_mut();
1820                if !my_intf.addrs.contains(&intf.addr) {
1821                    if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1822                        debug!("add_interface: socket_config {}: {e}", &intf.name);
1823                    }
1824                    my_intf.addrs.insert(intf.addr.clone());
1825                    new_addr = true;
1826                }
1827            }
1828            Entry::Vacant(entry) => {
1829                if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1830                    debug!("add_interface: socket_config {}: {e}. Skipped.", &intf.name);
1831                    return;
1832                }
1833
1834                new_addr = true;
1835                let new_intf = MyIntf {
1836                    name: intf.name.clone(),
1837                    index: if_index,
1838                    addrs: HashSet::from([intf.addr.clone()]),
1839                };
1840                entry.insert(new_intf);
1841            }
1842        }
1843
1844        if !new_addr {
1845            trace!("add_interface: interface {} already exists", &intf.name);
1846            return;
1847        }
1848
1849        debug!("add new interface {}: {}", intf.name, intf.ip());
1850
1851        let Some(my_intf) = self.my_intfs.get(&if_index) else {
1852            debug!("add_interface: cannot find if_index {if_index}");
1853            return;
1854        };
1855
1856        let dns_registry = match self.dns_registry_map.get_mut(&if_index) {
1857            Some(registry) => registry,
1858            None => self
1859                .dns_registry_map
1860                .entry(if_index)
1861                .or_insert_with(DnsRegistry::new),
1862        };
1863
1864        for (_, service_info) in self.my_services.iter_mut() {
1865            if service_info.is_addr_auto() {
1866                service_info.insert_ipaddr(&intf);
1867
1868                if let Ok(true) = announce_service_on_intf(
1869                    dns_registry,
1870                    service_info,
1871                    my_intf,
1872                    &sock.pktinfo,
1873                    self.port,
1874                ) {
1875                    debug!(
1876                        "Announce service {} on {}",
1877                        service_info.get_fullname(),
1878                        intf.ip()
1879                    );
1880                    service_info.set_status(if_index, ServiceStatus::Announced);
1881                } else {
1882                    for timer in dns_registry.new_timers.drain(..) {
1883                        self.timers.push(Reverse(timer));
1884                    }
1885                    service_info.set_status(if_index, ServiceStatus::Probing);
1886                }
1887            }
1888        }
1889
1890        // Send browse queries on the new interface without known answers.
1891        // This avoids known-answer suppression (RFC 6762 Section 7.1) that
1892        // would cause the responder to suppress its response, preventing
1893        // address records from being attributed to the new interface.
1894        if let Some(my_intf) = self.my_intfs.get(&if_index) {
1895            for ty in self.service_queriers.keys() {
1896                self.send_query_on_intf(ty, RRType::PTR, my_intf);
1897            }
1898        }
1899
1900        // Notify the monitors.
1901        self.notify_monitors(DaemonEvent::IpAdd(intf.ip()));
1902    }
1903
1904    /// Registers a service.
1905    ///
1906    /// RFC 6762 section 8.3.
1907    /// ...the Multicast DNS responder MUST send
1908    ///    an unsolicited Multicast DNS response containing, in the Answer
1909    ///    Section, all of its newly registered resource records
1910    ///
1911    /// Zeroconf will then respond to requests for information about this service.
1912    fn register_service(&mut self, mut info: ServiceInfo) {
1913        // Check the service name length.
1914        if let Err(e) = check_service_name_length(info.get_type(), self.service_name_len_max) {
1915            error!("check_service_name_length: {}", &e);
1916            self.notify_monitors(DaemonEvent::Error(e));
1917            return;
1918        }
1919
1920        if info.is_addr_auto() {
1921            let selected_intfs =
1922                self.selected_intfs(my_ip_interfaces_inner(true, self.include_apple_p2p));
1923            for intf in selected_intfs {
1924                info.insert_ipaddr(&intf);
1925            }
1926        }
1927
1928        debug!("register service {:?}", &info);
1929
1930        let outgoing_addrs = self.send_unsolicited_response(&mut info);
1931        if !outgoing_addrs.is_empty() {
1932            self.notify_monitors(DaemonEvent::Announce(
1933                info.get_fullname().to_string(),
1934                format!("{:?}", &outgoing_addrs),
1935            ));
1936        }
1937
1938        // The key has to be lower case letter as DNS record name is case insensitive.
1939        // The info will have the original name.
1940        let service_fullname = info.get_fullname().to_lowercase();
1941        self.my_services.insert(service_fullname, info);
1942    }
1943
1944    /// Sends out announcement of `info` on every valid interface.
1945    /// Returns the list of interface IPs that sent out the announcement.
1946    fn send_unsolicited_response(&mut self, info: &mut ServiceInfo) -> Vec<IpAddr> {
1947        let mut outgoing_addrs = Vec::new();
1948        let mut outgoing_intfs = HashSet::new();
1949
1950        let mut invalid_intf_addrs = HashSet::new();
1951
1952        for (if_index, intf) in self.my_intfs.iter() {
1953            let dns_registry = match self.dns_registry_map.get_mut(if_index) {
1954                Some(registry) => registry,
1955                None => self
1956                    .dns_registry_map
1957                    .entry(*if_index)
1958                    .or_insert_with(DnsRegistry::new),
1959            };
1960
1961            let mut announced = false;
1962
1963            // IPv4
1964            if let Some(sock) = self.ipv4_sock.as_mut() {
1965                match announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo, self.port) {
1966                    Ok(true) => {
1967                        for addr in intf.addrs.iter().filter(|a| a.ip().is_ipv4()) {
1968                            outgoing_addrs.push(addr.ip());
1969                        }
1970                        outgoing_intfs.insert(intf.index);
1971
1972                        debug!(
1973                            "Announce service IPv4 {} on {}",
1974                            info.get_fullname(),
1975                            intf.name
1976                        );
1977                        announced = true;
1978                    }
1979                    Ok(false) => {}
1980                    Err(InternalError::IntfAddrInvalid(intf_addr)) => {
1981                        invalid_intf_addrs.insert(intf_addr);
1982                    }
1983                }
1984            }
1985
1986            if let Some(sock) = self.ipv6_sock.as_mut() {
1987                match announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo, self.port) {
1988                    Ok(true) => {
1989                        for addr in intf.addrs.iter().filter(|a| a.ip().is_ipv6()) {
1990                            outgoing_addrs.push(addr.ip());
1991                        }
1992                        outgoing_intfs.insert(intf.index);
1993
1994                        debug!(
1995                            "Announce service IPv6 {} on {}",
1996                            info.get_fullname(),
1997                            intf.name
1998                        );
1999                        announced = true;
2000                    }
2001                    Ok(false) => {}
2002                    Err(InternalError::IntfAddrInvalid(intf_addr)) => {
2003                        invalid_intf_addrs.insert(intf_addr);
2004                    }
2005                }
2006            }
2007
2008            if announced {
2009                info.set_status(intf.index, ServiceStatus::Announced);
2010            } else {
2011                for timer in dns_registry.new_timers.drain(..) {
2012                    self.timers.push(Reverse(timer));
2013                }
2014                info.set_status(*if_index, ServiceStatus::Probing);
2015            }
2016        }
2017
2018        if !invalid_intf_addrs.is_empty() {
2019            let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addrs));
2020        }
2021
2022        // RFC 6762 section 8.3.
2023        // ..The Multicast DNS responder MUST send at least two unsolicited
2024        //    responses, one second apart.
2025        let next_time = current_time_millis() + 1000;
2026        for if_index in outgoing_intfs {
2027            self.add_retransmission(
2028                next_time,
2029                Command::RegisterResend(info.get_fullname().to_string(), if_index),
2030            );
2031        }
2032
2033        outgoing_addrs
2034    }
2035
2036    /// Send probings or finish them if expired. Notify waiting services.
2037    fn probing_handler(&mut self) {
2038        let now = current_time_millis();
2039        let mut invalid_intf_addrs = HashSet::new();
2040
2041        for (if_index, intf) in self.my_intfs.iter() {
2042            let Some(dns_registry) = self.dns_registry_map.get_mut(if_index) else {
2043                continue;
2044            };
2045
2046            let (out, expired_probes) = check_probing(dns_registry, &mut self.timers, now);
2047
2048            // send probing.
2049            if !out.questions().is_empty() {
2050                trace!("sending out probing of questions: {:?}", out.questions());
2051                if let Some(sock) = self.ipv4_sock.as_mut() {
2052                    if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
2053                        send_dns_outgoing(&out, intf, &sock.pktinfo, self.port)
2054                    {
2055                        invalid_intf_addrs.insert(intf_addr);
2056                    }
2057                }
2058                if let Some(sock) = self.ipv6_sock.as_mut() {
2059                    if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
2060                        send_dns_outgoing(&out, intf, &sock.pktinfo, self.port)
2061                    {
2062                        invalid_intf_addrs.insert(intf_addr);
2063                    }
2064                }
2065            }
2066
2067            // For finished probes, wake up services that are waiting for the probes.
2068            let waiting_services =
2069                handle_expired_probes(expired_probes, &intf.name, dns_registry, &mut self.monitors);
2070
2071            for service_name in waiting_services {
2072                // service names are lowercase
2073                if let Some(info) = self.my_services.get_mut(&service_name.to_lowercase()) {
2074                    if info.get_status(*if_index) == ServiceStatus::Announced {
2075                        debug!("service {} already announced", info.get_fullname());
2076                        continue;
2077                    }
2078
2079                    let announced_v4 = if let Some(sock) = self.ipv4_sock.as_mut() {
2080                        match announce_service_on_intf(
2081                            dns_registry,
2082                            info,
2083                            intf,
2084                            &sock.pktinfo,
2085                            self.port,
2086                        ) {
2087                            Ok(announced) => announced,
2088                            Err(InternalError::IntfAddrInvalid(intf_addr)) => {
2089                                invalid_intf_addrs.insert(intf_addr);
2090                                false
2091                            }
2092                        }
2093                    } else {
2094                        false
2095                    };
2096                    let announced_v6 = if let Some(sock) = self.ipv6_sock.as_mut() {
2097                        match announce_service_on_intf(
2098                            dns_registry,
2099                            info,
2100                            intf,
2101                            &sock.pktinfo,
2102                            self.port,
2103                        ) {
2104                            Ok(announced) => announced,
2105                            Err(InternalError::IntfAddrInvalid(intf_addr)) => {
2106                                invalid_intf_addrs.insert(intf_addr);
2107                                false
2108                            }
2109                        }
2110                    } else {
2111                        false
2112                    };
2113
2114                    if announced_v4 || announced_v6 {
2115                        let next_time = now + 1000;
2116                        let command =
2117                            Command::RegisterResend(info.get_fullname().to_string(), *if_index);
2118                        self.retransmissions.push(ReRun { next_time, command });
2119                        self.timers.push(Reverse(next_time));
2120
2121                        let fullname = dns_registry.resolve_name(&service_name).to_string();
2122
2123                        let hostname = dns_registry.resolve_name(info.get_hostname());
2124
2125                        debug!("wake up: announce service {} on {}", fullname, intf.name);
2126                        notify_monitors(
2127                            &mut self.monitors,
2128                            DaemonEvent::Announce(fullname, format!("{}:{}", hostname, &intf.name)),
2129                        );
2130
2131                        info.set_status(*if_index, ServiceStatus::Announced);
2132                    }
2133                }
2134            }
2135        }
2136
2137        if !invalid_intf_addrs.is_empty() {
2138            let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addrs));
2139        }
2140    }
2141
2142    fn unregister_service(
2143        &self,
2144        info: &ServiceInfo,
2145        intf: &MyIntf,
2146        sock: &PktInfoUdpSocket,
2147    ) -> Vec<u8> {
2148        let is_ipv4 = sock.domain() == Domain::IPV4;
2149
2150        let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
2151        out.add_answer_at_time(
2152            DnsPointer::new(
2153                info.get_type(),
2154                RRType::PTR,
2155                CLASS_IN,
2156                0,
2157                info.get_fullname().to_string(),
2158            ),
2159            0,
2160        );
2161
2162        if let Some(sub) = info.get_subtype() {
2163            trace!("Adding subdomain {}", sub);
2164            out.add_answer_at_time(
2165                DnsPointer::new(
2166                    sub,
2167                    RRType::PTR,
2168                    CLASS_IN,
2169                    0,
2170                    info.get_fullname().to_string(),
2171                ),
2172                0,
2173            );
2174        }
2175
2176        out.add_answer_at_time(
2177            DnsSrv::new(
2178                info.get_fullname(),
2179                CLASS_IN | CLASS_CACHE_FLUSH,
2180                0,
2181                info.get_priority(),
2182                info.get_weight(),
2183                info.get_port(),
2184                info.get_hostname().to_string(),
2185            ),
2186            0,
2187        );
2188        out.add_answer_at_time(
2189            DnsTxt::new(
2190                info.get_fullname(),
2191                CLASS_IN | CLASS_CACHE_FLUSH,
2192                0,
2193                info.generate_txt(),
2194            ),
2195            0,
2196        );
2197
2198        let if_addrs = if is_ipv4 {
2199            info.get_addrs_on_my_intf_v4(intf)
2200        } else {
2201            info.get_addrs_on_my_intf_v6(intf)
2202        };
2203
2204        if if_addrs.is_empty() {
2205            return vec![];
2206        }
2207
2208        for address in if_addrs {
2209            out.add_answer_at_time(
2210                DnsAddress::new(
2211                    info.get_hostname(),
2212                    ip_address_rr_type(&address),
2213                    CLASS_IN | CLASS_CACHE_FLUSH,
2214                    0,
2215                    address,
2216                    intf.into(),
2217                ),
2218                0,
2219            );
2220        }
2221
2222        // Only (at most) one packet is expected to be sent out.
2223        let sent_vec = match send_dns_outgoing(&out, intf, sock, self.port) {
2224            Ok(sent_vec) => sent_vec,
2225            Err(InternalError::IntfAddrInvalid(intf_addr)) => {
2226                let invalid_intf_addrs = HashSet::from([intf_addr]);
2227                let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addrs));
2228                vec![]
2229            }
2230        };
2231        sent_vec.into_iter().next().unwrap_or_default()
2232    }
2233
2234    /// Binds a channel `listener` to querying mDNS hostnames.
2235    ///
2236    /// If there is already a `listener`, it will be updated, i.e. overwritten.
2237    fn add_hostname_resolver(
2238        &mut self,
2239        hostname: String,
2240        listener: Sender<HostnameResolutionEvent>,
2241        timeout: Option<u64>,
2242    ) {
2243        let real_timeout = timeout.map(|t| current_time_millis() + t);
2244        self.hostname_resolvers
2245            .insert(hostname.to_lowercase(), (listener, real_timeout));
2246        if let Some(t) = real_timeout {
2247            self.add_timer(t);
2248        }
2249    }
2250
2251    /// Sends a multicast query for `name` with `qtype`.
2252    fn send_query(&self, name: &str, qtype: RRType) {
2253        self.send_query_vec(&[(name, qtype)]);
2254    }
2255
2256    /// Sends a query on a specific interface without known answers.
2257    ///
2258    /// Used when a new interface is added so the responder won't suppress
2259    /// its response due to known-answer suppression (RFC 6762 Section 7.1).
2260    fn send_query_on_intf(&self, name: &str, qtype: RRType, intf: &MyIntf) {
2261        let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
2262        out.add_question(name, qtype);
2263
2264        let mut invalid_intf_addrs = HashSet::new();
2265        if let Some(sock) = self.ipv4_sock.as_ref() {
2266            if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
2267                send_dns_outgoing(&out, intf, &sock.pktinfo, self.port)
2268            {
2269                invalid_intf_addrs.insert(intf_addr);
2270            }
2271        }
2272        if let Some(sock) = self.ipv6_sock.as_ref() {
2273            if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
2274                send_dns_outgoing(&out, intf, &sock.pktinfo, self.port)
2275            {
2276                invalid_intf_addrs.insert(intf_addr);
2277            }
2278        }
2279        if !invalid_intf_addrs.is_empty() {
2280            let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addrs));
2281        }
2282    }
2283
2284    /// Sends out a list of `questions` (i.e. DNS questions) via multicast.
2285    fn send_query_vec(&self, questions: &[(&str, RRType)]) {
2286        let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
2287        let now = current_time_millis();
2288
2289        for (name, qtype) in questions {
2290            out.add_question(name, *qtype);
2291
2292            for record in self.cache.get_known_answers(name, *qtype, now) {
2293                /*
2294                RFC 6762 section 7.1: https://datatracker.ietf.org/doc/html/rfc6762#section-7.1
2295                ...
2296                    When a Multicast DNS querier sends a query to which it already knows
2297                    some answers, it populates the Answer Section of the DNS query
2298                    message with those answers.
2299                 */
2300                trace!("add known answer: {:?}", record.record);
2301                let mut new_record = record.record.clone();
2302                new_record.get_record_mut().update_ttl(now);
2303                out.add_answer_box(new_record);
2304            }
2305        }
2306
2307        let mut invalid_intf_addrs = HashSet::new();
2308        for (_, intf) in self.my_intfs.iter() {
2309            if let Some(sock) = self.ipv4_sock.as_ref() {
2310                if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
2311                    send_dns_outgoing(&out, intf, &sock.pktinfo, self.port)
2312                {
2313                    invalid_intf_addrs.insert(intf_addr);
2314                }
2315            }
2316            if let Some(sock) = self.ipv6_sock.as_ref() {
2317                if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
2318                    send_dns_outgoing(&out, intf, &sock.pktinfo, self.port)
2319                {
2320                    invalid_intf_addrs.insert(intf_addr);
2321                }
2322            }
2323        }
2324
2325        if !invalid_intf_addrs.is_empty() {
2326            let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addrs));
2327        }
2328    }
2329
2330    /// Reads one UDP datagram from the socket of `intf`.
2331    ///
2332    /// Returns false if failed to receive a packet,
2333    /// otherwise returns true.
2334    fn handle_read(&mut self, event_key: usize) -> bool {
2335        let sock_opt = match event_key {
2336            IPV4_SOCK_EVENT_KEY => &mut self.ipv4_sock,
2337            IPV6_SOCK_EVENT_KEY => &mut self.ipv6_sock,
2338            _ => {
2339                debug!("handle_read: unknown token {}", event_key);
2340                return false;
2341            }
2342        };
2343        let Some(sock) = sock_opt.as_mut() else {
2344            debug!("handle_read: socket not available for token {}", event_key);
2345            return false;
2346        };
2347        let mut buf = vec![0u8; MAX_MSG_ABSOLUTE];
2348
2349        // Read the next mDNS UDP datagram.
2350        //
2351        // If the datagram is larger than `buf`, excess bytes may or may not
2352        // be truncated by the socket layer depending on the platform's libc.
2353        // In any case, such large datagram will not be decoded properly and
2354        // this function should return false but should not crash.
2355        let (sz, pktinfo) = match sock.pktinfo.recv(&mut buf) {
2356            Ok(sz) => sz,
2357            Err(e) => {
2358                if e.kind() != std::io::ErrorKind::WouldBlock {
2359                    debug!("listening socket read failed: {}", e);
2360                }
2361                return false;
2362            }
2363        };
2364
2365        // Find the interface that received the packet.
2366        let pkt_if_index = pktinfo.if_index as u32;
2367        let Some(my_intf) = self.my_intfs.get(&pkt_if_index) else {
2368            debug!(
2369                "handle_read: no interface found for pktinfo if_index: {}",
2370                pktinfo.if_index
2371            );
2372            return true; // We still return true to indicate that we read something.
2373        };
2374
2375        // Drop packets for an IP version that has been disabled on this interface.
2376        // This is needed because some times the socket layer may still receive packets
2377        // for an IP version even after we left the multicast group for that IP version.
2378        // We want to drop such packets to avoid unnecessary processing.
2379        let is_ipv4 = event_key == IPV4_SOCK_EVENT_KEY;
2380        if (is_ipv4 && my_intf.next_ifaddr_v4().is_none())
2381            || (!is_ipv4 && my_intf.next_ifaddr_v6().is_none())
2382        {
2383            debug!(
2384                "handle_read: dropping {} packet on intf {} (disabled)",
2385                if is_ipv4 { "IPv4" } else { "IPv6" },
2386                my_intf.name
2387            );
2388            return true;
2389        }
2390
2391        buf.truncate(sz); // reduce potential processing errors
2392
2393        match DnsIncoming::new(buf, my_intf.into()) {
2394            Ok(msg) => {
2395                if msg.is_query() {
2396                    self.handle_query(msg, pkt_if_index, event_key == IPV4_SOCK_EVENT_KEY);
2397                } else if msg.is_response() {
2398                    self.handle_response(msg, pkt_if_index);
2399                } else {
2400                    debug!("Invalid message: not query and not response");
2401                }
2402            }
2403            Err(e) => debug!("Invalid incoming DNS message: {}", e),
2404        }
2405
2406        true
2407    }
2408
2409    /// Returns true, if sent query. Returns false if SRV already exists.
2410    fn query_unresolved(&mut self, instance: &str) -> bool {
2411        if !valid_instance_name(instance) {
2412            trace!("instance name {} not valid", instance);
2413            return false;
2414        }
2415
2416        if let Some(records) = self.cache.get_srv(instance) {
2417            for record in records {
2418                if let Some(srv) = record.record.any().downcast_ref::<DnsSrv>() {
2419                    if self.cache.get_addr(srv.host()).is_none() {
2420                        self.send_query_vec(&[(srv.host(), RRType::A), (srv.host(), RRType::AAAA)]);
2421                        return true;
2422                    }
2423                }
2424            }
2425        } else {
2426            self.send_query(instance, RRType::ANY);
2427            return true;
2428        }
2429
2430        false
2431    }
2432
2433    /// Checks if `ty_domain` has records in the cache. If yes, sends the
2434    /// cached records via `sender`.
2435    fn query_cache_for_service(
2436        &mut self,
2437        ty_domain: &str,
2438        sender: &Sender<ServiceEvent>,
2439        now: u64,
2440    ) {
2441        let mut resolved: HashSet<String> = HashSet::new();
2442        let mut unresolved: HashSet<String> = HashSet::new();
2443
2444        if let Some(records) = self.cache.get_ptr(ty_domain) {
2445            for record in records.iter().filter(|r| !r.record.expires_soon(now)) {
2446                if let Some(ptr) = record.record.any().downcast_ref::<DnsPointer>() {
2447                    let mut new_event = None;
2448                    match self.resolve_service_from_cache(ty_domain, ptr.alias()) {
2449                        Ok(resolved_service) => {
2450                            if resolved_service.is_valid() {
2451                                debug!("Resolved service from cache: {}", ptr.alias());
2452                                new_event =
2453                                    Some(ServiceEvent::ServiceResolved(Box::new(resolved_service)));
2454                            } else {
2455                                debug!("Resolved service is not valid: {}", ptr.alias());
2456                            }
2457                        }
2458                        Err(err) => {
2459                            debug!("Error while resolving service from cache: {}", err);
2460                            continue;
2461                        }
2462                    }
2463
2464                    match sender.send(ServiceEvent::ServiceFound(
2465                        ty_domain.to_string(),
2466                        ptr.alias().to_string(),
2467                    )) {
2468                        Ok(()) => debug!("sent service found {}", ptr.alias()),
2469                        Err(e) => {
2470                            debug!("failed to send service found: {}", e);
2471                            continue;
2472                        }
2473                    }
2474
2475                    if let Some(event) = new_event {
2476                        resolved.insert(ptr.alias().to_string());
2477                        match sender.send(event) {
2478                            Ok(()) => debug!("sent service resolved: {}", ptr.alias()),
2479                            Err(e) => debug!("failed to send service resolved: {}", e),
2480                        }
2481                    } else {
2482                        unresolved.insert(ptr.alias().to_string());
2483                    }
2484                }
2485            }
2486        }
2487
2488        for instance in resolved.drain() {
2489            self.pending_resolves.remove(&instance);
2490            self.resolved.insert(instance);
2491        }
2492
2493        for instance in unresolved.drain() {
2494            self.add_pending_resolve(instance);
2495        }
2496    }
2497
2498    /// Checks if `hostname` has records in the cache. If yes, sends the
2499    /// cached records via `sender`.
2500    fn query_cache_for_hostname(
2501        &mut self,
2502        hostname: &str,
2503        sender: Sender<HostnameResolutionEvent>,
2504    ) {
2505        let addresses_map = self.cache.get_addresses_for_host(hostname);
2506        for (name, addresses) in addresses_map {
2507            match sender.send(HostnameResolutionEvent::AddressesFound(name, addresses)) {
2508                Ok(()) => trace!("sent hostname addresses found"),
2509                Err(e) => debug!("failed to send hostname addresses found: {}", e),
2510            }
2511        }
2512    }
2513
2514    fn add_pending_resolve(&mut self, instance: String) {
2515        if !self.pending_resolves.contains(&instance) {
2516            let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
2517            self.add_retransmission(next_time, Command::Resolve(instance.clone(), 1));
2518            self.pending_resolves.insert(instance);
2519        }
2520    }
2521
2522    /// Creates a `ResolvedService` from the cache.
2523    fn resolve_service_from_cache(
2524        &self,
2525        ty_domain: &str,
2526        fullname: &str,
2527    ) -> Result<ResolvedService> {
2528        let now = current_time_millis();
2529        let mut resolved_service = ResolvedService {
2530            ty_domain: ty_domain.to_string(),
2531            sub_ty_domain: None,
2532            fullname: fullname.to_string(),
2533            host: String::new(),
2534            port: 0,
2535            addresses: HashSet::new(),
2536            txt_properties: TxtProperties::new(),
2537        };
2538
2539        // Be sure setting `subtype` if available even when querying for the parent domain.
2540        if let Some(subtype) = self.cache.get_subtype(fullname) {
2541            trace!(
2542                "ty_domain: {} found subtype {} for instance: {}",
2543                ty_domain,
2544                subtype,
2545                fullname
2546            );
2547            if resolved_service.sub_ty_domain.is_none() {
2548                resolved_service.sub_ty_domain = Some(subtype.to_string());
2549            }
2550        }
2551
2552        // resolve SRV record
2553        if let Some(records) = self.cache.get_srv(fullname) {
2554            if let Some(answer) = records.iter().find(|r| !r.record.expires_soon(now)) {
2555                if let Some(dns_srv) = answer.record.any().downcast_ref::<DnsSrv>() {
2556                    resolved_service.host = dns_srv.host().to_string();
2557                    resolved_service.port = dns_srv.port();
2558                }
2559            }
2560        }
2561
2562        // resolve TXT record
2563        if let Some(records) = self.cache.get_txt(fullname) {
2564            if let Some(record) = records.iter().find(|r| !r.record.expires_soon(now)) {
2565                if let Some(dns_txt) = record.record.any().downcast_ref::<DnsTxt>() {
2566                    resolved_service.txt_properties = dns_txt.text().into();
2567                }
2568            }
2569        }
2570
2571        // resolve A and AAAA records
2572        if let Some(records) = self.cache.get_addr(&resolved_service.host) {
2573            for answer in records.iter() {
2574                if let Some(dns_a) = answer.record.any().downcast_ref::<DnsAddress>() {
2575                    if dns_a.expires_soon(now) {
2576                        trace!(
2577                            "Addr expired or expires soon: {}",
2578                            dns_a.address().to_ip_addr()
2579                        );
2580                    } else {
2581                        let scoped = dns_a.address();
2582                        if let ScopedIp::V4(v4) = &scoped {
2583                            // Merge interface_ids if this V4 addr already exists.
2584                            // Linear scan by IP since Eq/Hash include interface_ids.
2585                            let existing = resolved_service
2586                                .addresses
2587                                .iter()
2588                                .find(|a| a.to_ip_addr() == IpAddr::V4(*v4.addr()))
2589                                .cloned();
2590                            if let Some(mut existing) = existing {
2591                                resolved_service.addresses.remove(&existing);
2592                                if let ScopedIp::V4(existing_v4) = &mut existing {
2593                                    for id in v4.interface_ids() {
2594                                        existing_v4.add_interface_id(id.clone());
2595                                    }
2596                                }
2597                                resolved_service.addresses.insert(existing);
2598                            } else {
2599                                resolved_service.addresses.insert(scoped);
2600                            }
2601                        } else {
2602                            resolved_service.addresses.insert(scoped);
2603                        }
2604                    }
2605                }
2606            }
2607        }
2608
2609        Ok(resolved_service)
2610    }
2611
2612    fn handle_poller_events(&mut self, events: &mio::Events) {
2613        for ev in events.iter() {
2614            trace!("event received with key {:?}", ev.token());
2615            if ev.token().0 == SIGNAL_SOCK_EVENT_KEY {
2616                // Drain signals as we will drain commands as well.
2617                self.signal_sock_drain();
2618
2619                if let Err(e) = self.poller.registry().reregister(
2620                    &mut self.signal_sock,
2621                    ev.token(),
2622                    mio::Interest::READABLE,
2623                ) {
2624                    debug!("failed to modify poller for signal socket: {}", e);
2625                }
2626                continue; // Next event.
2627            }
2628
2629            // Read until no more packets available.
2630            while self.handle_read(ev.token().0) {}
2631
2632            // we continue to monitor this socket.
2633            if ev.token().0 == IPV4_SOCK_EVENT_KEY {
2634                // Re-register the IPv4 socket for reading.
2635                if let Some(sock) = self.ipv4_sock.as_mut() {
2636                    if let Err(e) =
2637                        self.poller
2638                            .registry()
2639                            .reregister(sock, ev.token(), mio::Interest::READABLE)
2640                    {
2641                        debug!("modify poller for IPv4 socket: {}", e);
2642                    }
2643                }
2644            } else if ev.token().0 == IPV6_SOCK_EVENT_KEY {
2645                // Re-register the IPv6 socket for reading.
2646                if let Some(sock) = self.ipv6_sock.as_mut() {
2647                    if let Err(e) =
2648                        self.poller
2649                            .registry()
2650                            .reregister(sock, ev.token(), mio::Interest::READABLE)
2651                    {
2652                        debug!("modify poller for IPv6 socket: {}", e);
2653                    }
2654                }
2655            }
2656        }
2657    }
2658
2659    /// Deal with incoming response packets.  All answers
2660    /// are held in the cache, and listeners are notified.
2661    fn handle_response(&mut self, mut msg: DnsIncoming, if_index: u32) {
2662        let now = current_time_millis();
2663
2664        // remove records that are expired.
2665        let mut record_predicate = |record: &DnsRecordBox| {
2666            if !record.get_record().is_expired(now) {
2667                return true;
2668            }
2669
2670            debug!("record is expired, removing it from cache.");
2671            if self.cache.remove(record) {
2672                // for PTR records, send event to listeners
2673                if let Some(dns_ptr) = record.any().downcast_ref::<DnsPointer>() {
2674                    call_service_listener(
2675                        &self.service_queriers,
2676                        dns_ptr.get_name(),
2677                        ServiceEvent::ServiceRemoved(
2678                            dns_ptr.get_name().to_string(),
2679                            dns_ptr.alias().to_string(),
2680                        ),
2681                    );
2682                }
2683            }
2684            false
2685        };
2686        msg.answers_mut().retain(&mut record_predicate);
2687        msg.authorities_mut().retain(&mut record_predicate);
2688        msg.additionals_mut().retain(&mut record_predicate);
2689
2690        // check possible conflicts and handle them.
2691        self.conflict_handler(&msg, if_index);
2692
2693        // check if the message is for us.
2694        let mut is_for_us = true; // assume it is for us.
2695
2696        // If there are any PTR records in the answers, there should be
2697        // at least one PTR for us. Otherwise, the message is not for us.
2698        // If there are no PTR records at all, assume this message is for us.
2699        for answer in msg.answers() {
2700            if answer.get_type() == RRType::PTR {
2701                if self.service_queriers.contains_key(answer.get_name()) {
2702                    is_for_us = true;
2703                    break; // OK to break: at least one PTR for us.
2704                } else {
2705                    is_for_us = false;
2706                }
2707            } else if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2708                // If there is a hostname querier for this address, then it is for us.
2709                let answer_lowercase = answer.get_name().to_lowercase();
2710                if self.hostname_resolvers.contains_key(&answer_lowercase) {
2711                    is_for_us = true;
2712                    break; // OK to break: at least one hostname for us.
2713                }
2714            }
2715        }
2716
2717        // if we explicitily want to accept unsolicited responses, we should consider all messages as for us.
2718        if self.accept_unsolicited {
2719            is_for_us = true;
2720        }
2721
2722        /// Represents a DNS record change that involves one service instance.
2723        struct InstanceChange {
2724            ty: RRType,   // The type of DNS record for the instance.
2725            name: String, // The name of the record.
2726        }
2727
2728        // Go through all answers to get the new and updated records.
2729        // For new PTR records, send out ServiceFound immediately. For others,
2730        // collect them into `changes`.
2731        //
2732        // Note: we don't try to identify the update instances based on
2733        // each record immediately as the answers are likely related to each
2734        // other.
2735        let mut changes = Vec::new();
2736        let mut timers = Vec::new();
2737        let Some(my_intf) = self.my_intfs.get(&if_index) else {
2738            return;
2739        };
2740        for record in msg.all_records() {
2741            match self
2742                .cache
2743                .add_or_update(my_intf, record, &mut timers, is_for_us)
2744            {
2745                Some((dns_record, true)) => {
2746                    timers.push(dns_record.record.get_record().get_expire_time());
2747                    timers.push(dns_record.record.get_record().get_refresh_time());
2748
2749                    let ty = dns_record.record.get_type();
2750                    let name = dns_record.record.get_name();
2751
2752                    // Only process PTR that does not expire soon (i.e. TTL > 1).
2753                    if ty == RRType::PTR && dns_record.record.get_record().get_ttl() > 1 {
2754                        if self.service_queriers.contains_key(name) {
2755                            timers.push(dns_record.record.get_record().get_refresh_time());
2756                        }
2757
2758                        // send ServiceFound
2759                        if let Some(dns_ptr) = dns_record.record.any().downcast_ref::<DnsPointer>()
2760                        {
2761                            debug!("calling listener with service found: {name}");
2762                            call_service_listener(
2763                                &self.service_queriers,
2764                                name,
2765                                ServiceEvent::ServiceFound(
2766                                    name.to_string(),
2767                                    dns_ptr.alias().to_string(),
2768                                ),
2769                            );
2770                            changes.push(InstanceChange {
2771                                ty,
2772                                name: dns_ptr.alias().to_string(),
2773                            });
2774                        }
2775                    } else {
2776                        changes.push(InstanceChange {
2777                            ty,
2778                            name: name.to_string(),
2779                        });
2780                    }
2781                }
2782                Some((dns_record, false)) => {
2783                    timers.push(dns_record.record.get_record().get_expire_time());
2784                    timers.push(dns_record.record.get_record().get_refresh_time());
2785                }
2786                _ => {}
2787            }
2788        }
2789
2790        // Add timers for the new records.
2791        for t in timers {
2792            self.add_timer(t);
2793        }
2794
2795        // Go through remaining changes to see if any hostname resolutions were found or updated.
2796        for change in changes
2797            .iter()
2798            .filter(|change| change.ty == RRType::A || change.ty == RRType::AAAA)
2799        {
2800            let addr_map = self.cache.get_addresses_for_host(&change.name);
2801            for (name, addresses) in addr_map {
2802                call_hostname_resolution_listener(
2803                    &self.hostname_resolvers,
2804                    &change.name,
2805                    HostnameResolutionEvent::AddressesFound(name, addresses),
2806                )
2807            }
2808        }
2809
2810        // Identify the instances that need to be "resolved".
2811        let mut updated_instances = HashSet::new();
2812        for update in changes {
2813            match update.ty {
2814                RRType::PTR | RRType::SRV | RRType::TXT => {
2815                    updated_instances.insert(update.name);
2816                }
2817                RRType::A | RRType::AAAA => {
2818                    let instances = self.cache.get_instances_on_host(&update.name);
2819                    updated_instances.extend(instances);
2820                }
2821                _ => {}
2822            }
2823        }
2824
2825        self.resolve_updated_instances(&updated_instances);
2826    }
2827
2828    fn conflict_handler(&mut self, msg: &DnsIncoming, if_index: u32) {
2829        let Some(my_intf) = self.my_intfs.get(&if_index) else {
2830            debug!("handle_response: no intf found for index {if_index}");
2831            return;
2832        };
2833
2834        let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
2835            return;
2836        };
2837
2838        for answer in msg.answers().iter() {
2839            let mut new_records = Vec::new();
2840
2841            let name = answer.get_name();
2842            let Some(probe) = dns_registry.probing.get_mut(name) else {
2843                continue;
2844            };
2845
2846            // check against possible multicast forwarding
2847            if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2848                if let Some(answer_addr) = answer.any().downcast_ref::<DnsAddress>() {
2849                    if answer_addr.interface_id.index != if_index {
2850                        debug!(
2851                            "conflict handler: answer addr {:?} not in the subnet of intf {}",
2852                            answer_addr, my_intf.name
2853                        );
2854                        continue;
2855                    }
2856                }
2857
2858                // double check if any other address record matches rrdata,
2859                // as there could be multiple addresses for the same name.
2860                let any_match = probe.records.iter().any(|r| {
2861                    r.get_type() == answer.get_type()
2862                        && r.get_class() == answer.get_class()
2863                        && r.rrdata_match(answer.as_ref())
2864                });
2865                if any_match {
2866                    continue; // no conflict for this answer.
2867                }
2868            }
2869
2870            probe.records.retain(|record| {
2871                if record.get_type() == answer.get_type()
2872                    && record.get_class() == answer.get_class()
2873                    && !record.rrdata_match(answer.as_ref())
2874                {
2875                    debug!(
2876                        "found conflict name: '{name}' record: {}: {} PEER: {}",
2877                        record.get_type(),
2878                        record.rdata_print(),
2879                        answer.rdata_print()
2880                    );
2881
2882                    // create a new name for this record
2883                    // then remove the old record in probing.
2884                    let mut new_record = record.clone();
2885                    let new_name = match record.get_type() {
2886                        RRType::A => hostname_change(name),
2887                        RRType::AAAA => hostname_change(name),
2888                        _ => name_change(name),
2889                    };
2890                    new_record.get_record_mut().set_new_name(new_name);
2891                    new_records.push(new_record);
2892                    return false; // old record is dropped from the probe.
2893                }
2894
2895                true
2896            });
2897
2898            // ?????
2899            // if probe.records.is_empty() {
2900            //     dns_registry.probing.remove(name);
2901            // }
2902
2903            // Probing again with the new names.
2904            let create_time = current_time_millis() + fastrand::u64(0..250);
2905
2906            let waiting_services = probe.waiting_services.clone();
2907
2908            for record in new_records {
2909                if dns_registry.update_hostname(name, record.get_name(), create_time) {
2910                    self.timers.push(Reverse(create_time));
2911                }
2912
2913                // remember the name changes (note: `name` might not be the original, it could be already changed once.)
2914                dns_registry.name_changes.insert(
2915                    record.get_record().get_original_name().to_string(),
2916                    record.get_name().to_string(),
2917                );
2918
2919                let new_probe = match dns_registry.probing.get_mut(record.get_name()) {
2920                    Some(p) => p,
2921                    None => {
2922                        let new_probe = dns_registry
2923                            .probing
2924                            .entry(record.get_name().to_string())
2925                            .or_insert_with(|| {
2926                                debug!("conflict handler: new probe of {}", record.get_name());
2927                                Probe::new(create_time)
2928                            });
2929                        self.timers.push(Reverse(new_probe.next_send));
2930                        new_probe
2931                    }
2932                };
2933
2934                debug!(
2935                    "insert record with new name '{}' {} into probe",
2936                    record.get_name(),
2937                    record.get_type()
2938                );
2939                new_probe.insert_record(record);
2940
2941                new_probe.waiting_services.extend(waiting_services.clone());
2942            }
2943        }
2944    }
2945
2946    /// Resolve the updated (including new) instances.
2947    ///
2948    /// Note: it is possible that more than 1 PTR pointing to the same
2949    /// instance. For example, a regular service type PTR and a sub-type
2950    /// service type PTR can both point to the same service instance.
2951    /// This loop automatically handles the sub-type PTRs.
2952    fn resolve_updated_instances(&mut self, updated_instances: &HashSet<String>) {
2953        if updated_instances.is_empty() {
2954            return;
2955        }
2956
2957        let mut resolved: HashSet<String> = HashSet::new();
2958        let mut unresolved: HashSet<String> = HashSet::new();
2959        let mut removed_instances = HashMap::new();
2960
2961        let now = current_time_millis();
2962
2963        for (ty_domain, records) in self.cache.all_ptr().iter() {
2964            if !self.service_queriers.contains_key(ty_domain) {
2965                // No need to resolve if not in our queries.
2966                continue;
2967            }
2968
2969            for ptr in records.iter().filter(|r| !r.record.expires_soon(now)) {
2970                let Some(dns_ptr) = ptr.record.any().downcast_ref::<DnsPointer>() else {
2971                    continue;
2972                };
2973
2974                let instance = dns_ptr.alias();
2975                if !updated_instances.contains(instance) {
2976                    continue;
2977                }
2978
2979                let Ok(resolved_service) = self.resolve_service_from_cache(ty_domain, instance)
2980                else {
2981                    continue;
2982                };
2983
2984                debug!("resolve_updated_instances: from cache: {instance}");
2985                if resolved_service.is_valid() {
2986                    debug!("call queriers to resolve {instance}");
2987                    resolved.insert(instance.to_string());
2988                    let event = ServiceEvent::ServiceResolved(Box::new(resolved_service));
2989                    call_service_listener(&self.service_queriers, ty_domain, event);
2990                } else {
2991                    debug!("Resolved service is not valid: {instance}");
2992                    if self.resolved.remove(dns_ptr.alias()) {
2993                        removed_instances
2994                            .entry(ty_domain.to_string())
2995                            .or_insert_with(HashSet::new)
2996                            .insert(instance.to_string());
2997                    }
2998                    unresolved.insert(instance.to_string());
2999                }
3000            }
3001        }
3002
3003        for instance in resolved.drain() {
3004            self.pending_resolves.remove(&instance);
3005            self.resolved.insert(instance);
3006        }
3007
3008        for instance in unresolved.drain() {
3009            self.add_pending_resolve(instance);
3010        }
3011
3012        if !removed_instances.is_empty() {
3013            debug!(
3014                "resolve_updated_instances: removed {}",
3015                &removed_instances.len()
3016            );
3017            self.notify_service_removal(removed_instances);
3018        }
3019    }
3020
3021    /// Handle incoming query packets, figure out whether and what to respond.
3022    fn handle_query(&mut self, msg: DnsIncoming, if_index: u32, is_ipv4: bool) {
3023        let sock_opt = if is_ipv4 {
3024            &self.ipv4_sock
3025        } else {
3026            &self.ipv6_sock
3027        };
3028        let Some(sock) = sock_opt.as_ref() else {
3029            debug!("handle_query: socket not available for intf {}", if_index);
3030            return;
3031        };
3032        let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
3033
3034        // Special meta-query "_services._dns-sd._udp.<Domain>".
3035        // See https://datatracker.ietf.org/doc/html/rfc6763#section-9
3036        const META_QUERY: &str = "_services._dns-sd._udp.local.";
3037
3038        let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
3039            debug!("missing dns registry for intf {}", if_index);
3040            return;
3041        };
3042
3043        let Some(intf) = self.my_intfs.get(&if_index) else {
3044            debug!("handle_query: no intf found for index {if_index}");
3045            return;
3046        };
3047
3048        for question in msg.questions().iter() {
3049            let qtype = question.entry_type();
3050            let q_name = question.entry_name();
3051
3052            if qtype == RRType::PTR {
3053                for service in self.my_services.values() {
3054                    if service.get_status(if_index) != ServiceStatus::Announced {
3055                        continue;
3056                    }
3057
3058                    if service.matches_type_or_subtype(q_name) {
3059                        out.add_answer_with_additionals(&msg, service, intf, dns_registry, is_ipv4);
3060                    } else if q_name == META_QUERY {
3061                        let ttl = service.get_other_ttl();
3062                        let alias = service.get_type().to_string();
3063                        let ptr = DnsPointer::new(q_name, RRType::PTR, CLASS_IN, ttl, alias);
3064                        if !out.add_answer(&msg, ptr) {
3065                            trace!("answer was not added for meta-query {:?}", &question);
3066                        }
3067                    }
3068                }
3069            } else {
3070                // Simultaneous Probe Tiebreaking (RFC 6762 section 8.2)
3071                if qtype == RRType::ANY && msg.num_authorities() > 0 {
3072                    if let Some(probe) = dns_registry.probing.get_mut(q_name) {
3073                        probe.tiebreaking(&msg, q_name);
3074                    }
3075                }
3076
3077                if qtype == RRType::A || qtype == RRType::AAAA || qtype == RRType::ANY {
3078                    for service in self.my_services.values() {
3079                        if service.get_status(if_index) != ServiceStatus::Announced {
3080                            continue;
3081                        }
3082
3083                        let service_hostname = dns_registry.resolve_name(service.get_hostname());
3084
3085                        if service_hostname.to_lowercase() == question.entry_name().to_lowercase() {
3086                            let intf_addrs = if is_ipv4 {
3087                                service.get_addrs_on_my_intf_v4(intf)
3088                            } else {
3089                                service.get_addrs_on_my_intf_v6(intf)
3090                            };
3091                            if intf_addrs.is_empty()
3092                                && (qtype == RRType::A || qtype == RRType::AAAA)
3093                            {
3094                                let t = match qtype {
3095                                    RRType::A => "TYPE_A",
3096                                    RRType::AAAA => "TYPE_AAAA",
3097                                    _ => "invalid_type",
3098                                };
3099                                trace!(
3100                                    "Cannot find valid addrs for {} response on intf {:?}",
3101                                    t,
3102                                    &intf
3103                                );
3104                                return;
3105                            }
3106                            for address in intf_addrs {
3107                                out.add_answer(
3108                                    &msg,
3109                                    DnsAddress::new(
3110                                        service_hostname,
3111                                        ip_address_rr_type(&address),
3112                                        CLASS_IN | CLASS_CACHE_FLUSH,
3113                                        service.get_host_ttl(),
3114                                        address,
3115                                        intf.into(),
3116                                    ),
3117                                );
3118                            }
3119                        }
3120                    }
3121                }
3122
3123                let query_name = q_name.to_lowercase();
3124                let service_opt = self
3125                    .my_services
3126                    .iter()
3127                    .find(|(k, _v)| dns_registry.resolve_name(k.as_str()) == query_name)
3128                    .map(|(_, v)| v);
3129
3130                let Some(service) = service_opt else {
3131                    continue;
3132                };
3133
3134                if service.get_status(if_index) != ServiceStatus::Announced {
3135                    continue;
3136                }
3137
3138                let intf_addrs = if is_ipv4 {
3139                    service.get_addrs_on_my_intf_v4(intf)
3140                } else {
3141                    service.get_addrs_on_my_intf_v6(intf)
3142                };
3143                if intf_addrs.is_empty() {
3144                    debug!(
3145                        "Cannot find valid addrs for TYPE_SRV response on intf {:?}",
3146                        &intf
3147                    );
3148                    continue;
3149                }
3150
3151                add_answer_of_service(
3152                    &mut out,
3153                    &msg,
3154                    question.entry_name(),
3155                    service,
3156                    qtype,
3157                    intf_addrs,
3158                );
3159            }
3160        }
3161
3162        if out.answers_count() > 0 {
3163            debug!("sending response on intf {}", &intf.name);
3164            out.set_id(msg.id());
3165            if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
3166                send_dns_outgoing(&out, intf, &sock.pktinfo, self.port)
3167            {
3168                let invalid_intf_addr = HashSet::from([intf_addr]);
3169                let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addr));
3170            }
3171
3172            let if_name = intf.name.clone();
3173
3174            self.increase_counter(Counter::Respond, 1);
3175            self.notify_monitors(DaemonEvent::Respond(if_name));
3176        }
3177
3178        self.increase_counter(Counter::KnownAnswerSuppression, out.known_answer_count());
3179    }
3180
3181    /// Increases the value of `counter` by `count`.
3182    fn increase_counter(&mut self, counter: Counter, count: i64) {
3183        let key = counter.to_string();
3184        match self.counters.get_mut(&key) {
3185            Some(v) => *v += count,
3186            None => {
3187                self.counters.insert(key, count);
3188            }
3189        }
3190    }
3191
3192    /// Sets the value of `counter` to `count`.
3193    fn set_counter(&mut self, counter: Counter, count: i64) {
3194        let key = counter.to_string();
3195        self.counters.insert(key, count);
3196    }
3197
3198    fn signal_sock_drain(&self) {
3199        let mut signal_buf = [0; 1024];
3200
3201        // This recv is non-blocking as the socket is non-blocking.
3202        while let Ok(sz) = self.signal_sock.recv(&mut signal_buf) {
3203            trace!(
3204                "signal socket recvd: {}",
3205                String::from_utf8_lossy(&signal_buf[0..sz])
3206            );
3207        }
3208    }
3209
3210    fn add_retransmission(&mut self, next_time: u64, command: Command) {
3211        self.retransmissions.push(ReRun { next_time, command });
3212        self.add_timer(next_time);
3213    }
3214
3215    /// Sends service removal event to listeners for expired service records.
3216    /// `expired`: map of service type domain to set of instance names.
3217    fn notify_service_removal(&self, expired: HashMap<String, HashSet<String>>) {
3218        for (ty_domain, sender) in self.service_queriers.iter() {
3219            if let Some(instances) = expired.get(ty_domain) {
3220                for instance_name in instances {
3221                    let event = ServiceEvent::ServiceRemoved(
3222                        ty_domain.to_string(),
3223                        instance_name.to_string(),
3224                    );
3225                    match sender.send(event) {
3226                        Ok(()) => debug!("notify_service_removal: sent ServiceRemoved to listener of {ty_domain}: {instance_name}"),
3227                        Err(e) => debug!("Failed to send event: {}", e),
3228                    }
3229                }
3230            }
3231        }
3232    }
3233
3234    /// The entry point that executes all commands received by the daemon.
3235    ///
3236    /// `repeating`: whether this is a retransmission.
3237    fn exec_command(&mut self, command: Command, repeating: bool) {
3238        trace!("exec_command: {:?} repeating: {}", &command, repeating);
3239        match command {
3240            Command::Browse(ty, next_delay, cache_only, listener) => {
3241                self.exec_command_browse(repeating, ty, next_delay, cache_only, listener);
3242            }
3243
3244            Command::ResolveHostname(hostname, next_delay, listener, timeout) => {
3245                self.exec_command_resolve_hostname(
3246                    repeating, hostname, next_delay, listener, timeout,
3247                );
3248            }
3249
3250            Command::Register(service_info) => {
3251                self.register_service(*service_info);
3252                self.increase_counter(Counter::Register, 1);
3253            }
3254
3255            Command::RegisterResend(fullname, intf) => {
3256                trace!("register-resend service: {fullname} on {}", &intf);
3257                if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
3258                    self.exec_command_register_resend(fullname, intf)
3259                {
3260                    let invalid_intf_addr = HashSet::from([intf_addr]);
3261                    let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addr));
3262                }
3263            }
3264
3265            Command::Unregister(fullname, resp_s) => {
3266                trace!("unregister service {} repeat {}", &fullname, &repeating);
3267                self.exec_command_unregister(repeating, fullname, resp_s);
3268            }
3269
3270            Command::UnregisterResend(packet, if_index, is_ipv4) => {
3271                self.exec_command_unregister_resend(packet, if_index, is_ipv4);
3272            }
3273
3274            Command::StopBrowse(ty_domain) => self.exec_command_stop_browse(ty_domain),
3275
3276            Command::StopResolveHostname(hostname) => {
3277                self.exec_command_stop_resolve_hostname(hostname.to_lowercase())
3278            }
3279
3280            Command::Resolve(instance, try_count) => self.exec_command_resolve(instance, try_count),
3281
3282            Command::GetMetrics(resp_s) => self.exec_command_get_metrics(resp_s),
3283
3284            Command::GetStatus(resp_s) => match resp_s.send(self.status.clone()) {
3285                Ok(()) => trace!("Sent status to the client"),
3286                Err(e) => debug!("Failed to send status: {}", e),
3287            },
3288
3289            Command::Monitor(resp_s) => {
3290                self.monitors.push(resp_s);
3291            }
3292
3293            Command::SetOption(daemon_opt) => {
3294                self.process_set_option(daemon_opt);
3295            }
3296
3297            Command::GetOption(resp_s) => {
3298                let val = DaemonOptionVal {
3299                    _service_name_len_max: self.service_name_len_max,
3300                    ip_check_interval: self.ip_check_interval,
3301                };
3302                if let Err(e) = resp_s.send(val) {
3303                    debug!("Failed to send options: {}", e);
3304                }
3305            }
3306
3307            Command::Verify(instance_fullname, timeout) => {
3308                self.exec_command_verify(instance_fullname, timeout, repeating);
3309            }
3310
3311            Command::InvalidIntfAddrs(invalid_intf_addrs) => {
3312                for intf_addr in invalid_intf_addrs {
3313                    self.del_interface_addr(&intf_addr);
3314                }
3315
3316                self.check_ip_changes();
3317            }
3318
3319            _ => {
3320                debug!("unexpected command: {:?}", &command);
3321            }
3322        }
3323    }
3324
3325    fn exec_command_get_metrics(&mut self, resp_s: Sender<HashMap<String, i64>>) {
3326        self.set_counter(Counter::CachedPTR, self.cache.ptr_count() as i64);
3327        self.set_counter(Counter::CachedSRV, self.cache.srv_count() as i64);
3328        self.set_counter(Counter::CachedAddr, self.cache.addr_count() as i64);
3329        self.set_counter(Counter::CachedTxt, self.cache.txt_count() as i64);
3330        self.set_counter(Counter::CachedNSec, self.cache.nsec_count() as i64);
3331        self.set_counter(Counter::CachedSubtype, self.cache.subtype_count() as i64);
3332        self.set_counter(Counter::Timer, self.timers.len() as i64);
3333
3334        let dns_registry_probe_count: usize = self
3335            .dns_registry_map
3336            .values()
3337            .map(|r| r.probing.len())
3338            .sum();
3339        self.set_counter(Counter::DnsRegistryProbe, dns_registry_probe_count as i64);
3340
3341        let dns_registry_active_count: usize = self
3342            .dns_registry_map
3343            .values()
3344            .map(|r| r.active.values().map(|a| a.len()).sum::<usize>())
3345            .sum();
3346        self.set_counter(Counter::DnsRegistryActive, dns_registry_active_count as i64);
3347
3348        let dns_registry_timer_count: usize = self
3349            .dns_registry_map
3350            .values()
3351            .map(|r| r.new_timers.len())
3352            .sum();
3353        self.set_counter(Counter::DnsRegistryTimer, dns_registry_timer_count as i64);
3354
3355        let dns_registry_name_change_count: usize = self
3356            .dns_registry_map
3357            .values()
3358            .map(|r| r.name_changes.len())
3359            .sum();
3360        self.set_counter(
3361            Counter::DnsRegistryNameChange,
3362            dns_registry_name_change_count as i64,
3363        );
3364
3365        // Send the metrics to the client.
3366        if let Err(e) = resp_s.send(self.counters.clone()) {
3367            debug!("Failed to send metrics: {}", e);
3368        }
3369    }
3370
3371    fn exec_command_browse(
3372        &mut self,
3373        repeating: bool,
3374        ty: String,
3375        next_delay: u32,
3376        cache_only: bool,
3377        listener: Sender<ServiceEvent>,
3378    ) {
3379        let pretty_addrs: Vec<String> = self
3380            .my_intfs
3381            .iter()
3382            .map(|(if_index, itf)| format!("{} ({if_index})", itf.name))
3383            .collect();
3384
3385        if let Err(e) = listener.send(ServiceEvent::SearchStarted(format!(
3386            "{ty} on {} interfaces [{}]",
3387            pretty_addrs.len(),
3388            pretty_addrs.join(", ")
3389        ))) {
3390            debug!(
3391                "Failed to send SearchStarted({})(repeating:{}): {}",
3392                &ty, repeating, e
3393            );
3394            return;
3395        }
3396
3397        let now = current_time_millis();
3398        if !repeating {
3399            // Binds a `listener` to querying mDNS domain type `ty`.
3400            //
3401            // If there is already a `listener`, it will be updated, i.e. overwritten.
3402            self.service_queriers.insert(ty.clone(), listener.clone());
3403
3404            // if we already have the records in our cache, just send them
3405            self.query_cache_for_service(&ty, &listener, now);
3406        }
3407
3408        if cache_only {
3409            // If cache_only is true, we do not send a query.
3410            match listener.send(ServiceEvent::SearchStopped(ty.clone())) {
3411                Ok(()) => debug!("SearchStopped sent for {}", &ty),
3412                Err(e) => debug!("Failed to send SearchStopped: {}", e),
3413            }
3414            return;
3415        }
3416
3417        self.send_query(&ty, RRType::PTR);
3418        self.increase_counter(Counter::Browse, 1);
3419
3420        let next_time = now + (next_delay * 1000) as u64;
3421        let max_delay = 60 * 60;
3422        let delay = cmp::min(next_delay * 2, max_delay);
3423        self.add_retransmission(next_time, Command::Browse(ty, delay, cache_only, listener));
3424    }
3425
3426    fn exec_command_resolve_hostname(
3427        &mut self,
3428        repeating: bool,
3429        hostname: String,
3430        next_delay: u32,
3431        listener: Sender<HostnameResolutionEvent>,
3432        timeout: Option<u64>,
3433    ) {
3434        let addr_list: Vec<_> = self.my_intfs.iter().collect();
3435        if let Err(e) = listener.send(HostnameResolutionEvent::SearchStarted(format!(
3436            "{} on addrs {:?}",
3437            &hostname, &addr_list
3438        ))) {
3439            debug!(
3440                "Failed to send ResolveStarted({})(repeating:{}): {}",
3441                &hostname, repeating, e
3442            );
3443            return;
3444        }
3445        if !repeating {
3446            self.add_hostname_resolver(hostname.to_owned(), listener.clone(), timeout);
3447            // if we already have the records in our cache, just send them
3448            self.query_cache_for_hostname(&hostname, listener.clone());
3449        }
3450
3451        self.send_query_vec(&[(&hostname, RRType::A), (&hostname, RRType::AAAA)]);
3452        self.increase_counter(Counter::ResolveHostname, 1);
3453
3454        let now = current_time_millis();
3455        let next_time = now + u64::from(next_delay) * 1000;
3456        let max_delay = 60 * 60;
3457        let delay = cmp::min(next_delay * 2, max_delay);
3458
3459        // Only add retransmission if it does not exceed the hostname resolver timeout, if any.
3460        if self
3461            .hostname_resolvers
3462            .get(&hostname)
3463            .and_then(|(_sender, timeout)| *timeout)
3464            .map(|timeout| next_time < timeout)
3465            .unwrap_or(true)
3466        {
3467            self.add_retransmission(
3468                next_time,
3469                Command::ResolveHostname(hostname, delay, listener, None),
3470            );
3471        }
3472    }
3473
3474    fn exec_command_resolve(&mut self, instance: String, try_count: u16) {
3475        let pending_query = self.query_unresolved(&instance);
3476        let max_try = 3;
3477        if pending_query && try_count < max_try {
3478            // Note that if the current try already succeeds, the next retransmission
3479            // will be no-op as the cache has been updated.
3480            let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
3481            self.add_retransmission(next_time, Command::Resolve(instance, try_count + 1));
3482        }
3483    }
3484
3485    fn exec_command_unregister(
3486        &mut self,
3487        repeating: bool,
3488        fullname: String,
3489        resp_s: Sender<UnregisterStatus>,
3490    ) {
3491        let response = match self.my_services.remove_entry(&fullname) {
3492            None => {
3493                debug!("unregister: cannot find such service {}", &fullname);
3494                UnregisterStatus::NotFound
3495            }
3496            Some((_k, info)) => {
3497                let mut timers = Vec::new();
3498
3499                for (if_index, intf) in self.my_intfs.iter() {
3500                    if let Some(sock) = self.ipv4_sock.as_ref() {
3501                        let packet = self.unregister_service(&info, intf, &sock.pktinfo);
3502                        // repeat for one time just in case some peers miss the message
3503                        if !repeating && !packet.is_empty() {
3504                            let next_time = current_time_millis() + 120;
3505                            self.retransmissions.push(ReRun {
3506                                next_time,
3507                                command: Command::UnregisterResend(packet, *if_index, true),
3508                            });
3509                            timers.push(next_time);
3510                        }
3511                    }
3512
3513                    // ipv6
3514                    if let Some(sock) = self.ipv6_sock.as_ref() {
3515                        let packet = self.unregister_service(&info, intf, &sock.pktinfo);
3516                        if !repeating && !packet.is_empty() {
3517                            let next_time = current_time_millis() + 120;
3518                            self.retransmissions.push(ReRun {
3519                                next_time,
3520                                command: Command::UnregisterResend(packet, *if_index, false),
3521                            });
3522                            timers.push(next_time);
3523                        }
3524                    }
3525                }
3526
3527                for t in timers {
3528                    self.add_timer(t);
3529                }
3530
3531                self.increase_counter(Counter::Unregister, 1);
3532                UnregisterStatus::OK
3533            }
3534        };
3535        if let Err(e) = resp_s.send(response) {
3536            debug!("unregister: failed to send response: {}", e);
3537        }
3538    }
3539
3540    fn exec_command_unregister_resend(&mut self, packet: Vec<u8>, if_index: u32, is_ipv4: bool) {
3541        let Some(intf) = self.my_intfs.get(&if_index) else {
3542            return;
3543        };
3544        let sock_opt = if is_ipv4 {
3545            &self.ipv4_sock
3546        } else {
3547            &self.ipv6_sock
3548        };
3549        let Some(sock) = sock_opt else {
3550            return;
3551        };
3552
3553        let if_addr = if is_ipv4 {
3554            match intf.next_ifaddr_v4() {
3555                Some(addr) => addr,
3556                None => return,
3557            }
3558        } else {
3559            match intf.next_ifaddr_v6() {
3560                Some(addr) => addr,
3561                None => return,
3562            }
3563        };
3564
3565        debug!("UnregisterResend from {:?}", if_addr);
3566        multicast_on_intf(
3567            &packet[..],
3568            &intf.name,
3569            intf.index,
3570            if_addr,
3571            &sock.pktinfo,
3572            self.port,
3573        );
3574
3575        self.increase_counter(Counter::UnregisterResend, 1);
3576    }
3577
3578    fn exec_command_stop_browse(&mut self, ty_domain: String) {
3579        match self.service_queriers.remove_entry(&ty_domain) {
3580            None => debug!("StopBrowse: cannot find querier for {}", &ty_domain),
3581            Some((ty, sender)) => {
3582                // Remove pending browse commands in the reruns.
3583                trace!("StopBrowse: removed queryer for {}", &ty);
3584                let mut i = 0;
3585                while i < self.retransmissions.len() {
3586                    if let Command::Browse(t, _, _, _) = &self.retransmissions[i].command {
3587                        if t == &ty {
3588                            self.retransmissions.remove(i);
3589                            trace!("StopBrowse: removed retransmission for {}", &ty);
3590                            continue;
3591                        }
3592                    }
3593                    i += 1;
3594                }
3595
3596                // Remove cache entries.
3597                self.cache.remove_service_type(&ty_domain);
3598
3599                // Notify the client.
3600                match sender.send(ServiceEvent::SearchStopped(ty_domain)) {
3601                    Ok(()) => trace!("Sent SearchStopped to the listener"),
3602                    Err(e) => debug!("Failed to send SearchStopped: {}", e),
3603                }
3604            }
3605        }
3606    }
3607
3608    fn exec_command_stop_resolve_hostname(&mut self, hostname: String) {
3609        if let Some((host, (sender, _timeout))) = self.hostname_resolvers.remove_entry(&hostname) {
3610            // Remove pending resolve commands in the reruns.
3611            trace!("StopResolve: removed queryer for {}", &host);
3612            let mut i = 0;
3613            while i < self.retransmissions.len() {
3614                if let Command::Resolve(t, _) = &self.retransmissions[i].command {
3615                    if t == &host {
3616                        self.retransmissions.remove(i);
3617                        trace!("StopResolve: removed retransmission for {}", &host);
3618                        continue;
3619                    }
3620                }
3621                i += 1;
3622            }
3623
3624            // Notify the client.
3625            match sender.send(HostnameResolutionEvent::SearchStopped(hostname)) {
3626                Ok(()) => trace!("Sent SearchStopped to the listener"),
3627                Err(e) => debug!("Failed to send SearchStopped: {}", e),
3628            }
3629        }
3630    }
3631
3632    fn exec_command_register_resend(&mut self, fullname: String, if_index: u32) -> MyResult<()> {
3633        let Some(info) = self.my_services.get_mut(&fullname) else {
3634            trace!("announce: cannot find such service {}", &fullname);
3635            return Ok(());
3636        };
3637
3638        let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
3639            return Ok(());
3640        };
3641
3642        let Some(intf) = self.my_intfs.get(&if_index) else {
3643            return Ok(());
3644        };
3645
3646        let announced_v4 = if let Some(sock) = self.ipv4_sock.as_ref() {
3647            announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo, self.port)?
3648        } else {
3649            false
3650        };
3651        let announced_v6 = if let Some(sock) = self.ipv6_sock.as_ref() {
3652            announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo, self.port)?
3653        } else {
3654            false
3655        };
3656
3657        if announced_v4 || announced_v6 {
3658            let hostname = dns_registry.resolve_name(info.get_hostname());
3659            let service_name = dns_registry.resolve_name(&fullname).to_string();
3660
3661            debug!("resend: announce service {service_name} on {}", intf.name);
3662
3663            notify_monitors(
3664                &mut self.monitors,
3665                DaemonEvent::Announce(service_name, format!("{}:{}", hostname, &intf.name)),
3666            );
3667            info.set_status(if_index, ServiceStatus::Announced);
3668        } else {
3669            debug!("register-resend should not fail");
3670        }
3671
3672        self.increase_counter(Counter::RegisterResend, 1);
3673        Ok(())
3674    }
3675
3676    fn exec_command_verify(&mut self, instance: String, timeout: Duration, repeating: bool) {
3677        /*
3678        RFC 6762 section 10.4:
3679        ...
3680        When the cache receives this hint that it should reconfirm some
3681        record, it MUST issue two or more queries for the resource record in
3682        dispute.  If no response is received within ten seconds, then, even
3683        though its TTL may indicate that it is not yet due to expire, that
3684        record SHOULD be promptly flushed from the cache.
3685        */
3686        let now = current_time_millis();
3687        let expire_at = if repeating {
3688            None
3689        } else {
3690            Some(now + timeout.as_millis() as u64)
3691        };
3692
3693        // send query for the resource records.
3694        let record_vec = self.cache.service_verify_queries(&instance, expire_at);
3695
3696        if !record_vec.is_empty() {
3697            let query_vec: Vec<(&str, RRType)> = record_vec
3698                .iter()
3699                .map(|(record, rr_type)| (record.as_str(), *rr_type))
3700                .collect();
3701            self.send_query_vec(&query_vec);
3702
3703            if let Some(new_expire) = expire_at {
3704                self.add_timer(new_expire); // ensure a check for the new expire time.
3705
3706                // schedule a resend 1 second later
3707                self.add_retransmission(now + 1000, Command::Verify(instance, timeout));
3708            }
3709        }
3710    }
3711
3712    /// Refresh cached service records with active queriers
3713    fn refresh_active_services(&mut self) {
3714        let mut query_ptr_count = 0;
3715        let mut query_srv_count = 0;
3716        let mut new_timers = HashSet::new();
3717        let mut query_addr_count = 0;
3718
3719        for (ty_domain, _sender) in self.service_queriers.iter() {
3720            let refreshed_timers = self.cache.refresh_due_ptr(ty_domain);
3721            if !refreshed_timers.is_empty() {
3722                trace!("sending refresh query for PTR: {}", ty_domain);
3723                self.send_query(ty_domain, RRType::PTR);
3724                query_ptr_count += 1;
3725                new_timers.extend(refreshed_timers);
3726            }
3727
3728            let (instances, timers) = self.cache.refresh_due_srv_txt(ty_domain);
3729            for (instance, types) in instances {
3730                trace!("sending refresh query for: {}", &instance);
3731                let query_vec = types
3732                    .into_iter()
3733                    .map(|ty| (instance.as_str(), ty))
3734                    .collect::<Vec<_>>();
3735                self.send_query_vec(&query_vec);
3736                query_srv_count += 1;
3737            }
3738            new_timers.extend(timers);
3739            let (hostnames, timers) = self.cache.refresh_due_hosts(ty_domain);
3740            for hostname in hostnames.iter() {
3741                trace!("sending refresh queries for A and AAAA:  {}", hostname);
3742                self.send_query_vec(&[(hostname, RRType::A), (hostname, RRType::AAAA)]);
3743                query_addr_count += 2;
3744            }
3745            new_timers.extend(timers);
3746        }
3747
3748        for timer in new_timers {
3749            self.add_timer(timer);
3750        }
3751
3752        self.increase_counter(Counter::CacheRefreshPTR, query_ptr_count);
3753        self.increase_counter(Counter::CacheRefreshSrvTxt, query_srv_count);
3754        self.increase_counter(Counter::CacheRefreshAddr, query_addr_count);
3755    }
3756}
3757
3758/// Adds one or more answers of a service for incoming msg and RR entry name.
3759fn add_answer_of_service(
3760    out: &mut DnsOutgoing,
3761    msg: &DnsIncoming,
3762    entry_name: &str,
3763    service: &ServiceInfo,
3764    qtype: RRType,
3765    intf_addrs: Vec<IpAddr>,
3766) {
3767    if qtype == RRType::SRV || qtype == RRType::ANY {
3768        out.add_answer(
3769            msg,
3770            DnsSrv::new(
3771                entry_name,
3772                CLASS_IN | CLASS_CACHE_FLUSH,
3773                service.get_host_ttl(),
3774                service.get_priority(),
3775                service.get_weight(),
3776                service.get_port(),
3777                service.get_hostname().to_string(),
3778            ),
3779        );
3780    }
3781
3782    if qtype == RRType::TXT || qtype == RRType::ANY {
3783        out.add_answer(
3784            msg,
3785            DnsTxt::new(
3786                entry_name,
3787                CLASS_IN | CLASS_CACHE_FLUSH,
3788                service.get_other_ttl(),
3789                service.generate_txt(),
3790            ),
3791        );
3792    }
3793
3794    if qtype == RRType::SRV {
3795        for address in intf_addrs {
3796            out.add_additional_answer(DnsAddress::new(
3797                service.get_hostname(),
3798                ip_address_rr_type(&address),
3799                CLASS_IN | CLASS_CACHE_FLUSH,
3800                service.get_host_ttl(),
3801                address,
3802                InterfaceId::default(),
3803            ));
3804        }
3805    }
3806}
3807
3808/// All possible events sent to the client from the daemon
3809/// regarding service discovery.
3810#[derive(Clone, Debug)]
3811#[non_exhaustive]
3812pub enum ServiceEvent {
3813    /// Started searching for a service type.
3814    SearchStarted(String),
3815
3816    /// Found a specific (service_type, fullname).
3817    ServiceFound(String, String),
3818
3819    /// Resolved a service instance in a ResolvedService struct.
3820    ServiceResolved(Box<ResolvedService>),
3821
3822    /// A service instance (service_type, fullname) was removed.
3823    ServiceRemoved(String, String),
3824
3825    /// Stopped searching for a service type.
3826    SearchStopped(String),
3827}
3828
3829/// All possible events sent to the client from the daemon
3830/// regarding host resolution.
3831#[derive(Clone, Debug)]
3832#[non_exhaustive]
3833pub enum HostnameResolutionEvent {
3834    /// Started searching for the ip address of a hostname.
3835    SearchStarted(String),
3836    /// One or more addresses for a hostname has been found.
3837    AddressesFound(String, HashSet<ScopedIp>),
3838    /// One or more addresses for a hostname has been removed.
3839    AddressesRemoved(String, HashSet<ScopedIp>),
3840    /// The search for the ip address of a hostname has timed out.
3841    SearchTimeout(String),
3842    /// Stopped searching for the ip address of a hostname.
3843    SearchStopped(String),
3844}
3845
3846/// Some notable events from the daemon besides [`ServiceEvent`].
3847/// These events are expected to happen infrequently.
3848#[derive(Clone, Debug)]
3849#[non_exhaustive]
3850pub enum DaemonEvent {
3851    /// Daemon unsolicitly announced a service from an interface.
3852    Announce(String, String),
3853
3854    /// Daemon encountered an error.
3855    Error(Error),
3856
3857    /// Daemon detected a new IP address from the host.
3858    IpAdd(IpAddr),
3859
3860    /// Daemon detected a IP address removed from the host.
3861    IpDel(IpAddr),
3862
3863    /// Daemon resolved a name conflict by changing one of its names.
3864    /// see [DnsNameChange] for more details.
3865    NameChange(DnsNameChange),
3866
3867    /// Send out a multicast response via an interface.
3868    Respond(String),
3869}
3870
3871/// Represents a name change due to a name conflict resolution.
3872/// See [RFC 6762 section 9](https://datatracker.ietf.org/doc/html/rfc6762#section-9)
3873#[derive(Clone, Debug)]
3874pub struct DnsNameChange {
3875    /// The original name set in `ServiceInfo` by the user.
3876    pub original: String,
3877
3878    /// A new name is created by appending a suffix after the original name.
3879    ///
3880    /// - for a service instance name, the suffix is `(N)`, where N starts at 2.
3881    /// - for a host name, the suffix is `-N`, where N starts at 2.
3882    ///
3883    /// For example:
3884    ///
3885    /// - Service name `foo._service-type._udp` becomes `foo (2)._service-type._udp`
3886    /// - Host name `foo.local.` becomes `foo-2.local.`
3887    pub new_name: String,
3888
3889    /// The resource record type
3890    pub rr_type: RRType,
3891
3892    /// The interface where the name conflict and its change happened.
3893    pub intf_name: String,
3894}
3895
3896/// Commands supported by the daemon
3897#[derive(Debug)]
3898enum Command {
3899    /// Browsing for a service type (ty_domain, next_time_delay_in_seconds, channel::sender)
3900    Browse(String, u32, bool, Sender<ServiceEvent>),
3901
3902    /// Resolve a hostname to IP addresses.
3903    ResolveHostname(String, u32, Sender<HostnameResolutionEvent>, Option<u64>), // (hostname, next_time_delay_in_seconds, sender, timeout_in_milliseconds)
3904
3905    /// Register a service
3906    Register(Box<ServiceInfo>),
3907
3908    /// Unregister a service
3909    Unregister(String, Sender<UnregisterStatus>), // (fullname)
3910
3911    /// Announce again a service to local network
3912    RegisterResend(String, u32), // (fullname)
3913
3914    /// Resend unregister packet.
3915    UnregisterResend(Vec<u8>, u32, bool), // (packet content, if_index, is_ipv4)
3916
3917    /// Stop browsing a service type
3918    StopBrowse(String), // (ty_domain)
3919
3920    /// Stop resolving a hostname
3921    StopResolveHostname(String), // (hostname)
3922
3923    /// Send query to resolve a service instance.
3924    /// This is used when a PTR record exists but SRV & TXT records are missing.
3925    Resolve(String, u16), // (service_instance_fullname, try_count)
3926
3927    /// Read the current values of the counters
3928    GetMetrics(Sender<Metrics>),
3929
3930    /// Get the current status of the daemon.
3931    GetStatus(Sender<DaemonStatus>),
3932
3933    /// Monitor noticeable events in the daemon.
3934    Monitor(Sender<DaemonEvent>),
3935
3936    SetOption(DaemonOption),
3937
3938    GetOption(Sender<DaemonOptionVal>),
3939
3940    /// Proactively confirm a DNS resource record.
3941    ///
3942    /// The intention is to check if a service name or IP address still valid
3943    /// before its TTL expires.
3944    Verify(String, Duration),
3945
3946    /// Invalidate some interface addresses.
3947    InvalidIntfAddrs(HashSet<Interface>),
3948
3949    Exit(Sender<DaemonStatus>),
3950}
3951
3952impl fmt::Display for Command {
3953    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3954        match self {
3955            Self::Browse(_, _, _, _) => write!(f, "Command Browse"),
3956            Self::ResolveHostname(_, _, _, _) => write!(f, "Command ResolveHostname"),
3957            Self::Exit(_) => write!(f, "Command Exit"),
3958            Self::GetStatus(_) => write!(f, "Command GetStatus"),
3959            Self::GetMetrics(_) => write!(f, "Command GetMetrics"),
3960            Self::Monitor(_) => write!(f, "Command Monitor"),
3961            Self::Register(_) => write!(f, "Command Register"),
3962            Self::RegisterResend(_, _) => write!(f, "Command RegisterResend"),
3963            Self::SetOption(_) => write!(f, "Command SetOption"),
3964            Self::GetOption(_) => write!(f, "Command GetOption"),
3965            Self::StopBrowse(_) => write!(f, "Command StopBrowse"),
3966            Self::StopResolveHostname(_) => write!(f, "Command StopResolveHostname"),
3967            Self::Unregister(_, _) => write!(f, "Command Unregister"),
3968            Self::UnregisterResend(_, _, _) => write!(f, "Command UnregisterResend"),
3969            Self::Resolve(_, _) => write!(f, "Command Resolve"),
3970            Self::Verify(_, _) => write!(f, "Command VerifyResource"),
3971            Self::InvalidIntfAddrs(_) => write!(f, "Command InvalidIntfAddrs"),
3972        }
3973    }
3974}
3975
3976struct DaemonOptionVal {
3977    _service_name_len_max: u8,
3978    ip_check_interval: u64,
3979}
3980
3981#[derive(Debug)]
3982enum DaemonOption {
3983    ServiceNameLenMax(u8),
3984    IpCheckInterval(u64),
3985    EnableInterface(Vec<IfKind>),
3986    DisableInterface(Vec<IfKind>),
3987    MulticastLoopV4(bool),
3988    MulticastLoopV6(bool),
3989    AcceptUnsolicited(bool),
3990    IncludeAppleP2P(bool),
3991    #[cfg(test)]
3992    TestDownInterface(String),
3993    #[cfg(test)]
3994    TestUpInterface(String),
3995}
3996
3997/// The length of Service Domain name supported in this lib.
3998const DOMAIN_LEN: usize = "._tcp.local.".len();
3999
4000/// Validate the length of "service_name" in a "_<service_name>.<domain_name>." string.
4001fn check_service_name_length(ty_domain: &str, limit: u8) -> Result<()> {
4002    if ty_domain.len() <= DOMAIN_LEN + 1 {
4003        // service name cannot be empty or only '_'.
4004        return Err(e_fmt!("Service type name cannot be empty: {}", ty_domain));
4005    }
4006
4007    let service_name_len = ty_domain.len() - DOMAIN_LEN - 1; // exclude the leading `_`
4008    if service_name_len > limit as usize {
4009        return Err(e_fmt!("Service name length must be <= {} bytes", limit));
4010    }
4011    Ok(())
4012}
4013
4014/// Checks if `name` ends with a valid domain: '._tcp.local.' or '._udp.local.'
4015fn check_domain_suffix(name: &str) -> Result<()> {
4016    if !(name.ends_with("._tcp.local.") || name.ends_with("._udp.local.")) {
4017        return Err(e_fmt!(
4018            "mDNS service {} must end with '._tcp.local.' or '._udp.local.'",
4019            name
4020        ));
4021    }
4022
4023    Ok(())
4024}
4025
4026/// Validate the service name in a fully qualified name.
4027///
4028/// A Full Name = <Instance>.<Service>.<Domain>
4029/// The only `<Domain>` supported are "._tcp.local." and "._udp.local.".
4030///
4031/// Note: this function does not check for the length of the service name.
4032/// Instead, `register_service` method will check the length.
4033fn check_service_name(fullname: &str) -> Result<()> {
4034    check_domain_suffix(fullname)?;
4035
4036    let remaining: Vec<&str> = fullname[..fullname.len() - DOMAIN_LEN].split('.').collect();
4037    let name = remaining.last().ok_or_else(|| e_fmt!("No service name"))?;
4038
4039    if &name[0..1] != "_" {
4040        return Err(e_fmt!("Service name must start with '_'"));
4041    }
4042
4043    let name = &name[1..];
4044
4045    if name.contains("--") {
4046        return Err(e_fmt!("Service name must not contain '--'"));
4047    }
4048
4049    if name.starts_with('-') || name.ends_with('-') {
4050        return Err(e_fmt!("Service name (%s) may not start or end with '-'"));
4051    }
4052
4053    let ascii_count = name.chars().filter(|c| c.is_ascii_alphabetic()).count();
4054    if ascii_count < 1 {
4055        return Err(e_fmt!(
4056            "Service name must contain at least one letter (eg: 'A-Za-z')"
4057        ));
4058    }
4059
4060    Ok(())
4061}
4062
4063/// Validate a hostname.
4064fn check_hostname(hostname: &str) -> Result<()> {
4065    if !hostname.ends_with(".local.") {
4066        return Err(e_fmt!("Hostname must end with '.local.': {hostname}"));
4067    }
4068
4069    if hostname == ".local." {
4070        return Err(e_fmt!(
4071            "The part of the hostname before '.local.' cannot be empty"
4072        ));
4073    }
4074
4075    if hostname.len() > 255 {
4076        return Err(e_fmt!("Hostname length must be <= 255 bytes"));
4077    }
4078
4079    Ok(())
4080}
4081
4082fn call_service_listener(
4083    listeners_map: &HashMap<String, Sender<ServiceEvent>>,
4084    ty_domain: &str,
4085    event: ServiceEvent,
4086) {
4087    if let Some(listener) = listeners_map.get(ty_domain) {
4088        match listener.send(event) {
4089            Ok(()) => trace!("Sent event to listener successfully"),
4090            Err(e) => debug!("Failed to send event: {}", e),
4091        }
4092    }
4093}
4094
4095fn call_hostname_resolution_listener(
4096    listeners_map: &HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>,
4097    hostname: &str,
4098    event: HostnameResolutionEvent,
4099) {
4100    let hostname_lower = hostname.to_lowercase();
4101    if let Some(listener) = listeners_map.get(&hostname_lower).map(|(l, _)| l) {
4102        match listener.send(event) {
4103            Ok(()) => trace!("Sent event to listener successfully"),
4104            Err(e) => debug!("Failed to send event: {}", e),
4105        }
4106    }
4107}
4108
4109/// Returns valid network interfaces in the host system.
4110/// Operational down interfaces are excluded.
4111/// Loopback interfaces are excluded if `with_loopback` is false.
4112fn my_ip_interfaces(with_loopback: bool) -> Vec<Interface> {
4113    my_ip_interfaces_inner(with_loopback, false)
4114}
4115
4116fn my_ip_interfaces_inner(with_loopback: bool, with_apple_p2p: bool) -> Vec<Interface> {
4117    if_addrs::get_if_addrs()
4118        .unwrap_or_default()
4119        .into_iter()
4120        .filter(|i| {
4121            i.is_oper_up()
4122                && !i.is_p2p()
4123                && (!i.is_loopback() || with_loopback)
4124                && (with_apple_p2p || !is_apple_p2p_by_name(&i.name))
4125        })
4126        .collect()
4127}
4128
4129/// Checks if the interface name indicates it's an Apple peer-to-peer interface,
4130/// which should be ignored by default.
4131fn is_apple_p2p_by_name(name: &str) -> bool {
4132    let p2p_prefixes = ["awdl", "llw"];
4133    p2p_prefixes.iter().any(|prefix| name.starts_with(prefix))
4134}
4135
4136/// Send an outgoing mDNS query or response, and returns the packet bytes.
4137/// Returns empty vec if no valid interface address is found.
4138fn send_dns_outgoing(
4139    out: &DnsOutgoing,
4140    my_intf: &MyIntf,
4141    sock: &PktInfoUdpSocket,
4142    port: u16,
4143) -> MyResult<Vec<Vec<u8>>> {
4144    let if_name = &my_intf.name;
4145
4146    let if_addr = if sock.domain() == Domain::IPV4 {
4147        match my_intf.next_ifaddr_v4() {
4148            Some(addr) => addr,
4149            None => return Ok(vec![]),
4150        }
4151    } else {
4152        match my_intf.next_ifaddr_v6() {
4153            Some(addr) => addr,
4154            None => return Ok(vec![]),
4155        }
4156    };
4157
4158    send_dns_outgoing_impl(out, if_name, my_intf.index, if_addr, sock, port)
4159}
4160
4161/// Send an outgoing mDNS query or response, and returns the packet bytes.
4162fn send_dns_outgoing_impl(
4163    out: &DnsOutgoing,
4164    if_name: &str,
4165    if_index: u32,
4166    if_addr: &IfAddr,
4167    sock: &PktInfoUdpSocket,
4168    port: u16,
4169) -> MyResult<Vec<Vec<u8>>> {
4170    let qtype = if out.is_query() {
4171        "query"
4172    } else {
4173        if out.answers_count() == 0 && out.additionals().is_empty() {
4174            return Ok(vec![]); // no need to send empty response
4175        }
4176        "response"
4177    };
4178    trace!(
4179        "send {}: {} questions {} answers {} authorities {} additional",
4180        qtype,
4181        out.questions().len(),
4182        out.answers_count(),
4183        out.authorities().len(),
4184        out.additionals().len()
4185    );
4186
4187    match if_addr.ip() {
4188        IpAddr::V4(ipv4) => {
4189            if let Err(e) = sock.set_multicast_if_v4(&ipv4) {
4190                debug!(
4191                    "send_dns_outgoing: failed to set multicast interface for IPv4 {}: {}",
4192                    ipv4, e
4193                );
4194                // cannot send without a valid interface
4195                if e.kind() == std::io::ErrorKind::AddrNotAvailable {
4196                    let intf_addr = Interface {
4197                        name: if_name.to_string(),
4198                        addr: if_addr.clone(),
4199                        index: Some(if_index),
4200                        oper_status: if_addrs::IfOperStatus::Down,
4201                        is_p2p: false,
4202                        #[cfg(windows)]
4203                        adapter_name: String::new(),
4204                    };
4205                    return Err(InternalError::IntfAddrInvalid(intf_addr));
4206                }
4207                return Ok(vec![]); // non-fatal other failure
4208            }
4209        }
4210        IpAddr::V6(ipv6) => {
4211            if let Err(e) = sock.set_multicast_if_v6(if_index) {
4212                debug!(
4213                    "send_dns_outgoing: failed to set multicast interface for IPv6 {}: {}",
4214                    ipv6, e
4215                );
4216                // cannot send without a valid interface
4217                if e.kind() == std::io::ErrorKind::AddrNotAvailable {
4218                    let intf_addr = Interface {
4219                        name: if_name.to_string(),
4220                        addr: if_addr.clone(),
4221                        index: Some(if_index),
4222                        oper_status: if_addrs::IfOperStatus::Down,
4223                        is_p2p: false,
4224                        #[cfg(windows)]
4225                        adapter_name: String::new(),
4226                    };
4227                    return Err(InternalError::IntfAddrInvalid(intf_addr));
4228                }
4229                return Ok(vec![]); // non-fatal other failure
4230            }
4231        }
4232    }
4233
4234    let packet_list = out.to_data_on_wire();
4235    for packet in packet_list.iter() {
4236        multicast_on_intf(packet, if_name, if_index, if_addr, sock, port);
4237    }
4238    Ok(packet_list)
4239}
4240
4241/// Sends a multicast packet, and returns the packet bytes.
4242fn multicast_on_intf(
4243    packet: &[u8],
4244    if_name: &str,
4245    if_index: u32,
4246    if_addr: &IfAddr,
4247    socket: &PktInfoUdpSocket,
4248    port: u16,
4249) {
4250    if packet.len() > MAX_MSG_ABSOLUTE {
4251        debug!("Drop over-sized packet ({})", packet.len());
4252        return;
4253    }
4254
4255    let addr: SocketAddr = match if_addr {
4256        if_addrs::IfAddr::V4(_) => SocketAddrV4::new(GROUP_ADDR_V4, port).into(),
4257        if_addrs::IfAddr::V6(_) => {
4258            let mut sock = SocketAddrV6::new(GROUP_ADDR_V6, port, 0, 0);
4259            sock.set_scope_id(if_index); // Choose iface for multicast
4260            sock.into()
4261        }
4262    };
4263
4264    // Sends out `packet` to `addr` on the socket.
4265    let sock_addr = addr.into();
4266    match socket.send_to(packet, &sock_addr) {
4267        Ok(sz) => trace!(
4268            "sent out {} bytes on interface {} (idx {}) addr {}",
4269            sz,
4270            if_name,
4271            if_index,
4272            if_addr.ip()
4273        ),
4274        Err(e) => trace!("Failed to send to {} via {:?}: {}", addr, &if_name, e),
4275    }
4276}
4277
4278/// Returns true if `name` is a valid instance name of format:
4279/// <instance>.<service_type>.<_udp|_tcp>.local.
4280/// Note: <instance> could contain '.' as well.
4281fn valid_instance_name(name: &str) -> bool {
4282    name.split('.').count() >= 5
4283}
4284
4285fn notify_monitors(monitors: &mut Vec<Sender<DaemonEvent>>, event: DaemonEvent) {
4286    monitors.retain(|sender| {
4287        if let Err(e) = sender.try_send(event.clone()) {
4288            debug!("notify_monitors: try_send: {}", &e);
4289            if matches!(e, TrySendError::Disconnected(_)) {
4290                return false; // This monitor is dropped.
4291            }
4292        }
4293        true
4294    });
4295}
4296
4297/// Check if all unique records passed "probing", and if yes, create a packet
4298/// to announce the service.
4299fn prepare_announce(
4300    info: &ServiceInfo,
4301    intf: &MyIntf,
4302    dns_registry: &mut DnsRegistry,
4303    is_ipv4: bool,
4304) -> Option<DnsOutgoing> {
4305    let intf_addrs = if is_ipv4 {
4306        info.get_addrs_on_my_intf_v4(intf)
4307    } else {
4308        info.get_addrs_on_my_intf_v6(intf)
4309    };
4310
4311    if intf_addrs.is_empty() {
4312        debug!(
4313            "prepare_announce (ipv4: {is_ipv4}): no valid addrs on interface {}",
4314            &intf.name
4315        );
4316        return None;
4317    }
4318
4319    // check if we changed our name due to conflicts.
4320    let service_fullname = dns_registry.resolve_name(info.get_fullname());
4321
4322    debug!(
4323        "prepare to announce service {service_fullname} on {:?}",
4324        &intf_addrs
4325    );
4326
4327    let mut probing_count = 0;
4328    let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
4329    let create_time = current_time_millis() + fastrand::u64(0..250);
4330
4331    out.add_answer_at_time(
4332        DnsPointer::new(
4333            info.get_type(),
4334            RRType::PTR,
4335            CLASS_IN,
4336            info.get_other_ttl(),
4337            service_fullname.to_string(),
4338        ),
4339        0,
4340    );
4341
4342    if let Some(sub) = info.get_subtype() {
4343        trace!("Adding subdomain {}", sub);
4344        out.add_answer_at_time(
4345            DnsPointer::new(
4346                sub,
4347                RRType::PTR,
4348                CLASS_IN,
4349                info.get_other_ttl(),
4350                service_fullname.to_string(),
4351            ),
4352            0,
4353        );
4354    }
4355
4356    // SRV records.
4357    let hostname = dns_registry.resolve_name(info.get_hostname()).to_string();
4358
4359    let mut srv = DnsSrv::new(
4360        info.get_fullname(),
4361        CLASS_IN | CLASS_CACHE_FLUSH,
4362        info.get_host_ttl(),
4363        info.get_priority(),
4364        info.get_weight(),
4365        info.get_port(),
4366        hostname,
4367    );
4368
4369    if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
4370        srv.get_record_mut().set_new_name(new_name.to_string());
4371    }
4372
4373    if !info.requires_probe()
4374        || dns_registry.is_probing_done(&srv, info.get_fullname(), create_time)
4375    {
4376        out.add_answer_at_time(srv, 0);
4377    } else {
4378        probing_count += 1;
4379    }
4380
4381    // TXT records.
4382
4383    let mut txt = DnsTxt::new(
4384        info.get_fullname(),
4385        CLASS_IN | CLASS_CACHE_FLUSH,
4386        info.get_other_ttl(),
4387        info.generate_txt(),
4388    );
4389
4390    if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
4391        txt.get_record_mut().set_new_name(new_name.to_string());
4392    }
4393
4394    if !info.requires_probe()
4395        || dns_registry.is_probing_done(&txt, info.get_fullname(), create_time)
4396    {
4397        out.add_answer_at_time(txt, 0);
4398    } else {
4399        probing_count += 1;
4400    }
4401
4402    // Address records. (A and AAAA)
4403
4404    let hostname = info.get_hostname();
4405    for address in intf_addrs {
4406        let mut dns_addr = DnsAddress::new(
4407            hostname,
4408            ip_address_rr_type(&address),
4409            CLASS_IN | CLASS_CACHE_FLUSH,
4410            info.get_host_ttl(),
4411            address,
4412            intf.into(),
4413        );
4414
4415        if let Some(new_name) = dns_registry.name_changes.get(hostname) {
4416            dns_addr.get_record_mut().set_new_name(new_name.to_string());
4417        }
4418
4419        if !info.requires_probe()
4420            || dns_registry.is_probing_done(&dns_addr, info.get_fullname(), create_time)
4421        {
4422            out.add_answer_at_time(dns_addr, 0);
4423        } else {
4424            probing_count += 1;
4425        }
4426    }
4427
4428    if probing_count > 0 {
4429        return None;
4430    }
4431
4432    Some(out)
4433}
4434
4435/// Send an unsolicited response for owned service via `intf` and `sock`.
4436/// Returns true if sent out successfully for IPv4 or IPv6.
4437fn announce_service_on_intf(
4438    dns_registry: &mut DnsRegistry,
4439    info: &ServiceInfo,
4440    intf: &MyIntf,
4441    sock: &PktInfoUdpSocket,
4442    port: u16,
4443) -> MyResult<bool> {
4444    let is_ipv4 = sock.domain() == Domain::IPV4;
4445    if let Some(out) = prepare_announce(info, intf, dns_registry, is_ipv4) {
4446        let _ = send_dns_outgoing(&out, intf, sock, port)?;
4447        return Ok(true);
4448    }
4449
4450    Ok(false)
4451}
4452
4453/// Returns a new name based on the `original` to avoid conflicts.
4454/// If the name already contains a number in parentheses, increments that number.
4455///
4456/// Examples:
4457/// - `foo.local.` becomes `foo (2).local.`
4458/// - `foo (2).local.` becomes `foo (3).local.`
4459/// - `foo (9)` becomes `foo (10)`
4460fn name_change(original: &str) -> String {
4461    let mut parts: Vec<_> = original.split('.').collect();
4462    let Some(first_part) = parts.get_mut(0) else {
4463        return format!("{original} (2)");
4464    };
4465
4466    let mut new_name = format!("{first_part} (2)");
4467
4468    // check if there is already has `(<num>)` suffix.
4469    if let Some(paren_pos) = first_part.rfind(" (") {
4470        // Check if there's a closing parenthesis
4471        if let Some(end_paren) = first_part[paren_pos..].find(')') {
4472            let absolute_end_pos = paren_pos + end_paren;
4473            // Only process if the closing parenthesis is the last character
4474            if absolute_end_pos == first_part.len() - 1 {
4475                let num_start = paren_pos + 2; // Skip " ("
4476                                               // Try to parse the number between parentheses
4477                if let Ok(number) = first_part[num_start..absolute_end_pos].parse::<u32>() {
4478                    let base_name = &first_part[..paren_pos];
4479                    new_name = format!("{} ({})", base_name, number + 1)
4480                }
4481            }
4482        }
4483    }
4484
4485    *first_part = &new_name;
4486    parts.join(".")
4487}
4488
4489/// Returns a new name based on the `original` to avoid conflicts.
4490/// If the name already contains a hyphenated number, increments that number.
4491///
4492/// Examples:
4493/// - `foo.local.` becomes `foo-2.local.`
4494/// - `foo-2.local.` becomes `foo-3.local.`
4495/// - `foo` becomes `foo-2`
4496fn hostname_change(original: &str) -> String {
4497    let mut parts: Vec<_> = original.split('.').collect();
4498    let Some(first_part) = parts.get_mut(0) else {
4499        return format!("{original}-2");
4500    };
4501
4502    let mut new_name = format!("{first_part}-2");
4503
4504    // check if there is already a `-<num>` suffix
4505    if let Some(hyphen_pos) = first_part.rfind('-') {
4506        // Try to parse everything after the hyphen as a number
4507        if let Ok(number) = first_part[hyphen_pos + 1..].parse::<u32>() {
4508            let base_name = &first_part[..hyphen_pos];
4509            new_name = format!("{}-{}", base_name, number + 1);
4510        }
4511    }
4512
4513    *first_part = &new_name;
4514    parts.join(".")
4515}
4516
4517/// Check probes in a registry and returns: a probing packet to send out, and a list of probe names
4518/// that are finished.
4519fn check_probing(
4520    dns_registry: &mut DnsRegistry,
4521    timers: &mut BinaryHeap<Reverse<u64>>,
4522    now: u64,
4523) -> (DnsOutgoing, Vec<String>) {
4524    let mut expired_probes = Vec::new();
4525    let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
4526
4527    for (name, probe) in dns_registry.probing.iter_mut() {
4528        if now >= probe.next_send {
4529            if probe.expired(now) {
4530                // move the record to active
4531                expired_probes.push(name.clone());
4532            } else {
4533                out.add_question(name, RRType::ANY);
4534
4535                /*
4536                RFC 6762 section 8.2: https://datatracker.ietf.org/doc/html/rfc6762#section-8.2
4537                ...
4538                for tiebreaking to work correctly in all
4539                cases, the Authority Section must contain *all* the records and
4540                proposed rdata being probed for uniqueness.
4541                    */
4542                for record in probe.records.iter() {
4543                    out.add_authority(record.clone());
4544                }
4545
4546                probe.update_next_send(now);
4547
4548                // add timer
4549                timers.push(Reverse(probe.next_send));
4550            }
4551        }
4552    }
4553
4554    (out, expired_probes)
4555}
4556
4557/// Process expired probes on an interface and return a list of services
4558/// that are waiting for the probe to finish.
4559///
4560/// `DnsNameChange` events are sent to the monitors.
4561fn handle_expired_probes(
4562    expired_probes: Vec<String>,
4563    intf_name: &str,
4564    dns_registry: &mut DnsRegistry,
4565    monitors: &mut Vec<Sender<DaemonEvent>>,
4566) -> HashSet<String> {
4567    let mut waiting_services = HashSet::new();
4568
4569    for name in expired_probes {
4570        let Some(probe) = dns_registry.probing.remove(&name) else {
4571            continue;
4572        };
4573
4574        // send notifications about name changes
4575        for record in probe.records.iter() {
4576            if let Some(new_name) = record.get_record().get_new_name() {
4577                dns_registry
4578                    .name_changes
4579                    .insert(name.clone(), new_name.to_string());
4580
4581                let event = DnsNameChange {
4582                    original: record.get_record().get_original_name().to_string(),
4583                    new_name: new_name.to_string(),
4584                    rr_type: record.get_type(),
4585                    intf_name: intf_name.to_string(),
4586                };
4587                debug!("Name change event: {:?}", &event);
4588                notify_monitors(monitors, DaemonEvent::NameChange(event));
4589            }
4590        }
4591
4592        // move RR from probe to active.
4593        debug!(
4594            "probe of '{name}' finished: move {} records to active. ({} waiting services)",
4595            probe.records.len(),
4596            probe.waiting_services.len(),
4597        );
4598
4599        // Move records to active and plan to wake up services if records are not empty.
4600        if !probe.records.is_empty() {
4601            match dns_registry.active.get_mut(&name) {
4602                Some(records) => {
4603                    records.extend(probe.records);
4604                }
4605                None => {
4606                    dns_registry.active.insert(name, probe.records);
4607                }
4608            }
4609
4610            waiting_services.extend(probe.waiting_services);
4611        }
4612    }
4613
4614    waiting_services
4615}
4616
4617/// Resolves `IfKind::Addr(ip)` to `IndexV4(if_index)` or `IndexV6(if_index)`.
4618fn resolve_addr_to_index(if_kind: IfKind, interfaces: &[Interface]) -> IfKind {
4619    if let IfKind::Addr(addr) = &if_kind {
4620        if let Some(intf) = interfaces.iter().find(|intf| &intf.ip() == addr) {
4621            let if_index = intf.index.unwrap_or(0);
4622            return if addr.is_ipv4() {
4623                IfKind::IndexV4(if_index)
4624            } else {
4625                IfKind::IndexV6(if_index)
4626            };
4627        }
4628    }
4629    if_kind
4630}
4631
4632#[cfg(test)]
4633mod tests {
4634    use super::{
4635        _new_socket_bind, check_domain_suffix, check_service_name_length, hostname_change,
4636        my_ip_interfaces, name_change, send_dns_outgoing_impl, valid_instance_name,
4637        HostnameResolutionEvent, ServiceDaemon, ServiceEvent, ServiceInfo, MDNS_PORT,
4638    };
4639    use crate::{
4640        dns_parser::{
4641            DnsIncoming, DnsOutgoing, DnsPointer, InterfaceId, RRType, ScopedIp, CLASS_IN,
4642            FLAGS_AA, FLAGS_QR_RESPONSE,
4643        },
4644        service_daemon::{add_answer_of_service, check_hostname},
4645    };
4646    use std::time::{Duration, SystemTime};
4647    use test_log::test;
4648
4649    #[test]
4650    fn test_instance_name() {
4651        assert!(valid_instance_name("my-laser._printer._tcp.local."));
4652        assert!(valid_instance_name("my-laser.._printer._tcp.local."));
4653        assert!(!valid_instance_name("_printer._tcp.local."));
4654    }
4655
4656    #[test]
4657    fn test_check_service_name_length() {
4658        let result = check_service_name_length("_tcp", 100);
4659        assert!(result.is_err());
4660        if let Err(e) = result {
4661            println!("{}", e);
4662        }
4663    }
4664
4665    #[test]
4666    fn test_check_hostname() {
4667        // valid hostnames
4668        for hostname in &[
4669            "my_host.local.",
4670            &("A".repeat(255 - ".local.".len()) + ".local."),
4671        ] {
4672            let result = check_hostname(hostname);
4673            assert!(result.is_ok());
4674        }
4675
4676        // erroneous hostnames
4677        for hostname in &[
4678            "my_host.local",
4679            ".local.",
4680            &("A".repeat(256 - ".local.".len()) + ".local."),
4681        ] {
4682            let result = check_hostname(hostname);
4683            assert!(result.is_err());
4684            if let Err(e) = result {
4685                println!("{}", e);
4686            }
4687        }
4688    }
4689
4690    #[test]
4691    fn test_check_domain_suffix() {
4692        assert!(check_domain_suffix("_missing_dot._tcp.local").is_err());
4693        assert!(check_domain_suffix("_missing_bar.tcp.local.").is_err());
4694        assert!(check_domain_suffix("_mis_spell._tpp.local.").is_err());
4695        assert!(check_domain_suffix("_mis_spell._upp.local.").is_err());
4696        assert!(check_domain_suffix("_has_dot._tcp.local.").is_ok());
4697        assert!(check_domain_suffix("_goodname._udp.local.").is_ok());
4698    }
4699
4700    #[test]
4701    fn test_service_with_temporarily_invalidated_ptr() {
4702        // Create a daemon
4703        let d = ServiceDaemon::new().expect("Failed to create daemon");
4704
4705        let service = "_test_inval_ptr._udp.local.";
4706        let host_name = "my_host_tmp_invalidated_ptr.local.";
4707        let intfs: Vec<_> = my_ip_interfaces(false);
4708        let intf_ips: Vec<_> = intfs.iter().map(|intf| intf.ip()).collect();
4709        let port = 5201;
4710        let my_service =
4711            ServiceInfo::new(service, "my_instance", host_name, &intf_ips[..], port, None)
4712                .expect("invalid service info")
4713                .enable_addr_auto();
4714        let result = d.register(my_service.clone());
4715        assert!(result.is_ok());
4716
4717        // Browse for a service
4718        let browse_chan = d.browse(service).unwrap();
4719        let timeout = Duration::from_secs(2);
4720        let mut resolved = false;
4721
4722        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4723            match event {
4724                ServiceEvent::ServiceResolved(info) => {
4725                    resolved = true;
4726                    println!("Resolved a service of {}", &info.fullname);
4727                    break;
4728                }
4729                e => {
4730                    println!("Received event {:?}", e);
4731                }
4732            }
4733        }
4734
4735        assert!(resolved);
4736
4737        println!("Stopping browse of {}", service);
4738        // Pause browsing so restarting will cause a new immediate query.
4739        // Unregistering will not work here, it will invalidate all the records.
4740        d.stop_browse(service).unwrap();
4741
4742        // Ensure the search is stopped.
4743        // Reduces the chance of receiving an answer adding the ptr back to the
4744        // cache causing the later browse to return directly from the cache.
4745        // (which invalidates what this test is trying to test for.)
4746        let mut stopped = false;
4747        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4748            match event {
4749                ServiceEvent::SearchStopped(_) => {
4750                    stopped = true;
4751                    println!("Stopped browsing service");
4752                    break;
4753                }
4754                // Other `ServiceResolved` messages may be received
4755                // here as they come from different interfaces.
4756                // That's fine for this test.
4757                e => {
4758                    println!("Received event {:?}", e);
4759                }
4760            }
4761        }
4762
4763        assert!(stopped);
4764
4765        // Invalidate the ptr from the service to the host.
4766        let invalidate_ptr_packet = DnsPointer::new(
4767            my_service.get_type(),
4768            RRType::PTR,
4769            CLASS_IN,
4770            0,
4771            my_service.get_fullname().to_string(),
4772        );
4773
4774        let mut packet_buffer = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
4775        packet_buffer.add_additional_answer(invalidate_ptr_packet);
4776
4777        for intf in intfs {
4778            let sock = _new_socket_bind(&intf, true).unwrap();
4779            send_dns_outgoing_impl(
4780                &packet_buffer,
4781                &intf.name,
4782                intf.index.unwrap_or(0),
4783                &intf.addr,
4784                &sock.pktinfo,
4785                MDNS_PORT,
4786            )
4787            .unwrap();
4788        }
4789
4790        println!(
4791            "Sent PTR record invalidation. Starting second browse for {}",
4792            service
4793        );
4794
4795        // Restart the browse to force the sender to re-send the announcements.
4796        let browse_chan = d.browse(service).unwrap();
4797
4798        resolved = false;
4799        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4800            match event {
4801                ServiceEvent::ServiceResolved(info) => {
4802                    resolved = true;
4803                    println!("Resolved a service of {}", &info.fullname);
4804                    break;
4805                }
4806                e => {
4807                    println!("Received event {:?}", e);
4808                }
4809            }
4810        }
4811
4812        assert!(resolved);
4813        d.shutdown().unwrap();
4814    }
4815
4816    #[test]
4817    fn test_expired_srv() {
4818        // construct service info
4819        let service_type = "_expired-srv._udp.local.";
4820        let instance = "test_instance";
4821        let host_name = "expired_srv_host.local.";
4822        let mut my_service = ServiceInfo::new(service_type, instance, host_name, "", 5023, None)
4823            .unwrap()
4824            .enable_addr_auto();
4825        // let fullname = my_service.get_fullname().to_string();
4826
4827        // set SRV to expire soon.
4828        let new_ttl = 3; // for testing only.
4829        my_service._set_host_ttl(new_ttl);
4830
4831        // register my service
4832        let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
4833        let result = mdns_server.register(my_service);
4834        assert!(result.is_ok());
4835
4836        let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
4837        let browse_chan = mdns_client.browse(service_type).unwrap();
4838        let timeout = Duration::from_secs(2);
4839        let mut resolved = false;
4840
4841        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4842            if let ServiceEvent::ServiceResolved(info) = event {
4843                resolved = true;
4844                println!("Resolved a service of {}", &info.fullname);
4845                break;
4846            }
4847        }
4848
4849        assert!(resolved);
4850
4851        // Exit the server so that no more responses.
4852        mdns_server.shutdown().unwrap();
4853
4854        // SRV record in the client cache will expire.
4855        let expire_timeout = Duration::from_secs(new_ttl as u64);
4856        while let Ok(event) = browse_chan.recv_timeout(expire_timeout) {
4857            if let ServiceEvent::ServiceRemoved(service_type, full_name) = event {
4858                println!("Service removed: {}: {}", &service_type, &full_name);
4859                break;
4860            }
4861        }
4862    }
4863
4864    #[test]
4865    fn test_hostname_resolution_address_removed() {
4866        // Create a mDNS server
4867        let server = ServiceDaemon::new().expect("Failed to create server");
4868        let hostname = "addr_remove_host._tcp.local.";
4869        let service_ip_addr: ScopedIp = my_ip_interfaces(false)
4870            .iter()
4871            .find(|iface| iface.ip().is_ipv4())
4872            .map(|iface| iface.into())
4873            .unwrap();
4874
4875        let mut my_service = ServiceInfo::new(
4876            "_host_res_test._tcp.local.",
4877            "my_instance",
4878            hostname,
4879            service_ip_addr.to_ip_addr(),
4880            1234,
4881            None,
4882        )
4883        .expect("invalid service info");
4884
4885        // Set a short TTL for addresses for testing.
4886        let addr_ttl = 2;
4887        my_service._set_host_ttl(addr_ttl); // Expire soon
4888
4889        server.register(my_service).unwrap();
4890
4891        // Create a mDNS client for resolving the hostname.
4892        let client = ServiceDaemon::new().expect("Failed to create client");
4893        let event_receiver = client.resolve_hostname(hostname, None).unwrap();
4894        let resolved = loop {
4895            match event_receiver.recv() {
4896                Ok(HostnameResolutionEvent::AddressesFound(found_hostname, addresses)) => {
4897                    assert!(found_hostname == hostname);
4898                    assert!(addresses.contains(&service_ip_addr));
4899                    println!("address found: {:?}", &addresses);
4900                    break true;
4901                }
4902                Ok(HostnameResolutionEvent::SearchStopped(_)) => break false,
4903                Ok(_event) => {}
4904                Err(_) => break false,
4905            }
4906        };
4907
4908        assert!(resolved);
4909
4910        // Shutdown the server so no more responses / refreshes for addresses.
4911        server.shutdown().unwrap();
4912
4913        // Wait till hostname address record expires, with 1 second grace period.
4914        let timeout = Duration::from_secs(addr_ttl as u64 + 1);
4915        let removed = loop {
4916            match event_receiver.recv_timeout(timeout) {
4917                Ok(HostnameResolutionEvent::AddressesRemoved(removed_host, addresses)) => {
4918                    assert!(removed_host == hostname);
4919                    assert!(addresses.contains(&service_ip_addr));
4920
4921                    println!(
4922                        "address removed: hostname: {} addresses: {:?}",
4923                        &hostname, &addresses
4924                    );
4925                    break true;
4926                }
4927                Ok(_event) => {}
4928                Err(_) => {
4929                    break false;
4930                }
4931            }
4932        };
4933
4934        assert!(removed);
4935
4936        client.shutdown().unwrap();
4937    }
4938
4939    #[test]
4940    fn test_refresh_ptr() {
4941        // construct service info
4942        let service_type = "_refresh-ptr._udp.local.";
4943        let instance = "test_instance";
4944        let host_name = "refresh_ptr_host.local.";
4945        let service_ip_addr = my_ip_interfaces(false)
4946            .iter()
4947            .find(|iface| iface.ip().is_ipv4())
4948            .map(|iface| iface.ip())
4949            .unwrap();
4950
4951        let mut my_service = ServiceInfo::new(
4952            service_type,
4953            instance,
4954            host_name,
4955            service_ip_addr,
4956            5023,
4957            None,
4958        )
4959        .unwrap();
4960
4961        let new_ttl = 3; // for testing only.
4962        my_service._set_other_ttl(new_ttl);
4963
4964        // register my service
4965        let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
4966        let result = mdns_server.register(my_service);
4967        assert!(result.is_ok());
4968
4969        let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
4970        let browse_chan = mdns_client.browse(service_type).unwrap();
4971        let timeout = Duration::from_millis(1500); // Give at least 1 second for the service probing.
4972        let mut resolved = false;
4973
4974        // resolve the service first.
4975        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4976            if let ServiceEvent::ServiceResolved(info) = event {
4977                resolved = true;
4978                println!("Resolved a service of {}", &info.fullname);
4979                break;
4980            }
4981        }
4982
4983        assert!(resolved);
4984
4985        // wait over 80% of TTL, and refresh PTR should be sent out.
4986        let timeout = Duration::from_millis(new_ttl as u64 * 1000 * 90 / 100);
4987        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4988            println!("event: {:?}", &event);
4989        }
4990
4991        // verify refresh counter.
4992        let metrics_chan = mdns_client.get_metrics().unwrap();
4993        let metrics = metrics_chan.recv_timeout(timeout).unwrap();
4994        let ptr_refresh_counter = metrics["cache-refresh-ptr"];
4995        assert_eq!(ptr_refresh_counter, 1);
4996        let srvtxt_refresh_counter = metrics["cache-refresh-srv-txt"];
4997        assert_eq!(srvtxt_refresh_counter, 1);
4998
4999        // Exit the server so that no more responses.
5000        mdns_server.shutdown().unwrap();
5001        mdns_client.shutdown().unwrap();
5002    }
5003
5004    #[test]
5005    fn test_name_change() {
5006        assert_eq!(name_change("foo.local."), "foo (2).local.");
5007        assert_eq!(name_change("foo (2).local."), "foo (3).local.");
5008        assert_eq!(name_change("foo (9).local."), "foo (10).local.");
5009        assert_eq!(name_change("foo"), "foo (2)");
5010        assert_eq!(name_change("foo (2)"), "foo (3)");
5011        assert_eq!(name_change(""), " (2)");
5012
5013        // Additional edge cases
5014        assert_eq!(name_change("foo (abc)"), "foo (abc) (2)"); // Invalid number
5015        assert_eq!(name_change("foo (2"), "foo (2 (2)"); // Missing closing parenthesis
5016        assert_eq!(name_change("foo (2) extra"), "foo (2) extra (2)"); // Extra text after number
5017    }
5018
5019    #[test]
5020    fn test_hostname_change() {
5021        assert_eq!(hostname_change("foo.local."), "foo-2.local.");
5022        assert_eq!(hostname_change("foo"), "foo-2");
5023        assert_eq!(hostname_change("foo-2.local."), "foo-3.local.");
5024        assert_eq!(hostname_change("foo-9"), "foo-10");
5025        assert_eq!(hostname_change("test-42.domain."), "test-43.domain.");
5026    }
5027
5028    #[test]
5029    fn test_add_answer_txt_ttl() {
5030        // construct a simple service info
5031        let service_type = "_test_add_answer._udp.local.";
5032        let instance = "test_instance";
5033        let host_name = "add_answer_host.local.";
5034        let service_intf = my_ip_interfaces(false)
5035            .into_iter()
5036            .find(|iface| iface.ip().is_ipv4())
5037            .unwrap();
5038        let service_ip_addr = service_intf.ip();
5039        let my_service = ServiceInfo::new(
5040            service_type,
5041            instance,
5042            host_name,
5043            service_ip_addr,
5044            5023,
5045            None,
5046        )
5047        .unwrap();
5048
5049        // construct a DnsOutgoing message
5050        let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
5051
5052        // Construct a dummy DnsIncoming message
5053        let mut dummy_data = out.to_data_on_wire();
5054        let interface_id = InterfaceId::from(&service_intf);
5055        let incoming = DnsIncoming::new(dummy_data.pop().unwrap(), interface_id).unwrap();
5056
5057        // Add an answer of TXT type for the service.
5058        let if_addrs = vec![service_intf.ip()];
5059        add_answer_of_service(
5060            &mut out,
5061            &incoming,
5062            instance,
5063            &my_service,
5064            RRType::TXT,
5065            if_addrs,
5066        );
5067
5068        // Check if the answer was added correctly
5069        assert!(
5070            out.answers_count() > 0,
5071            "No answers added to the outgoing message"
5072        );
5073
5074        // Check if the first answer is of type TXT
5075        let answer = out._answers().first().unwrap();
5076        assert_eq!(answer.0.get_type(), RRType::TXT);
5077
5078        // Check TTL is set properly for the TXT record
5079        assert_eq!(answer.0.get_record().get_ttl(), my_service.get_other_ttl());
5080    }
5081
5082    #[test]
5083    fn test_interface_flip() {
5084        // start a server
5085        let ty_domain = "_intf-flip._udp.local.";
5086        let host_name = "intf_flip.local.";
5087        let now = SystemTime::now()
5088            .duration_since(SystemTime::UNIX_EPOCH)
5089            .unwrap();
5090        let instance_name = now.as_micros().to_string(); // Create a unique name.
5091        let port = 5200;
5092
5093        // Get a single IPv4 address
5094        let (ip_addr1, intf_name) = my_ip_interfaces(false)
5095            .iter()
5096            .find(|iface| iface.ip().is_ipv4())
5097            .map(|iface| (iface.ip(), iface.name.clone()))
5098            .unwrap();
5099
5100        println!("Using interface {} with IP {}", intf_name, ip_addr1);
5101
5102        // Register the service.
5103        let service1 = ServiceInfo::new(ty_domain, &instance_name, host_name, ip_addr1, port, None)
5104            .expect("valid service info");
5105        let server1 = ServiceDaemon::new().expect("failed to start server");
5106        server1
5107            .register(service1)
5108            .expect("Failed to register service1");
5109
5110        // wait for the service announced.
5111        std::thread::sleep(Duration::from_secs(2));
5112
5113        // start a client
5114        let client = ServiceDaemon::new().expect("failed to start client");
5115
5116        let receiver = client.browse(ty_domain).unwrap();
5117
5118        let timeout = Duration::from_secs(3);
5119        let mut got_data = false;
5120
5121        while let Ok(event) = receiver.recv_timeout(timeout) {
5122            if let ServiceEvent::ServiceResolved(_) = event {
5123                println!("Received ServiceResolved event");
5124                got_data = true;
5125                break;
5126            }
5127        }
5128
5129        assert!(got_data, "Should receive ServiceResolved event");
5130
5131        // Set a short IP check interval to detect interface changes quickly.
5132        client.set_ip_check_interval(1).unwrap();
5133
5134        // Now shutdown the interface and expect the client to lose the service.
5135        println!("Shutting down interface {}", &intf_name);
5136        client.test_down_interface(&intf_name).unwrap();
5137
5138        let mut got_removed = false;
5139
5140        while let Ok(event) = receiver.recv_timeout(timeout) {
5141            if let ServiceEvent::ServiceRemoved(ty_domain, instance) = event {
5142                got_removed = true;
5143                println!("removed: {ty_domain} : {instance}");
5144                break;
5145            }
5146        }
5147        assert!(got_removed, "Should receive ServiceRemoved event");
5148
5149        println!("Bringing up interface {}", &intf_name);
5150        client.test_up_interface(&intf_name).unwrap();
5151        let mut got_data = false;
5152        while let Ok(event) = receiver.recv_timeout(timeout) {
5153            if let ServiceEvent::ServiceResolved(resolved) = event {
5154                got_data = true;
5155                println!("Received ServiceResolved: {:?}", resolved);
5156                break;
5157            }
5158        }
5159        assert!(
5160            got_data,
5161            "Should receive ServiceResolved event after interface is back up"
5162        );
5163
5164        server1.shutdown().unwrap();
5165        client.shutdown().unwrap();
5166    }
5167
5168    #[test]
5169    fn test_cache_only() {
5170        // construct service info
5171        let service_type = "_cache_only._udp.local.";
5172        let instance = "test_instance";
5173        let host_name = "cache_only_host.local.";
5174        let service_ip_addr = my_ip_interfaces(false)
5175            .iter()
5176            .find(|iface| iface.ip().is_ipv4())
5177            .map(|iface| iface.ip())
5178            .unwrap();
5179
5180        let mut my_service = ServiceInfo::new(
5181            service_type,
5182            instance,
5183            host_name,
5184            service_ip_addr,
5185            5023,
5186            None,
5187        )
5188        .unwrap();
5189
5190        let new_ttl = 3; // for testing only.
5191        my_service._set_other_ttl(new_ttl);
5192
5193        let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
5194
5195        // make a single browse request to record that we are interested in the service.  This ensures that
5196        // subsequent announcements are cached.
5197        let browse_chan = mdns_client.browse_cache(service_type).unwrap();
5198        std::thread::sleep(Duration::from_secs(2));
5199
5200        // register my service
5201        let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
5202        let result = mdns_server.register(my_service);
5203        assert!(result.is_ok());
5204
5205        let timeout = Duration::from_millis(1500); // Give at least 1 second for the service probing.
5206        let mut resolved = false;
5207
5208        // resolve the service.
5209        while let Ok(event) = browse_chan.recv_timeout(timeout) {
5210            if let ServiceEvent::ServiceResolved(info) = event {
5211                resolved = true;
5212                println!("Resolved a service of {}", &info.get_fullname());
5213                break;
5214            }
5215        }
5216
5217        assert!(resolved);
5218
5219        // Exit the server so that no more responses.
5220        mdns_server.shutdown().unwrap();
5221        mdns_client.shutdown().unwrap();
5222    }
5223
5224    #[test]
5225    fn test_cache_only_unsolicited() {
5226        let service_type = "_c_unsolicit._udp.local.";
5227        let instance = "test_instance";
5228        let host_name = "c_unsolicit_host.local.";
5229        let service_ip_addr = my_ip_interfaces(false)
5230            .iter()
5231            .find(|iface| iface.ip().is_ipv4())
5232            .map(|iface| iface.ip())
5233            .unwrap();
5234
5235        let my_service = ServiceInfo::new(
5236            service_type,
5237            instance,
5238            host_name,
5239            service_ip_addr,
5240            5023,
5241            None,
5242        )
5243        .unwrap();
5244
5245        // register my service
5246        let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
5247        let result = mdns_server.register(my_service);
5248        assert!(result.is_ok());
5249
5250        let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
5251        mdns_client.accept_unsolicited(true).unwrap();
5252
5253        // Wait a bit for the service announcements to go out, before calling browse_cache.  This ensures
5254        // that the announcements are treated as unsolicited
5255        std::thread::sleep(Duration::from_secs(2));
5256        let browse_chan = mdns_client.browse_cache(service_type).unwrap();
5257        let timeout = Duration::from_millis(1500); // Give at least 1 second for the service probing.
5258        let mut resolved = false;
5259
5260        // resolve the service.
5261        while let Ok(event) = browse_chan.recv_timeout(timeout) {
5262            if let ServiceEvent::ServiceResolved(info) = event {
5263                resolved = true;
5264                println!("Resolved a service of {}", &info.get_fullname());
5265                break;
5266            }
5267        }
5268
5269        assert!(resolved);
5270
5271        // Exit the server so that no more responses.
5272        mdns_server.shutdown().unwrap();
5273        mdns_client.shutdown().unwrap();
5274    }
5275
5276    #[test]
5277    fn test_custom_port_isolation() {
5278        // This test verifies:
5279        // 1. Daemons on a custom port can communicate with each other
5280        // 2. Daemons on different ports are isolated (no cross-talk)
5281
5282        let service_type = "_custom_port._udp.local.";
5283        let instance_custom = "custom_port_instance";
5284        let instance_default = "default_port_instance";
5285        let host_name = "custom_port_host.local.";
5286
5287        let service_ip_addr = my_ip_interfaces(false)
5288            .iter()
5289            .find(|iface| iface.ip().is_ipv4())
5290            .map(|iface| iface.ip())
5291            .expect("Test requires an IPv4 interface");
5292
5293        // Create service info for custom port (5454)
5294        let service_custom = ServiceInfo::new(
5295            service_type,
5296            instance_custom,
5297            host_name,
5298            service_ip_addr,
5299            8080,
5300            None,
5301        )
5302        .unwrap();
5303
5304        // Create service info for default port (5353)
5305        let service_default = ServiceInfo::new(
5306            service_type,
5307            instance_default,
5308            host_name,
5309            service_ip_addr,
5310            8081,
5311            None,
5312        )
5313        .unwrap();
5314
5315        // Create two daemons on custom port 5454
5316        let custom_port = 5454u16;
5317        let server_custom =
5318            ServiceDaemon::new_with_port(custom_port).expect("Failed to create custom port server");
5319        let client_custom =
5320            ServiceDaemon::new_with_port(custom_port).expect("Failed to create custom port client");
5321
5322        // Create daemon on default port (5353)
5323        let server_default = ServiceDaemon::new().expect("Failed to create default port server");
5324
5325        // Register service on custom port
5326        server_custom
5327            .register(service_custom.clone())
5328            .expect("Failed to register custom port service");
5329
5330        // Register service on default port
5331        server_default
5332            .register(service_default.clone())
5333            .expect("Failed to register default port service");
5334
5335        // Browse from custom port client
5336        let browse_custom = client_custom
5337            .browse(service_type)
5338            .expect("Failed to browse on custom port");
5339
5340        let timeout = Duration::from_secs(3);
5341        let mut found_custom = false;
5342        let mut found_default_on_custom = false;
5343
5344        // Custom port client should find the custom port service
5345        while let Ok(event) = browse_custom.recv_timeout(timeout) {
5346            if let ServiceEvent::ServiceResolved(info) = event {
5347                println!(
5348                    "Custom port client resolved: {} on port {}",
5349                    info.get_fullname(),
5350                    info.get_port()
5351                );
5352                if info.get_fullname().starts_with(instance_custom) {
5353                    found_custom = true;
5354                    assert_eq!(info.get_port(), 8080);
5355                }
5356                if info.get_fullname().starts_with(instance_default) {
5357                    found_default_on_custom = true;
5358                }
5359            }
5360        }
5361
5362        assert!(
5363            found_custom,
5364            "Custom port client should find service on custom port"
5365        );
5366        assert!(
5367            !found_default_on_custom,
5368            "Custom port client should NOT find service on default port"
5369        );
5370
5371        // Now verify the default port daemon can find its own services
5372        // but not the custom port services
5373        let client_default = ServiceDaemon::new().expect("Failed to create default port client");
5374        let browse_default = client_default
5375            .browse(service_type)
5376            .expect("Failed to browse on default port");
5377
5378        let mut found_default = false;
5379        let mut found_custom_on_default = false;
5380
5381        while let Ok(event) = browse_default.recv_timeout(timeout) {
5382            if let ServiceEvent::ServiceResolved(info) = event {
5383                println!(
5384                    "Default port client resolved: {} on port {}",
5385                    info.get_fullname(),
5386                    info.get_port()
5387                );
5388                if info.get_fullname().starts_with(instance_default) {
5389                    found_default = true;
5390                    assert_eq!(info.get_port(), 8081);
5391                }
5392                if info.get_fullname().starts_with(instance_custom) {
5393                    found_custom_on_default = true;
5394                }
5395            }
5396        }
5397
5398        assert!(
5399            found_default,
5400            "Default port client should find service on default port"
5401        );
5402        assert!(
5403            !found_custom_on_default,
5404            "Default port client should NOT find service on custom port"
5405        );
5406
5407        // Cleanup
5408        server_custom.shutdown().unwrap();
5409        client_custom.shutdown().unwrap();
5410        server_default.shutdown().unwrap();
5411        client_default.shutdown().unwrap();
5412    }
5413}