Skip to main content

mdns_sd/
service_daemon.rs

1//! Service daemon for mDNS Service Discovery.
2
3// How DNS-based Service Discovery works in a nutshell:
4//
5// (excerpt from RFC 6763)
6// .... that a particular service instance can be
7//    described using a DNS SRV [RFC2782] and DNS TXT [RFC1035] record.
8//    The SRV record has a name of the form "<Instance>.<Service>.<Domain>"
9//    and gives the target host and port where the service instance can be
10//    reached.  The DNS TXT record of the same name gives additional
11//    information about this instance, in a structured form using key/value
12//    pairs, described in Section 6.  A client discovers the list of
13//    available instances of a given service type using a query for a DNS
14//    PTR [RFC1035] record with a name of the form "<Service>.<Domain>",
15//    which returns a set of zero or more names, which are the names of the
16//    aforementioned DNS SRV/TXT record pairs.
17//
18// Some naming conventions in this source code:
19//
20// `ty_domain` refers to service type together with domain name, i.e. <service>.<domain>.
21// Every <service> consists of two labels: service itself and "_udp." or "_tcp".
22// See RFC 6763 section 7 Service Names.
23//     for example: `_my-service._udp.local.`
24//
25// `fullname` refers to a full Service Instance Name, i.e. <instance>.<service>.<domain>
26//     for example: `my_home._my-service._udp.local.`
27//
28// In mDNS and DNS, the basic data structure is "Resource Record" (RR), where
29// in Service Discovery, the basic data structure is "Service Info". One Service Info
30// corresponds to a set of DNS Resource Records.
31#[cfg(feature = "logging")]
32use crate::log::{debug, error, trace};
33use crate::{
34    current_time_millis,
35    dns_cache::{DnsCache, IpType},
36    dns_parser::{
37        ip_address_rr_type, DnsAddress, DnsEntryExt, DnsIncoming, DnsOutgoing, DnsPointer,
38        DnsRecordBox, DnsRecordExt, DnsSrv, DnsTxt, InterfaceId, RRType, ScopedIp,
39        CLASS_CACHE_FLUSH, CLASS_IN, FLAGS_AA, FLAGS_QR_QUERY, FLAGS_QR_RESPONSE, MAX_MSG_ABSOLUTE,
40    },
41    error::{e_fmt, Error, Result},
42    service_info::{valid_ip_on_intf, DnsRegistry, MyIntf, Probe, ServiceInfo, ServiceStatus},
43    Receiver, ResolvedService, TxtProperties,
44};
45use flume::{bounded, Sender, TrySendError};
46use if_addrs::{IfAddr, Interface};
47use mio::{event::Source, net::UdpSocket as MioUdpSocket, Interest, Poll, Registry, Token};
48use socket2::Domain;
49use socket_pktinfo::PktInfoUdpSocket;
50use std::{
51    cmp::{self, Reverse},
52    collections::{hash_map::Entry, BinaryHeap, HashMap, HashSet},
53    fmt, io,
54    net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, UdpSocket},
55    str, thread,
56    time::Duration,
57    vec,
58};
59
60/// The default max length of the service name without domain, not including the
61/// leading underscore (`_`). It is set to 15 per
62/// [RFC 6763 section 7.2](https://www.rfc-editor.org/rfc/rfc6763#section-7.2).
63pub const SERVICE_NAME_LEN_MAX_DEFAULT: u8 = 15;
64
65/// The default interval for checking IP changes automatically.
66pub const IP_CHECK_INTERVAL_IN_SECS_DEFAULT: u32 = 5;
67
68/// The default time out for [ServiceDaemon::verify] is 10 seconds, per
69/// [RFC 6762 section 10.4](https://datatracker.ietf.org/doc/html/rfc6762#section-10.4)
70pub const VERIFY_TIMEOUT_DEFAULT: Duration = Duration::from_secs(10);
71
72/// The mDNS port number per RFC 6762.
73pub const MDNS_PORT: u16 = 5353;
74
75const GROUP_ADDR_V4: Ipv4Addr = Ipv4Addr::new(224, 0, 0, 251);
76const GROUP_ADDR_V6: Ipv6Addr = Ipv6Addr::new(0xff02, 0, 0, 0, 0, 0, 0, 0xfb);
77const LOOPBACK_V4: Ipv4Addr = Ipv4Addr::new(127, 0, 0, 1);
78
79const RESOLVE_WAIT_IN_MILLIS: u64 = 500;
80
81/// Response status code for the service `unregister` call.
82#[derive(Debug)]
83pub enum UnregisterStatus {
84    /// Unregister was successful.
85    OK,
86    /// The service was not found in the registration.
87    NotFound,
88}
89
90/// Status code for the service daemon.
91#[derive(Debug, PartialEq, Clone, Eq)]
92#[non_exhaustive]
93pub enum DaemonStatus {
94    /// The daemon is running as normal.
95    Running,
96
97    /// The daemon has been shutdown.
98    Shutdown,
99}
100
101/// Different counters included in the metrics.
102/// Currently all counters are for outgoing packets.
103#[derive(Hash, Eq, PartialEq)]
104enum Counter {
105    Register,
106    RegisterResend,
107    Unregister,
108    UnregisterResend,
109    Browse,
110    ResolveHostname,
111    Respond,
112    CacheRefreshPTR,
113    CacheRefreshSrvTxt,
114    CacheRefreshAddr,
115    KnownAnswerSuppression,
116    CachedPTR,
117    CachedSRV,
118    CachedAddr,
119    CachedTxt,
120    CachedNSec,
121    CachedSubtype,
122    DnsRegistryProbe,
123    DnsRegistryActive,
124    DnsRegistryTimer,
125    DnsRegistryNameChange,
126    Timer,
127}
128
129impl fmt::Display for Counter {
130    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
131        match self {
132            Self::Register => write!(f, "register"),
133            Self::RegisterResend => write!(f, "register-resend"),
134            Self::Unregister => write!(f, "unregister"),
135            Self::UnregisterResend => write!(f, "unregister-resend"),
136            Self::Browse => write!(f, "browse"),
137            Self::ResolveHostname => write!(f, "resolve-hostname"),
138            Self::Respond => write!(f, "respond"),
139            Self::CacheRefreshPTR => write!(f, "cache-refresh-ptr"),
140            Self::CacheRefreshSrvTxt => write!(f, "cache-refresh-srv-txt"),
141            Self::CacheRefreshAddr => write!(f, "cache-refresh-addr"),
142            Self::KnownAnswerSuppression => write!(f, "known-answer-suppression"),
143            Self::CachedPTR => write!(f, "cached-ptr"),
144            Self::CachedSRV => write!(f, "cached-srv"),
145            Self::CachedAddr => write!(f, "cached-addr"),
146            Self::CachedTxt => write!(f, "cached-txt"),
147            Self::CachedNSec => write!(f, "cached-nsec"),
148            Self::CachedSubtype => write!(f, "cached-subtype"),
149            Self::DnsRegistryProbe => write!(f, "dns-registry-probe"),
150            Self::DnsRegistryActive => write!(f, "dns-registry-active"),
151            Self::DnsRegistryTimer => write!(f, "dns-registry-timer"),
152            Self::DnsRegistryNameChange => write!(f, "dns-registry-name-change"),
153            Self::Timer => write!(f, "timer"),
154        }
155    }
156}
157
158#[derive(Debug)]
159enum InternalError {
160    IntfAddrInvalid(Interface),
161}
162
163impl fmt::Display for InternalError {
164    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
165        match self {
166            InternalError::IntfAddrInvalid(iface) => write!(f, "interface addr invalid: {iface:?}"),
167        }
168    }
169}
170
171type MyResult<T> = core::result::Result<T, InternalError>;
172
173/// A wrapper around UDP socket used by the mDNS daemon.
174///
175/// We do this because `mio` does not support PKTINFO and
176/// does not provide a way to implement `Source` trait directly and safely.
177struct MyUdpSocket {
178    /// The underlying socket that supports control messages like
179    /// `IP_PKTINFO` for IPv4 and `IPV6_PKTINFO` for IPv6.
180    pktinfo: PktInfoUdpSocket,
181
182    /// The mio UDP socket that is a clone of `pktinfo` and
183    /// is used for event polling.
184    mio: MioUdpSocket,
185}
186
187impl MyUdpSocket {
188    pub fn new(pktinfo: PktInfoUdpSocket) -> io::Result<Self> {
189        let std_sock = pktinfo.try_clone_std()?;
190        let mio = MioUdpSocket::from_std(std_sock);
191
192        Ok(Self { pktinfo, mio })
193    }
194}
195
196/// Implements the mio `Source` trait so that we can use `MyUdpSocket` with `Poll`.
197impl Source for MyUdpSocket {
198    fn register(
199        &mut self,
200        registry: &Registry,
201        token: Token,
202        interests: Interest,
203    ) -> io::Result<()> {
204        self.mio.register(registry, token, interests)
205    }
206
207    fn reregister(
208        &mut self,
209        registry: &Registry,
210        token: Token,
211        interests: Interest,
212    ) -> io::Result<()> {
213        self.mio.reregister(registry, token, interests)
214    }
215
216    fn deregister(&mut self, registry: &Registry) -> std::io::Result<()> {
217        self.mio.deregister(registry)
218    }
219}
220
221/// The metrics is a HashMap of (name_key, i64_value).
222/// The main purpose is to help monitoring the mDNS packet traffic.
223pub type Metrics = HashMap<String, i64>;
224
225const IPV4_SOCK_EVENT_KEY: usize = 4; // Pick a key just to indicate IPv4.
226const IPV6_SOCK_EVENT_KEY: usize = 6; // Pick a key just to indicate IPv6.
227const SIGNAL_SOCK_EVENT_KEY: usize = usize::MAX - 1; // avoid to overlap with zc.poll_ids
228
229/// A daemon thread for mDNS
230///
231/// This struct provides a handle and an API to the daemon. It is cloneable.
232#[derive(Clone)]
233pub struct ServiceDaemon {
234    /// Sender handle of the channel to the daemon.
235    sender: Sender<Command>,
236
237    /// Send to this addr to signal that a `Command` is coming.
238    ///
239    /// The daemon listens on this addr together with other mDNS sockets,
240    /// to avoid busy polling the flume channel. If there is a way to poll
241    /// the channel and mDNS sockets together, then this can be removed.
242    signal_addr: SocketAddr,
243}
244
245impl ServiceDaemon {
246    /// Creates a new daemon and spawns a thread to run the daemon.
247    ///
248    /// Creates a new mDNS service daemon using the default port (5353).
249    ///
250    /// For development/testing with custom ports, use [`ServiceDaemon::new_with_port`].
251    pub fn new() -> Result<Self> {
252        Self::new_with_port(MDNS_PORT)
253    }
254
255    /// Creates a new mDNS service daemon using a custom port.
256    ///
257    /// # Arguments
258    ///
259    /// * `port` - The UDP port to bind for mDNS communication.
260    ///   - In production, this should be `MDNS_PORT` (5353) per RFC 6762.
261    ///   - For development/testing, you can use a non-standard port (e.g., 5454)
262    ///     to avoid conflicts with system mDNS services.
263    ///   - Both publisher and browser must use the same port to communicate.
264    ///
265    /// # Example
266    ///
267    /// ```no_run
268    /// use mdns_sd::ServiceDaemon;
269    ///
270    /// // Use standard mDNS port (production)
271    /// let daemon = ServiceDaemon::new_with_port(5353)?;
272    ///
273    /// // Use custom port for development (avoids macOS Bonjour conflict)
274    /// let daemon_dev = ServiceDaemon::new_with_port(5454)?;
275    /// # Ok::<(), mdns_sd::Error>(())
276    /// ```
277    pub fn new_with_port(port: u16) -> Result<Self> {
278        // Use port 0 to allow the system assign a random available port,
279        // no need for a pre-defined port number.
280        let signal_addr = SocketAddrV4::new(LOOPBACK_V4, 0);
281
282        let signal_sock = UdpSocket::bind(signal_addr)
283            .map_err(|e| e_fmt!("failed to create signal_sock for daemon: {}", e))?;
284
285        // Get the socket with the OS chosen port
286        let signal_addr = signal_sock
287            .local_addr()
288            .map_err(|e| e_fmt!("failed to get signal sock addr: {}", e))?;
289
290        // Must be nonblocking so we can listen to it together with mDNS sockets.
291        signal_sock
292            .set_nonblocking(true)
293            .map_err(|e| e_fmt!("failed to set nonblocking for signal socket: {}", e))?;
294
295        let poller = Poll::new().map_err(|e| e_fmt!("failed to create mio Poll: {e}"))?;
296
297        let (sender, receiver) = bounded(100);
298
299        // Spawn the daemon thread
300        let mio_sock = MioUdpSocket::from_std(signal_sock);
301        let cmd_sender = sender.clone();
302        thread::Builder::new()
303            .name("mDNS_daemon".to_string())
304            .spawn(move || {
305                Self::daemon_thread(mio_sock, poller, receiver, port, cmd_sender, signal_addr)
306            })
307            .map_err(|e| e_fmt!("thread builder failed to spawn: {}", e))?;
308
309        Ok(Self {
310            sender,
311            signal_addr,
312        })
313    }
314
315    /// Sends `cmd` to the daemon via its channel, and sends a signal
316    /// to its sock addr to notify.
317    fn send_cmd(&self, cmd: Command) -> Result<()> {
318        let cmd_name = cmd.to_string();
319
320        // First, send to the flume channel.
321        self.sender.try_send(cmd).map_err(|e| match e {
322            TrySendError::Full(_) => Error::Again,
323            e => e_fmt!("flume::channel::send failed: {}", e),
324        })?;
325
326        // Second, send a signal to notify the daemon.
327        let addr = SocketAddrV4::new(LOOPBACK_V4, 0);
328        let socket = UdpSocket::bind(addr)
329            .map_err(|e| e_fmt!("Failed to create socket to send signal: {}", e))?;
330        socket
331            .send_to(cmd_name.as_bytes(), self.signal_addr)
332            .map_err(|e| {
333                e_fmt!(
334                    "signal socket send_to {} ({}) failed: {}",
335                    self.signal_addr,
336                    cmd_name,
337                    e
338                )
339            })?;
340
341        Ok(())
342    }
343
344    /// Starts browsing for a specific service type.
345    ///
346    /// `service_type` must end with a valid mDNS domain: '._tcp.local.' or '._udp.local.'
347    ///
348    /// Returns a channel `Receiver` to receive events about the service. The caller
349    /// can call `.recv_async().await` on this receiver to handle events in an
350    /// async environment or call `.recv()` in a sync environment.
351    ///
352    /// When a new instance is found, the daemon automatically tries to resolve, i.e.
353    /// finding more details, i.e. SRV records and TXT records.
354    pub fn browse(&self, service_type: &str) -> Result<Receiver<ServiceEvent>> {
355        check_domain_suffix(service_type)?;
356
357        let (resp_s, resp_r) = bounded(10);
358        self.send_cmd(Command::Browse(service_type.to_string(), 1, false, resp_s))?;
359        Ok(resp_r)
360    }
361
362    /// Preforms a "cache-only" browse.
363    ///
364    /// `service_type` must end with a valid mDNS domain: '._tcp.local.' or '._udp.local.'
365    ///
366    /// The functionality is identical to 'browse', but the service events are based solely on the contents
367    /// of the daemon's cache. No actual mDNS query is sent to the network.
368    ///
369    /// See [accept_unsolicited](Self::accept_unsolicited) if you want to do cache-only browsing.
370    pub fn browse_cache(&self, service_type: &str) -> Result<Receiver<ServiceEvent>> {
371        check_domain_suffix(service_type)?;
372
373        let (resp_s, resp_r) = bounded(10);
374        self.send_cmd(Command::Browse(service_type.to_string(), 1, true, resp_s))?;
375        Ok(resp_r)
376    }
377
378    /// Stops searching for a specific service type.
379    ///
380    /// When an error is returned, the caller should retry only when
381    /// the error is `Error::Again`, otherwise should log and move on.
382    pub fn stop_browse(&self, ty_domain: &str) -> Result<()> {
383        self.send_cmd(Command::StopBrowse(ty_domain.to_string()))
384    }
385
386    /// Starts querying for the ip addresses of a hostname.
387    ///
388    /// Returns a channel `Receiver` to receive events about the hostname.
389    /// The caller can call `.recv_async().await` on this receiver to handle events in an
390    /// async environment or call `.recv()` in a sync environment.
391    ///
392    /// The `timeout` is specified in milliseconds.
393    pub fn resolve_hostname(
394        &self,
395        hostname: &str,
396        timeout: Option<u64>,
397    ) -> Result<Receiver<HostnameResolutionEvent>> {
398        check_hostname(hostname)?;
399        let (resp_s, resp_r) = bounded(10);
400        self.send_cmd(Command::ResolveHostname(
401            hostname.to_string(),
402            1,
403            resp_s,
404            timeout,
405        ))?;
406        Ok(resp_r)
407    }
408
409    /// Stops querying for the ip addresses of a hostname.
410    ///
411    /// When an error is returned, the caller should retry only when
412    /// the error is `Error::Again`, otherwise should log and move on.
413    pub fn stop_resolve_hostname(&self, hostname: &str) -> Result<()> {
414        self.send_cmd(Command::StopResolveHostname(hostname.to_string()))
415    }
416
417    /// Registers a service provided by this host.
418    ///
419    /// If `service_info` has no addresses yet and its `addr_auto` is enabled,
420    /// this method will automatically fill in addresses from the host.
421    ///
422    /// To re-announce a service with an updated `service_info`, just call
423    /// this `register` function again. No need to call `unregister` first.
424    pub fn register(&self, service_info: ServiceInfo) -> Result<()> {
425        check_service_name(service_info.get_fullname())?;
426        check_hostname(service_info.get_hostname())?;
427
428        self.send_cmd(Command::Register(service_info.into()))
429    }
430
431    /// Unregisters a service. This is a graceful shutdown of a service.
432    ///
433    /// Returns a channel receiver that is used to receive the status code
434    /// of the unregister.
435    ///
436    /// When an error is returned, the caller should retry only when
437    /// the error is `Error::Again`, otherwise should log and move on.
438    pub fn unregister(&self, fullname: &str) -> Result<Receiver<UnregisterStatus>> {
439        let (resp_s, resp_r) = bounded(1);
440        self.send_cmd(Command::Unregister(fullname.to_lowercase(), resp_s))?;
441        Ok(resp_r)
442    }
443
444    /// Starts to monitor events from the daemon.
445    ///
446    /// Returns a channel [`Receiver`] of [`DaemonEvent`].
447    pub fn monitor(&self) -> Result<Receiver<DaemonEvent>> {
448        let (resp_s, resp_r) = bounded(100);
449        self.send_cmd(Command::Monitor(resp_s))?;
450        Ok(resp_r)
451    }
452
453    /// Shuts down the daemon thread and returns a channel to receive the status.
454    ///
455    /// When an error is returned, the caller should retry only when
456    /// the error is `Error::Again`, otherwise should log and move on.
457    pub fn shutdown(&self) -> Result<Receiver<DaemonStatus>> {
458        let (resp_s, resp_r) = bounded(1);
459        self.send_cmd(Command::Exit(resp_s))?;
460        Ok(resp_r)
461    }
462
463    /// Returns the status of the daemon.
464    ///
465    /// When an error is returned, the caller should retry only when
466    /// the error is `Error::Again`, otherwise should consider the daemon
467    /// stopped working and move on.
468    pub fn status(&self) -> Result<Receiver<DaemonStatus>> {
469        let (resp_s, resp_r) = bounded(1);
470
471        if self.sender.is_disconnected() {
472            resp_s
473                .send(DaemonStatus::Shutdown)
474                .map_err(|e| e_fmt!("failed to send daemon status to the client: {}", e))?;
475        } else {
476            self.send_cmd(Command::GetStatus(resp_s))?;
477        }
478
479        Ok(resp_r)
480    }
481
482    /// Returns a channel receiver for the metrics, e.g. input/output counters.
483    ///
484    /// The metrics returned is a snapshot. Hence the caller should call
485    /// this method repeatedly if they want to monitor the metrics continuously.
486    pub fn get_metrics(&self) -> Result<Receiver<Metrics>> {
487        let (resp_s, resp_r) = bounded(1);
488        self.send_cmd(Command::GetMetrics(resp_s))?;
489        Ok(resp_r)
490    }
491
492    /// Change the max length allowed for a service name.
493    ///
494    /// As RFC 6763 defines a length max for a service name, a user should not call
495    /// this method unless they have to. See [`SERVICE_NAME_LEN_MAX_DEFAULT`].
496    ///
497    /// `len_max` is capped at an internal limit, which is currently 30.
498    pub fn set_service_name_len_max(&self, len_max: u8) -> Result<()> {
499        const SERVICE_NAME_LEN_MAX_LIMIT: u8 = 30; // Double the default length max.
500
501        if len_max > SERVICE_NAME_LEN_MAX_LIMIT {
502            return Err(Error::Msg(format!(
503                "service name length max {len_max} is too large"
504            )));
505        }
506
507        self.send_cmd(Command::SetOption(DaemonOption::ServiceNameLenMax(len_max)))
508    }
509
510    /// Change the interval for checking IP changes automatically.
511    ///
512    /// Setting the interval to 0 disables the IP check.
513    ///
514    /// See [`IP_CHECK_INTERVAL_IN_SECS_DEFAULT`] for the default interval.
515    pub fn set_ip_check_interval(&self, interval_in_secs: u32) -> Result<()> {
516        let interval_in_millis = interval_in_secs as u64 * 1000;
517        self.send_cmd(Command::SetOption(DaemonOption::IpCheckInterval(
518            interval_in_millis,
519        )))
520    }
521
522    /// Get the current interval in seconds for checking IP changes automatically.
523    pub fn get_ip_check_interval(&self) -> Result<u32> {
524        let (resp_s, resp_r) = bounded(1);
525        self.send_cmd(Command::GetOption(resp_s))?;
526
527        let option = resp_r
528            .recv_timeout(Duration::from_secs(10))
529            .map_err(|e| e_fmt!("failed to receive ip check interval: {}", e))?;
530        let ip_check_interval_in_secs = option.ip_check_interval / 1000;
531        Ok(ip_check_interval_in_secs as u32)
532    }
533
534    /// Include interfaces that match `if_kind` for this service daemon.
535    ///
536    /// For example:
537    /// ```ignore
538    ///     daemon.enable_interface("en0")?;
539    /// ```
540    pub fn enable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
541        let if_kind_vec = if_kind.into_vec();
542        self.send_cmd(Command::SetOption(DaemonOption::EnableInterface(
543            if_kind_vec.kinds,
544        )))
545    }
546
547    /// Ignore/exclude interfaces that match `if_kind` for this daemon.
548    ///
549    /// For example:
550    /// ```ignore
551    ///     daemon.disable_interface(IfKind::IPv6)?;
552    /// ```
553    pub fn disable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
554        let if_kind_vec = if_kind.into_vec();
555        self.send_cmd(Command::SetOption(DaemonOption::DisableInterface(
556            if_kind_vec.kinds,
557        )))
558    }
559
560    /// If `accept` is true, accept and cache all responses, even if there is no active querier
561    /// for a given service type. This is useful / necessary when doing cache-only browsing. See
562    /// [browse_cache](Self::browse_cache).
563    ///
564    /// If `accept` is false (default), accept only responses matching queries that we have initiated.
565    ///
566    /// For example:
567    /// ```ignore
568    ///     daemon.accept_unsolicited(true)?;
569    /// ```
570    pub fn accept_unsolicited(&self, accept: bool) -> Result<()> {
571        self.send_cmd(Command::SetOption(DaemonOption::AcceptUnsolicited(accept)))
572    }
573
574    /// Include or exclude Apple P2P interfaces, e.g. "awdl0", "llw0".
575    /// By default, they are excluded.
576    pub fn include_apple_p2p(&self, include: bool) -> Result<()> {
577        self.send_cmd(Command::SetOption(DaemonOption::IncludeAppleP2P(include)))
578    }
579
580    #[cfg(test)]
581    pub fn test_down_interface(&self, ifname: &str) -> Result<()> {
582        self.send_cmd(Command::SetOption(DaemonOption::TestDownInterface(
583            ifname.to_string(),
584        )))
585    }
586
587    #[cfg(test)]
588    pub fn test_up_interface(&self, ifname: &str) -> Result<()> {
589        self.send_cmd(Command::SetOption(DaemonOption::TestUpInterface(
590            ifname.to_string(),
591        )))
592    }
593
594    /// Enable or disable the loopback for locally sent multicast packets in IPv4.
595    ///
596    /// By default, multicast loop is enabled for IPv4. When disabled, a querier will not
597    /// receive announcements from a responder on the same host.
598    ///
599    /// Reference: <https://learn.microsoft.com/en-us/windows/win32/winsock/ip-multicast-2>
600    ///
601    /// "The Winsock version of the IP_MULTICAST_LOOP option is semantically different than
602    /// the UNIX version of the IP_MULTICAST_LOOP option:
603    ///
604    /// In Winsock, the IP_MULTICAST_LOOP option applies only to the receive path.
605    /// In the UNIX version, the IP_MULTICAST_LOOP option applies to the send path."
606    ///
607    /// Which means, in order NOT to receive localhost announcements, you want to call
608    /// this API on the querier side on Windows, but on the responder side on Unix.
609    pub fn set_multicast_loop_v4(&self, on: bool) -> Result<()> {
610        self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV4(on)))
611    }
612
613    /// Enable or disable the loopback for locally sent multicast packets in IPv6.
614    ///
615    /// By default, multicast loop is enabled for IPv6. When disabled, a querier will not
616    /// receive announcements from a responder on the same host.
617    ///
618    /// Reference: <https://learn.microsoft.com/en-us/windows/win32/winsock/ip-multicast-2>
619    ///
620    /// "The Winsock version of the IP_MULTICAST_LOOP option is semantically different than
621    /// the UNIX version of the IP_MULTICAST_LOOP option:
622    ///
623    /// In Winsock, the IP_MULTICAST_LOOP option applies only to the receive path.
624    /// In the UNIX version, the IP_MULTICAST_LOOP option applies to the send path."
625    ///
626    /// Which means, in order NOT to receive localhost announcements, you want to call
627    /// this API on the querier side on Windows, but on the responder side on Unix.
628    pub fn set_multicast_loop_v6(&self, on: bool) -> Result<()> {
629        self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV6(on)))
630    }
631
632    /// Proactively confirms whether a service instance still valid.
633    ///
634    /// This call will issue queries for a service instance's SRV record and Address records.
635    ///
636    /// For `timeout`, most users should use [VERIFY_TIMEOUT_DEFAULT]
637    /// unless there is a reason not to follow RFC.
638    ///
639    /// If no response is received within `timeout`, the current resource
640    /// records will be flushed, and if needed, `ServiceRemoved` event will be
641    /// sent to active queriers.
642    ///
643    /// Reference: [RFC 6762](https://datatracker.ietf.org/doc/html/rfc6762#section-10.4)
644    pub fn verify(&self, instance_fullname: String, timeout: Duration) -> Result<()> {
645        self.send_cmd(Command::Verify(instance_fullname, timeout))
646    }
647
648    fn daemon_thread(
649        signal_sock: MioUdpSocket,
650        poller: Poll,
651        receiver: Receiver<Command>,
652        port: u16,
653        cmd_sender: Sender<Command>,
654        signal_addr: SocketAddr,
655    ) {
656        let mut zc = Zeroconf::new(signal_sock, poller, port, cmd_sender, signal_addr);
657
658        if let Some(cmd) = zc.run(receiver) {
659            match cmd {
660                Command::Exit(resp_s) => {
661                    // It is guaranteed that the receiver already dropped,
662                    // i.e. the daemon command channel closed.
663                    if let Err(e) = resp_s.send(DaemonStatus::Shutdown) {
664                        debug!("exit: failed to send response of shutdown: {}", e);
665                    }
666                }
667                _ => {
668                    debug!("Unexpected command: {:?}", cmd);
669                }
670            }
671        }
672    }
673}
674
675/// Creates a new UDP socket that uses `intf` to send and recv multicast.
676fn _new_socket_bind(intf: &Interface, should_loop: bool) -> Result<MyUdpSocket> {
677    // Use the same socket for receiving and sending multicast packets.
678    // Such socket has to bind to INADDR_ANY or IN6ADDR_ANY.
679    let intf_ip = &intf.ip();
680    match intf_ip {
681        IpAddr::V4(ip) => {
682            let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), MDNS_PORT);
683            let sock = new_socket(addr.into(), true)?;
684
685            // Join mDNS group to receive packets.
686            sock.join_multicast_v4(&GROUP_ADDR_V4, ip)
687                .map_err(|e| e_fmt!("join multicast group on addr {}: {}", intf_ip, e))?;
688
689            // Set IP_MULTICAST_IF to send packets.
690            sock.set_multicast_if_v4(ip)
691                .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
692
693            // Per RFC 6762 section 11:
694            // "All Multicast DNS responses (including responses sent via unicast) SHOULD
695            // be sent with IP TTL set to 255."
696            // Here we set the TTL to 255 for multicast as we don't support unicast yet.
697            sock.set_multicast_ttl_v4(255)
698                .map_err(|e| e_fmt!("set set_multicast_ttl_v4 on addr {}: {}", ip, e))?;
699
700            if !should_loop {
701                sock.set_multicast_loop_v4(false)
702                    .map_err(|e| e_fmt!("failed to set multicast loop v4 for {ip}: {e}"))?;
703            }
704
705            // Test if we can send packets successfully.
706            let multicast_addr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
707            let test_packets = DnsOutgoing::new(0).to_data_on_wire();
708            for packet in test_packets {
709                sock.send_to(&packet, &multicast_addr)
710                    .map_err(|e| e_fmt!("send multicast packet on addr {}: {}", ip, e))?;
711            }
712            MyUdpSocket::new(sock)
713                .map_err(|e| e_fmt!("failed to create MySocket for interface {}: {e}", intf.name))
714        }
715        IpAddr::V6(ip) => {
716            let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), MDNS_PORT, 0, 0);
717            let sock = new_socket(addr.into(), true)?;
718
719            let if_index = intf.index.unwrap_or(0);
720
721            // Join mDNS group to receive packets.
722            sock.join_multicast_v6(&GROUP_ADDR_V6, if_index)
723                .map_err(|e| e_fmt!("join multicast group on addr {}: {}", ip, e))?;
724
725            // Set IPV6_MULTICAST_IF to send packets.
726            sock.set_multicast_if_v6(if_index)
727                .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
728
729            // We are not sending multicast packets to test this socket as there might
730            // be many IPv6 interfaces on a host and could cause such send error:
731            // "No buffer space available (os error 55)".
732
733            MyUdpSocket::new(sock)
734                .map_err(|e| e_fmt!("failed to create MySocket for interface {}: {e}", intf.name))
735        }
736    }
737}
738
739/// Creates a new UDP socket to bind to `port` with REUSEPORT option.
740/// `non_block` indicates whether to set O_NONBLOCK for the socket.
741fn new_socket(addr: SocketAddr, non_block: bool) -> Result<PktInfoUdpSocket> {
742    let domain = match addr {
743        SocketAddr::V4(_) => socket2::Domain::IPV4,
744        SocketAddr::V6(_) => socket2::Domain::IPV6,
745    };
746
747    let fd = PktInfoUdpSocket::new(domain).map_err(|e| e_fmt!("create socket failed: {}", e))?;
748
749    fd.set_reuse_address(true)
750        .map_err(|e| e_fmt!("set ReuseAddr failed: {}", e))?;
751    #[cfg(unix)]
752    if let Err(e) = fd.set_reuse_port(true) {
753        debug!(
754            "SO_REUSEPORT is not supported, continuing without it: {}",
755            e
756        );
757    }
758
759    if non_block {
760        fd.set_nonblocking(true)
761            .map_err(|e| e_fmt!("set O_NONBLOCK: {}", e))?;
762    }
763
764    fd.bind(&addr.into())
765        .map_err(|e| e_fmt!("socket bind to {} failed: {}", &addr, e))?;
766
767    trace!("new socket bind to {}", &addr);
768    Ok(fd)
769}
770
771/// Specify a UNIX timestamp in millis to run `command` for the next time.
772struct ReRun {
773    /// UNIX timestamp in millis.
774    next_time: u64,
775    command: Command,
776}
777
778/// Specify kinds of interfaces. It is used to enable or to disable interfaces in the daemon.
779///
780/// Note that for ergonomic reasons, `From<&str>` and `From<IpAddr>` are implemented.
781#[derive(Debug, Clone)]
782#[non_exhaustive]
783pub enum IfKind {
784    /// All interfaces.
785    All,
786
787    /// All IPv4 interfaces.
788    IPv4,
789
790    /// All IPv6 interfaces.
791    IPv6,
792
793    /// By the interface name, for example "en0"
794    Name(String),
795
796    /// By an IPv4 or IPv6 address.
797    /// This is used to look up the interface. The semantics is to identify an interface of
798    /// IPv4 or IPv6, not a specific address on the interface.
799    Addr(IpAddr),
800
801    /// 127.0.0.1 (or anything in 127.0.0.0/8), enabled by default.
802    ///
803    /// Loopback interfaces are required by some use cases (e.g., OSCQuery) for publishing.
804    LoopbackV4,
805
806    /// ::1/128, enabled by default.
807    LoopbackV6,
808
809    /// By interface index, IPv4 only.
810    IndexV4(u32),
811
812    /// By interface index, IPv6 only.
813    IndexV6(u32),
814}
815
816impl IfKind {
817    /// Checks if `intf` matches with this interface kind.
818    fn matches(&self, intf: &Interface) -> bool {
819        match self {
820            Self::All => true,
821            Self::IPv4 => intf.ip().is_ipv4(),
822            Self::IPv6 => intf.ip().is_ipv6(),
823            Self::Name(ifname) => ifname == &intf.name,
824            Self::Addr(addr) => addr == &intf.ip(),
825            Self::LoopbackV4 => intf.is_loopback() && intf.ip().is_ipv4(),
826            Self::LoopbackV6 => intf.is_loopback() && intf.ip().is_ipv6(),
827            Self::IndexV4(idx) => intf.index == Some(*idx) && intf.ip().is_ipv4(),
828            Self::IndexV6(idx) => intf.index == Some(*idx) && intf.ip().is_ipv6(),
829        }
830    }
831}
832
833/// The first use case of specifying an interface was to
834/// use an interface name. Hence adding this for ergonomic reasons.
835impl From<&str> for IfKind {
836    fn from(val: &str) -> Self {
837        Self::Name(val.to_string())
838    }
839}
840
841impl From<&String> for IfKind {
842    fn from(val: &String) -> Self {
843        Self::Name(val.to_string())
844    }
845}
846
847/// Still for ergonomic reasons.
848impl From<IpAddr> for IfKind {
849    fn from(val: IpAddr) -> Self {
850        Self::Addr(val)
851    }
852}
853
854/// A list of `IfKind` that can be used to match interfaces.
855pub struct IfKindVec {
856    kinds: Vec<IfKind>,
857}
858
859/// A trait that converts a type into a Vec of `IfKind`.
860pub trait IntoIfKindVec {
861    fn into_vec(self) -> IfKindVec;
862}
863
864impl<T: Into<IfKind>> IntoIfKindVec for T {
865    fn into_vec(self) -> IfKindVec {
866        let if_kind: IfKind = self.into();
867        IfKindVec {
868            kinds: vec![if_kind],
869        }
870    }
871}
872
873impl<T: Into<IfKind>> IntoIfKindVec for Vec<T> {
874    fn into_vec(self) -> IfKindVec {
875        let kinds: Vec<IfKind> = self.into_iter().map(|x| x.into()).collect();
876        IfKindVec { kinds }
877    }
878}
879
880/// Selection of interfaces.
881struct IfSelection {
882    /// The interfaces to be selected.
883    if_kind: IfKind,
884
885    /// Whether the `if_kind` should be enabled or not.
886    selected: bool,
887}
888
889/// A struct holding the state. It was inspired by `zeroconf` package in Python.
890struct Zeroconf {
891    /// The mDNS port number to use for socket binding.
892    /// Typically MDNS_PORT (5353), but can be customized for development/testing.
893    port: u16,
894
895    /// Local interfaces keyed by interface index.
896    my_intfs: HashMap<u32, MyIntf>,
897
898    /// A common socket for IPv4 interfaces. It's None if IPv4 is disabled in OS kernel.
899    ipv4_sock: Option<MyUdpSocket>,
900
901    /// A common socket for IPv6 interfaces. It's None if IPv6 is disabled in OS kernel.
902    ipv6_sock: Option<MyUdpSocket>,
903
904    /// Local registered services, keyed by service full names.
905    my_services: HashMap<String, ServiceInfo>,
906
907    /// Received DNS records.
908    cache: DnsCache,
909
910    /// Registered service records, keyed by interface index.
911    dns_registry_map: HashMap<u32, DnsRegistry>,
912
913    /// Active "Browse" commands.
914    service_queriers: HashMap<String, Sender<ServiceEvent>>, // <ty_domain, channel::sender>
915
916    /// Active "ResolveHostname" commands.
917    ///
918    /// The timestamps are set at the future timestamp when the command should timeout.
919    /// `hostname` is case-insensitive and stored in lowercase.
920    hostname_resolvers: HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>, // <hostname, (channel::sender, UNIX timestamp in millis)>
921
922    /// All repeating transmissions.
923    retransmissions: Vec<ReRun>,
924
925    counters: Metrics,
926
927    /// Waits for incoming packets.
928    poller: Poll,
929
930    /// Channels to notify events.
931    monitors: Vec<Sender<DaemonEvent>>,
932
933    /// Options
934    service_name_len_max: u8,
935
936    /// Interval in millis to check IP address changes.
937    ip_check_interval: u64,
938
939    /// All interface selections called to the daemon.
940    if_selections: Vec<IfSelection>,
941
942    /// Socket for signaling.
943    signal_sock: MioUdpSocket,
944
945    /// Timestamps marking where we need another iteration of the run loop,
946    /// to react to events like retransmissions, cache refreshes, interface IP address changes, etc.
947    ///
948    /// When the run loop goes through a single iteration, it will
949    /// set its timeout to the earliest timer in this list.
950    timers: BinaryHeap<Reverse<u64>>,
951
952    status: DaemonStatus,
953
954    /// Service instances that are pending for resolving SRV and TXT.
955    pending_resolves: HashSet<String>,
956
957    /// Service instances that are already resolved.
958    resolved: HashSet<String>,
959
960    multicast_loop_v4: bool,
961
962    multicast_loop_v6: bool,
963
964    accept_unsolicited: bool,
965
966    include_apple_p2p: bool,
967
968    cmd_sender: Sender<Command>,
969
970    signal_addr: SocketAddr,
971
972    #[cfg(test)]
973    test_down_interfaces: HashSet<String>,
974}
975
976/// Join the multicast group for the given interface.
977fn join_multicast_group(my_sock: &PktInfoUdpSocket, intf: &Interface) -> Result<()> {
978    let intf_ip = &intf.ip();
979    match intf_ip {
980        IpAddr::V4(ip) => {
981            // Join mDNS group to receive packets.
982            debug!("join multicast group V4 on {} addr {ip}", intf.name);
983            my_sock
984                .join_multicast_v4(&GROUP_ADDR_V4, ip)
985                .map_err(|e| e_fmt!("PKT join multicast group on addr {}: {}", intf_ip, e))?;
986        }
987        IpAddr::V6(ip) => {
988            let if_index = intf.index.unwrap_or(0);
989            // Join mDNS group to receive packets.
990            debug!(
991                "join multicast group V6 on {} addr {ip} with index {if_index}",
992                intf.name
993            );
994            my_sock
995                .join_multicast_v6(&GROUP_ADDR_V6, if_index)
996                .map_err(|e| e_fmt!("PKT join multicast group on addr {}: {}", ip, e))?;
997        }
998    }
999    Ok(())
1000}
1001
1002impl Zeroconf {
1003    fn new(
1004        signal_sock: MioUdpSocket,
1005        poller: Poll,
1006        port: u16,
1007        cmd_sender: Sender<Command>,
1008        signal_addr: SocketAddr,
1009    ) -> Self {
1010        // Get interfaces.
1011        let my_ifaddrs = my_ip_interfaces(true);
1012
1013        // Create a socket for every IP addr.
1014        // Note: it is possible that `my_ifaddrs` contains the same IP addr with different interface names,
1015        // or the same interface name with different IP addrs.
1016        let mut my_intfs = HashMap::new();
1017        let mut dns_registry_map = HashMap::new();
1018
1019        // Use the same socket for receiving and sending multicast packets.
1020        // Such socket has to bind to INADDR_ANY or IN6ADDR_ANY.
1021        let mut ipv4_sock = None;
1022        let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), port);
1023        match new_socket(addr.into(), true) {
1024            Ok(sock) => {
1025                // Per RFC 6762 section 11:
1026                // "All Multicast DNS responses (including responses sent via unicast) SHOULD
1027                // be sent with IP TTL set to 255."
1028                // Here we set the TTL to 255 for multicast as we don't support unicast yet.
1029                sock.set_multicast_ttl_v4(255)
1030                    .map_err(|e| e_fmt!("set set_multicast_ttl_v4 on addr: {}", e))
1031                    .ok();
1032
1033                // This clones a socket.
1034                ipv4_sock = match MyUdpSocket::new(sock) {
1035                    Ok(s) => Some(s),
1036                    Err(e) => {
1037                        debug!("failed to create IPv4 MyUdpSocket: {e}");
1038                        None
1039                    }
1040                };
1041            }
1042            // Per RFC 6762 section 11:}
1043            Err(e) => debug!("failed to create IPv4 socket: {e}"),
1044        }
1045
1046        let mut ipv6_sock = None;
1047        let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), port, 0, 0);
1048        match new_socket(addr.into(), true) {
1049            Ok(sock) => {
1050                // Per RFC 6762 section 11:
1051                // "All Multicast DNS responses (including responses sent via unicast) SHOULD
1052                // be sent with IP TTL set to 255."
1053                sock.set_multicast_hops_v6(255)
1054                    .map_err(|e| e_fmt!("set set_multicast_hops_v6: {}", e))
1055                    .ok();
1056
1057                // This clones the ipv6 socket.
1058                ipv6_sock = match MyUdpSocket::new(sock) {
1059                    Ok(s) => Some(s),
1060                    Err(e) => {
1061                        debug!("failed to create IPv6 MyUdpSocket: {e}");
1062                        None
1063                    }
1064                };
1065            }
1066            Err(e) => debug!("failed to create IPv6 socket: {e}"),
1067        }
1068
1069        // Configure sockets to join multicast groups.
1070        for intf in my_ifaddrs {
1071            let sock_opt = if intf.ip().is_ipv4() {
1072                &ipv4_sock
1073            } else {
1074                &ipv6_sock
1075            };
1076            let Some(sock) = sock_opt else {
1077                debug!(
1078                    "no socket available for interface {} with addr {}. Skipped.",
1079                    intf.name,
1080                    intf.ip()
1081                );
1082                continue;
1083            };
1084
1085            if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1086                debug!("failed to join multicast: {}: {e}. Skipped.", &intf.ip());
1087            }
1088
1089            let if_index = intf.index.unwrap_or(0);
1090
1091            // Add this interface address if not already present.
1092            dns_registry_map
1093                .entry(if_index)
1094                .or_insert_with(DnsRegistry::new);
1095
1096            my_intfs
1097                .entry(if_index)
1098                .and_modify(|v: &mut MyIntf| {
1099                    v.addrs.insert(intf.addr.clone());
1100                })
1101                .or_insert(MyIntf {
1102                    name: intf.name.clone(),
1103                    index: if_index,
1104                    addrs: HashSet::from([intf.addr]),
1105                });
1106        }
1107
1108        let monitors = Vec::new();
1109        let service_name_len_max = SERVICE_NAME_LEN_MAX_DEFAULT;
1110        let ip_check_interval = IP_CHECK_INTERVAL_IN_SECS_DEFAULT as u64 * 1000;
1111
1112        let timers = BinaryHeap::new();
1113
1114        // Enable everything, including loopback interfaces.
1115        let if_selections = vec![];
1116
1117        let status = DaemonStatus::Running;
1118
1119        Self {
1120            port,
1121            my_intfs,
1122            ipv4_sock,
1123            ipv6_sock,
1124            my_services: HashMap::new(),
1125            cache: DnsCache::new(),
1126            dns_registry_map,
1127            hostname_resolvers: HashMap::new(),
1128            service_queriers: HashMap::new(),
1129            retransmissions: Vec::new(),
1130            counters: HashMap::new(),
1131            poller,
1132            monitors,
1133            service_name_len_max,
1134            ip_check_interval,
1135            if_selections,
1136            signal_sock,
1137            timers,
1138            status,
1139            pending_resolves: HashSet::new(),
1140            resolved: HashSet::new(),
1141            multicast_loop_v4: true,
1142            multicast_loop_v6: true,
1143            accept_unsolicited: false,
1144            include_apple_p2p: false,
1145            cmd_sender,
1146            signal_addr,
1147
1148            #[cfg(test)]
1149            test_down_interfaces: HashSet::new(),
1150        }
1151    }
1152
1153    /// Send a Command into the daemon channel and poke the signal socket to wake the poll loop.
1154    fn send_cmd_to_self(&self, cmd: Command) -> Result<()> {
1155        let cmd_name = cmd.to_string();
1156
1157        self.cmd_sender.try_send(cmd).map_err(|e| match e {
1158            TrySendError::Full(_) => Error::Again,
1159            e => e_fmt!("flume::channel::send failed: {}", e),
1160        })?;
1161
1162        let addr = SocketAddrV4::new(LOOPBACK_V4, 0);
1163        let socket = UdpSocket::bind(addr)
1164            .map_err(|e| e_fmt!("Failed to create socket to send signal: {}", e))?;
1165        socket
1166            .send_to(cmd_name.as_bytes(), self.signal_addr)
1167            .map_err(|e| {
1168                e_fmt!(
1169                    "signal socket send_to {} ({}) failed: {}",
1170                    self.signal_addr,
1171                    cmd_name,
1172                    e
1173                )
1174            })?;
1175
1176        Ok(())
1177    }
1178
1179    /// Clean up all resources before shutdown.
1180    ///
1181    /// This method:
1182    /// 1. Unregisters all registered services (sends goodbye packets)
1183    /// 2. Stops all active browse operations
1184    /// 3. Stops all active hostname resolution operations
1185    /// 4. Clears all retransmissions
1186    fn cleanup(&mut self) {
1187        debug!("Starting cleanup for shutdown");
1188
1189        // 1. Unregister all services - send goodbye packets
1190        let service_names: Vec<String> = self.my_services.keys().cloned().collect();
1191        for fullname in service_names {
1192            if let Some(info) = self.my_services.get(&fullname) {
1193                debug!("Unregistering service during shutdown: {}", &fullname);
1194
1195                for intf in self.my_intfs.values() {
1196                    if let Some(sock) = self.ipv4_sock.as_ref() {
1197                        self.unregister_service(info, intf, &sock.pktinfo);
1198                    }
1199
1200                    if let Some(sock) = self.ipv6_sock.as_ref() {
1201                        self.unregister_service(info, intf, &sock.pktinfo);
1202                    }
1203                }
1204            }
1205        }
1206        self.my_services.clear();
1207
1208        // 2. Stop all browse operations
1209        let browse_types: Vec<String> = self.service_queriers.keys().cloned().collect();
1210        for ty_domain in browse_types {
1211            debug!("Stopping browse during shutdown: {}", &ty_domain);
1212            if let Some(sender) = self.service_queriers.remove(&ty_domain) {
1213                // Notify the client
1214                if let Err(e) = sender.send(ServiceEvent::SearchStopped(ty_domain.clone())) {
1215                    debug!("Failed to send SearchStopped during shutdown: {}", e);
1216                }
1217            }
1218        }
1219
1220        // 3. Stop all hostname resolution operations
1221        let hostnames: Vec<String> = self.hostname_resolvers.keys().cloned().collect();
1222        for hostname in hostnames {
1223            debug!(
1224                "Stopping hostname resolution during shutdown: {}",
1225                &hostname
1226            );
1227            if let Some((sender, _timeout)) = self.hostname_resolvers.remove(&hostname) {
1228                // Notify the client
1229                if let Err(e) =
1230                    sender.send(HostnameResolutionEvent::SearchStopped(hostname.clone()))
1231                {
1232                    debug!(
1233                        "Failed to send HostnameResolutionEvent::SearchStopped during shutdown: {}",
1234                        e
1235                    );
1236                }
1237            }
1238        }
1239
1240        // 4. Clear all retransmissions
1241        self.retransmissions.clear();
1242
1243        debug!("Cleanup completed");
1244    }
1245
1246    /// The main event loop of the daemon thread
1247    ///
1248    /// In each round, it will:
1249    /// 1. select the listening sockets with a timeout.
1250    /// 2. process the incoming packets if any.
1251    /// 3. try_recv on its channel and execute commands.
1252    /// 4. announce its registered services.
1253    /// 5. process retransmissions if any.
1254    fn run(&mut self, receiver: Receiver<Command>) -> Option<Command> {
1255        // Add the daemon's signal socket to the poller.
1256        if let Err(e) = self.poller.registry().register(
1257            &mut self.signal_sock,
1258            mio::Token(SIGNAL_SOCK_EVENT_KEY),
1259            mio::Interest::READABLE,
1260        ) {
1261            debug!("failed to add signal socket to the poller: {}", e);
1262            return None;
1263        }
1264
1265        if let Some(sock) = self.ipv4_sock.as_mut() {
1266            if let Err(e) = self.poller.registry().register(
1267                sock,
1268                mio::Token(IPV4_SOCK_EVENT_KEY),
1269                mio::Interest::READABLE,
1270            ) {
1271                debug!("failed to register ipv4 socket: {}", e);
1272                return None;
1273            }
1274        }
1275
1276        if let Some(sock) = self.ipv6_sock.as_mut() {
1277            if let Err(e) = self.poller.registry().register(
1278                sock,
1279                mio::Token(IPV6_SOCK_EVENT_KEY),
1280                mio::Interest::READABLE,
1281            ) {
1282                debug!("failed to register ipv6 socket: {}", e);
1283                return None;
1284            }
1285        }
1286
1287        // Setup timer for IP checks.
1288        let mut next_ip_check = if self.ip_check_interval > 0 {
1289            current_time_millis() + self.ip_check_interval
1290        } else {
1291            0
1292        };
1293
1294        if next_ip_check > 0 {
1295            self.add_timer(next_ip_check);
1296        }
1297
1298        // Start the run loop.
1299
1300        let mut events = mio::Events::with_capacity(1024);
1301        loop {
1302            let now = current_time_millis();
1303
1304            let earliest_timer = self.peek_earliest_timer();
1305            let timeout = earliest_timer.map(|timer| {
1306                // If `timer` already passed, set `timeout` to be 1ms.
1307                let millis = if timer > now { timer - now } else { 1 };
1308                Duration::from_millis(millis)
1309            });
1310
1311            // Process incoming packets, command events and optional timeout.
1312            events.clear();
1313            match self.poller.poll(&mut events, timeout) {
1314                Ok(_) => self.handle_poller_events(&events),
1315                Err(e) => debug!("failed to select from sockets: {}", e),
1316            }
1317
1318            let now = current_time_millis();
1319
1320            // Remove the timers if already passed.
1321            self.pop_timers_till(now);
1322
1323            // Remove hostname resolvers with expired timeouts.
1324            for hostname in self
1325                .hostname_resolvers
1326                .clone()
1327                .into_iter()
1328                .filter(|(_, (_, timeout))| timeout.map(|t| now >= t).unwrap_or(false))
1329                .map(|(hostname, _)| hostname)
1330            {
1331                trace!("hostname resolver timeout for {}", &hostname);
1332                call_hostname_resolution_listener(
1333                    &self.hostname_resolvers,
1334                    &hostname,
1335                    HostnameResolutionEvent::SearchTimeout(hostname.to_owned()),
1336                );
1337                call_hostname_resolution_listener(
1338                    &self.hostname_resolvers,
1339                    &hostname,
1340                    HostnameResolutionEvent::SearchStopped(hostname.to_owned()),
1341                );
1342                self.hostname_resolvers.remove(&hostname);
1343            }
1344
1345            // process commands from the command channel
1346            while let Ok(command) = receiver.try_recv() {
1347                if matches!(command, Command::Exit(_)) {
1348                    debug!("Exit command received, performing cleanup");
1349                    self.cleanup();
1350                    self.status = DaemonStatus::Shutdown;
1351                    return Some(command);
1352                }
1353                self.exec_command(command, false);
1354            }
1355
1356            // check for repeated commands and run them if their time is up.
1357            let mut i = 0;
1358            while i < self.retransmissions.len() {
1359                if now >= self.retransmissions[i].next_time {
1360                    let rerun = self.retransmissions.remove(i);
1361                    self.exec_command(rerun.command, true);
1362                } else {
1363                    i += 1;
1364                }
1365            }
1366
1367            // Refresh cached service records with active queriers
1368            self.refresh_active_services();
1369
1370            // Refresh cached A/AAAA records with active queriers
1371            let mut query_count = 0;
1372            for (hostname, _sender) in self.hostname_resolvers.iter() {
1373                for (hostname, ip_addr) in
1374                    self.cache.refresh_due_hostname_resolutions(hostname).iter()
1375                {
1376                    self.send_query(hostname, ip_address_rr_type(&ip_addr.to_ip_addr()));
1377                    query_count += 1;
1378                }
1379            }
1380
1381            self.increase_counter(Counter::CacheRefreshAddr, query_count);
1382
1383            // check and evict expired records in our cache
1384            let now = current_time_millis();
1385
1386            // Notify service listeners about the expired records.
1387            let expired_services = self.cache.evict_expired_services(now);
1388            if !expired_services.is_empty() {
1389                debug!(
1390                    "run: send {} service removal to listeners",
1391                    expired_services.len()
1392                );
1393                self.notify_service_removal(expired_services);
1394            }
1395
1396            // Notify hostname listeners about the expired records.
1397            let expired_addrs = self.cache.evict_expired_addr(now);
1398            for (hostname, addrs) in expired_addrs {
1399                call_hostname_resolution_listener(
1400                    &self.hostname_resolvers,
1401                    &hostname,
1402                    HostnameResolutionEvent::AddressesRemoved(hostname.clone(), addrs),
1403                );
1404                let instances = self.cache.get_instances_on_host(&hostname);
1405                let instance_set: HashSet<String> = instances.into_iter().collect();
1406                self.resolve_updated_instances(&instance_set);
1407            }
1408
1409            // Send out probing queries.
1410            self.probing_handler();
1411
1412            // check IP changes if next_ip_check is reached.
1413            if now >= next_ip_check && next_ip_check > 0 {
1414                next_ip_check = now + self.ip_check_interval;
1415                self.add_timer(next_ip_check);
1416
1417                self.check_ip_changes();
1418            }
1419        }
1420    }
1421
1422    fn process_set_option(&mut self, daemon_opt: DaemonOption) {
1423        match daemon_opt {
1424            DaemonOption::ServiceNameLenMax(length) => self.service_name_len_max = length,
1425            DaemonOption::IpCheckInterval(interval) => self.ip_check_interval = interval,
1426            DaemonOption::EnableInterface(if_kind) => self.enable_interface(if_kind),
1427            DaemonOption::DisableInterface(if_kind) => self.disable_interface(if_kind),
1428            DaemonOption::MulticastLoopV4(on) => self.set_multicast_loop_v4(on),
1429            DaemonOption::MulticastLoopV6(on) => self.set_multicast_loop_v6(on),
1430            DaemonOption::AcceptUnsolicited(accept) => self.set_accept_unsolicited(accept),
1431            DaemonOption::IncludeAppleP2P(enable) => self.set_apple_p2p(enable),
1432            #[cfg(test)]
1433            DaemonOption::TestDownInterface(ifname) => {
1434                self.test_down_interfaces.insert(ifname);
1435            }
1436            #[cfg(test)]
1437            DaemonOption::TestUpInterface(ifname) => {
1438                self.test_down_interfaces.remove(&ifname);
1439            }
1440        }
1441    }
1442
1443    fn enable_interface(&mut self, kinds: Vec<IfKind>) {
1444        debug!("enable_interface: {:?}", kinds);
1445        let interfaces = my_ip_interfaces_inner(true, self.include_apple_p2p);
1446
1447        for if_kind in kinds {
1448            self.if_selections.push(IfSelection {
1449                if_kind: resolve_addr_to_index(if_kind, &interfaces),
1450                selected: true,
1451            });
1452        }
1453
1454        self.apply_intf_selections(interfaces);
1455    }
1456
1457    fn disable_interface(&mut self, kinds: Vec<IfKind>) {
1458        debug!("disable_interface: {:?}", kinds);
1459        let interfaces = my_ip_interfaces_inner(true, self.include_apple_p2p);
1460
1461        for if_kind in kinds {
1462            self.if_selections.push(IfSelection {
1463                if_kind: resolve_addr_to_index(if_kind, &interfaces),
1464                selected: false,
1465            });
1466        }
1467
1468        self.apply_intf_selections(interfaces);
1469    }
1470
1471    fn set_multicast_loop_v4(&mut self, on: bool) {
1472        let Some(sock) = self.ipv4_sock.as_mut() else {
1473            return;
1474        };
1475        self.multicast_loop_v4 = on;
1476        sock.pktinfo
1477            .set_multicast_loop_v4(on)
1478            .map_err(|e| e_fmt!("failed to set multicast loop v4: {}", e))
1479            .unwrap();
1480    }
1481
1482    fn set_multicast_loop_v6(&mut self, on: bool) {
1483        let Some(sock) = self.ipv6_sock.as_mut() else {
1484            return;
1485        };
1486        self.multicast_loop_v6 = on;
1487        sock.pktinfo
1488            .set_multicast_loop_v6(on)
1489            .map_err(|e| e_fmt!("failed to set multicast loop v6: {}", e))
1490            .unwrap();
1491    }
1492
1493    fn set_accept_unsolicited(&mut self, accept: bool) {
1494        self.accept_unsolicited = accept;
1495    }
1496
1497    fn set_apple_p2p(&mut self, include: bool) {
1498        if self.include_apple_p2p != include {
1499            self.include_apple_p2p = include;
1500            self.apply_intf_selections(my_ip_interfaces_inner(true, self.include_apple_p2p));
1501        }
1502    }
1503
1504    fn notify_monitors(&mut self, event: DaemonEvent) {
1505        // Only retain the monitors that are still connected.
1506        self.monitors.retain(|sender| {
1507            if let Err(e) = sender.try_send(event.clone()) {
1508                debug!("notify_monitors: try_send: {}", &e);
1509                if matches!(e, TrySendError::Disconnected(_)) {
1510                    return false; // This monitor is dropped.
1511                }
1512            }
1513            true
1514        });
1515    }
1516
1517    /// Remove `addr` in my services that enabled `addr_auto`.
1518    fn del_addr_in_my_services(&mut self, addr: &IpAddr) {
1519        for (_, service_info) in self.my_services.iter_mut() {
1520            if service_info.is_addr_auto() {
1521                service_info.remove_ipaddr(addr);
1522            }
1523        }
1524    }
1525
1526    fn add_timer(&mut self, next_time: u64) {
1527        self.timers.push(Reverse(next_time));
1528    }
1529
1530    fn peek_earliest_timer(&self) -> Option<u64> {
1531        self.timers.peek().map(|Reverse(v)| *v)
1532    }
1533
1534    fn _pop_earliest_timer(&mut self) -> Option<u64> {
1535        self.timers.pop().map(|Reverse(v)| v)
1536    }
1537
1538    /// Pop all timers that are already passed till `now`.
1539    fn pop_timers_till(&mut self, now: u64) {
1540        while let Some(Reverse(v)) = self.timers.peek() {
1541            if *v > now {
1542                break;
1543            }
1544            self.timers.pop();
1545        }
1546    }
1547
1548    /// Apply all selections to `interfaces` and return the selected addresses.
1549    fn selected_intfs(&self, interfaces: Vec<Interface>) -> HashSet<Interface> {
1550        let intf_count = interfaces.len();
1551        let mut intf_selections = vec![true; intf_count];
1552
1553        // apply if_selections
1554        for selection in self.if_selections.iter() {
1555            // Mark the interfaces for this selection.
1556            for i in 0..intf_count {
1557                if selection.if_kind.matches(&interfaces[i]) {
1558                    intf_selections[i] = selection.selected;
1559                }
1560            }
1561        }
1562
1563        let mut selected_addrs = HashSet::new();
1564        for i in 0..intf_count {
1565            if intf_selections[i] {
1566                selected_addrs.insert(interfaces[i].clone());
1567            }
1568        }
1569
1570        selected_addrs
1571    }
1572
1573    /// Apply all selections to `interfaces`.
1574    ///
1575    /// For any interface, add it if selected but not bound yet,
1576    /// delete it if not selected but still bound.
1577    fn apply_intf_selections(&mut self, interfaces: Vec<Interface>) {
1578        // By default, we enable all interfaces.
1579        let intf_count = interfaces.len();
1580        let mut intf_selections = vec![true; intf_count];
1581
1582        // apply if_selections
1583        for selection in self.if_selections.iter() {
1584            // Mark the interfaces for this selection.
1585            for i in 0..intf_count {
1586                if selection.if_kind.matches(&interfaces[i]) {
1587                    intf_selections[i] = selection.selected;
1588                }
1589            }
1590        }
1591
1592        // Update `my_intfs` based on the selections.
1593        for (idx, intf) in interfaces.into_iter().enumerate() {
1594            if intf_selections[idx] {
1595                // Add the interface
1596                self.add_interface(intf);
1597            } else {
1598                // Remove the interface
1599                self.del_interface_addr(&intf);
1600            }
1601        }
1602    }
1603
1604    fn del_ip(&mut self, ip: IpAddr) {
1605        self.del_addr_in_my_services(&ip);
1606        self.notify_monitors(DaemonEvent::IpDel(ip));
1607    }
1608
1609    /// Check for IP changes and update [my_intfs] as needed.
1610    fn check_ip_changes(&mut self) {
1611        // Get the current interfaces.
1612        let my_ifaddrs = my_ip_interfaces_inner(true, self.include_apple_p2p);
1613
1614        #[cfg(test)]
1615        let my_ifaddrs: Vec<_> = my_ifaddrs
1616            .into_iter()
1617            .filter(|intf| !self.test_down_interfaces.contains(&intf.name))
1618            .collect();
1619
1620        let ifaddrs_map: HashMap<u32, Vec<&IfAddr>> =
1621            my_ifaddrs.iter().fold(HashMap::new(), |mut acc, intf| {
1622                let if_index = intf.index.unwrap_or(0);
1623                acc.entry(if_index).or_default().push(&intf.addr);
1624                acc
1625            });
1626
1627        let mut deleted_intfs = Vec::new();
1628        let mut deleted_ips = Vec::new();
1629
1630        for (if_index, my_intf) in self.my_intfs.iter_mut() {
1631            let mut last_ipv4 = None;
1632            let mut last_ipv6 = None;
1633
1634            if let Some(current_addrs) = ifaddrs_map.get(if_index) {
1635                my_intf.addrs.retain(|addr| {
1636                    if current_addrs.contains(&addr) {
1637                        true
1638                    } else {
1639                        match addr.ip() {
1640                            IpAddr::V4(ipv4) => last_ipv4 = Some(ipv4),
1641                            IpAddr::V6(ipv6) => last_ipv6 = Some(ipv6),
1642                        }
1643                        deleted_ips.push(addr.ip());
1644                        false
1645                    }
1646                });
1647                if my_intf.addrs.is_empty() {
1648                    deleted_intfs.push((*if_index, last_ipv4, last_ipv6))
1649                }
1650            } else {
1651                // If it does not exist, remove the interface.
1652                debug!(
1653                    "check_ip_changes: interface {} ({}) no longer exists, removing",
1654                    my_intf.name, if_index
1655                );
1656                for addr in my_intf.addrs.iter() {
1657                    match addr.ip() {
1658                        IpAddr::V4(ipv4) => last_ipv4 = Some(ipv4),
1659                        IpAddr::V6(ipv6) => last_ipv6 = Some(ipv6),
1660                    }
1661                    deleted_ips.push(addr.ip())
1662                }
1663                deleted_intfs.push((*if_index, last_ipv4, last_ipv6));
1664            }
1665        }
1666
1667        if !deleted_ips.is_empty() || !deleted_intfs.is_empty() {
1668            debug!(
1669                "check_ip_changes: {} deleted ips {} deleted intfs",
1670                deleted_ips.len(),
1671                deleted_intfs.len()
1672            );
1673        }
1674
1675        for ip in deleted_ips {
1676            self.del_ip(ip);
1677        }
1678
1679        for (if_index, last_ipv4, last_ipv6) in deleted_intfs {
1680            let Some(my_intf) = self.my_intfs.remove(&if_index) else {
1681                continue;
1682            };
1683
1684            if let Some(ipv4) = last_ipv4 {
1685                debug!("leave multicast for {ipv4}");
1686                if let Some(sock) = self.ipv4_sock.as_mut() {
1687                    if let Err(e) = sock.pktinfo.leave_multicast_v4(&GROUP_ADDR_V4, &ipv4) {
1688                        debug!("leave multicast group for addr {ipv4}: {e}");
1689                    }
1690                }
1691            }
1692
1693            if let Some(ipv6) = last_ipv6 {
1694                debug!("leave multicast for {ipv6}");
1695                if let Some(sock) = self.ipv6_sock.as_mut() {
1696                    if let Err(e) = sock
1697                        .pktinfo
1698                        .leave_multicast_v6(&GROUP_ADDR_V6, my_intf.index)
1699                    {
1700                        debug!("leave multicast group for IPv6: {ipv6}: {e}");
1701                    }
1702                }
1703            }
1704
1705            // Remove cache records for this interface.
1706            let intf_id = InterfaceId {
1707                name: my_intf.name.to_string(),
1708                index: my_intf.index,
1709            };
1710            let result = self.cache.remove_records_on_intf(intf_id);
1711            self.notify_service_removal(result.removed_instances);
1712            self.resolve_updated_instances(&result.modified_instances);
1713        }
1714
1715        // Add newly found interfaces only if in our selections.
1716        self.apply_intf_selections(my_ifaddrs);
1717    }
1718
1719    /// Remove an interface address when it was down, disabled or removed from the system.
1720    /// If no more addresses on the interface, remove the interface as well.
1721    fn del_interface_addr(&mut self, intf: &Interface) {
1722        let if_index = intf.index.unwrap_or(0);
1723        debug!(
1724            "del_interface_addr: {} ({if_index}) addr {}",
1725            intf.name,
1726            intf.ip()
1727        );
1728
1729        let Some(my_intf) = self.my_intfs.get_mut(&if_index) else {
1730            debug!("del_interface_addr: interface {} not found", intf.name);
1731            return;
1732        };
1733
1734        let mut ip_removed = false;
1735
1736        if my_intf.addrs.remove(&intf.addr) {
1737            ip_removed = true;
1738
1739            match intf.addr.ip() {
1740                IpAddr::V4(ipv4) => {
1741                    if my_intf.next_ifaddr_v4().is_none() {
1742                        if let Some(sock) = self.ipv4_sock.as_mut() {
1743                            if let Err(e) = sock.pktinfo.leave_multicast_v4(&GROUP_ADDR_V4, &ipv4) {
1744                                debug!("leave multicast group for addr {ipv4}: {e}");
1745                            } else {
1746                                debug!("leave multicast for {ipv4}");
1747                            }
1748                        }
1749                    }
1750                }
1751
1752                IpAddr::V6(ipv6) => {
1753                    if my_intf.next_ifaddr_v6().is_none() {
1754                        if let Some(sock) = self.ipv6_sock.as_mut() {
1755                            if let Err(e) =
1756                                sock.pktinfo.leave_multicast_v6(&GROUP_ADDR_V6, if_index)
1757                            {
1758                                debug!("leave multicast group for addr {ipv6}: {e}");
1759                            }
1760                        }
1761                    }
1762                }
1763            }
1764
1765            if my_intf.addrs.is_empty() {
1766                // If no more addresses, remove the interface.
1767                debug!("del_interface_addr: removing interface {}", intf.name);
1768                self.my_intfs.remove(&if_index);
1769                self.dns_registry_map.remove(&if_index);
1770                self.cache
1771                    .remove_addrs_on_disabled_intf(if_index, IpType::BOTH);
1772            } else {
1773                // Interface still has addresses of the other IP version.
1774                // Remove cached address records for the disabled IP version
1775                // only if no more addresses of that version remain.
1776                let is_v4 = intf.addr.ip().is_ipv4();
1777                let version_gone = if is_v4 {
1778                    my_intf.next_ifaddr_v4().is_none()
1779                } else {
1780                    my_intf.next_ifaddr_v6().is_none()
1781                };
1782                if version_gone {
1783                    let ip_type = if is_v4 { IpType::V4 } else { IpType::V6 };
1784                    self.cache.remove_addrs_on_disabled_intf(if_index, ip_type);
1785                }
1786            }
1787        }
1788
1789        if ip_removed {
1790            // Notify the monitors.
1791            self.notify_monitors(DaemonEvent::IpDel(intf.ip()));
1792            // Remove the interface from my services that enabled `addr_auto`.
1793            self.del_addr_in_my_services(&intf.ip());
1794        }
1795    }
1796
1797    fn add_interface(&mut self, intf: Interface) {
1798        let sock_opt = if intf.ip().is_ipv4() {
1799            &self.ipv4_sock
1800        } else {
1801            &self.ipv6_sock
1802        };
1803
1804        let Some(sock) = sock_opt else {
1805            debug!(
1806                "add_interface: no socket available for interface {} with addr {}. Skipped.",
1807                intf.name,
1808                intf.ip()
1809            );
1810            return;
1811        };
1812
1813        let if_index = intf.index.unwrap_or(0);
1814        let mut new_addr = false;
1815
1816        match self.my_intfs.entry(if_index) {
1817            Entry::Occupied(mut entry) => {
1818                // If intf has a new address, add it to the existing interface.
1819                let my_intf = entry.get_mut();
1820                if !my_intf.addrs.contains(&intf.addr) {
1821                    if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1822                        debug!("add_interface: socket_config {}: {e}", &intf.name);
1823                    }
1824                    my_intf.addrs.insert(intf.addr.clone());
1825                    new_addr = true;
1826                }
1827            }
1828            Entry::Vacant(entry) => {
1829                if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1830                    debug!("add_interface: socket_config {}: {e}. Skipped.", &intf.name);
1831                    return;
1832                }
1833
1834                new_addr = true;
1835                let new_intf = MyIntf {
1836                    name: intf.name.clone(),
1837                    index: if_index,
1838                    addrs: HashSet::from([intf.addr.clone()]),
1839                };
1840                entry.insert(new_intf);
1841            }
1842        }
1843
1844        if !new_addr {
1845            trace!("add_interface: interface {} already exists", &intf.name);
1846            return;
1847        }
1848
1849        debug!("add new interface {}: {}", intf.name, intf.ip());
1850
1851        let Some(my_intf) = self.my_intfs.get(&if_index) else {
1852            debug!("add_interface: cannot find if_index {if_index}");
1853            return;
1854        };
1855
1856        let dns_registry = match self.dns_registry_map.get_mut(&if_index) {
1857            Some(registry) => registry,
1858            None => self
1859                .dns_registry_map
1860                .entry(if_index)
1861                .or_insert_with(DnsRegistry::new),
1862        };
1863
1864        for (_, service_info) in self.my_services.iter_mut() {
1865            if service_info.is_addr_auto() {
1866                service_info.insert_ipaddr(&intf);
1867
1868                if let Ok(true) = announce_service_on_intf(
1869                    dns_registry,
1870                    service_info,
1871                    my_intf,
1872                    &sock.pktinfo,
1873                    self.port,
1874                ) {
1875                    debug!(
1876                        "Announce service {} on {}",
1877                        service_info.get_fullname(),
1878                        intf.ip()
1879                    );
1880                    service_info.set_status(if_index, ServiceStatus::Announced);
1881                } else {
1882                    for timer in dns_registry.new_timers.drain(..) {
1883                        self.timers.push(Reverse(timer));
1884                    }
1885                    service_info.set_status(if_index, ServiceStatus::Probing);
1886                }
1887            }
1888        }
1889
1890        // Send browse queries on the new interface without known answers.
1891        // This avoids known-answer suppression (RFC 6762 Section 7.1) that
1892        // would cause the responder to suppress its response, preventing
1893        // address records from being attributed to the new interface.
1894        if let Some(my_intf) = self.my_intfs.get(&if_index) {
1895            for ty in self.service_queriers.keys() {
1896                self.send_query_on_intf(ty, RRType::PTR, my_intf);
1897            }
1898        }
1899
1900        // Notify the monitors.
1901        self.notify_monitors(DaemonEvent::IpAdd(intf.ip()));
1902    }
1903
1904    /// Registers a service.
1905    ///
1906    /// RFC 6762 section 8.3.
1907    /// ...the Multicast DNS responder MUST send
1908    ///    an unsolicited Multicast DNS response containing, in the Answer
1909    ///    Section, all of its newly registered resource records
1910    ///
1911    /// Zeroconf will then respond to requests for information about this service.
1912    fn register_service(&mut self, mut info: ServiceInfo) {
1913        // Check the service name length.
1914        if let Err(e) = check_service_name_length(info.get_type(), self.service_name_len_max) {
1915            error!("check_service_name_length: {}", &e);
1916            self.notify_monitors(DaemonEvent::Error(e));
1917            return;
1918        }
1919
1920        if info.is_addr_auto() {
1921            let selected_intfs =
1922                self.selected_intfs(my_ip_interfaces_inner(true, self.include_apple_p2p));
1923            for intf in selected_intfs {
1924                info.insert_ipaddr(&intf);
1925            }
1926        }
1927
1928        debug!("register service {:?}", &info);
1929
1930        let outgoing_addrs = self.send_unsolicited_response(&mut info);
1931        if !outgoing_addrs.is_empty() {
1932            self.notify_monitors(DaemonEvent::Announce(
1933                info.get_fullname().to_string(),
1934                format!("{:?}", &outgoing_addrs),
1935            ));
1936        }
1937
1938        // The key has to be lower case letter as DNS record name is case insensitive.
1939        // The info will have the original name.
1940        let service_fullname = info.get_fullname().to_lowercase();
1941        self.my_services.insert(service_fullname, info);
1942    }
1943
1944    /// Sends out announcement of `info` on every valid interface.
1945    /// Returns the list of interface IPs that sent out the announcement.
1946    fn send_unsolicited_response(&mut self, info: &mut ServiceInfo) -> Vec<IpAddr> {
1947        let mut outgoing_addrs = Vec::new();
1948        let mut outgoing_intfs = HashSet::new();
1949
1950        let mut invalid_intf_addrs = HashSet::new();
1951
1952        for (if_index, intf) in self.my_intfs.iter() {
1953            let dns_registry = match self.dns_registry_map.get_mut(if_index) {
1954                Some(registry) => registry,
1955                None => self
1956                    .dns_registry_map
1957                    .entry(*if_index)
1958                    .or_insert_with(DnsRegistry::new),
1959            };
1960
1961            let mut announced = false;
1962
1963            // IPv4
1964            if let Some(sock) = self.ipv4_sock.as_mut() {
1965                match announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo, self.port) {
1966                    Ok(true) => {
1967                        for addr in intf.addrs.iter().filter(|a| a.ip().is_ipv4()) {
1968                            outgoing_addrs.push(addr.ip());
1969                        }
1970                        outgoing_intfs.insert(intf.index);
1971
1972                        debug!(
1973                            "Announce service IPv4 {} on {}",
1974                            info.get_fullname(),
1975                            intf.name
1976                        );
1977                        announced = true;
1978                    }
1979                    Ok(false) => {}
1980                    Err(InternalError::IntfAddrInvalid(intf_addr)) => {
1981                        invalid_intf_addrs.insert(intf_addr);
1982                    }
1983                }
1984            }
1985
1986            if let Some(sock) = self.ipv6_sock.as_mut() {
1987                match announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo, self.port) {
1988                    Ok(true) => {
1989                        for addr in intf.addrs.iter().filter(|a| a.ip().is_ipv6()) {
1990                            outgoing_addrs.push(addr.ip());
1991                        }
1992                        outgoing_intfs.insert(intf.index);
1993
1994                        debug!(
1995                            "Announce service IPv6 {} on {}",
1996                            info.get_fullname(),
1997                            intf.name
1998                        );
1999                        announced = true;
2000                    }
2001                    Ok(false) => {}
2002                    Err(InternalError::IntfAddrInvalid(intf_addr)) => {
2003                        invalid_intf_addrs.insert(intf_addr);
2004                    }
2005                }
2006            }
2007
2008            if announced {
2009                info.set_status(intf.index, ServiceStatus::Announced);
2010            } else {
2011                for timer in dns_registry.new_timers.drain(..) {
2012                    self.timers.push(Reverse(timer));
2013                }
2014                info.set_status(*if_index, ServiceStatus::Probing);
2015            }
2016        }
2017
2018        if !invalid_intf_addrs.is_empty() {
2019            let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addrs));
2020        }
2021
2022        // RFC 6762 section 8.3.
2023        // ..The Multicast DNS responder MUST send at least two unsolicited
2024        //    responses, one second apart.
2025        let next_time = current_time_millis() + 1000;
2026        for if_index in outgoing_intfs {
2027            self.add_retransmission(
2028                next_time,
2029                Command::RegisterResend(info.get_fullname().to_string(), if_index),
2030            );
2031        }
2032
2033        outgoing_addrs
2034    }
2035
2036    /// Send probings or finish them if expired. Notify waiting services.
2037    fn probing_handler(&mut self) {
2038        let now = current_time_millis();
2039        let mut invalid_intf_addrs = HashSet::new();
2040
2041        for (if_index, intf) in self.my_intfs.iter() {
2042            let Some(dns_registry) = self.dns_registry_map.get_mut(if_index) else {
2043                continue;
2044            };
2045
2046            let (out, expired_probes) = check_probing(dns_registry, &mut self.timers, now);
2047
2048            // send probing.
2049            if !out.questions().is_empty() {
2050                trace!("sending out probing of questions: {:?}", out.questions());
2051                if let Some(sock) = self.ipv4_sock.as_mut() {
2052                    if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
2053                        send_dns_outgoing(&out, intf, &sock.pktinfo, self.port, None)
2054                    {
2055                        invalid_intf_addrs.insert(intf_addr);
2056                    }
2057                }
2058                if let Some(sock) = self.ipv6_sock.as_mut() {
2059                    if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
2060                        send_dns_outgoing(&out, intf, &sock.pktinfo, self.port, None)
2061                    {
2062                        invalid_intf_addrs.insert(intf_addr);
2063                    }
2064                }
2065            }
2066
2067            // For finished probes, wake up services that are waiting for the probes.
2068            let waiting_services =
2069                handle_expired_probes(expired_probes, &intf.name, dns_registry, &mut self.monitors);
2070
2071            for service_name in waiting_services {
2072                // service names are lowercase
2073                if let Some(info) = self.my_services.get_mut(&service_name.to_lowercase()) {
2074                    if info.get_status(*if_index) == ServiceStatus::Announced {
2075                        debug!("service {} already announced", info.get_fullname());
2076                        continue;
2077                    }
2078
2079                    let announced_v4 = if let Some(sock) = self.ipv4_sock.as_mut() {
2080                        match announce_service_on_intf(
2081                            dns_registry,
2082                            info,
2083                            intf,
2084                            &sock.pktinfo,
2085                            self.port,
2086                        ) {
2087                            Ok(announced) => announced,
2088                            Err(InternalError::IntfAddrInvalid(intf_addr)) => {
2089                                invalid_intf_addrs.insert(intf_addr);
2090                                false
2091                            }
2092                        }
2093                    } else {
2094                        false
2095                    };
2096                    let announced_v6 = if let Some(sock) = self.ipv6_sock.as_mut() {
2097                        match announce_service_on_intf(
2098                            dns_registry,
2099                            info,
2100                            intf,
2101                            &sock.pktinfo,
2102                            self.port,
2103                        ) {
2104                            Ok(announced) => announced,
2105                            Err(InternalError::IntfAddrInvalid(intf_addr)) => {
2106                                invalid_intf_addrs.insert(intf_addr);
2107                                false
2108                            }
2109                        }
2110                    } else {
2111                        false
2112                    };
2113
2114                    if announced_v4 || announced_v6 {
2115                        let next_time = now + 1000;
2116                        let command =
2117                            Command::RegisterResend(info.get_fullname().to_string(), *if_index);
2118                        self.retransmissions.push(ReRun { next_time, command });
2119                        self.timers.push(Reverse(next_time));
2120
2121                        let fullname = dns_registry.resolve_name(&service_name).to_string();
2122
2123                        let hostname = dns_registry.resolve_name(info.get_hostname());
2124
2125                        debug!("wake up: announce service {} on {}", fullname, intf.name);
2126                        notify_monitors(
2127                            &mut self.monitors,
2128                            DaemonEvent::Announce(fullname, format!("{}:{}", hostname, &intf.name)),
2129                        );
2130
2131                        info.set_status(*if_index, ServiceStatus::Announced);
2132                    }
2133                }
2134            }
2135        }
2136
2137        if !invalid_intf_addrs.is_empty() {
2138            let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addrs));
2139        }
2140    }
2141
2142    fn unregister_service(
2143        &self,
2144        info: &ServiceInfo,
2145        intf: &MyIntf,
2146        sock: &PktInfoUdpSocket,
2147    ) -> Vec<u8> {
2148        let is_ipv4 = sock.domain() == Domain::IPV4;
2149
2150        let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
2151        out.add_answer_at_time(
2152            DnsPointer::new(
2153                info.get_type(),
2154                RRType::PTR,
2155                CLASS_IN,
2156                0,
2157                info.get_fullname().to_string(),
2158            ),
2159            0,
2160        );
2161
2162        if let Some(sub) = info.get_subtype() {
2163            trace!("Adding subdomain {}", sub);
2164            out.add_answer_at_time(
2165                DnsPointer::new(
2166                    sub,
2167                    RRType::PTR,
2168                    CLASS_IN,
2169                    0,
2170                    info.get_fullname().to_string(),
2171                ),
2172                0,
2173            );
2174        }
2175
2176        out.add_answer_at_time(
2177            DnsSrv::new(
2178                info.get_fullname(),
2179                CLASS_IN | CLASS_CACHE_FLUSH,
2180                0,
2181                info.get_priority(),
2182                info.get_weight(),
2183                info.get_port(),
2184                info.get_hostname().to_string(),
2185            ),
2186            0,
2187        );
2188        out.add_answer_at_time(
2189            DnsTxt::new(
2190                info.get_fullname(),
2191                CLASS_IN | CLASS_CACHE_FLUSH,
2192                0,
2193                info.generate_txt(),
2194            ),
2195            0,
2196        );
2197
2198        let if_addrs = if is_ipv4 {
2199            info.get_addrs_on_my_intf_v4(intf)
2200        } else {
2201            info.get_addrs_on_my_intf_v6(intf)
2202        };
2203
2204        if if_addrs.is_empty() {
2205            return vec![];
2206        }
2207
2208        for address in if_addrs {
2209            out.add_answer_at_time(
2210                DnsAddress::new(
2211                    info.get_hostname(),
2212                    ip_address_rr_type(&address),
2213                    CLASS_IN | CLASS_CACHE_FLUSH,
2214                    0,
2215                    address,
2216                    intf.into(),
2217                ),
2218                0,
2219            );
2220        }
2221
2222        // Only (at most) one packet is expected to be sent out.
2223        let sent_vec = match send_dns_outgoing(&out, intf, sock, self.port, None) {
2224            Ok(sent_vec) => sent_vec,
2225            Err(InternalError::IntfAddrInvalid(intf_addr)) => {
2226                let invalid_intf_addrs = HashSet::from([intf_addr]);
2227                let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addrs));
2228                vec![]
2229            }
2230        };
2231        sent_vec.into_iter().next().unwrap_or_default()
2232    }
2233
2234    /// Binds a channel `listener` to querying mDNS hostnames.
2235    ///
2236    /// If there is already a `listener`, it will be updated, i.e. overwritten.
2237    fn add_hostname_resolver(
2238        &mut self,
2239        hostname: String,
2240        listener: Sender<HostnameResolutionEvent>,
2241        timeout: Option<u64>,
2242    ) {
2243        let real_timeout = timeout.map(|t| current_time_millis() + t);
2244        self.hostname_resolvers
2245            .insert(hostname.to_lowercase(), (listener, real_timeout));
2246        if let Some(t) = real_timeout {
2247            self.add_timer(t);
2248        }
2249    }
2250
2251    /// Sends a multicast query for `name` with `qtype`.
2252    fn send_query(&self, name: &str, qtype: RRType) {
2253        self.send_query_vec(&[(name, qtype)]);
2254    }
2255
2256    /// Sends a query on a specific interface without known answers.
2257    ///
2258    /// Used when a new interface is added so the responder won't suppress
2259    /// its response due to known-answer suppression (RFC 6762 Section 7.1).
2260    fn send_query_on_intf(&self, name: &str, qtype: RRType, intf: &MyIntf) {
2261        let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
2262        out.add_question(name, qtype);
2263
2264        let mut invalid_intf_addrs = HashSet::new();
2265        if let Some(sock) = self.ipv4_sock.as_ref() {
2266            if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
2267                send_dns_outgoing(&out, intf, &sock.pktinfo, self.port, None)
2268            {
2269                invalid_intf_addrs.insert(intf_addr);
2270            }
2271        }
2272        if let Some(sock) = self.ipv6_sock.as_ref() {
2273            if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
2274                send_dns_outgoing(&out, intf, &sock.pktinfo, self.port, None)
2275            {
2276                invalid_intf_addrs.insert(intf_addr);
2277            }
2278        }
2279        if !invalid_intf_addrs.is_empty() {
2280            let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addrs));
2281        }
2282    }
2283
2284    /// Sends out a list of `questions` (i.e. DNS questions) via multicast.
2285    fn send_query_vec(&self, questions: &[(&str, RRType)]) {
2286        let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
2287        let now = current_time_millis();
2288
2289        for (name, qtype) in questions {
2290            out.add_question(name, *qtype);
2291
2292            for record in self.cache.get_known_answers(name, *qtype, now) {
2293                /*
2294                RFC 6762 section 7.1: https://datatracker.ietf.org/doc/html/rfc6762#section-7.1
2295                ...
2296                    When a Multicast DNS querier sends a query to which it already knows
2297                    some answers, it populates the Answer Section of the DNS query
2298                    message with those answers.
2299                 */
2300                trace!("add known answer: {:?}", record.record);
2301                let mut new_record = record.record.clone();
2302                new_record.get_record_mut().update_ttl(now);
2303                out.add_answer_box(new_record);
2304            }
2305        }
2306
2307        let mut invalid_intf_addrs = HashSet::new();
2308        for (_, intf) in self.my_intfs.iter() {
2309            if let Some(sock) = self.ipv4_sock.as_ref() {
2310                if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
2311                    send_dns_outgoing(&out, intf, &sock.pktinfo, self.port, None)
2312                {
2313                    invalid_intf_addrs.insert(intf_addr);
2314                }
2315            }
2316            if let Some(sock) = self.ipv6_sock.as_ref() {
2317                if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
2318                    send_dns_outgoing(&out, intf, &sock.pktinfo, self.port, None)
2319                {
2320                    invalid_intf_addrs.insert(intf_addr);
2321                }
2322            }
2323        }
2324
2325        if !invalid_intf_addrs.is_empty() {
2326            let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addrs));
2327        }
2328    }
2329
2330    /// Reads one UDP datagram from the socket of `intf`.
2331    ///
2332    /// Returns false if failed to receive a packet,
2333    /// otherwise returns true.
2334    fn handle_read(&mut self, event_key: usize) -> bool {
2335        let sock_opt = match event_key {
2336            IPV4_SOCK_EVENT_KEY => &mut self.ipv4_sock,
2337            IPV6_SOCK_EVENT_KEY => &mut self.ipv6_sock,
2338            _ => {
2339                debug!("handle_read: unknown token {}", event_key);
2340                return false;
2341            }
2342        };
2343        let Some(sock) = sock_opt.as_mut() else {
2344            debug!("handle_read: socket not available for token {}", event_key);
2345            return false;
2346        };
2347        let mut buf = vec![0u8; MAX_MSG_ABSOLUTE];
2348
2349        // Read the next mDNS UDP datagram.
2350        //
2351        // If the datagram is larger than `buf`, excess bytes may or may not
2352        // be truncated by the socket layer depending on the platform's libc.
2353        // In any case, such large datagram will not be decoded properly and
2354        // this function should return false but should not crash.
2355        let (sz, pktinfo) = match sock.pktinfo.recv(&mut buf) {
2356            Ok(sz) => sz,
2357            Err(e) => {
2358                if e.kind() != std::io::ErrorKind::WouldBlock {
2359                    debug!("listening socket read failed: {}", e);
2360                }
2361                return false;
2362            }
2363        };
2364
2365        // Find the interface that received the packet.
2366        let pkt_if_index = pktinfo.if_index as u32;
2367        let Some(my_intf) = self.my_intfs.get(&pkt_if_index) else {
2368            debug!(
2369                "handle_read: no interface found for pktinfo if_index: {}",
2370                pktinfo.if_index
2371            );
2372            return true; // We still return true to indicate that we read something.
2373        };
2374
2375        // Drop packets for an IP version that has been disabled on this interface.
2376        // This is needed because some times the socket layer may still receive packets
2377        // for an IP version even after we left the multicast group for that IP version.
2378        // We want to drop such packets to avoid unnecessary processing.
2379        let is_ipv4 = event_key == IPV4_SOCK_EVENT_KEY;
2380        if (is_ipv4 && my_intf.next_ifaddr_v4().is_none())
2381            || (!is_ipv4 && my_intf.next_ifaddr_v6().is_none())
2382        {
2383            debug!(
2384                "handle_read: dropping {} packet on intf {} (disabled)",
2385                if is_ipv4 { "IPv4" } else { "IPv6" },
2386                my_intf.name
2387            );
2388            return true;
2389        }
2390
2391        buf.truncate(sz); // reduce potential processing errors
2392
2393        match DnsIncoming::new(buf, my_intf.into()) {
2394            Ok(msg) => {
2395                if msg.is_query() {
2396                    let querier_ip = pktinfo.addr_src.ip();
2397                    self.handle_query(msg, pkt_if_index, querier_ip);
2398                } else if msg.is_response() {
2399                    self.handle_response(msg, pkt_if_index);
2400                } else {
2401                    debug!("Invalid message: not query and not response");
2402                }
2403            }
2404            Err(e) => debug!("Invalid incoming DNS message: {}", e),
2405        }
2406
2407        true
2408    }
2409
2410    /// Returns true, if sent query. Returns false if SRV already exists.
2411    fn query_unresolved(&mut self, instance: &str) -> bool {
2412        if !valid_instance_name(instance) {
2413            trace!("instance name {} not valid", instance);
2414            return false;
2415        }
2416
2417        if let Some(records) = self.cache.get_srv(instance) {
2418            for record in records {
2419                if let Some(srv) = record.record.any().downcast_ref::<DnsSrv>() {
2420                    if self.cache.get_addr(srv.host()).is_none() {
2421                        self.send_query_vec(&[(srv.host(), RRType::A), (srv.host(), RRType::AAAA)]);
2422                        return true;
2423                    }
2424                }
2425            }
2426        } else {
2427            self.send_query(instance, RRType::ANY);
2428            return true;
2429        }
2430
2431        false
2432    }
2433
2434    /// Checks if `ty_domain` has records in the cache. If yes, sends the
2435    /// cached records via `sender`.
2436    fn query_cache_for_service(
2437        &mut self,
2438        ty_domain: &str,
2439        sender: &Sender<ServiceEvent>,
2440        now: u64,
2441    ) {
2442        let mut resolved: HashSet<String> = HashSet::new();
2443        let mut unresolved: HashSet<String> = HashSet::new();
2444
2445        if let Some(records) = self.cache.get_ptr(ty_domain) {
2446            for record in records.iter().filter(|r| !r.record.expires_soon(now)) {
2447                if let Some(ptr) = record.record.any().downcast_ref::<DnsPointer>() {
2448                    let mut new_event = None;
2449                    match self.resolve_service_from_cache(ty_domain, ptr.alias()) {
2450                        Ok(resolved_service) => {
2451                            if resolved_service.is_valid() {
2452                                debug!("Resolved service from cache: {}", ptr.alias());
2453                                new_event =
2454                                    Some(ServiceEvent::ServiceResolved(Box::new(resolved_service)));
2455                            } else {
2456                                debug!("Resolved service is not valid: {}", ptr.alias());
2457                            }
2458                        }
2459                        Err(err) => {
2460                            debug!("Error while resolving service from cache: {}", err);
2461                            continue;
2462                        }
2463                    }
2464
2465                    match sender.send(ServiceEvent::ServiceFound(
2466                        ty_domain.to_string(),
2467                        ptr.alias().to_string(),
2468                    )) {
2469                        Ok(()) => debug!("sent service found {}", ptr.alias()),
2470                        Err(e) => {
2471                            debug!("failed to send service found: {}", e);
2472                            continue;
2473                        }
2474                    }
2475
2476                    if let Some(event) = new_event {
2477                        resolved.insert(ptr.alias().to_string());
2478                        match sender.send(event) {
2479                            Ok(()) => debug!("sent service resolved: {}", ptr.alias()),
2480                            Err(e) => debug!("failed to send service resolved: {}", e),
2481                        }
2482                    } else {
2483                        unresolved.insert(ptr.alias().to_string());
2484                    }
2485                }
2486            }
2487        }
2488
2489        for instance in resolved.drain() {
2490            self.pending_resolves.remove(&instance);
2491            self.resolved.insert(instance);
2492        }
2493
2494        for instance in unresolved.drain() {
2495            self.add_pending_resolve(instance);
2496        }
2497    }
2498
2499    /// Checks if `hostname` has records in the cache. If yes, sends the
2500    /// cached records via `sender`.
2501    fn query_cache_for_hostname(
2502        &mut self,
2503        hostname: &str,
2504        sender: Sender<HostnameResolutionEvent>,
2505    ) {
2506        let addresses_map = self.cache.get_addresses_for_host(hostname);
2507        for (name, addresses) in addresses_map {
2508            match sender.send(HostnameResolutionEvent::AddressesFound(name, addresses)) {
2509                Ok(()) => trace!("sent hostname addresses found"),
2510                Err(e) => debug!("failed to send hostname addresses found: {}", e),
2511            }
2512        }
2513    }
2514
2515    fn add_pending_resolve(&mut self, instance: String) {
2516        if !self.pending_resolves.contains(&instance) {
2517            let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
2518            self.add_retransmission(next_time, Command::Resolve(instance.clone(), 1));
2519            self.pending_resolves.insert(instance);
2520        }
2521    }
2522
2523    /// Creates a `ResolvedService` from the cache.
2524    fn resolve_service_from_cache(
2525        &self,
2526        ty_domain: &str,
2527        fullname: &str,
2528    ) -> Result<ResolvedService> {
2529        let now = current_time_millis();
2530        let mut resolved_service = ResolvedService {
2531            ty_domain: ty_domain.to_string(),
2532            sub_ty_domain: None,
2533            fullname: fullname.to_string(),
2534            host: String::new(),
2535            port: 0,
2536            addresses: HashSet::new(),
2537            txt_properties: TxtProperties::new(),
2538        };
2539
2540        // Be sure setting `subtype` if available even when querying for the parent domain.
2541        if let Some(subtype) = self.cache.get_subtype(fullname) {
2542            trace!(
2543                "ty_domain: {} found subtype {} for instance: {}",
2544                ty_domain,
2545                subtype,
2546                fullname
2547            );
2548            if resolved_service.sub_ty_domain.is_none() {
2549                resolved_service.sub_ty_domain = Some(subtype.to_string());
2550            }
2551        }
2552
2553        // resolve SRV record
2554        if let Some(records) = self.cache.get_srv(fullname) {
2555            if let Some(answer) = records.iter().find(|r| !r.record.expires_soon(now)) {
2556                if let Some(dns_srv) = answer.record.any().downcast_ref::<DnsSrv>() {
2557                    resolved_service.host = dns_srv.host().to_string();
2558                    resolved_service.port = dns_srv.port();
2559                }
2560            }
2561        }
2562
2563        // resolve TXT record
2564        if let Some(records) = self.cache.get_txt(fullname) {
2565            if let Some(record) = records.iter().find(|r| !r.record.expires_soon(now)) {
2566                if let Some(dns_txt) = record.record.any().downcast_ref::<DnsTxt>() {
2567                    resolved_service.txt_properties = dns_txt.text().into();
2568                }
2569            }
2570        }
2571
2572        // resolve A and AAAA records
2573        if let Some(records) = self.cache.get_addr(&resolved_service.host) {
2574            for answer in records.iter() {
2575                if let Some(dns_a) = answer.record.any().downcast_ref::<DnsAddress>() {
2576                    if dns_a.expires_soon(now) {
2577                        trace!(
2578                            "Addr expired or expires soon: {}",
2579                            dns_a.address().to_ip_addr()
2580                        );
2581                    } else {
2582                        let scoped = dns_a.address();
2583                        if let ScopedIp::V4(v4) = &scoped {
2584                            // Merge interface_ids if this V4 addr already exists.
2585                            // Linear scan by IP since Eq/Hash include interface_ids.
2586                            let existing = resolved_service
2587                                .addresses
2588                                .iter()
2589                                .find(|a| a.to_ip_addr() == IpAddr::V4(*v4.addr()))
2590                                .cloned();
2591                            if let Some(mut existing) = existing {
2592                                resolved_service.addresses.remove(&existing);
2593                                if let ScopedIp::V4(existing_v4) = &mut existing {
2594                                    for id in v4.interface_ids() {
2595                                        existing_v4.add_interface_id(id.clone());
2596                                    }
2597                                }
2598                                resolved_service.addresses.insert(existing);
2599                            } else {
2600                                resolved_service.addresses.insert(scoped);
2601                            }
2602                        } else {
2603                            resolved_service.addresses.insert(scoped);
2604                        }
2605                    }
2606                }
2607            }
2608        }
2609
2610        Ok(resolved_service)
2611    }
2612
2613    fn handle_poller_events(&mut self, events: &mio::Events) {
2614        for ev in events.iter() {
2615            trace!("event received with key {:?}", ev.token());
2616            if ev.token().0 == SIGNAL_SOCK_EVENT_KEY {
2617                // Drain signals as we will drain commands as well.
2618                self.signal_sock_drain();
2619
2620                if let Err(e) = self.poller.registry().reregister(
2621                    &mut self.signal_sock,
2622                    ev.token(),
2623                    mio::Interest::READABLE,
2624                ) {
2625                    debug!("failed to modify poller for signal socket: {}", e);
2626                }
2627                continue; // Next event.
2628            }
2629
2630            // Read until no more packets available.
2631            while self.handle_read(ev.token().0) {}
2632
2633            // we continue to monitor this socket.
2634            if ev.token().0 == IPV4_SOCK_EVENT_KEY {
2635                // Re-register the IPv4 socket for reading.
2636                if let Some(sock) = self.ipv4_sock.as_mut() {
2637                    if let Err(e) =
2638                        self.poller
2639                            .registry()
2640                            .reregister(sock, ev.token(), mio::Interest::READABLE)
2641                    {
2642                        debug!("modify poller for IPv4 socket: {}", e);
2643                    }
2644                }
2645            } else if ev.token().0 == IPV6_SOCK_EVENT_KEY {
2646                // Re-register the IPv6 socket for reading.
2647                if let Some(sock) = self.ipv6_sock.as_mut() {
2648                    if let Err(e) =
2649                        self.poller
2650                            .registry()
2651                            .reregister(sock, ev.token(), mio::Interest::READABLE)
2652                    {
2653                        debug!("modify poller for IPv6 socket: {}", e);
2654                    }
2655                }
2656            }
2657        }
2658    }
2659
2660    /// Deal with incoming response packets.  All answers
2661    /// are held in the cache, and listeners are notified.
2662    fn handle_response(&mut self, mut msg: DnsIncoming, if_index: u32) {
2663        let now = current_time_millis();
2664
2665        // remove records that are expired.
2666        let mut record_predicate = |record: &DnsRecordBox| {
2667            if !record.get_record().is_expired(now) {
2668                return true;
2669            }
2670
2671            debug!("record is expired, removing it from cache.");
2672            if self.cache.remove(record) {
2673                // for PTR records, send event to listeners
2674                if let Some(dns_ptr) = record.any().downcast_ref::<DnsPointer>() {
2675                    call_service_listener(
2676                        &self.service_queriers,
2677                        dns_ptr.get_name(),
2678                        ServiceEvent::ServiceRemoved(
2679                            dns_ptr.get_name().to_string(),
2680                            dns_ptr.alias().to_string(),
2681                        ),
2682                    );
2683                }
2684            }
2685            false
2686        };
2687        msg.answers_mut().retain(&mut record_predicate);
2688        msg.authorities_mut().retain(&mut record_predicate);
2689        msg.additionals_mut().retain(&mut record_predicate);
2690
2691        // check possible conflicts and handle them.
2692        self.conflict_handler(&msg, if_index);
2693
2694        // check if the message is for us.
2695        let mut is_for_us = true; // assume it is for us.
2696
2697        // If there are any PTR records in the answers, there should be
2698        // at least one PTR for us. Otherwise, the message is not for us.
2699        // If there are no PTR records at all, assume this message is for us.
2700        for answer in msg.answers() {
2701            if answer.get_type() == RRType::PTR {
2702                if self.service_queriers.contains_key(answer.get_name()) {
2703                    is_for_us = true;
2704                    break; // OK to break: at least one PTR for us.
2705                } else {
2706                    is_for_us = false;
2707                }
2708            } else if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2709                // If there is a hostname querier for this address, then it is for us.
2710                let answer_lowercase = answer.get_name().to_lowercase();
2711                if self.hostname_resolvers.contains_key(&answer_lowercase) {
2712                    is_for_us = true;
2713                    break; // OK to break: at least one hostname for us.
2714                }
2715            }
2716        }
2717
2718        // if we explicitily want to accept unsolicited responses, we should consider all messages as for us.
2719        if self.accept_unsolicited {
2720            is_for_us = true;
2721        }
2722
2723        /// Represents a DNS record change that involves one service instance.
2724        struct InstanceChange {
2725            ty: RRType,   // The type of DNS record for the instance.
2726            name: String, // The name of the record.
2727        }
2728
2729        // Go through all answers to get the new and updated records.
2730        // For new PTR records, send out ServiceFound immediately. For others,
2731        // collect them into `changes`.
2732        //
2733        // Note: we don't try to identify the update instances based on
2734        // each record immediately as the answers are likely related to each
2735        // other.
2736        let mut changes = Vec::new();
2737        let mut timers = Vec::new();
2738        let Some(my_intf) = self.my_intfs.get(&if_index) else {
2739            return;
2740        };
2741        for record in msg.all_records() {
2742            match self
2743                .cache
2744                .add_or_update(my_intf, record, &mut timers, is_for_us)
2745            {
2746                Some((dns_record, true)) => {
2747                    timers.push(dns_record.record.get_record().get_expire_time());
2748                    timers.push(dns_record.record.get_record().get_refresh_time());
2749
2750                    let ty = dns_record.record.get_type();
2751                    let name = dns_record.record.get_name();
2752
2753                    // Only process PTR that does not expire soon (i.e. TTL > 1).
2754                    if ty == RRType::PTR && dns_record.record.get_record().get_ttl() > 1 {
2755                        if self.service_queriers.contains_key(name) {
2756                            timers.push(dns_record.record.get_record().get_refresh_time());
2757                        }
2758
2759                        // send ServiceFound
2760                        if let Some(dns_ptr) = dns_record.record.any().downcast_ref::<DnsPointer>()
2761                        {
2762                            debug!("calling listener with service found: {name}");
2763                            call_service_listener(
2764                                &self.service_queriers,
2765                                name,
2766                                ServiceEvent::ServiceFound(
2767                                    name.to_string(),
2768                                    dns_ptr.alias().to_string(),
2769                                ),
2770                            );
2771                            changes.push(InstanceChange {
2772                                ty,
2773                                name: dns_ptr.alias().to_string(),
2774                            });
2775                        }
2776                    } else {
2777                        changes.push(InstanceChange {
2778                            ty,
2779                            name: name.to_string(),
2780                        });
2781                    }
2782                }
2783                Some((dns_record, false)) => {
2784                    timers.push(dns_record.record.get_record().get_expire_time());
2785                    timers.push(dns_record.record.get_record().get_refresh_time());
2786                }
2787                _ => {}
2788            }
2789        }
2790
2791        // Add timers for the new records.
2792        for t in timers {
2793            self.add_timer(t);
2794        }
2795
2796        // Go through remaining changes to see if any hostname resolutions were found or updated.
2797        for change in changes
2798            .iter()
2799            .filter(|change| change.ty == RRType::A || change.ty == RRType::AAAA)
2800        {
2801            let addr_map = self.cache.get_addresses_for_host(&change.name);
2802            for (name, addresses) in addr_map {
2803                call_hostname_resolution_listener(
2804                    &self.hostname_resolvers,
2805                    &change.name,
2806                    HostnameResolutionEvent::AddressesFound(name, addresses),
2807                )
2808            }
2809        }
2810
2811        // Identify the instances that need to be "resolved".
2812        let mut updated_instances = HashSet::new();
2813        for update in changes {
2814            match update.ty {
2815                RRType::PTR | RRType::SRV | RRType::TXT => {
2816                    updated_instances.insert(update.name);
2817                }
2818                RRType::A | RRType::AAAA => {
2819                    let instances = self.cache.get_instances_on_host(&update.name);
2820                    updated_instances.extend(instances);
2821                }
2822                _ => {}
2823            }
2824        }
2825
2826        self.resolve_updated_instances(&updated_instances);
2827    }
2828
2829    fn conflict_handler(&mut self, msg: &DnsIncoming, if_index: u32) {
2830        let Some(my_intf) = self.my_intfs.get(&if_index) else {
2831            debug!("handle_response: no intf found for index {if_index}");
2832            return;
2833        };
2834
2835        let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
2836            return;
2837        };
2838
2839        for answer in msg.answers().iter() {
2840            let mut new_records = Vec::new();
2841
2842            let name = answer.get_name();
2843            let Some(probe) = dns_registry.probing.get_mut(name) else {
2844                continue;
2845            };
2846
2847            // check against possible multicast forwarding
2848            if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2849                if let Some(answer_addr) = answer.any().downcast_ref::<DnsAddress>() {
2850                    if answer_addr.interface_id.index != if_index {
2851                        debug!(
2852                            "conflict handler: answer addr {:?} not in the subnet of intf {}",
2853                            answer_addr, my_intf.name
2854                        );
2855                        continue;
2856                    }
2857                }
2858
2859                // double check if any other address record matches rrdata,
2860                // as there could be multiple addresses for the same name.
2861                let any_match = probe.records.iter().any(|r| {
2862                    r.get_type() == answer.get_type()
2863                        && r.get_class() == answer.get_class()
2864                        && r.rrdata_match(answer.as_ref())
2865                });
2866                if any_match {
2867                    continue; // no conflict for this answer.
2868                }
2869            }
2870
2871            probe.records.retain(|record| {
2872                if record.get_type() == answer.get_type()
2873                    && record.get_class() == answer.get_class()
2874                    && !record.rrdata_match(answer.as_ref())
2875                {
2876                    debug!(
2877                        "found conflict name: '{name}' record: {}: {} PEER: {}",
2878                        record.get_type(),
2879                        record.rdata_print(),
2880                        answer.rdata_print()
2881                    );
2882
2883                    // create a new name for this record
2884                    // then remove the old record in probing.
2885                    let mut new_record = record.clone();
2886                    let new_name = match record.get_type() {
2887                        RRType::A => hostname_change(name),
2888                        RRType::AAAA => hostname_change(name),
2889                        _ => name_change(name),
2890                    };
2891                    new_record.get_record_mut().set_new_name(new_name);
2892                    new_records.push(new_record);
2893                    return false; // old record is dropped from the probe.
2894                }
2895
2896                true
2897            });
2898
2899            // ?????
2900            // if probe.records.is_empty() {
2901            //     dns_registry.probing.remove(name);
2902            // }
2903
2904            // Probing again with the new names.
2905            let create_time = current_time_millis() + fastrand::u64(0..250);
2906
2907            let waiting_services = probe.waiting_services.clone();
2908
2909            for record in new_records {
2910                if dns_registry.update_hostname(name, record.get_name(), create_time) {
2911                    self.timers.push(Reverse(create_time));
2912                }
2913
2914                // remember the name changes (note: `name` might not be the original, it could be already changed once.)
2915                dns_registry.name_changes.insert(
2916                    record.get_record().get_original_name().to_string(),
2917                    record.get_name().to_string(),
2918                );
2919
2920                let new_probe = match dns_registry.probing.get_mut(record.get_name()) {
2921                    Some(p) => p,
2922                    None => {
2923                        let new_probe = dns_registry
2924                            .probing
2925                            .entry(record.get_name().to_string())
2926                            .or_insert_with(|| {
2927                                debug!("conflict handler: new probe of {}", record.get_name());
2928                                Probe::new(create_time)
2929                            });
2930                        self.timers.push(Reverse(new_probe.next_send));
2931                        new_probe
2932                    }
2933                };
2934
2935                debug!(
2936                    "insert record with new name '{}' {} into probe",
2937                    record.get_name(),
2938                    record.get_type()
2939                );
2940                new_probe.insert_record(record);
2941
2942                new_probe.waiting_services.extend(waiting_services.clone());
2943            }
2944        }
2945    }
2946
2947    /// Resolve the updated (including new) instances.
2948    ///
2949    /// Note: it is possible that more than 1 PTR pointing to the same
2950    /// instance. For example, a regular service type PTR and a sub-type
2951    /// service type PTR can both point to the same service instance.
2952    /// This loop automatically handles the sub-type PTRs.
2953    fn resolve_updated_instances(&mut self, updated_instances: &HashSet<String>) {
2954        if updated_instances.is_empty() {
2955            return;
2956        }
2957
2958        let mut resolved: HashSet<String> = HashSet::new();
2959        let mut unresolved: HashSet<String> = HashSet::new();
2960        let mut removed_instances = HashMap::new();
2961
2962        let now = current_time_millis();
2963
2964        for (ty_domain, records) in self.cache.all_ptr().iter() {
2965            if !self.service_queriers.contains_key(ty_domain) {
2966                // No need to resolve if not in our queries.
2967                continue;
2968            }
2969
2970            for ptr in records.iter().filter(|r| !r.record.expires_soon(now)) {
2971                let Some(dns_ptr) = ptr.record.any().downcast_ref::<DnsPointer>() else {
2972                    continue;
2973                };
2974
2975                let instance = dns_ptr.alias();
2976                if !updated_instances.contains(instance) {
2977                    continue;
2978                }
2979
2980                let Ok(resolved_service) = self.resolve_service_from_cache(ty_domain, instance)
2981                else {
2982                    continue;
2983                };
2984
2985                debug!("resolve_updated_instances: from cache: {instance}");
2986                if resolved_service.is_valid() {
2987                    debug!("call queriers to resolve {instance}");
2988                    resolved.insert(instance.to_string());
2989                    let event = ServiceEvent::ServiceResolved(Box::new(resolved_service));
2990                    call_service_listener(&self.service_queriers, ty_domain, event);
2991                } else {
2992                    debug!("Resolved service is not valid: {instance}");
2993                    if self.resolved.remove(dns_ptr.alias()) {
2994                        removed_instances
2995                            .entry(ty_domain.to_string())
2996                            .or_insert_with(HashSet::new)
2997                            .insert(instance.to_string());
2998                    }
2999                    unresolved.insert(instance.to_string());
3000                }
3001            }
3002        }
3003
3004        for instance in resolved.drain() {
3005            self.pending_resolves.remove(&instance);
3006            self.resolved.insert(instance);
3007        }
3008
3009        for instance in unresolved.drain() {
3010            self.add_pending_resolve(instance);
3011        }
3012
3013        if !removed_instances.is_empty() {
3014            debug!(
3015                "resolve_updated_instances: removed {}",
3016                &removed_instances.len()
3017            );
3018            self.notify_service_removal(removed_instances);
3019        }
3020    }
3021
3022    /// Handle incoming query packets, figure out whether and what to respond.
3023    fn handle_query(&mut self, msg: DnsIncoming, if_index: u32, querier_ip: IpAddr) {
3024        let is_ipv4 = querier_ip.is_ipv4();
3025        let sock_opt = if is_ipv4 {
3026            &self.ipv4_sock
3027        } else {
3028            &self.ipv6_sock
3029        };
3030        let Some(sock) = sock_opt.as_ref() else {
3031            debug!("handle_query: socket not available for intf {}", if_index);
3032            return;
3033        };
3034        let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
3035
3036        // Special meta-query "_services._dns-sd._udp.<Domain>".
3037        // See https://datatracker.ietf.org/doc/html/rfc6763#section-9
3038        const META_QUERY: &str = "_services._dns-sd._udp.local.";
3039
3040        let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
3041            debug!("missing dns registry for intf {}", if_index);
3042            return;
3043        };
3044
3045        let Some(intf) = self.my_intfs.get(&if_index) else {
3046            debug!("handle_query: no intf found for index {if_index}");
3047            return;
3048        };
3049
3050        for question in msg.questions().iter() {
3051            let qtype = question.entry_type();
3052            let q_name = question.entry_name();
3053
3054            if qtype == RRType::PTR {
3055                for service in self.my_services.values() {
3056                    if service.get_status(if_index) != ServiceStatus::Announced {
3057                        continue;
3058                    }
3059
3060                    if service.matches_type_or_subtype(q_name) {
3061                        out.add_answer_with_additionals(&msg, service, intf, dns_registry, is_ipv4);
3062                    } else if q_name == META_QUERY {
3063                        let ttl = service.get_other_ttl();
3064                        let alias = service.get_type().to_string();
3065                        let ptr = DnsPointer::new(q_name, RRType::PTR, CLASS_IN, ttl, alias);
3066                        if !out.add_answer(&msg, ptr) {
3067                            trace!("answer was not added for meta-query {:?}", &question);
3068                        }
3069                    }
3070                }
3071            } else {
3072                // Simultaneous Probe Tiebreaking (RFC 6762 section 8.2)
3073                if qtype == RRType::ANY && msg.num_authorities() > 0 {
3074                    if let Some(probe) = dns_registry.probing.get_mut(q_name) {
3075                        probe.tiebreaking(&msg, q_name);
3076                    }
3077                }
3078
3079                if qtype == RRType::A || qtype == RRType::AAAA || qtype == RRType::ANY {
3080                    for service in self.my_services.values() {
3081                        if service.get_status(if_index) != ServiceStatus::Announced {
3082                            continue;
3083                        }
3084
3085                        let service_hostname = dns_registry.resolve_name(service.get_hostname());
3086
3087                        if service_hostname.to_lowercase() == question.entry_name().to_lowercase() {
3088                            let intf_addrs = if is_ipv4 {
3089                                service.get_addrs_on_my_intf_v4(intf)
3090                            } else {
3091                                service.get_addrs_on_my_intf_v6(intf)
3092                            };
3093                            if intf_addrs.is_empty()
3094                                && (qtype == RRType::A || qtype == RRType::AAAA)
3095                            {
3096                                let t = match qtype {
3097                                    RRType::A => "TYPE_A",
3098                                    RRType::AAAA => "TYPE_AAAA",
3099                                    _ => "invalid_type",
3100                                };
3101                                trace!(
3102                                    "Cannot find valid addrs for {} response on intf {:?}",
3103                                    t,
3104                                    &intf
3105                                );
3106                                return;
3107                            }
3108                            for address in intf_addrs {
3109                                out.add_answer(
3110                                    &msg,
3111                                    DnsAddress::new(
3112                                        service_hostname,
3113                                        ip_address_rr_type(&address),
3114                                        CLASS_IN | CLASS_CACHE_FLUSH,
3115                                        service.get_host_ttl(),
3116                                        address,
3117                                        intf.into(),
3118                                    ),
3119                                );
3120                            }
3121                        }
3122                    }
3123                }
3124
3125                let query_name = q_name.to_lowercase();
3126                let service_opt = self
3127                    .my_services
3128                    .iter()
3129                    .find(|(k, _v)| dns_registry.resolve_name(k.as_str()) == query_name)
3130                    .map(|(_, v)| v);
3131
3132                let Some(service) = service_opt else {
3133                    continue;
3134                };
3135
3136                if service.get_status(if_index) != ServiceStatus::Announced {
3137                    continue;
3138                }
3139
3140                let intf_addrs = if is_ipv4 {
3141                    service.get_addrs_on_my_intf_v4(intf)
3142                } else {
3143                    service.get_addrs_on_my_intf_v6(intf)
3144                };
3145                if intf_addrs.is_empty() {
3146                    debug!(
3147                        "Cannot find valid addrs for TYPE_SRV response on intf {:?}",
3148                        &intf
3149                    );
3150                    continue;
3151                }
3152
3153                add_answer_of_service(
3154                    &mut out,
3155                    &msg,
3156                    question.entry_name(),
3157                    service,
3158                    qtype,
3159                    intf_addrs,
3160                );
3161            }
3162        }
3163
3164        if out.answers_count() > 0 {
3165            debug!("sending response on intf {}", &intf.name);
3166            out.set_id(msg.id());
3167
3168            // Pick a source IfAddr on `intf` whose subnet contains the querier's IP.
3169            // It's OK if it's None, `send_dns_outgoing` will then pick one address.
3170            let matched_source = intf
3171                .addrs
3172                .iter()
3173                .find(|if_addr| valid_ip_on_intf(&querier_ip, if_addr));
3174
3175            if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
3176                send_dns_outgoing(&out, intf, &sock.pktinfo, self.port, matched_source)
3177            {
3178                let invalid_intf_addr = HashSet::from([intf_addr]);
3179                let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addr));
3180            }
3181
3182            let if_name = intf.name.clone();
3183
3184            self.increase_counter(Counter::Respond, 1);
3185            self.notify_monitors(DaemonEvent::Respond(if_name));
3186        }
3187
3188        self.increase_counter(Counter::KnownAnswerSuppression, out.known_answer_count());
3189    }
3190
3191    /// Increases the value of `counter` by `count`.
3192    fn increase_counter(&mut self, counter: Counter, count: i64) {
3193        let key = counter.to_string();
3194        match self.counters.get_mut(&key) {
3195            Some(v) => *v += count,
3196            None => {
3197                self.counters.insert(key, count);
3198            }
3199        }
3200    }
3201
3202    /// Sets the value of `counter` to `count`.
3203    fn set_counter(&mut self, counter: Counter, count: i64) {
3204        let key = counter.to_string();
3205        self.counters.insert(key, count);
3206    }
3207
3208    fn signal_sock_drain(&self) {
3209        let mut signal_buf = [0; 1024];
3210
3211        // This recv is non-blocking as the socket is non-blocking.
3212        while let Ok(sz) = self.signal_sock.recv(&mut signal_buf) {
3213            trace!(
3214                "signal socket recvd: {}",
3215                String::from_utf8_lossy(&signal_buf[0..sz])
3216            );
3217        }
3218    }
3219
3220    fn add_retransmission(&mut self, next_time: u64, command: Command) {
3221        self.retransmissions.push(ReRun { next_time, command });
3222        self.add_timer(next_time);
3223    }
3224
3225    /// Sends service removal event to listeners for expired service records.
3226    /// `expired`: map of service type domain to set of instance names.
3227    fn notify_service_removal(&self, expired: HashMap<String, HashSet<String>>) {
3228        for (ty_domain, sender) in self.service_queriers.iter() {
3229            if let Some(instances) = expired.get(ty_domain) {
3230                for instance_name in instances {
3231                    let event = ServiceEvent::ServiceRemoved(
3232                        ty_domain.to_string(),
3233                        instance_name.to_string(),
3234                    );
3235                    match sender.send(event) {
3236                        Ok(()) => debug!("notify_service_removal: sent ServiceRemoved to listener of {ty_domain}: {instance_name}"),
3237                        Err(e) => debug!("Failed to send event: {}", e),
3238                    }
3239                }
3240            }
3241        }
3242    }
3243
3244    /// The entry point that executes all commands received by the daemon.
3245    ///
3246    /// `repeating`: whether this is a retransmission.
3247    fn exec_command(&mut self, command: Command, repeating: bool) {
3248        trace!("exec_command: {:?} repeating: {}", &command, repeating);
3249        match command {
3250            Command::Browse(ty, next_delay, cache_only, listener) => {
3251                self.exec_command_browse(repeating, ty, next_delay, cache_only, listener);
3252            }
3253
3254            Command::ResolveHostname(hostname, next_delay, listener, timeout) => {
3255                self.exec_command_resolve_hostname(
3256                    repeating, hostname, next_delay, listener, timeout,
3257                );
3258            }
3259
3260            Command::Register(service_info) => {
3261                self.register_service(*service_info);
3262                self.increase_counter(Counter::Register, 1);
3263            }
3264
3265            Command::RegisterResend(fullname, intf) => {
3266                trace!("register-resend service: {fullname} on {}", &intf);
3267                if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
3268                    self.exec_command_register_resend(fullname, intf)
3269                {
3270                    let invalid_intf_addr = HashSet::from([intf_addr]);
3271                    let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addr));
3272                }
3273            }
3274
3275            Command::Unregister(fullname, resp_s) => {
3276                trace!("unregister service {} repeat {}", &fullname, &repeating);
3277                self.exec_command_unregister(repeating, fullname, resp_s);
3278            }
3279
3280            Command::UnregisterResend(packet, if_index, is_ipv4) => {
3281                self.exec_command_unregister_resend(packet, if_index, is_ipv4);
3282            }
3283
3284            Command::StopBrowse(ty_domain) => self.exec_command_stop_browse(ty_domain),
3285
3286            Command::StopResolveHostname(hostname) => {
3287                self.exec_command_stop_resolve_hostname(hostname.to_lowercase())
3288            }
3289
3290            Command::Resolve(instance, try_count) => self.exec_command_resolve(instance, try_count),
3291
3292            Command::GetMetrics(resp_s) => self.exec_command_get_metrics(resp_s),
3293
3294            Command::GetStatus(resp_s) => match resp_s.send(self.status.clone()) {
3295                Ok(()) => trace!("Sent status to the client"),
3296                Err(e) => debug!("Failed to send status: {}", e),
3297            },
3298
3299            Command::Monitor(resp_s) => {
3300                self.monitors.push(resp_s);
3301            }
3302
3303            Command::SetOption(daemon_opt) => {
3304                self.process_set_option(daemon_opt);
3305            }
3306
3307            Command::GetOption(resp_s) => {
3308                let val = DaemonOptionVal {
3309                    _service_name_len_max: self.service_name_len_max,
3310                    ip_check_interval: self.ip_check_interval,
3311                };
3312                if let Err(e) = resp_s.send(val) {
3313                    debug!("Failed to send options: {}", e);
3314                }
3315            }
3316
3317            Command::Verify(instance_fullname, timeout) => {
3318                self.exec_command_verify(instance_fullname, timeout, repeating);
3319            }
3320
3321            Command::InvalidIntfAddrs(invalid_intf_addrs) => {
3322                for intf_addr in invalid_intf_addrs {
3323                    self.del_interface_addr(&intf_addr);
3324                }
3325
3326                self.check_ip_changes();
3327            }
3328
3329            _ => {
3330                debug!("unexpected command: {:?}", &command);
3331            }
3332        }
3333    }
3334
3335    fn exec_command_get_metrics(&mut self, resp_s: Sender<HashMap<String, i64>>) {
3336        self.set_counter(Counter::CachedPTR, self.cache.ptr_count() as i64);
3337        self.set_counter(Counter::CachedSRV, self.cache.srv_count() as i64);
3338        self.set_counter(Counter::CachedAddr, self.cache.addr_count() as i64);
3339        self.set_counter(Counter::CachedTxt, self.cache.txt_count() as i64);
3340        self.set_counter(Counter::CachedNSec, self.cache.nsec_count() as i64);
3341        self.set_counter(Counter::CachedSubtype, self.cache.subtype_count() as i64);
3342        self.set_counter(Counter::Timer, self.timers.len() as i64);
3343
3344        let dns_registry_probe_count: usize = self
3345            .dns_registry_map
3346            .values()
3347            .map(|r| r.probing.len())
3348            .sum();
3349        self.set_counter(Counter::DnsRegistryProbe, dns_registry_probe_count as i64);
3350
3351        let dns_registry_active_count: usize = self
3352            .dns_registry_map
3353            .values()
3354            .map(|r| r.active.values().map(|a| a.len()).sum::<usize>())
3355            .sum();
3356        self.set_counter(Counter::DnsRegistryActive, dns_registry_active_count as i64);
3357
3358        let dns_registry_timer_count: usize = self
3359            .dns_registry_map
3360            .values()
3361            .map(|r| r.new_timers.len())
3362            .sum();
3363        self.set_counter(Counter::DnsRegistryTimer, dns_registry_timer_count as i64);
3364
3365        let dns_registry_name_change_count: usize = self
3366            .dns_registry_map
3367            .values()
3368            .map(|r| r.name_changes.len())
3369            .sum();
3370        self.set_counter(
3371            Counter::DnsRegistryNameChange,
3372            dns_registry_name_change_count as i64,
3373        );
3374
3375        // Send the metrics to the client.
3376        if let Err(e) = resp_s.send(self.counters.clone()) {
3377            debug!("Failed to send metrics: {}", e);
3378        }
3379    }
3380
3381    fn exec_command_browse(
3382        &mut self,
3383        repeating: bool,
3384        ty: String,
3385        next_delay: u32,
3386        cache_only: bool,
3387        listener: Sender<ServiceEvent>,
3388    ) {
3389        let pretty_addrs: Vec<String> = self
3390            .my_intfs
3391            .iter()
3392            .map(|(if_index, itf)| format!("{} ({if_index})", itf.name))
3393            .collect();
3394
3395        if let Err(e) = listener.send(ServiceEvent::SearchStarted(format!(
3396            "{ty} on {} interfaces [{}]",
3397            pretty_addrs.len(),
3398            pretty_addrs.join(", ")
3399        ))) {
3400            debug!(
3401                "Failed to send SearchStarted({})(repeating:{}): {}",
3402                &ty, repeating, e
3403            );
3404            return;
3405        }
3406
3407        let now = current_time_millis();
3408        if !repeating {
3409            // Binds a `listener` to querying mDNS domain type `ty`.
3410            //
3411            // If there is already a `listener`, it will be updated, i.e. overwritten.
3412            self.service_queriers.insert(ty.clone(), listener.clone());
3413
3414            // if we already have the records in our cache, just send them
3415            self.query_cache_for_service(&ty, &listener, now);
3416        }
3417
3418        if cache_only {
3419            // If cache_only is true, we do not send a query.
3420            match listener.send(ServiceEvent::SearchStopped(ty.clone())) {
3421                Ok(()) => debug!("SearchStopped sent for {}", &ty),
3422                Err(e) => debug!("Failed to send SearchStopped: {}", e),
3423            }
3424            return;
3425        }
3426
3427        self.send_query(&ty, RRType::PTR);
3428        self.increase_counter(Counter::Browse, 1);
3429
3430        let next_time = now + (next_delay * 1000) as u64;
3431        let max_delay = 60 * 60;
3432        let delay = cmp::min(next_delay * 2, max_delay);
3433        self.add_retransmission(next_time, Command::Browse(ty, delay, cache_only, listener));
3434    }
3435
3436    fn exec_command_resolve_hostname(
3437        &mut self,
3438        repeating: bool,
3439        hostname: String,
3440        next_delay: u32,
3441        listener: Sender<HostnameResolutionEvent>,
3442        timeout: Option<u64>,
3443    ) {
3444        let addr_list: Vec<_> = self.my_intfs.iter().collect();
3445        if let Err(e) = listener.send(HostnameResolutionEvent::SearchStarted(format!(
3446            "{} on addrs {:?}",
3447            &hostname, &addr_list
3448        ))) {
3449            debug!(
3450                "Failed to send ResolveStarted({})(repeating:{}): {}",
3451                &hostname, repeating, e
3452            );
3453            return;
3454        }
3455        if !repeating {
3456            self.add_hostname_resolver(hostname.to_owned(), listener.clone(), timeout);
3457            // if we already have the records in our cache, just send them
3458            self.query_cache_for_hostname(&hostname, listener.clone());
3459        }
3460
3461        self.send_query_vec(&[(&hostname, RRType::A), (&hostname, RRType::AAAA)]);
3462        self.increase_counter(Counter::ResolveHostname, 1);
3463
3464        let now = current_time_millis();
3465        let next_time = now + u64::from(next_delay) * 1000;
3466        let max_delay = 60 * 60;
3467        let delay = cmp::min(next_delay * 2, max_delay);
3468
3469        // Only add retransmission if it does not exceed the hostname resolver timeout, if any.
3470        if self
3471            .hostname_resolvers
3472            .get(&hostname)
3473            .and_then(|(_sender, timeout)| *timeout)
3474            .map(|timeout| next_time < timeout)
3475            .unwrap_or(true)
3476        {
3477            self.add_retransmission(
3478                next_time,
3479                Command::ResolveHostname(hostname, delay, listener, None),
3480            );
3481        }
3482    }
3483
3484    fn exec_command_resolve(&mut self, instance: String, try_count: u16) {
3485        let pending_query = self.query_unresolved(&instance);
3486        let max_try = 3;
3487        if pending_query && try_count < max_try {
3488            // Note that if the current try already succeeds, the next retransmission
3489            // will be no-op as the cache has been updated.
3490            let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
3491            self.add_retransmission(next_time, Command::Resolve(instance, try_count + 1));
3492        }
3493    }
3494
3495    fn exec_command_unregister(
3496        &mut self,
3497        repeating: bool,
3498        fullname: String,
3499        resp_s: Sender<UnregisterStatus>,
3500    ) {
3501        let response = match self.my_services.remove_entry(&fullname) {
3502            None => {
3503                debug!("unregister: cannot find such service {}", &fullname);
3504                UnregisterStatus::NotFound
3505            }
3506            Some((_k, info)) => {
3507                let mut timers = Vec::new();
3508
3509                for (if_index, intf) in self.my_intfs.iter() {
3510                    if let Some(sock) = self.ipv4_sock.as_ref() {
3511                        let packet = self.unregister_service(&info, intf, &sock.pktinfo);
3512                        // repeat for one time just in case some peers miss the message
3513                        if !repeating && !packet.is_empty() {
3514                            let next_time = current_time_millis() + 120;
3515                            self.retransmissions.push(ReRun {
3516                                next_time,
3517                                command: Command::UnregisterResend(packet, *if_index, true),
3518                            });
3519                            timers.push(next_time);
3520                        }
3521                    }
3522
3523                    // ipv6
3524                    if let Some(sock) = self.ipv6_sock.as_ref() {
3525                        let packet = self.unregister_service(&info, intf, &sock.pktinfo);
3526                        if !repeating && !packet.is_empty() {
3527                            let next_time = current_time_millis() + 120;
3528                            self.retransmissions.push(ReRun {
3529                                next_time,
3530                                command: Command::UnregisterResend(packet, *if_index, false),
3531                            });
3532                            timers.push(next_time);
3533                        }
3534                    }
3535                }
3536
3537                for t in timers {
3538                    self.add_timer(t);
3539                }
3540
3541                self.increase_counter(Counter::Unregister, 1);
3542                UnregisterStatus::OK
3543            }
3544        };
3545        if let Err(e) = resp_s.send(response) {
3546            debug!("unregister: failed to send response: {}", e);
3547        }
3548    }
3549
3550    fn exec_command_unregister_resend(&mut self, packet: Vec<u8>, if_index: u32, is_ipv4: bool) {
3551        let Some(intf) = self.my_intfs.get(&if_index) else {
3552            return;
3553        };
3554        let sock_opt = if is_ipv4 {
3555            &self.ipv4_sock
3556        } else {
3557            &self.ipv6_sock
3558        };
3559        let Some(sock) = sock_opt else {
3560            return;
3561        };
3562
3563        let if_addr = if is_ipv4 {
3564            match intf.next_ifaddr_v4() {
3565                Some(addr) => addr,
3566                None => return,
3567            }
3568        } else {
3569            match intf.next_ifaddr_v6() {
3570                Some(addr) => addr,
3571                None => return,
3572            }
3573        };
3574
3575        debug!("UnregisterResend from {:?}", if_addr);
3576        multicast_on_intf(
3577            &packet[..],
3578            &intf.name,
3579            intf.index,
3580            if_addr,
3581            &sock.pktinfo,
3582            self.port,
3583        );
3584
3585        self.increase_counter(Counter::UnregisterResend, 1);
3586    }
3587
3588    fn exec_command_stop_browse(&mut self, ty_domain: String) {
3589        match self.service_queriers.remove_entry(&ty_domain) {
3590            None => debug!("StopBrowse: cannot find querier for {}", &ty_domain),
3591            Some((ty, sender)) => {
3592                // Remove pending browse commands in the reruns.
3593                trace!("StopBrowse: removed queryer for {}", &ty);
3594                let mut i = 0;
3595                while i < self.retransmissions.len() {
3596                    if let Command::Browse(t, _, _, _) = &self.retransmissions[i].command {
3597                        if t == &ty {
3598                            self.retransmissions.remove(i);
3599                            trace!("StopBrowse: removed retransmission for {}", &ty);
3600                            continue;
3601                        }
3602                    }
3603                    i += 1;
3604                }
3605
3606                // Remove cache entries.
3607                self.cache.remove_service_type(&ty_domain);
3608
3609                // Notify the client.
3610                match sender.send(ServiceEvent::SearchStopped(ty_domain)) {
3611                    Ok(()) => trace!("Sent SearchStopped to the listener"),
3612                    Err(e) => debug!("Failed to send SearchStopped: {}", e),
3613                }
3614            }
3615        }
3616    }
3617
3618    fn exec_command_stop_resolve_hostname(&mut self, hostname: String) {
3619        if let Some((host, (sender, _timeout))) = self.hostname_resolvers.remove_entry(&hostname) {
3620            // Remove pending resolve commands in the reruns.
3621            trace!("StopResolve: removed queryer for {}", &host);
3622            let mut i = 0;
3623            while i < self.retransmissions.len() {
3624                if let Command::Resolve(t, _) = &self.retransmissions[i].command {
3625                    if t == &host {
3626                        self.retransmissions.remove(i);
3627                        trace!("StopResolve: removed retransmission for {}", &host);
3628                        continue;
3629                    }
3630                }
3631                i += 1;
3632            }
3633
3634            // Notify the client.
3635            match sender.send(HostnameResolutionEvent::SearchStopped(hostname)) {
3636                Ok(()) => trace!("Sent SearchStopped to the listener"),
3637                Err(e) => debug!("Failed to send SearchStopped: {}", e),
3638            }
3639        }
3640    }
3641
3642    fn exec_command_register_resend(&mut self, fullname: String, if_index: u32) -> MyResult<()> {
3643        let Some(info) = self.my_services.get_mut(&fullname) else {
3644            trace!("announce: cannot find such service {}", &fullname);
3645            return Ok(());
3646        };
3647
3648        let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
3649            return Ok(());
3650        };
3651
3652        let Some(intf) = self.my_intfs.get(&if_index) else {
3653            return Ok(());
3654        };
3655
3656        let announced_v4 = if let Some(sock) = self.ipv4_sock.as_ref() {
3657            announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo, self.port)?
3658        } else {
3659            false
3660        };
3661        let announced_v6 = if let Some(sock) = self.ipv6_sock.as_ref() {
3662            announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo, self.port)?
3663        } else {
3664            false
3665        };
3666
3667        if announced_v4 || announced_v6 {
3668            let hostname = dns_registry.resolve_name(info.get_hostname());
3669            let service_name = dns_registry.resolve_name(&fullname).to_string();
3670
3671            debug!("resend: announce service {service_name} on {}", intf.name);
3672
3673            notify_monitors(
3674                &mut self.monitors,
3675                DaemonEvent::Announce(service_name, format!("{}:{}", hostname, &intf.name)),
3676            );
3677            info.set_status(if_index, ServiceStatus::Announced);
3678        } else {
3679            debug!("register-resend should not fail");
3680        }
3681
3682        self.increase_counter(Counter::RegisterResend, 1);
3683        Ok(())
3684    }
3685
3686    fn exec_command_verify(&mut self, instance: String, timeout: Duration, repeating: bool) {
3687        /*
3688        RFC 6762 section 10.4:
3689        ...
3690        When the cache receives this hint that it should reconfirm some
3691        record, it MUST issue two or more queries for the resource record in
3692        dispute.  If no response is received within ten seconds, then, even
3693        though its TTL may indicate that it is not yet due to expire, that
3694        record SHOULD be promptly flushed from the cache.
3695        */
3696        let now = current_time_millis();
3697        let expire_at = if repeating {
3698            None
3699        } else {
3700            Some(now + timeout.as_millis() as u64)
3701        };
3702
3703        // send query for the resource records.
3704        let record_vec = self.cache.service_verify_queries(&instance, expire_at);
3705
3706        if !record_vec.is_empty() {
3707            let query_vec: Vec<(&str, RRType)> = record_vec
3708                .iter()
3709                .map(|(record, rr_type)| (record.as_str(), *rr_type))
3710                .collect();
3711            self.send_query_vec(&query_vec);
3712
3713            if let Some(new_expire) = expire_at {
3714                self.add_timer(new_expire); // ensure a check for the new expire time.
3715
3716                // schedule a resend 1 second later
3717                self.add_retransmission(now + 1000, Command::Verify(instance, timeout));
3718            }
3719        }
3720    }
3721
3722    /// Refresh cached service records with active queriers
3723    fn refresh_active_services(&mut self) {
3724        let mut query_ptr_count = 0;
3725        let mut query_srv_count = 0;
3726        let mut new_timers = HashSet::new();
3727        let mut query_addr_count = 0;
3728
3729        for (ty_domain, _sender) in self.service_queriers.iter() {
3730            let refreshed_timers = self.cache.refresh_due_ptr(ty_domain);
3731            if !refreshed_timers.is_empty() {
3732                trace!("sending refresh query for PTR: {}", ty_domain);
3733                self.send_query(ty_domain, RRType::PTR);
3734                query_ptr_count += 1;
3735                new_timers.extend(refreshed_timers);
3736            }
3737
3738            let (instances, timers) = self.cache.refresh_due_srv_txt(ty_domain);
3739            for (instance, types) in instances {
3740                trace!("sending refresh query for: {}", &instance);
3741                let query_vec = types
3742                    .into_iter()
3743                    .map(|ty| (instance.as_str(), ty))
3744                    .collect::<Vec<_>>();
3745                self.send_query_vec(&query_vec);
3746                query_srv_count += 1;
3747            }
3748            new_timers.extend(timers);
3749            let (hostnames, timers) = self.cache.refresh_due_hosts(ty_domain);
3750            for hostname in hostnames.iter() {
3751                trace!("sending refresh queries for A and AAAA:  {}", hostname);
3752                self.send_query_vec(&[(hostname, RRType::A), (hostname, RRType::AAAA)]);
3753                query_addr_count += 2;
3754            }
3755            new_timers.extend(timers);
3756        }
3757
3758        for timer in new_timers {
3759            self.add_timer(timer);
3760        }
3761
3762        self.increase_counter(Counter::CacheRefreshPTR, query_ptr_count);
3763        self.increase_counter(Counter::CacheRefreshSrvTxt, query_srv_count);
3764        self.increase_counter(Counter::CacheRefreshAddr, query_addr_count);
3765    }
3766}
3767
3768/// Adds one or more answers of a service for incoming msg and RR entry name.
3769fn add_answer_of_service(
3770    out: &mut DnsOutgoing,
3771    msg: &DnsIncoming,
3772    entry_name: &str,
3773    service: &ServiceInfo,
3774    qtype: RRType,
3775    intf_addrs: Vec<IpAddr>,
3776) {
3777    if qtype == RRType::SRV || qtype == RRType::ANY {
3778        out.add_answer(
3779            msg,
3780            DnsSrv::new(
3781                entry_name,
3782                CLASS_IN | CLASS_CACHE_FLUSH,
3783                service.get_host_ttl(),
3784                service.get_priority(),
3785                service.get_weight(),
3786                service.get_port(),
3787                service.get_hostname().to_string(),
3788            ),
3789        );
3790    }
3791
3792    if qtype == RRType::TXT || qtype == RRType::ANY {
3793        out.add_answer(
3794            msg,
3795            DnsTxt::new(
3796                entry_name,
3797                CLASS_IN | CLASS_CACHE_FLUSH,
3798                service.get_other_ttl(),
3799                service.generate_txt(),
3800            ),
3801        );
3802    }
3803
3804    if qtype == RRType::SRV {
3805        for address in intf_addrs {
3806            out.add_additional_answer(DnsAddress::new(
3807                service.get_hostname(),
3808                ip_address_rr_type(&address),
3809                CLASS_IN | CLASS_CACHE_FLUSH,
3810                service.get_host_ttl(),
3811                address,
3812                InterfaceId::default(),
3813            ));
3814        }
3815    }
3816}
3817
3818/// All possible events sent to the client from the daemon
3819/// regarding service discovery.
3820#[derive(Clone, Debug)]
3821#[non_exhaustive]
3822pub enum ServiceEvent {
3823    /// Started searching for a service type.
3824    SearchStarted(String),
3825
3826    /// Found a specific (service_type, fullname).
3827    ServiceFound(String, String),
3828
3829    /// Resolved a service instance in a ResolvedService struct.
3830    ServiceResolved(Box<ResolvedService>),
3831
3832    /// A service instance (service_type, fullname) was removed.
3833    ServiceRemoved(String, String),
3834
3835    /// Stopped searching for a service type.
3836    SearchStopped(String),
3837}
3838
3839/// All possible events sent to the client from the daemon
3840/// regarding host resolution.
3841#[derive(Clone, Debug)]
3842#[non_exhaustive]
3843pub enum HostnameResolutionEvent {
3844    /// Started searching for the ip address of a hostname.
3845    SearchStarted(String),
3846    /// One or more addresses for a hostname has been found.
3847    AddressesFound(String, HashSet<ScopedIp>),
3848    /// One or more addresses for a hostname has been removed.
3849    AddressesRemoved(String, HashSet<ScopedIp>),
3850    /// The search for the ip address of a hostname has timed out.
3851    SearchTimeout(String),
3852    /// Stopped searching for the ip address of a hostname.
3853    SearchStopped(String),
3854}
3855
3856/// Some notable events from the daemon besides [`ServiceEvent`].
3857/// These events are expected to happen infrequently.
3858#[derive(Clone, Debug)]
3859#[non_exhaustive]
3860pub enum DaemonEvent {
3861    /// Daemon unsolicitly announced a service from an interface.
3862    Announce(String, String),
3863
3864    /// Daemon encountered an error.
3865    Error(Error),
3866
3867    /// Daemon detected a new IP address from the host.
3868    IpAdd(IpAddr),
3869
3870    /// Daemon detected a IP address removed from the host.
3871    IpDel(IpAddr),
3872
3873    /// Daemon resolved a name conflict by changing one of its names.
3874    /// see [DnsNameChange] for more details.
3875    NameChange(DnsNameChange),
3876
3877    /// Send out a multicast response via an interface.
3878    Respond(String),
3879}
3880
3881/// Represents a name change due to a name conflict resolution.
3882/// See [RFC 6762 section 9](https://datatracker.ietf.org/doc/html/rfc6762#section-9)
3883#[derive(Clone, Debug)]
3884pub struct DnsNameChange {
3885    /// The original name set in `ServiceInfo` by the user.
3886    pub original: String,
3887
3888    /// A new name is created by appending a suffix after the original name.
3889    ///
3890    /// - for a service instance name, the suffix is `(N)`, where N starts at 2.
3891    /// - for a host name, the suffix is `-N`, where N starts at 2.
3892    ///
3893    /// For example:
3894    ///
3895    /// - Service name `foo._service-type._udp` becomes `foo (2)._service-type._udp`
3896    /// - Host name `foo.local.` becomes `foo-2.local.`
3897    pub new_name: String,
3898
3899    /// The resource record type
3900    pub rr_type: RRType,
3901
3902    /// The interface where the name conflict and its change happened.
3903    pub intf_name: String,
3904}
3905
3906/// Commands supported by the daemon
3907#[derive(Debug)]
3908enum Command {
3909    /// Browsing for a service type (ty_domain, next_time_delay_in_seconds, channel::sender)
3910    Browse(String, u32, bool, Sender<ServiceEvent>),
3911
3912    /// Resolve a hostname to IP addresses.
3913    ResolveHostname(String, u32, Sender<HostnameResolutionEvent>, Option<u64>), // (hostname, next_time_delay_in_seconds, sender, timeout_in_milliseconds)
3914
3915    /// Register a service
3916    Register(Box<ServiceInfo>),
3917
3918    /// Unregister a service
3919    Unregister(String, Sender<UnregisterStatus>), // (fullname)
3920
3921    /// Announce again a service to local network
3922    RegisterResend(String, u32), // (fullname)
3923
3924    /// Resend unregister packet.
3925    UnregisterResend(Vec<u8>, u32, bool), // (packet content, if_index, is_ipv4)
3926
3927    /// Stop browsing a service type
3928    StopBrowse(String), // (ty_domain)
3929
3930    /// Stop resolving a hostname
3931    StopResolveHostname(String), // (hostname)
3932
3933    /// Send query to resolve a service instance.
3934    /// This is used when a PTR record exists but SRV & TXT records are missing.
3935    Resolve(String, u16), // (service_instance_fullname, try_count)
3936
3937    /// Read the current values of the counters
3938    GetMetrics(Sender<Metrics>),
3939
3940    /// Get the current status of the daemon.
3941    GetStatus(Sender<DaemonStatus>),
3942
3943    /// Monitor noticeable events in the daemon.
3944    Monitor(Sender<DaemonEvent>),
3945
3946    SetOption(DaemonOption),
3947
3948    GetOption(Sender<DaemonOptionVal>),
3949
3950    /// Proactively confirm a DNS resource record.
3951    ///
3952    /// The intention is to check if a service name or IP address still valid
3953    /// before its TTL expires.
3954    Verify(String, Duration),
3955
3956    /// Invalidate some interface addresses.
3957    InvalidIntfAddrs(HashSet<Interface>),
3958
3959    Exit(Sender<DaemonStatus>),
3960}
3961
3962impl fmt::Display for Command {
3963    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3964        match self {
3965            Self::Browse(_, _, _, _) => write!(f, "Command Browse"),
3966            Self::ResolveHostname(_, _, _, _) => write!(f, "Command ResolveHostname"),
3967            Self::Exit(_) => write!(f, "Command Exit"),
3968            Self::GetStatus(_) => write!(f, "Command GetStatus"),
3969            Self::GetMetrics(_) => write!(f, "Command GetMetrics"),
3970            Self::Monitor(_) => write!(f, "Command Monitor"),
3971            Self::Register(_) => write!(f, "Command Register"),
3972            Self::RegisterResend(_, _) => write!(f, "Command RegisterResend"),
3973            Self::SetOption(_) => write!(f, "Command SetOption"),
3974            Self::GetOption(_) => write!(f, "Command GetOption"),
3975            Self::StopBrowse(_) => write!(f, "Command StopBrowse"),
3976            Self::StopResolveHostname(_) => write!(f, "Command StopResolveHostname"),
3977            Self::Unregister(_, _) => write!(f, "Command Unregister"),
3978            Self::UnregisterResend(_, _, _) => write!(f, "Command UnregisterResend"),
3979            Self::Resolve(_, _) => write!(f, "Command Resolve"),
3980            Self::Verify(_, _) => write!(f, "Command VerifyResource"),
3981            Self::InvalidIntfAddrs(_) => write!(f, "Command InvalidIntfAddrs"),
3982        }
3983    }
3984}
3985
3986struct DaemonOptionVal {
3987    _service_name_len_max: u8,
3988    ip_check_interval: u64,
3989}
3990
3991#[derive(Debug)]
3992enum DaemonOption {
3993    ServiceNameLenMax(u8),
3994    IpCheckInterval(u64),
3995    EnableInterface(Vec<IfKind>),
3996    DisableInterface(Vec<IfKind>),
3997    MulticastLoopV4(bool),
3998    MulticastLoopV6(bool),
3999    AcceptUnsolicited(bool),
4000    IncludeAppleP2P(bool),
4001    #[cfg(test)]
4002    TestDownInterface(String),
4003    #[cfg(test)]
4004    TestUpInterface(String),
4005}
4006
4007/// The length of Service Domain name supported in this lib.
4008const DOMAIN_LEN: usize = "._tcp.local.".len();
4009
4010/// Validate the length of "service_name" in a "_<service_name>.<domain_name>." string.
4011fn check_service_name_length(ty_domain: &str, limit: u8) -> Result<()> {
4012    if ty_domain.len() <= DOMAIN_LEN + 1 {
4013        // service name cannot be empty or only '_'.
4014        return Err(e_fmt!("Service type name cannot be empty: {}", ty_domain));
4015    }
4016
4017    let service_name_len = ty_domain.len() - DOMAIN_LEN - 1; // exclude the leading `_`
4018    if service_name_len > limit as usize {
4019        return Err(e_fmt!("Service name length must be <= {} bytes", limit));
4020    }
4021    Ok(())
4022}
4023
4024/// Checks if `name` ends with a valid domain: '._tcp.local.' or '._udp.local.'
4025fn check_domain_suffix(name: &str) -> Result<()> {
4026    if !(name.ends_with("._tcp.local.") || name.ends_with("._udp.local.")) {
4027        return Err(e_fmt!(
4028            "mDNS service {} must end with '._tcp.local.' or '._udp.local.'",
4029            name
4030        ));
4031    }
4032
4033    Ok(())
4034}
4035
4036/// Validate the service name in a fully qualified name.
4037///
4038/// A Full Name = <Instance>.<Service>.<Domain>
4039/// The only `<Domain>` supported are "._tcp.local." and "._udp.local.".
4040///
4041/// Note: this function does not check for the length of the service name.
4042/// Instead, `register_service` method will check the length.
4043fn check_service_name(fullname: &str) -> Result<()> {
4044    check_domain_suffix(fullname)?;
4045
4046    let remaining: Vec<&str> = fullname[..fullname.len() - DOMAIN_LEN].split('.').collect();
4047    let name = remaining.last().ok_or_else(|| e_fmt!("No service name"))?;
4048
4049    if &name[0..1] != "_" {
4050        return Err(e_fmt!("Service name must start with '_'"));
4051    }
4052
4053    let name = &name[1..];
4054
4055    if name.contains("--") {
4056        return Err(e_fmt!("Service name must not contain '--'"));
4057    }
4058
4059    if name.starts_with('-') || name.ends_with('-') {
4060        return Err(e_fmt!("Service name (%s) may not start or end with '-'"));
4061    }
4062
4063    let ascii_count = name.chars().filter(|c| c.is_ascii_alphabetic()).count();
4064    if ascii_count < 1 {
4065        return Err(e_fmt!(
4066            "Service name must contain at least one letter (eg: 'A-Za-z')"
4067        ));
4068    }
4069
4070    Ok(())
4071}
4072
4073/// Validate a hostname.
4074fn check_hostname(hostname: &str) -> Result<()> {
4075    if !hostname.ends_with(".local.") {
4076        return Err(e_fmt!("Hostname must end with '.local.': {hostname}"));
4077    }
4078
4079    if hostname == ".local." {
4080        return Err(e_fmt!(
4081            "The part of the hostname before '.local.' cannot be empty"
4082        ));
4083    }
4084
4085    if hostname.len() > 255 {
4086        return Err(e_fmt!("Hostname length must be <= 255 bytes"));
4087    }
4088
4089    Ok(())
4090}
4091
4092fn call_service_listener(
4093    listeners_map: &HashMap<String, Sender<ServiceEvent>>,
4094    ty_domain: &str,
4095    event: ServiceEvent,
4096) {
4097    if let Some(listener) = listeners_map.get(ty_domain) {
4098        match listener.send(event) {
4099            Ok(()) => trace!("Sent event to listener successfully"),
4100            Err(e) => debug!("Failed to send event: {}", e),
4101        }
4102    }
4103}
4104
4105fn call_hostname_resolution_listener(
4106    listeners_map: &HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>,
4107    hostname: &str,
4108    event: HostnameResolutionEvent,
4109) {
4110    let hostname_lower = hostname.to_lowercase();
4111    if let Some(listener) = listeners_map.get(&hostname_lower).map(|(l, _)| l) {
4112        match listener.send(event) {
4113            Ok(()) => trace!("Sent event to listener successfully"),
4114            Err(e) => debug!("Failed to send event: {}", e),
4115        }
4116    }
4117}
4118
4119/// Returns valid network interfaces in the host system.
4120/// Operational down interfaces are excluded.
4121/// Loopback interfaces are excluded if `with_loopback` is false.
4122fn my_ip_interfaces(with_loopback: bool) -> Vec<Interface> {
4123    my_ip_interfaces_inner(with_loopback, false)
4124}
4125
4126fn my_ip_interfaces_inner(with_loopback: bool, with_apple_p2p: bool) -> Vec<Interface> {
4127    if_addrs::get_if_addrs()
4128        .unwrap_or_default()
4129        .into_iter()
4130        .filter(|i| {
4131            i.is_oper_up()
4132                && !i.is_p2p()
4133                && (!i.is_loopback() || with_loopback)
4134                && (with_apple_p2p || !is_apple_p2p_by_name(&i.name))
4135        })
4136        .collect()
4137}
4138
4139/// Checks if the interface name indicates it's an Apple peer-to-peer interface,
4140/// which should be ignored by default.
4141fn is_apple_p2p_by_name(name: &str) -> bool {
4142    let p2p_prefixes = ["awdl", "llw"];
4143    p2p_prefixes.iter().any(|prefix| name.starts_with(prefix))
4144}
4145
4146/// Send an outgoing mDNS query or response, and returns the packet bytes.
4147/// Returns empty vec if no valid interface address is found.
4148fn send_dns_outgoing(
4149    out: &DnsOutgoing,
4150    my_intf: &MyIntf,
4151    sock: &PktInfoUdpSocket,
4152    port: u16,
4153    source: Option<&IfAddr>,
4154) -> MyResult<Vec<Vec<u8>>> {
4155    let if_name = &my_intf.name;
4156
4157    let if_addr = match source {
4158        Some(addr) => addr,
4159        None => {
4160            if sock.domain() == Domain::IPV4 {
4161                match my_intf.next_ifaddr_v4() {
4162                    Some(addr) => addr,
4163                    None => return Ok(vec![]),
4164                }
4165            } else {
4166                match my_intf.next_ifaddr_v6() {
4167                    Some(addr) => addr,
4168                    None => return Ok(vec![]),
4169                }
4170            }
4171        }
4172    };
4173
4174    send_dns_outgoing_impl(out, if_name, my_intf.index, if_addr, sock, port)
4175}
4176
4177/// Send an outgoing mDNS query or response, and returns the packet bytes.
4178fn send_dns_outgoing_impl(
4179    out: &DnsOutgoing,
4180    if_name: &str,
4181    if_index: u32,
4182    if_addr: &IfAddr,
4183    sock: &PktInfoUdpSocket,
4184    port: u16,
4185) -> MyResult<Vec<Vec<u8>>> {
4186    let qtype = if out.is_query() {
4187        "query"
4188    } else {
4189        if out.answers_count() == 0 && out.additionals().is_empty() {
4190            return Ok(vec![]); // no need to send empty response
4191        }
4192        "response"
4193    };
4194    trace!(
4195        "send {}: {} questions {} answers {} authorities {} additional",
4196        qtype,
4197        out.questions().len(),
4198        out.answers_count(),
4199        out.authorities().len(),
4200        out.additionals().len()
4201    );
4202
4203    match if_addr.ip() {
4204        IpAddr::V4(ipv4) => {
4205            if let Err(e) = sock.set_multicast_if_v4(&ipv4) {
4206                debug!(
4207                    "send_dns_outgoing: failed to set multicast interface for IPv4 {}: {}",
4208                    ipv4, e
4209                );
4210                // cannot send without a valid interface
4211                if e.kind() == std::io::ErrorKind::AddrNotAvailable {
4212                    let intf_addr = Interface {
4213                        name: if_name.to_string(),
4214                        addr: if_addr.clone(),
4215                        index: Some(if_index),
4216                        oper_status: if_addrs::IfOperStatus::Down,
4217                        is_p2p: false,
4218                        #[cfg(windows)]
4219                        adapter_name: String::new(),
4220                    };
4221                    return Err(InternalError::IntfAddrInvalid(intf_addr));
4222                }
4223                return Ok(vec![]); // non-fatal other failure
4224            }
4225        }
4226        IpAddr::V6(ipv6) => {
4227            if let Err(e) = sock.set_multicast_if_v6(if_index) {
4228                debug!(
4229                    "send_dns_outgoing: failed to set multicast interface for IPv6 {}: {}",
4230                    ipv6, e
4231                );
4232                // cannot send without a valid interface
4233                if e.kind() == std::io::ErrorKind::AddrNotAvailable {
4234                    let intf_addr = Interface {
4235                        name: if_name.to_string(),
4236                        addr: if_addr.clone(),
4237                        index: Some(if_index),
4238                        oper_status: if_addrs::IfOperStatus::Down,
4239                        is_p2p: false,
4240                        #[cfg(windows)]
4241                        adapter_name: String::new(),
4242                    };
4243                    return Err(InternalError::IntfAddrInvalid(intf_addr));
4244                }
4245                return Ok(vec![]); // non-fatal other failure
4246            }
4247        }
4248    }
4249
4250    let packet_list = out.to_data_on_wire();
4251    for packet in packet_list.iter() {
4252        multicast_on_intf(packet, if_name, if_index, if_addr, sock, port);
4253    }
4254    Ok(packet_list)
4255}
4256
4257/// Sends a multicast packet, and returns the packet bytes.
4258fn multicast_on_intf(
4259    packet: &[u8],
4260    if_name: &str,
4261    if_index: u32,
4262    if_addr: &IfAddr,
4263    socket: &PktInfoUdpSocket,
4264    port: u16,
4265) {
4266    if packet.len() > MAX_MSG_ABSOLUTE {
4267        debug!("Drop over-sized packet ({})", packet.len());
4268        return;
4269    }
4270
4271    let addr: SocketAddr = match if_addr {
4272        if_addrs::IfAddr::V4(_) => SocketAddrV4::new(GROUP_ADDR_V4, port).into(),
4273        if_addrs::IfAddr::V6(_) => {
4274            let mut sock = SocketAddrV6::new(GROUP_ADDR_V6, port, 0, 0);
4275            sock.set_scope_id(if_index); // Choose iface for multicast
4276            sock.into()
4277        }
4278    };
4279
4280    // Sends out `packet` to `addr` on the socket.
4281    let sock_addr = addr.into();
4282    match socket.send_to(packet, &sock_addr) {
4283        Ok(sz) => trace!(
4284            "sent out {} bytes on interface {} (idx {}) addr {}",
4285            sz,
4286            if_name,
4287            if_index,
4288            if_addr.ip()
4289        ),
4290        Err(e) => trace!("Failed to send to {} via {:?}: {}", addr, &if_name, e),
4291    }
4292}
4293
4294/// Returns true if `name` is a valid instance name of format:
4295/// <instance>.<service_type>.<_udp|_tcp>.local.
4296/// Note: <instance> could contain '.' as well.
4297fn valid_instance_name(name: &str) -> bool {
4298    name.split('.').count() >= 5
4299}
4300
4301fn notify_monitors(monitors: &mut Vec<Sender<DaemonEvent>>, event: DaemonEvent) {
4302    monitors.retain(|sender| {
4303        if let Err(e) = sender.try_send(event.clone()) {
4304            debug!("notify_monitors: try_send: {}", &e);
4305            if matches!(e, TrySendError::Disconnected(_)) {
4306                return false; // This monitor is dropped.
4307            }
4308        }
4309        true
4310    });
4311}
4312
4313/// Check if all unique records passed "probing", and if yes, create a packet
4314/// to announce the service.
4315fn prepare_announce(
4316    info: &ServiceInfo,
4317    intf: &MyIntf,
4318    dns_registry: &mut DnsRegistry,
4319    is_ipv4: bool,
4320) -> Option<DnsOutgoing> {
4321    let intf_addrs = if is_ipv4 {
4322        info.get_addrs_on_my_intf_v4(intf)
4323    } else {
4324        info.get_addrs_on_my_intf_v6(intf)
4325    };
4326
4327    if intf_addrs.is_empty() {
4328        debug!(
4329            "prepare_announce (ipv4: {is_ipv4}): no valid addrs on interface {}",
4330            &intf.name
4331        );
4332        return None;
4333    }
4334
4335    // check if we changed our name due to conflicts.
4336    let service_fullname = dns_registry.resolve_name(info.get_fullname());
4337
4338    debug!(
4339        "prepare to announce service {service_fullname} on {:?}",
4340        &intf_addrs
4341    );
4342
4343    let mut probing_count = 0;
4344    let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
4345    let create_time = current_time_millis() + fastrand::u64(0..250);
4346
4347    out.add_answer_at_time(
4348        DnsPointer::new(
4349            info.get_type(),
4350            RRType::PTR,
4351            CLASS_IN,
4352            info.get_other_ttl(),
4353            service_fullname.to_string(),
4354        ),
4355        0,
4356    );
4357
4358    if let Some(sub) = info.get_subtype() {
4359        trace!("Adding subdomain {}", sub);
4360        out.add_answer_at_time(
4361            DnsPointer::new(
4362                sub,
4363                RRType::PTR,
4364                CLASS_IN,
4365                info.get_other_ttl(),
4366                service_fullname.to_string(),
4367            ),
4368            0,
4369        );
4370    }
4371
4372    // SRV records.
4373    let hostname = dns_registry.resolve_name(info.get_hostname()).to_string();
4374
4375    let mut srv = DnsSrv::new(
4376        info.get_fullname(),
4377        CLASS_IN | CLASS_CACHE_FLUSH,
4378        info.get_host_ttl(),
4379        info.get_priority(),
4380        info.get_weight(),
4381        info.get_port(),
4382        hostname,
4383    );
4384
4385    if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
4386        srv.get_record_mut().set_new_name(new_name.to_string());
4387    }
4388
4389    if !info.requires_probe()
4390        || dns_registry.is_probing_done(&srv, info.get_fullname(), create_time)
4391    {
4392        out.add_answer_at_time(srv, 0);
4393    } else {
4394        probing_count += 1;
4395    }
4396
4397    // TXT records.
4398
4399    let mut txt = DnsTxt::new(
4400        info.get_fullname(),
4401        CLASS_IN | CLASS_CACHE_FLUSH,
4402        info.get_other_ttl(),
4403        info.generate_txt(),
4404    );
4405
4406    if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
4407        txt.get_record_mut().set_new_name(new_name.to_string());
4408    }
4409
4410    if !info.requires_probe()
4411        || dns_registry.is_probing_done(&txt, info.get_fullname(), create_time)
4412    {
4413        out.add_answer_at_time(txt, 0);
4414    } else {
4415        probing_count += 1;
4416    }
4417
4418    // Address records. (A and AAAA)
4419
4420    let hostname = info.get_hostname();
4421    for address in intf_addrs {
4422        let mut dns_addr = DnsAddress::new(
4423            hostname,
4424            ip_address_rr_type(&address),
4425            CLASS_IN | CLASS_CACHE_FLUSH,
4426            info.get_host_ttl(),
4427            address,
4428            intf.into(),
4429        );
4430
4431        if let Some(new_name) = dns_registry.name_changes.get(hostname) {
4432            dns_addr.get_record_mut().set_new_name(new_name.to_string());
4433        }
4434
4435        if !info.requires_probe()
4436            || dns_registry.is_probing_done(&dns_addr, info.get_fullname(), create_time)
4437        {
4438            out.add_answer_at_time(dns_addr, 0);
4439        } else {
4440            probing_count += 1;
4441        }
4442    }
4443
4444    if probing_count > 0 {
4445        return None;
4446    }
4447
4448    Some(out)
4449}
4450
4451/// Send an unsolicited response for owned service via `intf` and `sock`.
4452/// Returns true if sent out successfully for IPv4 or IPv6.
4453fn announce_service_on_intf(
4454    dns_registry: &mut DnsRegistry,
4455    info: &ServiceInfo,
4456    intf: &MyIntf,
4457    sock: &PktInfoUdpSocket,
4458    port: u16,
4459) -> MyResult<bool> {
4460    let is_ipv4 = sock.domain() == Domain::IPV4;
4461    if let Some(out) = prepare_announce(info, intf, dns_registry, is_ipv4) {
4462        let _ = send_dns_outgoing(&out, intf, sock, port, None)?;
4463        return Ok(true);
4464    }
4465
4466    Ok(false)
4467}
4468
4469/// Returns a new name based on the `original` to avoid conflicts.
4470/// If the name already contains a number in parentheses, increments that number.
4471///
4472/// Examples:
4473/// - `foo.local.` becomes `foo (2).local.`
4474/// - `foo (2).local.` becomes `foo (3).local.`
4475/// - `foo (9)` becomes `foo (10)`
4476fn name_change(original: &str) -> String {
4477    let mut parts: Vec<_> = original.split('.').collect();
4478    let Some(first_part) = parts.get_mut(0) else {
4479        return format!("{original} (2)");
4480    };
4481
4482    let mut new_name = format!("{first_part} (2)");
4483
4484    // check if there is already has `(<num>)` suffix.
4485    if let Some(paren_pos) = first_part.rfind(" (") {
4486        // Check if there's a closing parenthesis
4487        if let Some(end_paren) = first_part[paren_pos..].find(')') {
4488            let absolute_end_pos = paren_pos + end_paren;
4489            // Only process if the closing parenthesis is the last character
4490            if absolute_end_pos == first_part.len() - 1 {
4491                let num_start = paren_pos + 2; // Skip " ("
4492                                               // Try to parse the number between parentheses
4493                if let Ok(number) = first_part[num_start..absolute_end_pos].parse::<u32>() {
4494                    let base_name = &first_part[..paren_pos];
4495                    new_name = format!("{} ({})", base_name, number + 1)
4496                }
4497            }
4498        }
4499    }
4500
4501    *first_part = &new_name;
4502    parts.join(".")
4503}
4504
4505/// Returns a new name based on the `original` to avoid conflicts.
4506/// If the name already contains a hyphenated number, increments that number.
4507///
4508/// Examples:
4509/// - `foo.local.` becomes `foo-2.local.`
4510/// - `foo-2.local.` becomes `foo-3.local.`
4511/// - `foo` becomes `foo-2`
4512fn hostname_change(original: &str) -> String {
4513    let mut parts: Vec<_> = original.split('.').collect();
4514    let Some(first_part) = parts.get_mut(0) else {
4515        return format!("{original}-2");
4516    };
4517
4518    let mut new_name = format!("{first_part}-2");
4519
4520    // check if there is already a `-<num>` suffix
4521    if let Some(hyphen_pos) = first_part.rfind('-') {
4522        // Try to parse everything after the hyphen as a number
4523        if let Ok(number) = first_part[hyphen_pos + 1..].parse::<u32>() {
4524            let base_name = &first_part[..hyphen_pos];
4525            new_name = format!("{}-{}", base_name, number + 1);
4526        }
4527    }
4528
4529    *first_part = &new_name;
4530    parts.join(".")
4531}
4532
4533/// Check probes in a registry and returns: a probing packet to send out, and a list of probe names
4534/// that are finished.
4535fn check_probing(
4536    dns_registry: &mut DnsRegistry,
4537    timers: &mut BinaryHeap<Reverse<u64>>,
4538    now: u64,
4539) -> (DnsOutgoing, Vec<String>) {
4540    let mut expired_probes = Vec::new();
4541    let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
4542
4543    for (name, probe) in dns_registry.probing.iter_mut() {
4544        if now >= probe.next_send {
4545            if probe.expired(now) {
4546                // move the record to active
4547                expired_probes.push(name.clone());
4548            } else {
4549                out.add_question(name, RRType::ANY);
4550
4551                /*
4552                RFC 6762 section 8.2: https://datatracker.ietf.org/doc/html/rfc6762#section-8.2
4553                ...
4554                for tiebreaking to work correctly in all
4555                cases, the Authority Section must contain *all* the records and
4556                proposed rdata being probed for uniqueness.
4557                    */
4558                for record in probe.records.iter() {
4559                    out.add_authority(record.clone());
4560                }
4561
4562                probe.update_next_send(now);
4563
4564                // add timer
4565                timers.push(Reverse(probe.next_send));
4566            }
4567        }
4568    }
4569
4570    (out, expired_probes)
4571}
4572
4573/// Process expired probes on an interface and return a list of services
4574/// that are waiting for the probe to finish.
4575///
4576/// `DnsNameChange` events are sent to the monitors.
4577fn handle_expired_probes(
4578    expired_probes: Vec<String>,
4579    intf_name: &str,
4580    dns_registry: &mut DnsRegistry,
4581    monitors: &mut Vec<Sender<DaemonEvent>>,
4582) -> HashSet<String> {
4583    let mut waiting_services = HashSet::new();
4584
4585    for name in expired_probes {
4586        let Some(probe) = dns_registry.probing.remove(&name) else {
4587            continue;
4588        };
4589
4590        // send notifications about name changes
4591        for record in probe.records.iter() {
4592            if let Some(new_name) = record.get_record().get_new_name() {
4593                dns_registry
4594                    .name_changes
4595                    .insert(name.clone(), new_name.to_string());
4596
4597                let event = DnsNameChange {
4598                    original: record.get_record().get_original_name().to_string(),
4599                    new_name: new_name.to_string(),
4600                    rr_type: record.get_type(),
4601                    intf_name: intf_name.to_string(),
4602                };
4603                debug!("Name change event: {:?}", &event);
4604                notify_monitors(monitors, DaemonEvent::NameChange(event));
4605            }
4606        }
4607
4608        // move RR from probe to active.
4609        debug!(
4610            "probe of '{name}' finished: move {} records to active. ({} waiting services)",
4611            probe.records.len(),
4612            probe.waiting_services.len(),
4613        );
4614
4615        // Move records to active and plan to wake up services if records are not empty.
4616        if !probe.records.is_empty() {
4617            match dns_registry.active.get_mut(&name) {
4618                Some(records) => {
4619                    records.extend(probe.records);
4620                }
4621                None => {
4622                    dns_registry.active.insert(name, probe.records);
4623                }
4624            }
4625
4626            waiting_services.extend(probe.waiting_services);
4627        }
4628    }
4629
4630    waiting_services
4631}
4632
4633/// Resolves `IfKind::Addr(ip)` to `IndexV4(if_index)` or `IndexV6(if_index)`.
4634fn resolve_addr_to_index(if_kind: IfKind, interfaces: &[Interface]) -> IfKind {
4635    if let IfKind::Addr(addr) = &if_kind {
4636        if let Some(intf) = interfaces.iter().find(|intf| &intf.ip() == addr) {
4637            let if_index = intf.index.unwrap_or(0);
4638            return if addr.is_ipv4() {
4639                IfKind::IndexV4(if_index)
4640            } else {
4641                IfKind::IndexV6(if_index)
4642            };
4643        }
4644    }
4645    if_kind
4646}
4647
4648#[cfg(test)]
4649mod tests {
4650    use super::{
4651        _new_socket_bind, check_domain_suffix, check_service_name_length, hostname_change,
4652        my_ip_interfaces, name_change, send_dns_outgoing_impl, valid_instance_name,
4653        valid_ip_on_intf, HostnameResolutionEvent, MyIntf, ServiceDaemon, ServiceEvent,
4654        ServiceInfo, MDNS_PORT,
4655    };
4656    use crate::{
4657        dns_parser::{
4658            DnsIncoming, DnsOutgoing, DnsPointer, InterfaceId, RRType, ScopedIp, CLASS_IN,
4659            FLAGS_AA, FLAGS_QR_RESPONSE,
4660        },
4661        service_daemon::{add_answer_of_service, check_hostname},
4662    };
4663    use if_addrs::{IfAddr, Ifv4Addr};
4664    use std::{
4665        collections::HashSet,
4666        net::{IpAddr, Ipv4Addr},
4667        time::{Duration, SystemTime},
4668    };
4669    use test_log::test;
4670
4671    #[test]
4672    fn test_response_source_ifaddr_match() {
4673        // When an interface has multiple IPs on unrelated subnets,
4674        // handle_query should pick the IfAddr whose subnet contains the querier,
4675        // and fall back to None if none match.
4676        let ifaddr_a = IfAddr::V4(Ifv4Addr {
4677            ip: Ipv4Addr::new(192, 168, 1, 148),
4678            netmask: Ipv4Addr::new(255, 255, 255, 0),
4679            broadcast: None,
4680            prefixlen: 24,
4681        });
4682        let ifaddr_b = IfAddr::V4(Ifv4Addr {
4683            ip: Ipv4Addr::new(10, 238, 0, 51),
4684            netmask: Ipv4Addr::new(255, 255, 255, 0),
4685            broadcast: None,
4686            prefixlen: 24,
4687        });
4688
4689        let intf = MyIntf {
4690            name: "dummy0".to_string(),
4691            index: 1,
4692            addrs: HashSet::from([ifaddr_a.clone(), ifaddr_b.clone()]),
4693        };
4694
4695        let pick = |querier: IpAddr| -> Option<IfAddr> {
4696            intf.addrs
4697                .iter()
4698                .find(|a| valid_ip_on_intf(&querier, a))
4699                .cloned()
4700        };
4701
4702        assert_eq!(
4703            pick(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2))),
4704            Some(ifaddr_a)
4705        );
4706        assert_eq!(
4707            pick(IpAddr::V4(Ipv4Addr::new(10, 238, 0, 99))),
4708            Some(ifaddr_b)
4709        );
4710        // Querier not on any local subnet: fall back to None.
4711        assert_eq!(pick(IpAddr::V4(Ipv4Addr::new(172, 16, 0, 1))), None);
4712    }
4713
4714    #[test]
4715    fn test_instance_name() {
4716        assert!(valid_instance_name("my-laser._printer._tcp.local."));
4717        assert!(valid_instance_name("my-laser.._printer._tcp.local."));
4718        assert!(!valid_instance_name("_printer._tcp.local."));
4719    }
4720
4721    #[test]
4722    fn test_check_service_name_length() {
4723        let result = check_service_name_length("_tcp", 100);
4724        assert!(result.is_err());
4725        if let Err(e) = result {
4726            println!("{}", e);
4727        }
4728    }
4729
4730    #[test]
4731    fn test_check_hostname() {
4732        // valid hostnames
4733        for hostname in &[
4734            "my_host.local.",
4735            &("A".repeat(255 - ".local.".len()) + ".local."),
4736        ] {
4737            let result = check_hostname(hostname);
4738            assert!(result.is_ok());
4739        }
4740
4741        // erroneous hostnames
4742        for hostname in &[
4743            "my_host.local",
4744            ".local.",
4745            &("A".repeat(256 - ".local.".len()) + ".local."),
4746        ] {
4747            let result = check_hostname(hostname);
4748            assert!(result.is_err());
4749            if let Err(e) = result {
4750                println!("{}", e);
4751            }
4752        }
4753    }
4754
4755    #[test]
4756    fn test_check_domain_suffix() {
4757        assert!(check_domain_suffix("_missing_dot._tcp.local").is_err());
4758        assert!(check_domain_suffix("_missing_bar.tcp.local.").is_err());
4759        assert!(check_domain_suffix("_mis_spell._tpp.local.").is_err());
4760        assert!(check_domain_suffix("_mis_spell._upp.local.").is_err());
4761        assert!(check_domain_suffix("_has_dot._tcp.local.").is_ok());
4762        assert!(check_domain_suffix("_goodname._udp.local.").is_ok());
4763    }
4764
4765    #[test]
4766    fn test_service_with_temporarily_invalidated_ptr() {
4767        // Create a daemon
4768        let d = ServiceDaemon::new().expect("Failed to create daemon");
4769
4770        let service = "_test_inval_ptr._udp.local.";
4771        let host_name = "my_host_tmp_invalidated_ptr.local.";
4772        let intfs: Vec<_> = my_ip_interfaces(false);
4773        let intf_ips: Vec<_> = intfs.iter().map(|intf| intf.ip()).collect();
4774        let port = 5201;
4775        let my_service =
4776            ServiceInfo::new(service, "my_instance", host_name, &intf_ips[..], port, None)
4777                .expect("invalid service info")
4778                .enable_addr_auto();
4779        let result = d.register(my_service.clone());
4780        assert!(result.is_ok());
4781
4782        // Browse for a service
4783        let browse_chan = d.browse(service).unwrap();
4784        let timeout = Duration::from_secs(2);
4785        let mut resolved = false;
4786
4787        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4788            match event {
4789                ServiceEvent::ServiceResolved(info) => {
4790                    resolved = true;
4791                    println!("Resolved a service of {}", &info.fullname);
4792                    break;
4793                }
4794                e => {
4795                    println!("Received event {:?}", e);
4796                }
4797            }
4798        }
4799
4800        assert!(resolved);
4801
4802        println!("Stopping browse of {}", service);
4803        // Pause browsing so restarting will cause a new immediate query.
4804        // Unregistering will not work here, it will invalidate all the records.
4805        d.stop_browse(service).unwrap();
4806
4807        // Ensure the search is stopped.
4808        // Reduces the chance of receiving an answer adding the ptr back to the
4809        // cache causing the later browse to return directly from the cache.
4810        // (which invalidates what this test is trying to test for.)
4811        let mut stopped = false;
4812        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4813            match event {
4814                ServiceEvent::SearchStopped(_) => {
4815                    stopped = true;
4816                    println!("Stopped browsing service");
4817                    break;
4818                }
4819                // Other `ServiceResolved` messages may be received
4820                // here as they come from different interfaces.
4821                // That's fine for this test.
4822                e => {
4823                    println!("Received event {:?}", e);
4824                }
4825            }
4826        }
4827
4828        assert!(stopped);
4829
4830        // Invalidate the ptr from the service to the host.
4831        let invalidate_ptr_packet = DnsPointer::new(
4832            my_service.get_type(),
4833            RRType::PTR,
4834            CLASS_IN,
4835            0,
4836            my_service.get_fullname().to_string(),
4837        );
4838
4839        let mut packet_buffer = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
4840        packet_buffer.add_additional_answer(invalidate_ptr_packet);
4841
4842        for intf in intfs {
4843            let sock = _new_socket_bind(&intf, true).unwrap();
4844            send_dns_outgoing_impl(
4845                &packet_buffer,
4846                &intf.name,
4847                intf.index.unwrap_or(0),
4848                &intf.addr,
4849                &sock.pktinfo,
4850                MDNS_PORT,
4851            )
4852            .unwrap();
4853        }
4854
4855        println!(
4856            "Sent PTR record invalidation. Starting second browse for {}",
4857            service
4858        );
4859
4860        // Restart the browse to force the sender to re-send the announcements.
4861        let browse_chan = d.browse(service).unwrap();
4862
4863        resolved = false;
4864        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4865            match event {
4866                ServiceEvent::ServiceResolved(info) => {
4867                    resolved = true;
4868                    println!("Resolved a service of {}", &info.fullname);
4869                    break;
4870                }
4871                e => {
4872                    println!("Received event {:?}", e);
4873                }
4874            }
4875        }
4876
4877        assert!(resolved);
4878        d.shutdown().unwrap();
4879    }
4880
4881    #[test]
4882    fn test_expired_srv() {
4883        // construct service info
4884        let service_type = "_expired-srv._udp.local.";
4885        let instance = "test_instance";
4886        let host_name = "expired_srv_host.local.";
4887        let mut my_service = ServiceInfo::new(service_type, instance, host_name, "", 5023, None)
4888            .unwrap()
4889            .enable_addr_auto();
4890        // let fullname = my_service.get_fullname().to_string();
4891
4892        // set SRV to expire soon.
4893        let new_ttl = 3; // for testing only.
4894        my_service._set_host_ttl(new_ttl);
4895
4896        // register my service
4897        let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
4898        let result = mdns_server.register(my_service);
4899        assert!(result.is_ok());
4900
4901        let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
4902        let browse_chan = mdns_client.browse(service_type).unwrap();
4903        let timeout = Duration::from_secs(2);
4904        let mut resolved = false;
4905
4906        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4907            if let ServiceEvent::ServiceResolved(info) = event {
4908                resolved = true;
4909                println!("Resolved a service of {}", &info.fullname);
4910                break;
4911            }
4912        }
4913
4914        assert!(resolved);
4915
4916        // Exit the server so that no more responses.
4917        mdns_server.shutdown().unwrap();
4918
4919        // SRV record in the client cache will expire.
4920        let expire_timeout = Duration::from_secs(new_ttl as u64);
4921        while let Ok(event) = browse_chan.recv_timeout(expire_timeout) {
4922            if let ServiceEvent::ServiceRemoved(service_type, full_name) = event {
4923                println!("Service removed: {}: {}", &service_type, &full_name);
4924                break;
4925            }
4926        }
4927    }
4928
4929    #[test]
4930    fn test_hostname_resolution_address_removed() {
4931        // Create a mDNS server
4932        let server = ServiceDaemon::new().expect("Failed to create server");
4933        let hostname = "addr_remove_host._tcp.local.";
4934        let service_ip_addr: ScopedIp = my_ip_interfaces(false)
4935            .iter()
4936            .find(|iface| iface.ip().is_ipv4())
4937            .map(|iface| iface.into())
4938            .unwrap();
4939
4940        let mut my_service = ServiceInfo::new(
4941            "_host_res_test._tcp.local.",
4942            "my_instance",
4943            hostname,
4944            service_ip_addr.to_ip_addr(),
4945            1234,
4946            None,
4947        )
4948        .expect("invalid service info");
4949
4950        // Set a short TTL for addresses for testing.
4951        let addr_ttl = 2;
4952        my_service._set_host_ttl(addr_ttl); // Expire soon
4953
4954        server.register(my_service).unwrap();
4955
4956        // Create a mDNS client for resolving the hostname.
4957        let client = ServiceDaemon::new().expect("Failed to create client");
4958        let event_receiver = client.resolve_hostname(hostname, None).unwrap();
4959        let resolved = loop {
4960            match event_receiver.recv() {
4961                Ok(HostnameResolutionEvent::AddressesFound(found_hostname, addresses)) => {
4962                    assert!(found_hostname == hostname);
4963                    assert!(addresses.contains(&service_ip_addr));
4964                    println!("address found: {:?}", &addresses);
4965                    break true;
4966                }
4967                Ok(HostnameResolutionEvent::SearchStopped(_)) => break false,
4968                Ok(_event) => {}
4969                Err(_) => break false,
4970            }
4971        };
4972
4973        assert!(resolved);
4974
4975        // Shutdown the server so no more responses / refreshes for addresses.
4976        server.shutdown().unwrap();
4977
4978        // Wait till hostname address record expires, with 1 second grace period.
4979        let timeout = Duration::from_secs(addr_ttl as u64 + 1);
4980        let removed = loop {
4981            match event_receiver.recv_timeout(timeout) {
4982                Ok(HostnameResolutionEvent::AddressesRemoved(removed_host, addresses)) => {
4983                    assert!(removed_host == hostname);
4984                    assert!(addresses.contains(&service_ip_addr));
4985
4986                    println!(
4987                        "address removed: hostname: {} addresses: {:?}",
4988                        &hostname, &addresses
4989                    );
4990                    break true;
4991                }
4992                Ok(_event) => {}
4993                Err(_) => {
4994                    break false;
4995                }
4996            }
4997        };
4998
4999        assert!(removed);
5000
5001        client.shutdown().unwrap();
5002    }
5003
5004    #[test]
5005    fn test_refresh_ptr() {
5006        // construct service info
5007        let service_type = "_refresh-ptr._udp.local.";
5008        let instance = "test_instance";
5009        let host_name = "refresh_ptr_host.local.";
5010        let service_ip_addr = my_ip_interfaces(false)
5011            .iter()
5012            .find(|iface| iface.ip().is_ipv4())
5013            .map(|iface| iface.ip())
5014            .unwrap();
5015
5016        let mut my_service = ServiceInfo::new(
5017            service_type,
5018            instance,
5019            host_name,
5020            service_ip_addr,
5021            5023,
5022            None,
5023        )
5024        .unwrap();
5025
5026        let new_ttl = 3; // for testing only.
5027        my_service._set_other_ttl(new_ttl);
5028
5029        // register my service
5030        let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
5031        let result = mdns_server.register(my_service);
5032        assert!(result.is_ok());
5033
5034        let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
5035        let browse_chan = mdns_client.browse(service_type).unwrap();
5036        let timeout = Duration::from_millis(1500); // Give at least 1 second for the service probing.
5037        let mut resolved = false;
5038
5039        // resolve the service first.
5040        while let Ok(event) = browse_chan.recv_timeout(timeout) {
5041            if let ServiceEvent::ServiceResolved(info) = event {
5042                resolved = true;
5043                println!("Resolved a service of {}", &info.fullname);
5044                break;
5045            }
5046        }
5047
5048        assert!(resolved);
5049
5050        // wait over 80% of TTL, and refresh PTR should be sent out.
5051        let timeout = Duration::from_millis(new_ttl as u64 * 1000 * 90 / 100);
5052        while let Ok(event) = browse_chan.recv_timeout(timeout) {
5053            println!("event: {:?}", &event);
5054        }
5055
5056        // verify refresh counter.
5057        let metrics_chan = mdns_client.get_metrics().unwrap();
5058        let metrics = metrics_chan.recv_timeout(timeout).unwrap();
5059        let ptr_refresh_counter = metrics["cache-refresh-ptr"];
5060        assert_eq!(ptr_refresh_counter, 1);
5061        let srvtxt_refresh_counter = metrics["cache-refresh-srv-txt"];
5062        assert_eq!(srvtxt_refresh_counter, 1);
5063
5064        // Exit the server so that no more responses.
5065        mdns_server.shutdown().unwrap();
5066        mdns_client.shutdown().unwrap();
5067    }
5068
5069    #[test]
5070    fn test_name_change() {
5071        assert_eq!(name_change("foo.local."), "foo (2).local.");
5072        assert_eq!(name_change("foo (2).local."), "foo (3).local.");
5073        assert_eq!(name_change("foo (9).local."), "foo (10).local.");
5074        assert_eq!(name_change("foo"), "foo (2)");
5075        assert_eq!(name_change("foo (2)"), "foo (3)");
5076        assert_eq!(name_change(""), " (2)");
5077
5078        // Additional edge cases
5079        assert_eq!(name_change("foo (abc)"), "foo (abc) (2)"); // Invalid number
5080        assert_eq!(name_change("foo (2"), "foo (2 (2)"); // Missing closing parenthesis
5081        assert_eq!(name_change("foo (2) extra"), "foo (2) extra (2)"); // Extra text after number
5082    }
5083
5084    #[test]
5085    fn test_hostname_change() {
5086        assert_eq!(hostname_change("foo.local."), "foo-2.local.");
5087        assert_eq!(hostname_change("foo"), "foo-2");
5088        assert_eq!(hostname_change("foo-2.local."), "foo-3.local.");
5089        assert_eq!(hostname_change("foo-9"), "foo-10");
5090        assert_eq!(hostname_change("test-42.domain."), "test-43.domain.");
5091    }
5092
5093    #[test]
5094    fn test_add_answer_txt_ttl() {
5095        // construct a simple service info
5096        let service_type = "_test_add_answer._udp.local.";
5097        let instance = "test_instance";
5098        let host_name = "add_answer_host.local.";
5099        let service_intf = my_ip_interfaces(false)
5100            .into_iter()
5101            .find(|iface| iface.ip().is_ipv4())
5102            .unwrap();
5103        let service_ip_addr = service_intf.ip();
5104        let my_service = ServiceInfo::new(
5105            service_type,
5106            instance,
5107            host_name,
5108            service_ip_addr,
5109            5023,
5110            None,
5111        )
5112        .unwrap();
5113
5114        // construct a DnsOutgoing message
5115        let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
5116
5117        // Construct a dummy DnsIncoming message
5118        let mut dummy_data = out.to_data_on_wire();
5119        let interface_id = InterfaceId::from(&service_intf);
5120        let incoming = DnsIncoming::new(dummy_data.pop().unwrap(), interface_id).unwrap();
5121
5122        // Add an answer of TXT type for the service.
5123        let if_addrs = vec![service_intf.ip()];
5124        add_answer_of_service(
5125            &mut out,
5126            &incoming,
5127            instance,
5128            &my_service,
5129            RRType::TXT,
5130            if_addrs,
5131        );
5132
5133        // Check if the answer was added correctly
5134        assert!(
5135            out.answers_count() > 0,
5136            "No answers added to the outgoing message"
5137        );
5138
5139        // Check if the first answer is of type TXT
5140        let answer = out._answers().first().unwrap();
5141        assert_eq!(answer.0.get_type(), RRType::TXT);
5142
5143        // Check TTL is set properly for the TXT record
5144        assert_eq!(answer.0.get_record().get_ttl(), my_service.get_other_ttl());
5145    }
5146
5147    #[test]
5148    fn test_interface_flip() {
5149        // start a server
5150        let ty_domain = "_intf-flip._udp.local.";
5151        let host_name = "intf_flip.local.";
5152        let now = SystemTime::now()
5153            .duration_since(SystemTime::UNIX_EPOCH)
5154            .unwrap();
5155        let instance_name = now.as_micros().to_string(); // Create a unique name.
5156        let port = 5200;
5157
5158        // Get a single IPv4 address
5159        let (ip_addr1, intf_name) = my_ip_interfaces(false)
5160            .iter()
5161            .find(|iface| iface.ip().is_ipv4())
5162            .map(|iface| (iface.ip(), iface.name.clone()))
5163            .unwrap();
5164
5165        println!("Using interface {} with IP {}", intf_name, ip_addr1);
5166
5167        // Register the service.
5168        let service1 = ServiceInfo::new(ty_domain, &instance_name, host_name, ip_addr1, port, None)
5169            .expect("valid service info");
5170        let server1 = ServiceDaemon::new().expect("failed to start server");
5171        server1
5172            .register(service1)
5173            .expect("Failed to register service1");
5174
5175        // wait for the service announced.
5176        std::thread::sleep(Duration::from_secs(2));
5177
5178        // start a client
5179        let client = ServiceDaemon::new().expect("failed to start client");
5180
5181        let receiver = client.browse(ty_domain).unwrap();
5182
5183        let timeout = Duration::from_secs(3);
5184        let mut got_data = false;
5185
5186        while let Ok(event) = receiver.recv_timeout(timeout) {
5187            if let ServiceEvent::ServiceResolved(_) = event {
5188                println!("Received ServiceResolved event");
5189                got_data = true;
5190                break;
5191            }
5192        }
5193
5194        assert!(got_data, "Should receive ServiceResolved event");
5195
5196        // Set a short IP check interval to detect interface changes quickly.
5197        client.set_ip_check_interval(1).unwrap();
5198
5199        // Now shutdown the interface and expect the client to lose the service.
5200        println!("Shutting down interface {}", &intf_name);
5201        client.test_down_interface(&intf_name).unwrap();
5202
5203        let mut got_removed = false;
5204
5205        while let Ok(event) = receiver.recv_timeout(timeout) {
5206            if let ServiceEvent::ServiceRemoved(ty_domain, instance) = event {
5207                got_removed = true;
5208                println!("removed: {ty_domain} : {instance}");
5209                break;
5210            }
5211        }
5212        assert!(got_removed, "Should receive ServiceRemoved event");
5213
5214        println!("Bringing up interface {}", &intf_name);
5215        client.test_up_interface(&intf_name).unwrap();
5216        let mut got_data = false;
5217        while let Ok(event) = receiver.recv_timeout(timeout) {
5218            if let ServiceEvent::ServiceResolved(resolved) = event {
5219                got_data = true;
5220                println!("Received ServiceResolved: {:?}", resolved);
5221                break;
5222            }
5223        }
5224        assert!(
5225            got_data,
5226            "Should receive ServiceResolved event after interface is back up"
5227        );
5228
5229        server1.shutdown().unwrap();
5230        client.shutdown().unwrap();
5231    }
5232
5233    #[test]
5234    fn test_cache_only() {
5235        // construct service info
5236        let service_type = "_cache_only._udp.local.";
5237        let instance = "test_instance";
5238        let host_name = "cache_only_host.local.";
5239        let service_ip_addr = my_ip_interfaces(false)
5240            .iter()
5241            .find(|iface| iface.ip().is_ipv4())
5242            .map(|iface| iface.ip())
5243            .unwrap();
5244
5245        let mut my_service = ServiceInfo::new(
5246            service_type,
5247            instance,
5248            host_name,
5249            service_ip_addr,
5250            5023,
5251            None,
5252        )
5253        .unwrap();
5254
5255        let new_ttl = 3; // for testing only.
5256        my_service._set_other_ttl(new_ttl);
5257
5258        let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
5259
5260        // make a single browse request to record that we are interested in the service.  This ensures that
5261        // subsequent announcements are cached.
5262        let browse_chan = mdns_client.browse_cache(service_type).unwrap();
5263        std::thread::sleep(Duration::from_secs(2));
5264
5265        // register my service
5266        let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
5267        let result = mdns_server.register(my_service);
5268        assert!(result.is_ok());
5269
5270        let timeout = Duration::from_millis(1500); // Give at least 1 second for the service probing.
5271        let mut resolved = false;
5272
5273        // resolve the service.
5274        while let Ok(event) = browse_chan.recv_timeout(timeout) {
5275            if let ServiceEvent::ServiceResolved(info) = event {
5276                resolved = true;
5277                println!("Resolved a service of {}", &info.get_fullname());
5278                break;
5279            }
5280        }
5281
5282        assert!(resolved);
5283
5284        // Exit the server so that no more responses.
5285        mdns_server.shutdown().unwrap();
5286        mdns_client.shutdown().unwrap();
5287    }
5288
5289    #[test]
5290    fn test_cache_only_unsolicited() {
5291        let service_type = "_c_unsolicit._udp.local.";
5292        let instance = "test_instance";
5293        let host_name = "c_unsolicit_host.local.";
5294        let service_ip_addr = my_ip_interfaces(false)
5295            .iter()
5296            .find(|iface| iface.ip().is_ipv4())
5297            .map(|iface| iface.ip())
5298            .unwrap();
5299
5300        let my_service = ServiceInfo::new(
5301            service_type,
5302            instance,
5303            host_name,
5304            service_ip_addr,
5305            5023,
5306            None,
5307        )
5308        .unwrap();
5309
5310        // register my service
5311        let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
5312        let result = mdns_server.register(my_service);
5313        assert!(result.is_ok());
5314
5315        let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
5316        mdns_client.accept_unsolicited(true).unwrap();
5317
5318        // Wait a bit for the service announcements to go out, before calling browse_cache.  This ensures
5319        // that the announcements are treated as unsolicited
5320        std::thread::sleep(Duration::from_secs(2));
5321        let browse_chan = mdns_client.browse_cache(service_type).unwrap();
5322        let timeout = Duration::from_millis(1500); // Give at least 1 second for the service probing.
5323        let mut resolved = false;
5324
5325        // resolve the service.
5326        while let Ok(event) = browse_chan.recv_timeout(timeout) {
5327            if let ServiceEvent::ServiceResolved(info) = event {
5328                resolved = true;
5329                println!("Resolved a service of {}", &info.get_fullname());
5330                break;
5331            }
5332        }
5333
5334        assert!(resolved);
5335
5336        // Exit the server so that no more responses.
5337        mdns_server.shutdown().unwrap();
5338        mdns_client.shutdown().unwrap();
5339    }
5340
5341    #[test]
5342    fn test_custom_port_isolation() {
5343        // This test verifies:
5344        // 1. Daemons on a custom port can communicate with each other
5345        // 2. Daemons on different ports are isolated (no cross-talk)
5346
5347        let service_type = "_custom_port._udp.local.";
5348        let instance_custom = "custom_port_instance";
5349        let instance_default = "default_port_instance";
5350        let host_name = "custom_port_host.local.";
5351
5352        let service_ip_addr = my_ip_interfaces(false)
5353            .iter()
5354            .find(|iface| iface.ip().is_ipv4())
5355            .map(|iface| iface.ip())
5356            .expect("Test requires an IPv4 interface");
5357
5358        // Create service info for custom port (5454)
5359        let service_custom = ServiceInfo::new(
5360            service_type,
5361            instance_custom,
5362            host_name,
5363            service_ip_addr,
5364            8080,
5365            None,
5366        )
5367        .unwrap();
5368
5369        // Create service info for default port (5353)
5370        let service_default = ServiceInfo::new(
5371            service_type,
5372            instance_default,
5373            host_name,
5374            service_ip_addr,
5375            8081,
5376            None,
5377        )
5378        .unwrap();
5379
5380        // Create two daemons on custom port 5454
5381        let custom_port = 5454u16;
5382        let server_custom =
5383            ServiceDaemon::new_with_port(custom_port).expect("Failed to create custom port server");
5384        let client_custom =
5385            ServiceDaemon::new_with_port(custom_port).expect("Failed to create custom port client");
5386
5387        // Create daemon on default port (5353)
5388        let server_default = ServiceDaemon::new().expect("Failed to create default port server");
5389
5390        // Register service on custom port
5391        server_custom
5392            .register(service_custom.clone())
5393            .expect("Failed to register custom port service");
5394
5395        // Register service on default port
5396        server_default
5397            .register(service_default.clone())
5398            .expect("Failed to register default port service");
5399
5400        // Browse from custom port client
5401        let browse_custom = client_custom
5402            .browse(service_type)
5403            .expect("Failed to browse on custom port");
5404
5405        let timeout = Duration::from_secs(3);
5406        let mut found_custom = false;
5407        let mut found_default_on_custom = false;
5408
5409        // Custom port client should find the custom port service
5410        while let Ok(event) = browse_custom.recv_timeout(timeout) {
5411            if let ServiceEvent::ServiceResolved(info) = event {
5412                println!(
5413                    "Custom port client resolved: {} on port {}",
5414                    info.get_fullname(),
5415                    info.get_port()
5416                );
5417                if info.get_fullname().starts_with(instance_custom) {
5418                    found_custom = true;
5419                    assert_eq!(info.get_port(), 8080);
5420                }
5421                if info.get_fullname().starts_with(instance_default) {
5422                    found_default_on_custom = true;
5423                }
5424            }
5425        }
5426
5427        assert!(
5428            found_custom,
5429            "Custom port client should find service on custom port"
5430        );
5431        assert!(
5432            !found_default_on_custom,
5433            "Custom port client should NOT find service on default port"
5434        );
5435
5436        // Now verify the default port daemon can find its own services
5437        // but not the custom port services
5438        let client_default = ServiceDaemon::new().expect("Failed to create default port client");
5439        let browse_default = client_default
5440            .browse(service_type)
5441            .expect("Failed to browse on default port");
5442
5443        let mut found_default = false;
5444        let mut found_custom_on_default = false;
5445
5446        while let Ok(event) = browse_default.recv_timeout(timeout) {
5447            if let ServiceEvent::ServiceResolved(info) = event {
5448                println!(
5449                    "Default port client resolved: {} on port {}",
5450                    info.get_fullname(),
5451                    info.get_port()
5452                );
5453                if info.get_fullname().starts_with(instance_default) {
5454                    found_default = true;
5455                    assert_eq!(info.get_port(), 8081);
5456                }
5457                if info.get_fullname().starts_with(instance_custom) {
5458                    found_custom_on_default = true;
5459                }
5460            }
5461        }
5462
5463        assert!(
5464            found_default,
5465            "Default port client should find service on default port"
5466        );
5467        assert!(
5468            !found_custom_on_default,
5469            "Default port client should NOT find service on custom port"
5470        );
5471
5472        // Cleanup
5473        server_custom.shutdown().unwrap();
5474        client_custom.shutdown().unwrap();
5475        server_default.shutdown().unwrap();
5476        client_default.shutdown().unwrap();
5477    }
5478}