mdns_sd/
service_daemon.rs

1//! Service daemon for mDNS Service Discovery.
2
3// How DNS-based Service Discovery works in a nutshell:
4//
5// (excerpt from RFC 6763)
6// .... that a particular service instance can be
7//    described using a DNS SRV [RFC2782] and DNS TXT [RFC1035] record.
8//    The SRV record has a name of the form "<Instance>.<Service>.<Domain>"
9//    and gives the target host and port where the service instance can be
10//    reached.  The DNS TXT record of the same name gives additional
11//    information about this instance, in a structured form using key/value
12//    pairs, described in Section 6.  A client discovers the list of
13//    available instances of a given service type using a query for a DNS
14//    PTR [RFC1035] record with a name of the form "<Service>.<Domain>",
15//    which returns a set of zero or more names, which are the names of the
16//    aforementioned DNS SRV/TXT record pairs.
17//
18// Some naming conventions in this source code:
19//
20// `ty_domain` refers to service type together with domain name, i.e. <service>.<domain>.
21// Every <service> consists of two labels: service itself and "_udp." or "_tcp".
22// See RFC 6763 section 7 Service Names.
23//     for example: `_my-service._udp.local.`
24//
25// `fullname` refers to a full Service Instance Name, i.e. <instance>.<service>.<domain>
26//     for example: `my_home._my-service._udp.local.`
27//
28// In mDNS and DNS, the basic data structure is "Resource Record" (RR), where
29// in Service Discovery, the basic data structure is "Service Info". One Service Info
30// corresponds to a set of DNS Resource Records.
31#[cfg(feature = "logging")]
32use crate::log::{debug, trace};
33use crate::{
34    dns_cache::{current_time_millis, DnsCache},
35    dns_parser::{
36        ip_address_rr_type, DnsAddress, DnsEntryExt, DnsIncoming, DnsOutgoing, DnsPointer,
37        DnsRecordBox, DnsRecordExt, DnsSrv, DnsTxt, RRType, CLASS_CACHE_FLUSH, CLASS_IN, FLAGS_AA,
38        FLAGS_QR_QUERY, FLAGS_QR_RESPONSE, MAX_MSG_ABSOLUTE,
39    },
40    error::{e_fmt, Error, Result},
41    service_info::{
42        split_sub_domain, valid_ip_on_intf, DnsRegistry, Probe, ServiceInfo, ServiceStatus,
43    },
44    Receiver,
45};
46use flume::{bounded, Sender, TrySendError};
47use if_addrs::{IfAddr, Interface};
48use mio::{net::UdpSocket as MioUdpSocket, Poll};
49use socket2::Socket;
50use std::{
51    cmp::{self, Reverse},
52    collections::{BinaryHeap, HashMap, HashSet},
53    fmt,
54    net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, UdpSocket},
55    str, thread,
56    time::Duration,
57    vec,
58};
59
60/// The default max length of the service name without domain, not including the
61/// leading underscore (`_`). It is set to 15 per
62/// [RFC 6763 section 7.2](https://www.rfc-editor.org/rfc/rfc6763#section-7.2).
63pub const SERVICE_NAME_LEN_MAX_DEFAULT: u8 = 15;
64
65/// The default interval for checking IP changes automatically.
66pub const IP_CHECK_INTERVAL_IN_SECS_DEFAULT: u32 = 5;
67
68/// The default time out for [ServiceDaemon::verify] is 10 seconds, per
69/// [RFC 6762 section 10.4](https://datatracker.ietf.org/doc/html/rfc6762#section-10.4)
70pub const VERIFY_TIMEOUT_DEFAULT: Duration = Duration::from_secs(10);
71
72const MDNS_PORT: u16 = 5353;
73const GROUP_ADDR_V4: Ipv4Addr = Ipv4Addr::new(224, 0, 0, 251);
74const GROUP_ADDR_V6: Ipv6Addr = Ipv6Addr::new(0xff02, 0, 0, 0, 0, 0, 0, 0xfb);
75const LOOPBACK_V4: Ipv4Addr = Ipv4Addr::new(127, 0, 0, 1);
76
77const RESOLVE_WAIT_IN_MILLIS: u64 = 500;
78
79/// Response status code for the service `unregister` call.
80#[derive(Debug)]
81pub enum UnregisterStatus {
82    /// Unregister was successful.
83    OK,
84    /// The service was not found in the registration.
85    NotFound,
86}
87
88/// Status code for the service daemon.
89#[derive(Debug, PartialEq, Clone, Eq)]
90#[non_exhaustive]
91pub enum DaemonStatus {
92    /// The daemon is running as normal.
93    Running,
94
95    /// The daemon has been shutdown.
96    Shutdown,
97}
98
99/// Different counters included in the metrics.
100/// Currently all counters are for outgoing packets.
101#[derive(Hash, Eq, PartialEq)]
102enum Counter {
103    Register,
104    RegisterResend,
105    Unregister,
106    UnregisterResend,
107    Browse,
108    ResolveHostname,
109    Respond,
110    CacheRefreshPTR,
111    CacheRefreshSRV,
112    CacheRefreshAddr,
113    KnownAnswerSuppression,
114    CachedPTR,
115    CachedSRV,
116    CachedAddr,
117    CachedTxt,
118    CachedNSec,
119    CachedSubtype,
120    DnsRegistryProbe,
121    DnsRegistryActive,
122    DnsRegistryTimer,
123    DnsRegistryNameChange,
124}
125
126impl fmt::Display for Counter {
127    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
128        match self {
129            Self::Register => write!(f, "register"),
130            Self::RegisterResend => write!(f, "register-resend"),
131            Self::Unregister => write!(f, "unregister"),
132            Self::UnregisterResend => write!(f, "unregister-resend"),
133            Self::Browse => write!(f, "browse"),
134            Self::ResolveHostname => write!(f, "resolve-hostname"),
135            Self::Respond => write!(f, "respond"),
136            Self::CacheRefreshPTR => write!(f, "cache-refresh-ptr"),
137            Self::CacheRefreshSRV => write!(f, "cache-refresh-srv"),
138            Self::CacheRefreshAddr => write!(f, "cache-refresh-addr"),
139            Self::KnownAnswerSuppression => write!(f, "known-answer-suppression"),
140            Self::CachedPTR => write!(f, "cached-ptr"),
141            Self::CachedSRV => write!(f, "cached-srv"),
142            Self::CachedAddr => write!(f, "cached-addr"),
143            Self::CachedTxt => write!(f, "cached-txt"),
144            Self::CachedNSec => write!(f, "cached-nsec"),
145            Self::CachedSubtype => write!(f, "cached-subtype"),
146            Self::DnsRegistryProbe => write!(f, "dns-registry-probe"),
147            Self::DnsRegistryActive => write!(f, "dns-registry-active"),
148            Self::DnsRegistryTimer => write!(f, "dns-registry-timer"),
149            Self::DnsRegistryNameChange => write!(f, "dns-registry-name-change"),
150        }
151    }
152}
153
154/// The metrics is a HashMap of (name_key, i64_value).
155/// The main purpose is to help monitoring the mDNS packet traffic.
156pub type Metrics = HashMap<String, i64>;
157
158const SIGNAL_SOCK_EVENT_KEY: usize = usize::MAX - 1; // avoid to overlap with zc.poll_ids
159
160/// A daemon thread for mDNS
161///
162/// This struct provides a handle and an API to the daemon. It is cloneable.
163#[derive(Clone)]
164pub struct ServiceDaemon {
165    /// Sender handle of the channel to the daemon.
166    sender: Sender<Command>,
167
168    /// Send to this addr to signal that a `Command` is coming.
169    ///
170    /// The daemon listens on this addr together with other mDNS sockets,
171    /// to avoid busy polling the flume channel. If there is a way to poll
172    /// the channel and mDNS sockets together, then this can be removed.
173    signal_addr: SocketAddr,
174}
175
176impl ServiceDaemon {
177    /// Creates a new daemon and spawns a thread to run the daemon.
178    ///
179    /// The daemon (re)uses the default mDNS port 5353. To keep it simple, we don't
180    /// ask callers to set the port.
181    pub fn new() -> Result<Self> {
182        // Use port 0 to allow the system assign a random available port,
183        // no need for a pre-defined port number.
184        let signal_addr = SocketAddrV4::new(LOOPBACK_V4, 0);
185
186        let signal_sock = UdpSocket::bind(signal_addr)
187            .map_err(|e| e_fmt!("failed to create signal_sock for daemon: {}", e))?;
188
189        // Get the socket with the OS chosen port
190        let signal_addr = signal_sock
191            .local_addr()
192            .map_err(|e| e_fmt!("failed to get signal sock addr: {}", e))?;
193
194        // Must be nonblocking so we can listen to it together with mDNS sockets.
195        signal_sock
196            .set_nonblocking(true)
197            .map_err(|e| e_fmt!("failed to set nonblocking for signal socket: {}", e))?;
198
199        let poller = Poll::new().map_err(|e| e_fmt!("failed to create mio Poll: {e}"))?;
200
201        let (sender, receiver) = bounded(100);
202
203        // Spawn the daemon thread
204        let mio_sock = MioUdpSocket::from_std(signal_sock);
205        thread::Builder::new()
206            .name("mDNS_daemon".to_string())
207            .spawn(move || Self::daemon_thread(mio_sock, poller, receiver))
208            .map_err(|e| e_fmt!("thread builder failed to spawn: {}", e))?;
209
210        Ok(Self {
211            sender,
212            signal_addr,
213        })
214    }
215
216    /// Sends `cmd` to the daemon via its channel, and sends a signal
217    /// to its sock addr to notify.
218    fn send_cmd(&self, cmd: Command) -> Result<()> {
219        let cmd_name = cmd.to_string();
220
221        // First, send to the flume channel.
222        self.sender.try_send(cmd).map_err(|e| match e {
223            TrySendError::Full(_) => Error::Again,
224            e => e_fmt!("flume::channel::send failed: {}", e),
225        })?;
226
227        // Second, send a signal to notify the daemon.
228        let addr = SocketAddrV4::new(LOOPBACK_V4, 0);
229        let socket = UdpSocket::bind(addr)
230            .map_err(|e| e_fmt!("Failed to create socket to send signal: {}", e))?;
231        socket
232            .send_to(cmd_name.as_bytes(), self.signal_addr)
233            .map_err(|e| {
234                e_fmt!(
235                    "signal socket send_to {} ({}) failed: {}",
236                    self.signal_addr,
237                    cmd_name,
238                    e
239                )
240            })?;
241
242        Ok(())
243    }
244
245    /// Starts browsing for a specific service type.
246    ///
247    /// `service_type` must end with a valid mDNS domain: '._tcp.local.' or '._udp.local.'
248    ///
249    /// Returns a channel `Receiver` to receive events about the service. The caller
250    /// can call `.recv_async().await` on this receiver to handle events in an
251    /// async environment or call `.recv()` in a sync environment.
252    ///
253    /// When a new instance is found, the daemon automatically tries to resolve, i.e.
254    /// finding more details, i.e. SRV records and TXT records.
255    pub fn browse(&self, service_type: &str) -> Result<Receiver<ServiceEvent>> {
256        check_domain_suffix(service_type)?;
257
258        let (resp_s, resp_r) = bounded(10);
259        self.send_cmd(Command::Browse(service_type.to_string(), 1, resp_s))?;
260        Ok(resp_r)
261    }
262
263    /// Stops searching for a specific service type.
264    ///
265    /// When an error is returned, the caller should retry only when
266    /// the error is `Error::Again`, otherwise should log and move on.
267    pub fn stop_browse(&self, ty_domain: &str) -> Result<()> {
268        self.send_cmd(Command::StopBrowse(ty_domain.to_string()))
269    }
270
271    /// Starts querying for the ip addresses of a hostname.
272    ///
273    /// Returns a channel `Receiver` to receive events about the hostname.
274    /// The caller can call `.recv_async().await` on this receiver to handle events in an
275    /// async environment or call `.recv()` in a sync environment.
276    ///
277    /// The `timeout` is specified in milliseconds.
278    pub fn resolve_hostname(
279        &self,
280        hostname: &str,
281        timeout: Option<u64>,
282    ) -> Result<Receiver<HostnameResolutionEvent>> {
283        check_hostname(hostname)?;
284        let (resp_s, resp_r) = bounded(10);
285        self.send_cmd(Command::ResolveHostname(
286            hostname.to_string(),
287            1,
288            resp_s,
289            timeout,
290        ))?;
291        Ok(resp_r)
292    }
293
294    /// Stops querying for the ip addresses of a hostname.
295    ///
296    /// When an error is returned, the caller should retry only when
297    /// the error is `Error::Again`, otherwise should log and move on.
298    pub fn stop_resolve_hostname(&self, hostname: &str) -> Result<()> {
299        self.send_cmd(Command::StopResolveHostname(hostname.to_string()))
300    }
301
302    /// Registers a service provided by this host.
303    ///
304    /// If `service_info` has no addresses yet and its `addr_auto` is enabled,
305    /// this method will automatically fill in addresses from the host.
306    ///
307    /// To re-announce a service with an updated `service_info`, just call
308    /// this `register` function again. No need to call `unregister` first.
309    pub fn register(&self, service_info: ServiceInfo) -> Result<()> {
310        check_service_name(service_info.get_fullname())?;
311        check_hostname(service_info.get_hostname())?;
312
313        self.send_cmd(Command::Register(service_info))
314    }
315
316    /// Unregisters a service. This is a graceful shutdown of a service.
317    ///
318    /// Returns a channel receiver that is used to receive the status code
319    /// of the unregister.
320    ///
321    /// When an error is returned, the caller should retry only when
322    /// the error is `Error::Again`, otherwise should log and move on.
323    pub fn unregister(&self, fullname: &str) -> Result<Receiver<UnregisterStatus>> {
324        let (resp_s, resp_r) = bounded(1);
325        self.send_cmd(Command::Unregister(fullname.to_lowercase(), resp_s))?;
326        Ok(resp_r)
327    }
328
329    /// Starts to monitor events from the daemon.
330    ///
331    /// Returns a channel [`Receiver`] of [`DaemonEvent`].
332    pub fn monitor(&self) -> Result<Receiver<DaemonEvent>> {
333        let (resp_s, resp_r) = bounded(100);
334        self.send_cmd(Command::Monitor(resp_s))?;
335        Ok(resp_r)
336    }
337
338    /// Shuts down the daemon thread and returns a channel to receive the status.
339    ///
340    /// When an error is returned, the caller should retry only when
341    /// the error is `Error::Again`, otherwise should log and move on.
342    pub fn shutdown(&self) -> Result<Receiver<DaemonStatus>> {
343        let (resp_s, resp_r) = bounded(1);
344        self.send_cmd(Command::Exit(resp_s))?;
345        Ok(resp_r)
346    }
347
348    /// Returns the status of the daemon.
349    ///
350    /// When an error is returned, the caller should retry only when
351    /// the error is `Error::Again`, otherwise should consider the daemon
352    /// stopped working and move on.
353    pub fn status(&self) -> Result<Receiver<DaemonStatus>> {
354        let (resp_s, resp_r) = bounded(1);
355
356        if self.sender.is_disconnected() {
357            resp_s
358                .send(DaemonStatus::Shutdown)
359                .map_err(|e| e_fmt!("failed to send daemon status to the client: {}", e))?;
360        } else {
361            self.send_cmd(Command::GetStatus(resp_s))?;
362        }
363
364        Ok(resp_r)
365    }
366
367    /// Returns a channel receiver for the metrics, e.g. input/output counters.
368    ///
369    /// The metrics returned is a snapshot. Hence the caller should call
370    /// this method repeatedly if they want to monitor the metrics continuously.
371    pub fn get_metrics(&self) -> Result<Receiver<Metrics>> {
372        let (resp_s, resp_r) = bounded(1);
373        self.send_cmd(Command::GetMetrics(resp_s))?;
374        Ok(resp_r)
375    }
376
377    /// Change the max length allowed for a service name.
378    ///
379    /// As RFC 6763 defines a length max for a service name, a user should not call
380    /// this method unless they have to. See [`SERVICE_NAME_LEN_MAX_DEFAULT`].
381    ///
382    /// `len_max` is capped at an internal limit, which is currently 30.
383    pub fn set_service_name_len_max(&self, len_max: u8) -> Result<()> {
384        const SERVICE_NAME_LEN_MAX_LIMIT: u8 = 30; // Double the default length max.
385
386        if len_max > SERVICE_NAME_LEN_MAX_LIMIT {
387            return Err(Error::Msg(format!(
388                "service name length max {} is too large",
389                len_max
390            )));
391        }
392
393        self.send_cmd(Command::SetOption(DaemonOption::ServiceNameLenMax(len_max)))
394    }
395
396    /// Change the interval for checking IP changes automatically.
397    ///
398    /// Setting the interval to 0 disables the IP check.
399    ///
400    /// See [`IP_CHECK_INTERVAL_IN_SECS_DEFAULT`] for the default interval.
401    pub fn set_ip_check_interval(&self, interval_in_secs: u32) -> Result<()> {
402        let interval_in_millis = interval_in_secs as u64 * 1000;
403        self.send_cmd(Command::SetOption(DaemonOption::IpCheckInterval(
404            interval_in_millis,
405        )))
406    }
407
408    /// Get the current interval in seconds for checking IP changes automatically.
409    pub fn get_ip_check_interval(&self) -> Result<u32> {
410        let (resp_s, resp_r) = bounded(1);
411        self.send_cmd(Command::GetOption(resp_s))?;
412
413        let option = resp_r
414            .recv_timeout(Duration::from_secs(10))
415            .map_err(|e| e_fmt!("failed to receive ip check interval: {}", e))?;
416        let ip_check_interval_in_secs = option.ip_check_interval / 1000;
417        Ok(ip_check_interval_in_secs as u32)
418    }
419
420    /// Include interfaces that match `if_kind` for this service daemon.
421    ///
422    /// For example:
423    /// ```ignore
424    ///     daemon.enable_interface("en0")?;
425    /// ```
426    pub fn enable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
427        let if_kind_vec = if_kind.into_vec();
428        self.send_cmd(Command::SetOption(DaemonOption::EnableInterface(
429            if_kind_vec.kinds,
430        )))
431    }
432
433    /// Ignore/exclude interfaces that match `if_kind` for this daemon.
434    ///
435    /// For example:
436    /// ```ignore
437    ///     daemon.disable_interface(IfKind::IPv6)?;
438    /// ```
439    pub fn disable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
440        let if_kind_vec = if_kind.into_vec();
441        self.send_cmd(Command::SetOption(DaemonOption::DisableInterface(
442            if_kind_vec.kinds,
443        )))
444    }
445
446    /// Enable or disable the loopback for locally sent multicast packets in IPv4.
447    ///
448    /// By default, multicast loop is enabled for IPv4. When disabled, a querier will not
449    /// receive announcements from a responder on the same host.
450    ///
451    /// Reference: <https://learn.microsoft.com/en-us/windows/win32/winsock/ip-multicast-2>
452    ///
453    /// "The Winsock version of the IP_MULTICAST_LOOP option is semantically different than
454    /// the UNIX version of the IP_MULTICAST_LOOP option:
455    ///
456    /// In Winsock, the IP_MULTICAST_LOOP option applies only to the receive path.
457    /// In the UNIX version, the IP_MULTICAST_LOOP option applies to the send path."
458    ///
459    /// Which means, in order NOT to receive localhost announcements, you want to call
460    /// this API on the querier side on Windows, but on the responder side on Unix.
461    pub fn set_multicast_loop_v4(&self, on: bool) -> Result<()> {
462        self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV4(on)))
463    }
464
465    /// Enable or disable the loopback for locally sent multicast packets in IPv6.
466    ///
467    /// By default, multicast loop is enabled for IPv6. When disabled, a querier will not
468    /// receive announcements from a responder on the same host.
469    ///
470    /// Reference: <https://learn.microsoft.com/en-us/windows/win32/winsock/ip-multicast-2>
471    ///
472    /// "The Winsock version of the IP_MULTICAST_LOOP option is semantically different than
473    /// the UNIX version of the IP_MULTICAST_LOOP option:
474    ///
475    /// In Winsock, the IP_MULTICAST_LOOP option applies only to the receive path.
476    /// In the UNIX version, the IP_MULTICAST_LOOP option applies to the send path."
477    ///
478    /// Which means, in order NOT to receive localhost announcements, you want to call
479    /// this API on the querier side on Windows, but on the responder side on Unix.
480    pub fn set_multicast_loop_v6(&self, on: bool) -> Result<()> {
481        self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV6(on)))
482    }
483
484    /// Proactively confirms whether a service instance still valid.
485    ///
486    /// This call will issue queries for a service instance's SRV record and Address records.
487    ///
488    /// For `timeout`, most users should use [VERIFY_TIMEOUT_DEFAULT]
489    /// unless there is a reason not to follow RFC.
490    ///
491    /// If no response is received within `timeout`, the current resource
492    /// records will be flushed, and if needed, `ServiceRemoved` event will be
493    /// sent to active queriers.
494    ///
495    /// Reference: [RFC 6762](https://datatracker.ietf.org/doc/html/rfc6762#section-10.4)
496    pub fn verify(&self, instance_fullname: String, timeout: Duration) -> Result<()> {
497        self.send_cmd(Command::Verify(instance_fullname, timeout))
498    }
499
500    fn daemon_thread(signal_sock: MioUdpSocket, poller: Poll, receiver: Receiver<Command>) {
501        let zc = Zeroconf::new(signal_sock, poller);
502
503        if let Some(cmd) = Self::run(zc, receiver) {
504            match cmd {
505                Command::Exit(resp_s) => {
506                    // It is guaranteed that the receiver already dropped,
507                    // i.e. the daemon command channel closed.
508                    if let Err(e) = resp_s.send(DaemonStatus::Shutdown) {
509                        debug!("exit: failed to send response of shutdown: {}", e);
510                    }
511                }
512                _ => {
513                    debug!("Unexpected command: {:?}", cmd);
514                }
515            }
516        }
517    }
518
519    /// The main event loop of the daemon thread
520    ///
521    /// In each round, it will:
522    /// 1. select the listening sockets with a timeout.
523    /// 2. process the incoming packets if any.
524    /// 3. try_recv on its channel and execute commands.
525    /// 4. announce its registered services.
526    /// 5. process retransmissions if any.
527    fn run(mut zc: Zeroconf, receiver: Receiver<Command>) -> Option<Command> {
528        // Add the daemon's signal socket to the poller.
529        if let Err(e) = zc.poller.registry().register(
530            &mut zc.signal_sock,
531            mio::Token(SIGNAL_SOCK_EVENT_KEY),
532            mio::Interest::READABLE,
533        ) {
534            debug!("failed to add signal socket to the poller: {}", e);
535            return None;
536        }
537
538        // Add mDNS sockets to the poller.
539        for (intf, sock) in zc.intf_socks.iter_mut() {
540            let key =
541                Zeroconf::add_poll_impl(&mut zc.poll_ids, &mut zc.poll_id_count, intf.clone());
542
543            if let Err(e) =
544                zc.poller
545                    .registry()
546                    .register(sock, mio::Token(key), mio::Interest::READABLE)
547            {
548                debug!("add socket of {:?} to poller: {e}", intf);
549                return None;
550            }
551        }
552
553        // Setup timer for IP checks.
554        let mut next_ip_check = if zc.ip_check_interval > 0 {
555            current_time_millis() + zc.ip_check_interval
556        } else {
557            0
558        };
559
560        if next_ip_check > 0 {
561            zc.add_timer(next_ip_check);
562        }
563
564        // Start the run loop.
565
566        let mut events = mio::Events::with_capacity(1024);
567        loop {
568            let now = current_time_millis();
569
570            let earliest_timer = zc.peek_earliest_timer();
571            let timeout = earliest_timer.map(|timer| {
572                // If `timer` already passed, set `timeout` to be 1ms.
573                let millis = if timer > now { timer - now } else { 1 };
574                Duration::from_millis(millis)
575            });
576
577            // Process incoming packets, command events and optional timeout.
578            events.clear();
579            match zc.poller.poll(&mut events, timeout) {
580                Ok(_) => zc.handle_poller_events(&events),
581                Err(e) => debug!("failed to select from sockets: {}", e),
582            }
583
584            let now = current_time_millis();
585
586            // Remove the timers if already passed.
587            zc.pop_timers_till(now);
588
589            // Remove hostname resolvers with expired timeouts.
590            for hostname in zc
591                .hostname_resolvers
592                .clone()
593                .into_iter()
594                .filter(|(_, (_, timeout))| timeout.map(|t| now >= t).unwrap_or(false))
595                .map(|(hostname, _)| hostname)
596            {
597                trace!("hostname resolver timeout for {}", &hostname);
598                call_hostname_resolution_listener(
599                    &zc.hostname_resolvers,
600                    &hostname,
601                    HostnameResolutionEvent::SearchTimeout(hostname.to_owned()),
602                );
603                call_hostname_resolution_listener(
604                    &zc.hostname_resolvers,
605                    &hostname,
606                    HostnameResolutionEvent::SearchStopped(hostname.to_owned()),
607                );
608                zc.hostname_resolvers.remove(&hostname);
609            }
610
611            // process commands from the command channel
612            while let Ok(command) = receiver.try_recv() {
613                if matches!(command, Command::Exit(_)) {
614                    zc.status = DaemonStatus::Shutdown;
615                    return Some(command);
616                }
617                zc.exec_command(command, false);
618            }
619
620            // check for repeated commands and run them if their time is up.
621            let mut i = 0;
622            while i < zc.retransmissions.len() {
623                if now >= zc.retransmissions[i].next_time {
624                    let rerun = zc.retransmissions.remove(i);
625                    zc.exec_command(rerun.command, true);
626                } else {
627                    i += 1;
628                }
629            }
630
631            // Refresh cached service records with active queriers
632            zc.refresh_active_services();
633
634            // Refresh cached A/AAAA records with active queriers
635            let mut query_count = 0;
636            for (hostname, _sender) in zc.hostname_resolvers.iter() {
637                for (hostname, ip_addr) in
638                    zc.cache.refresh_due_hostname_resolutions(hostname).iter()
639                {
640                    zc.send_query(hostname, ip_address_rr_type(ip_addr));
641                    query_count += 1;
642                }
643            }
644
645            zc.increase_counter(Counter::CacheRefreshAddr, query_count);
646
647            // check and evict expired records in our cache
648            let now = current_time_millis();
649
650            // Notify service listeners about the expired records.
651            let expired_services = zc.cache.evict_expired_services(now);
652            zc.notify_service_removal(expired_services);
653
654            // Notify hostname listeners about the expired records.
655            let expired_addrs = zc.cache.evict_expired_addr(now);
656            for (hostname, addrs) in expired_addrs {
657                call_hostname_resolution_listener(
658                    &zc.hostname_resolvers,
659                    &hostname,
660                    HostnameResolutionEvent::AddressesRemoved(hostname.clone(), addrs),
661                );
662                let instances = zc.cache.get_instances_on_host(&hostname);
663                let instance_set: HashSet<String> = instances.into_iter().collect();
664                zc.resolve_updated_instances(&instance_set);
665            }
666
667            // Send out probing queries.
668            zc.probing_handler();
669
670            // check IP changes if next_ip_check is reached.
671            if now >= next_ip_check && next_ip_check > 0 {
672                next_ip_check = now + zc.ip_check_interval;
673                zc.add_timer(next_ip_check);
674
675                zc.check_ip_changes();
676            }
677        }
678    }
679}
680
681/// Creates a new UDP socket that uses `intf` to send and recv multicast.
682fn new_socket_bind(intf: &Interface, should_loop: bool) -> Result<MioUdpSocket> {
683    // Use the same socket for receiving and sending multicast packets.
684    // Such socket has to bind to INADDR_ANY or IN6ADDR_ANY.
685    let intf_ip = &intf.ip();
686    match intf_ip {
687        IpAddr::V4(ip) => {
688            let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), MDNS_PORT);
689            let sock = new_socket(addr.into(), true)?;
690
691            // Join mDNS group to receive packets.
692            sock.join_multicast_v4(&GROUP_ADDR_V4, ip)
693                .map_err(|e| e_fmt!("join multicast group on addr {}: {}", intf_ip, e))?;
694
695            // Set IP_MULTICAST_IF to send packets.
696            sock.set_multicast_if_v4(ip)
697                .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
698
699            if !should_loop {
700                sock.set_multicast_loop_v4(false)
701                    .map_err(|e| e_fmt!("failed to set multicast loop v4 for {ip}: {e}"))?;
702            }
703
704            // Test if we can send packets successfully.
705            let multicast_addr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
706            let test_packets = DnsOutgoing::new(0).to_data_on_wire();
707            for packet in test_packets {
708                sock.send_to(&packet, &multicast_addr)
709                    .map_err(|e| e_fmt!("send multicast packet on addr {}: {}", ip, e))?;
710            }
711            Ok(MioUdpSocket::from_std(UdpSocket::from(sock)))
712        }
713        IpAddr::V6(ip) => {
714            let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), MDNS_PORT, 0, 0);
715            let sock = new_socket(addr.into(), true)?;
716
717            // Join mDNS group to receive packets.
718            sock.join_multicast_v6(&GROUP_ADDR_V6, intf.index.unwrap_or(0))
719                .map_err(|e| e_fmt!("join multicast group on addr {}: {}", ip, e))?;
720
721            // Set IPV6_MULTICAST_IF to send packets.
722            sock.set_multicast_if_v6(intf.index.unwrap_or(0))
723                .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
724
725            // We are not sending multicast packets to test this socket as there might
726            // be many IPv6 interfaces on a host and could cause such send error:
727            // "No buffer space available (os error 55)".
728
729            Ok(MioUdpSocket::from_std(UdpSocket::from(sock)))
730        }
731    }
732}
733
734/// Creates a new UDP socket to bind to `port` with REUSEPORT option.
735/// `non_block` indicates whether to set O_NONBLOCK for the socket.
736fn new_socket(addr: SocketAddr, non_block: bool) -> Result<Socket> {
737    let domain = match addr {
738        SocketAddr::V4(_) => socket2::Domain::IPV4,
739        SocketAddr::V6(_) => socket2::Domain::IPV6,
740    };
741
742    let fd = Socket::new(domain, socket2::Type::DGRAM, None)
743        .map_err(|e| e_fmt!("create socket failed: {}", e))?;
744
745    fd.set_reuse_address(true)
746        .map_err(|e| e_fmt!("set ReuseAddr failed: {}", e))?;
747    #[cfg(unix)] // this is currently restricted to Unix's in socket2
748    fd.set_reuse_port(true)
749        .map_err(|e| e_fmt!("set ReusePort failed: {}", e))?;
750
751    if non_block {
752        fd.set_nonblocking(true)
753            .map_err(|e| e_fmt!("set O_NONBLOCK: {}", e))?;
754    }
755
756    fd.bind(&addr.into())
757        .map_err(|e| e_fmt!("socket bind to {} failed: {}", &addr, e))?;
758
759    trace!("new socket bind to {}", &addr);
760    Ok(fd)
761}
762
763/// Specify a UNIX timestamp in millis to run `command` for the next time.
764struct ReRun {
765    /// UNIX timestamp in millis.
766    next_time: u64,
767    command: Command,
768}
769
770/// Enum to represent the IP version.
771#[derive(Debug, Eq, Hash, PartialEq)]
772enum IpVersion {
773    V4,
774    V6,
775}
776
777/// A struct to track multicast send status for a network interface.
778#[derive(Debug, Eq, Hash, PartialEq)]
779struct MulticastSendTracker {
780    intf_index: u32,
781    ip_version: IpVersion,
782}
783
784/// Returns the multicast send tracker if the interface index is valid
785fn multicast_send_tracker(intf: &Interface) -> Option<MulticastSendTracker> {
786    match intf.index {
787        Some(index) => {
788            let ip_ver = match intf.addr {
789                IfAddr::V4(_) => IpVersion::V4,
790                IfAddr::V6(_) => IpVersion::V6,
791            };
792            Some(MulticastSendTracker {
793                intf_index: index,
794                ip_version: ip_ver,
795            })
796        }
797        None => None,
798    }
799}
800
801/// Specify kinds of interfaces. It is used to enable or to disable interfaces in the daemon.
802///
803/// Note that for ergonomic reasons, `From<&str>` and `From<IpAddr>` are implemented.
804#[derive(Debug, Clone)]
805#[non_exhaustive]
806pub enum IfKind {
807    /// All interfaces.
808    All,
809
810    /// All IPv4 interfaces.
811    IPv4,
812
813    /// All IPv6 interfaces.
814    IPv6,
815
816    /// By the interface name, for example "en0"
817    Name(String),
818
819    /// By an IPv4 or IPv6 address.
820    Addr(IpAddr),
821
822    /// 127.0.0.1 (or anything in 127.0.0.0/8), disabled by default.
823    ///
824    /// Use [ServiceDaemon::enable_interface] to support registering services on loopback interfaces,
825    /// which is required by some use cases (e.g., OSCQuery) that publish via mDNS.
826    LoopbackV4,
827
828    /// ::1/128, disabled by default.
829    LoopbackV6,
830}
831
832impl IfKind {
833    /// Checks if `intf` matches with this interface kind.
834    fn matches(&self, intf: &Interface) -> bool {
835        match self {
836            Self::All => true,
837            Self::IPv4 => intf.ip().is_ipv4(),
838            Self::IPv6 => intf.ip().is_ipv6(),
839            Self::Name(ifname) => ifname == &intf.name,
840            Self::Addr(addr) => addr == &intf.ip(),
841            Self::LoopbackV4 => intf.is_loopback() && intf.ip().is_ipv4(),
842            Self::LoopbackV6 => intf.is_loopback() && intf.ip().is_ipv6(),
843        }
844    }
845}
846
847/// The first use case of specifying an interface was to
848/// use an interface name. Hence adding this for ergonomic reasons.
849impl From<&str> for IfKind {
850    fn from(val: &str) -> Self {
851        Self::Name(val.to_string())
852    }
853}
854
855impl From<&String> for IfKind {
856    fn from(val: &String) -> Self {
857        Self::Name(val.to_string())
858    }
859}
860
861/// Still for ergonomic reasons.
862impl From<IpAddr> for IfKind {
863    fn from(val: IpAddr) -> Self {
864        Self::Addr(val)
865    }
866}
867
868/// A list of `IfKind` that can be used to match interfaces.
869pub struct IfKindVec {
870    kinds: Vec<IfKind>,
871}
872
873/// A trait that converts a type into a Vec of `IfKind`.
874pub trait IntoIfKindVec {
875    fn into_vec(self) -> IfKindVec;
876}
877
878impl<T: Into<IfKind>> IntoIfKindVec for T {
879    fn into_vec(self) -> IfKindVec {
880        let if_kind: IfKind = self.into();
881        IfKindVec {
882            kinds: vec![if_kind],
883        }
884    }
885}
886
887impl<T: Into<IfKind>> IntoIfKindVec for Vec<T> {
888    fn into_vec(self) -> IfKindVec {
889        let kinds: Vec<IfKind> = self.into_iter().map(|x| x.into()).collect();
890        IfKindVec { kinds }
891    }
892}
893
894/// Selection of interfaces.
895struct IfSelection {
896    /// The interfaces to be selected.
897    if_kind: IfKind,
898
899    /// Whether the `if_kind` should be enabled or not.
900    selected: bool,
901}
902
903/// A struct holding the state. It was inspired by `zeroconf` package in Python.
904struct Zeroconf {
905    /// Local interfaces with sockets to recv/send on these interfaces.
906    intf_socks: HashMap<Interface, MioUdpSocket>,
907
908    /// Map poll id to Interface.
909    poll_ids: HashMap<usize, Interface>,
910
911    /// Next poll id value
912    poll_id_count: usize,
913
914    /// Local registered services, keyed by service full names.
915    my_services: HashMap<String, ServiceInfo>,
916
917    /// Received DNS records.
918    cache: DnsCache,
919
920    /// Registered service records.
921    dns_registry_map: HashMap<Interface, DnsRegistry>,
922
923    /// Active "Browse" commands.
924    service_queriers: HashMap<String, Sender<ServiceEvent>>, // <ty_domain, channel::sender>
925
926    /// Active "ResolveHostname" commands.
927    ///
928    /// The timestamps are set at the future timestamp when the command should timeout.
929    /// `hostname` is case-insensitive and stored in lowercase.
930    hostname_resolvers: HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>, // <hostname, (channel::sender, UNIX timestamp in millis)>
931
932    /// All repeating transmissions.
933    retransmissions: Vec<ReRun>,
934
935    counters: Metrics,
936
937    /// Waits for incoming packets.
938    poller: Poll,
939
940    /// Channels to notify events.
941    monitors: Vec<Sender<DaemonEvent>>,
942
943    /// Options
944    service_name_len_max: u8,
945
946    /// Interval in millis to check IP address changes.
947    ip_check_interval: u64,
948
949    /// All interface selections called to the daemon.
950    if_selections: Vec<IfSelection>,
951
952    /// Socket for signaling.
953    signal_sock: MioUdpSocket,
954
955    /// Timestamps marking where we need another iteration of the run loop,
956    /// to react to events like retransmissions, cache refreshes, interface IP address changes, etc.
957    ///
958    /// When the run loop goes through a single iteration, it will
959    /// set its timeout to the earliest timer in this list.
960    timers: BinaryHeap<Reverse<u64>>,
961
962    status: DaemonStatus,
963
964    /// Service instances that are pending for resolving SRV and TXT.
965    pending_resolves: HashSet<String>,
966
967    /// Service instances that are already resolved.
968    resolved: HashSet<String>,
969
970    multicast_loop_v4: bool,
971
972    multicast_loop_v6: bool,
973}
974
975impl Zeroconf {
976    fn new(signal_sock: MioUdpSocket, poller: Poll) -> Self {
977        // Get interfaces.
978        let my_ifaddrs = my_ip_interfaces(false);
979
980        // Create a socket for every IP addr.
981        // Note: it is possible that `my_ifaddrs` contains the same IP addr with different interface names,
982        // or the same interface name with different IP addrs.
983        let mut intf_socks = HashMap::new();
984        let mut dns_registry_map = HashMap::new();
985
986        for intf in my_ifaddrs {
987            let sock = match new_socket_bind(&intf, true) {
988                Ok(s) => s,
989                Err(e) => {
990                    trace!("bind a socket to {}: {}. Skipped.", &intf.ip(), e);
991                    continue;
992                }
993            };
994
995            dns_registry_map.insert(intf.clone(), DnsRegistry::new());
996
997            intf_socks.insert(intf, sock);
998        }
999
1000        let monitors = Vec::new();
1001        let service_name_len_max = SERVICE_NAME_LEN_MAX_DEFAULT;
1002        let ip_check_interval = IP_CHECK_INTERVAL_IN_SECS_DEFAULT as u64 * 1000;
1003
1004        let timers = BinaryHeap::new();
1005
1006        // Disable loopback by default.
1007        let if_selections = vec![
1008            IfSelection {
1009                if_kind: IfKind::LoopbackV4,
1010                selected: false,
1011            },
1012            IfSelection {
1013                if_kind: IfKind::LoopbackV6,
1014                selected: false,
1015            },
1016        ];
1017
1018        let status = DaemonStatus::Running;
1019
1020        Self {
1021            intf_socks,
1022            poll_ids: HashMap::new(),
1023            poll_id_count: 0,
1024            my_services: HashMap::new(),
1025            cache: DnsCache::new(),
1026            dns_registry_map,
1027            hostname_resolvers: HashMap::new(),
1028            service_queriers: HashMap::new(),
1029            retransmissions: Vec::new(),
1030            counters: HashMap::new(),
1031            poller,
1032            monitors,
1033            service_name_len_max,
1034            ip_check_interval,
1035            if_selections,
1036            signal_sock,
1037            timers,
1038            status,
1039            pending_resolves: HashSet::new(),
1040            resolved: HashSet::new(),
1041            multicast_loop_v4: true,
1042            multicast_loop_v6: true,
1043        }
1044    }
1045
1046    fn process_set_option(&mut self, daemon_opt: DaemonOption) {
1047        match daemon_opt {
1048            DaemonOption::ServiceNameLenMax(length) => self.service_name_len_max = length,
1049            DaemonOption::IpCheckInterval(interval) => self.ip_check_interval = interval,
1050            DaemonOption::EnableInterface(if_kind) => self.enable_interface(if_kind),
1051            DaemonOption::DisableInterface(if_kind) => self.disable_interface(if_kind),
1052            DaemonOption::MulticastLoopV4(on) => self.set_multicast_loop_v4(on),
1053            DaemonOption::MulticastLoopV6(on) => self.set_multicast_loop_v6(on),
1054        }
1055    }
1056
1057    fn enable_interface(&mut self, kinds: Vec<IfKind>) {
1058        for if_kind in kinds {
1059            self.if_selections.push(IfSelection {
1060                if_kind,
1061                selected: true,
1062            });
1063        }
1064
1065        self.apply_intf_selections(my_ip_interfaces(true));
1066    }
1067
1068    fn disable_interface(&mut self, kinds: Vec<IfKind>) {
1069        for if_kind in kinds {
1070            self.if_selections.push(IfSelection {
1071                if_kind,
1072                selected: false,
1073            });
1074        }
1075
1076        self.apply_intf_selections(my_ip_interfaces(true));
1077    }
1078
1079    fn set_multicast_loop_v4(&mut self, on: bool) {
1080        for (_, sock) in self.intf_socks.iter_mut() {
1081            if let Err(e) = sock.set_multicast_loop_v4(on) {
1082                debug!("failed to set multicast loop v4: {e}");
1083            }
1084        }
1085    }
1086
1087    fn set_multicast_loop_v6(&mut self, on: bool) {
1088        for (_, sock) in self.intf_socks.iter_mut() {
1089            if let Err(e) = sock.set_multicast_loop_v6(on) {
1090                debug!("failed to set multicast loop v6: {e}");
1091            }
1092        }
1093    }
1094
1095    fn notify_monitors(&mut self, event: DaemonEvent) {
1096        // Only retain the monitors that are still connected.
1097        self.monitors.retain(|sender| {
1098            if let Err(e) = sender.try_send(event.clone()) {
1099                debug!("notify_monitors: try_send: {}", &e);
1100                if matches!(e, TrySendError::Disconnected(_)) {
1101                    return false; // This monitor is dropped.
1102                }
1103            }
1104            true
1105        });
1106    }
1107
1108    /// Remove `addr` in my services that enabled `addr_auto`.
1109    fn del_addr_in_my_services(&mut self, addr: &IpAddr) {
1110        for (_, service_info) in self.my_services.iter_mut() {
1111            if service_info.is_addr_auto() {
1112                service_info.remove_ipaddr(addr);
1113            }
1114        }
1115    }
1116
1117    /// Insert a new interface into the poll map and return key
1118    fn add_poll(&mut self, intf: Interface) -> usize {
1119        Self::add_poll_impl(&mut self.poll_ids, &mut self.poll_id_count, intf)
1120    }
1121
1122    /// Insert a new interface into the poll map and return its key.
1123    ///
1124    /// This exists to satisfy the borrow checker
1125    fn add_poll_impl(
1126        poll_ids: &mut HashMap<usize, Interface>,
1127        poll_id_count: &mut usize,
1128        intf: Interface,
1129    ) -> usize {
1130        let key = *poll_id_count;
1131        *poll_id_count += 1;
1132        let _ = (*poll_ids).insert(key, intf);
1133        key
1134    }
1135
1136    fn add_timer(&mut self, next_time: u64) {
1137        self.timers.push(Reverse(next_time));
1138    }
1139
1140    fn peek_earliest_timer(&self) -> Option<u64> {
1141        self.timers.peek().map(|Reverse(v)| *v)
1142    }
1143
1144    fn _pop_earliest_timer(&mut self) -> Option<u64> {
1145        self.timers.pop().map(|Reverse(v)| v)
1146    }
1147
1148    /// Pop all timers that are already passed till `now`.
1149    fn pop_timers_till(&mut self, now: u64) {
1150        while let Some(Reverse(v)) = self.timers.peek() {
1151            if *v > now {
1152                break;
1153            }
1154            self.timers.pop();
1155        }
1156    }
1157
1158    /// Apply all selections to `interfaces` and return the selected addresses.
1159    fn selected_addrs(&self, interfaces: Vec<Interface>) -> HashSet<IpAddr> {
1160        let intf_count = interfaces.len();
1161        let mut intf_selections = vec![true; intf_count];
1162
1163        // apply if_selections
1164        for selection in self.if_selections.iter() {
1165            // Mark the interfaces for this selection.
1166            for i in 0..intf_count {
1167                if selection.if_kind.matches(&interfaces[i]) {
1168                    intf_selections[i] = selection.selected;
1169                }
1170            }
1171        }
1172
1173        let mut selected_addrs = HashSet::new();
1174        for i in 0..intf_count {
1175            if intf_selections[i] {
1176                selected_addrs.insert(interfaces[i].addr.ip());
1177            }
1178        }
1179
1180        selected_addrs
1181    }
1182
1183    /// Apply all selections to `interfaces`.
1184    ///
1185    /// For any interface, add it if selected but not bound yet,
1186    /// delete it if not selected but still bound.
1187    fn apply_intf_selections(&mut self, interfaces: Vec<Interface>) {
1188        // By default, we enable all interfaces.
1189        let intf_count = interfaces.len();
1190        let mut intf_selections = vec![true; intf_count];
1191
1192        // apply if_selections
1193        for selection in self.if_selections.iter() {
1194            // Mark the interfaces for this selection.
1195            for i in 0..intf_count {
1196                if selection.if_kind.matches(&interfaces[i]) {
1197                    intf_selections[i] = selection.selected;
1198                }
1199            }
1200        }
1201
1202        // Update `intf_socks` based on the selections.
1203        for (idx, intf) in interfaces.into_iter().enumerate() {
1204            if intf_selections[idx] {
1205                // Add the interface
1206                if !self.intf_socks.contains_key(&intf) {
1207                    debug!("apply_intf_selections: add {:?}", &intf.ip());
1208                    self.add_new_interface(intf);
1209                }
1210            } else {
1211                // Remove the interface
1212                if let Some(mut sock) = self.intf_socks.remove(&intf) {
1213                    match self.poller.registry().deregister(&mut sock) {
1214                        Ok(()) => debug!("apply_intf_selections: deregister {:?}", &intf.ip()),
1215                        Err(e) => debug!("apply_intf_selections: poller.delete {:?}: {}", &intf, e),
1216                    }
1217
1218                    // Remove from poll_ids
1219                    self.poll_ids.retain(|_, v| v != &intf);
1220
1221                    // Remove cache records for this interface.
1222                    self.cache.remove_addrs_on_disabled_intf(&intf);
1223                }
1224            }
1225        }
1226    }
1227
1228    /// Check for IP changes and update intf_socks as needed.
1229    fn check_ip_changes(&mut self) {
1230        // Get the current interfaces.
1231        let my_ifaddrs = my_ip_interfaces(true);
1232
1233        let poll_ids = &mut self.poll_ids;
1234        let poller = &mut self.poller;
1235        // Remove unused sockets in the poller.
1236        let deleted_addrs = self
1237            .intf_socks
1238            .iter_mut()
1239            .filter_map(|(intf, sock)| {
1240                if !my_ifaddrs.contains(intf) {
1241                    if let Err(e) = poller.registry().deregister(sock) {
1242                        debug!("check_ip_changes: poller.delete {:?}: {}", intf, e);
1243                    }
1244                    // Remove from poll_ids
1245                    poll_ids.retain(|_, v| v != intf);
1246                    Some(intf.ip())
1247                } else {
1248                    None
1249                }
1250            })
1251            .collect::<Vec<IpAddr>>();
1252
1253        // Remove deleted addrs from my services that enabled `addr_auto`.
1254        for ip in deleted_addrs.iter() {
1255            self.del_addr_in_my_services(ip);
1256            self.notify_monitors(DaemonEvent::IpDel(*ip));
1257        }
1258
1259        // Keep the interfaces only if they still exist.
1260        self.intf_socks.retain(|intf, _| my_ifaddrs.contains(intf));
1261
1262        // Add newly found interfaces only if in our selections.
1263        self.apply_intf_selections(my_ifaddrs);
1264    }
1265
1266    fn add_new_interface(&mut self, intf: Interface) {
1267        // Bind the new interface.
1268        let new_ip = intf.ip();
1269        let should_loop = if new_ip.is_ipv4() {
1270            self.multicast_loop_v4
1271        } else {
1272            self.multicast_loop_v6
1273        };
1274        let mut sock = match new_socket_bind(&intf, should_loop) {
1275            Ok(s) => s,
1276            Err(e) => {
1277                debug!("bind a socket to {}: {}. Skipped.", &intf.ip(), e);
1278                return;
1279            }
1280        };
1281
1282        // Add the new interface into the poller.
1283        let key = self.add_poll(intf.clone());
1284        if let Err(e) =
1285            self.poller
1286                .registry()
1287                .register(&mut sock, mio::Token(key), mio::Interest::READABLE)
1288        {
1289            debug!("check_ip_changes: poller add ip {}: {}", new_ip, e);
1290            return;
1291        }
1292
1293        debug!("add new interface {}: {new_ip}", intf.name);
1294        let dns_registry = match self.dns_registry_map.get_mut(&intf) {
1295            Some(registry) => registry,
1296            None => self
1297                .dns_registry_map
1298                .entry(intf.clone())
1299                .or_insert_with(DnsRegistry::new),
1300        };
1301
1302        for (_, service_info) in self.my_services.iter_mut() {
1303            if service_info.is_addr_auto() {
1304                service_info.insert_ipaddr(new_ip);
1305
1306                if announce_service_on_intf(dns_registry, service_info, &intf, &sock) {
1307                    debug!(
1308                        "Announce service {} on {}",
1309                        service_info.get_fullname(),
1310                        intf.ip()
1311                    );
1312                    service_info.set_status(&intf, ServiceStatus::Announced);
1313                } else {
1314                    for timer in dns_registry.new_timers.drain(..) {
1315                        self.timers.push(Reverse(timer));
1316                    }
1317                    service_info.set_status(&intf, ServiceStatus::Probing);
1318                }
1319            }
1320        }
1321
1322        self.intf_socks.insert(intf, sock);
1323
1324        // Notify the monitors.
1325        self.notify_monitors(DaemonEvent::IpAdd(new_ip));
1326    }
1327
1328    /// Registers a service.
1329    ///
1330    /// RFC 6762 section 8.3.
1331    /// ...the Multicast DNS responder MUST send
1332    ///    an unsolicited Multicast DNS response containing, in the Answer
1333    ///    Section, all of its newly registered resource records
1334    ///
1335    /// Zeroconf will then respond to requests for information about this service.
1336    fn register_service(&mut self, mut info: ServiceInfo) {
1337        // Check the service name length.
1338        if let Err(e) = check_service_name_length(info.get_type(), self.service_name_len_max) {
1339            debug!("check_service_name_length: {}", &e);
1340            self.notify_monitors(DaemonEvent::Error(e));
1341            return;
1342        }
1343
1344        if info.is_addr_auto() {
1345            let selected_addrs = self.selected_addrs(my_ip_interfaces(true));
1346            for addr in selected_addrs {
1347                info.insert_ipaddr(addr);
1348            }
1349        }
1350
1351        debug!("register service {:?}", &info);
1352
1353        let outgoing_addrs = self.send_unsolicited_response(&mut info);
1354        if !outgoing_addrs.is_empty() {
1355            self.notify_monitors(DaemonEvent::Announce(
1356                info.get_fullname().to_string(),
1357                format!("{:?}", &outgoing_addrs),
1358            ));
1359        }
1360
1361        // The key has to be lower case letter as DNS record name is case insensitive.
1362        // The info will have the original name.
1363        let service_fullname = info.get_fullname().to_lowercase();
1364        self.my_services.insert(service_fullname, info);
1365    }
1366
1367    /// Sends out announcement of `info` on every valid interface.
1368    /// Returns the list of interface IPs that sent out the announcement.
1369    fn send_unsolicited_response(&mut self, info: &mut ServiceInfo) -> Vec<IpAddr> {
1370        let mut outgoing_addrs = Vec::new();
1371        // Send the announcement on one interface per ip version.
1372        let mut multicast_sent_trackers = HashSet::new();
1373
1374        let mut outgoing_intfs = Vec::new();
1375
1376        for (intf, sock) in self.intf_socks.iter() {
1377            if let Some(tracker) = multicast_send_tracker(intf) {
1378                if multicast_sent_trackers.contains(&tracker) {
1379                    continue; // No need to send again on the same interface with same ip version.
1380                }
1381            }
1382
1383            let dns_registry = match self.dns_registry_map.get_mut(intf) {
1384                Some(registry) => registry,
1385                None => self
1386                    .dns_registry_map
1387                    .entry(intf.clone())
1388                    .or_insert_with(DnsRegistry::new),
1389            };
1390
1391            if announce_service_on_intf(dns_registry, info, intf, sock) {
1392                if let Some(tracker) = multicast_send_tracker(intf) {
1393                    multicast_sent_trackers.insert(tracker);
1394                }
1395                outgoing_addrs.push(intf.ip());
1396                outgoing_intfs.push(intf.clone());
1397
1398                debug!("Announce service {} on {}", info.get_fullname(), intf.ip());
1399
1400                info.set_status(intf, ServiceStatus::Announced);
1401            } else {
1402                for timer in dns_registry.new_timers.drain(..) {
1403                    self.timers.push(Reverse(timer));
1404                }
1405                info.set_status(intf, ServiceStatus::Probing);
1406            }
1407        }
1408
1409        // RFC 6762 section 8.3.
1410        // ..The Multicast DNS responder MUST send at least two unsolicited
1411        //    responses, one second apart.
1412        let next_time = current_time_millis() + 1000;
1413        for intf in outgoing_intfs {
1414            self.add_retransmission(
1415                next_time,
1416                Command::RegisterResend(info.get_fullname().to_string(), intf),
1417            );
1418        }
1419
1420        outgoing_addrs
1421    }
1422
1423    /// Send probings or finish them if expired. Notify waiting services.
1424    fn probing_handler(&mut self) {
1425        let now = current_time_millis();
1426
1427        for (intf, sock) in self.intf_socks.iter() {
1428            let Some(dns_registry) = self.dns_registry_map.get_mut(intf) else {
1429                continue;
1430            };
1431
1432            let mut expired_probe_names = Vec::new();
1433            let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
1434
1435            for (name, probe) in dns_registry.probing.iter_mut() {
1436                if now >= probe.next_send {
1437                    if probe.expired(now) {
1438                        // move the record to active
1439                        expired_probe_names.push(name.clone());
1440                    } else {
1441                        out.add_question(name, RRType::ANY);
1442
1443                        /*
1444                        RFC 6762 section 8.2: https://datatracker.ietf.org/doc/html/rfc6762#section-8.2
1445                        ...
1446                        for tiebreaking to work correctly in all
1447                        cases, the Authority Section must contain *all* the records and
1448                        proposed rdata being probed for uniqueness.
1449                         */
1450                        for record in probe.records.iter() {
1451                            out.add_authority(record.clone());
1452                        }
1453
1454                        probe.update_next_send(now);
1455
1456                        // add timer
1457                        self.timers.push(Reverse(probe.next_send));
1458                    }
1459                }
1460            }
1461
1462            // send probing.
1463            if !out.questions().is_empty() {
1464                debug!("sending out probing of {} questions", out.questions().len());
1465                send_dns_outgoing(&out, intf, sock);
1466            }
1467
1468            let mut waiting_services = HashSet::new();
1469
1470            for name in expired_probe_names {
1471                let Some(probe) = dns_registry.probing.remove(&name) else {
1472                    continue;
1473                };
1474
1475                // send notifications about name changes
1476                for record in probe.records.iter() {
1477                    if let Some(new_name) = record.get_record().get_new_name() {
1478                        dns_registry
1479                            .name_changes
1480                            .insert(name.clone(), new_name.to_string());
1481
1482                        let event = DnsNameChange {
1483                            original: record.get_record().get_original_name().to_string(),
1484                            new_name: new_name.to_string(),
1485                            rr_type: record.get_type(),
1486                            intf_name: intf.name.to_string(),
1487                        };
1488                        notify_monitors(&mut self.monitors, DaemonEvent::NameChange(event));
1489                    }
1490                }
1491
1492                // move RR from probe to active.
1493                debug!(
1494                    "probe of '{name}' finished: move {} records to active. ({} waiting services)",
1495                    probe.records.len(),
1496                    probe.waiting_services.len(),
1497                );
1498
1499                // Move records to active and plan to wake up services if records are not empty.
1500                if !probe.records.is_empty() {
1501                    match dns_registry.active.get_mut(&name) {
1502                        Some(records) => {
1503                            records.extend(probe.records);
1504                        }
1505                        None => {
1506                            dns_registry.active.insert(name, probe.records);
1507                        }
1508                    }
1509
1510                    waiting_services.extend(probe.waiting_services);
1511                }
1512            }
1513
1514            // wake up services waiting.
1515            for service_name in waiting_services {
1516                debug!(
1517                    "try to announce service {service_name} on intf {}",
1518                    intf.ip()
1519                );
1520                // service names are lowercase
1521                if let Some(info) = self.my_services.get_mut(&service_name.to_lowercase()) {
1522                    if info.get_status(intf) == ServiceStatus::Announced {
1523                        debug!("service {} already announced", info.get_fullname());
1524                        continue;
1525                    }
1526
1527                    if announce_service_on_intf(dns_registry, info, intf, sock) {
1528                        let next_time = now + 1000;
1529                        let command =
1530                            Command::RegisterResend(info.get_fullname().to_string(), intf.clone());
1531                        self.retransmissions.push(ReRun { next_time, command });
1532                        self.timers.push(Reverse(next_time));
1533
1534                        let fullname = match dns_registry.name_changes.get(&service_name) {
1535                            Some(new_name) => new_name.to_string(),
1536                            None => service_name.to_string(),
1537                        };
1538
1539                        let mut hostname = info.get_hostname();
1540                        if let Some(new_name) = dns_registry.name_changes.get(hostname) {
1541                            hostname = new_name;
1542                        }
1543
1544                        debug!("wake up: announce service {} on {}", fullname, intf.ip());
1545                        notify_monitors(
1546                            &mut self.monitors,
1547                            DaemonEvent::Announce(fullname, format!("{}:{}", hostname, &intf.ip())),
1548                        );
1549
1550                        info.set_status(intf, ServiceStatus::Announced);
1551                    }
1552                }
1553            }
1554        }
1555    }
1556
1557    fn unregister_service(
1558        &self,
1559        info: &ServiceInfo,
1560        intf: &Interface,
1561        sock: &MioUdpSocket,
1562    ) -> Vec<u8> {
1563        let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
1564        out.add_answer_at_time(
1565            DnsPointer::new(
1566                info.get_type(),
1567                RRType::PTR,
1568                CLASS_IN,
1569                0,
1570                info.get_fullname().to_string(),
1571            ),
1572            0,
1573        );
1574
1575        if let Some(sub) = info.get_subtype() {
1576            trace!("Adding subdomain {}", sub);
1577            out.add_answer_at_time(
1578                DnsPointer::new(
1579                    sub,
1580                    RRType::PTR,
1581                    CLASS_IN,
1582                    0,
1583                    info.get_fullname().to_string(),
1584                ),
1585                0,
1586            );
1587        }
1588
1589        out.add_answer_at_time(
1590            DnsSrv::new(
1591                info.get_fullname(),
1592                CLASS_IN | CLASS_CACHE_FLUSH,
1593                0,
1594                info.get_priority(),
1595                info.get_weight(),
1596                info.get_port(),
1597                info.get_hostname().to_string(),
1598            ),
1599            0,
1600        );
1601        out.add_answer_at_time(
1602            DnsTxt::new(
1603                info.get_fullname(),
1604                CLASS_IN | CLASS_CACHE_FLUSH,
1605                0,
1606                info.generate_txt(),
1607            ),
1608            0,
1609        );
1610
1611        for address in info.get_addrs_on_intf(intf) {
1612            out.add_answer_at_time(
1613                DnsAddress::new(
1614                    info.get_hostname(),
1615                    ip_address_rr_type(&address),
1616                    CLASS_IN | CLASS_CACHE_FLUSH,
1617                    0,
1618                    address,
1619                ),
1620                0,
1621            );
1622        }
1623
1624        // `out` data is non-empty, hence we can do this.
1625        send_dns_outgoing(&out, intf, sock).remove(0)
1626    }
1627
1628    /// Binds a channel `listener` to querying mDNS hostnames.
1629    ///
1630    /// If there is already a `listener`, it will be updated, i.e. overwritten.
1631    fn add_hostname_resolver(
1632        &mut self,
1633        hostname: String,
1634        listener: Sender<HostnameResolutionEvent>,
1635        timeout: Option<u64>,
1636    ) {
1637        let real_timeout = timeout.map(|t| current_time_millis() + t);
1638        self.hostname_resolvers
1639            .insert(hostname.to_lowercase(), (listener, real_timeout));
1640        if let Some(t) = real_timeout {
1641            self.add_timer(t);
1642        }
1643    }
1644
1645    /// Sends a multicast query for `name` with `qtype`.
1646    fn send_query(&self, name: &str, qtype: RRType) {
1647        self.send_query_vec(&[(name, qtype)]);
1648    }
1649
1650    /// Sends out a list of `questions` (i.e. DNS questions) via multicast.
1651    fn send_query_vec(&self, questions: &[(&str, RRType)]) {
1652        trace!("Sending query questions: {:?}", questions);
1653        let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
1654        let now = current_time_millis();
1655
1656        for (name, qtype) in questions {
1657            out.add_question(name, *qtype);
1658
1659            for record in self.cache.get_known_answers(name, *qtype, now) {
1660                /*
1661                RFC 6762 section 7.1: https://datatracker.ietf.org/doc/html/rfc6762#section-7.1
1662                ...
1663                    When a Multicast DNS querier sends a query to which it already knows
1664                    some answers, it populates the Answer Section of the DNS query
1665                    message with those answers.
1666                 */
1667                trace!("add known answer: {:?}", record);
1668                let mut new_record = record.clone();
1669                new_record.get_record_mut().update_ttl(now);
1670                out.add_answer_box(new_record);
1671            }
1672        }
1673
1674        // Send the query on one interface per ip version.
1675        let mut multicast_sent_trackers = HashSet::new();
1676        for (intf, sock) in self.intf_socks.iter() {
1677            if let Some(tracker) = multicast_send_tracker(intf) {
1678                if multicast_sent_trackers.contains(&tracker) {
1679                    continue; // no need to send query the same interface with same ip version.
1680                }
1681                multicast_sent_trackers.insert(tracker);
1682            }
1683            send_dns_outgoing(&out, intf, sock);
1684        }
1685    }
1686
1687    /// Reads one UDP datagram from the socket of `intf`.
1688    ///
1689    /// Returns false if failed to receive a packet,
1690    /// otherwise returns true.
1691    fn handle_read(&mut self, intf: &Interface) -> bool {
1692        let sock = match self.intf_socks.get_mut(intf) {
1693            Some(if_sock) => if_sock,
1694            None => return false,
1695        };
1696        let mut buf = vec![0u8; MAX_MSG_ABSOLUTE];
1697
1698        // Read the next mDNS UDP datagram.
1699        //
1700        // If the datagram is larger than `buf`, excess bytes may or may not
1701        // be truncated by the socket layer depending on the platform's libc.
1702        // In any case, such large datagram will not be decoded properly and
1703        // this function should return false but should not crash.
1704        let sz = match sock.recv(&mut buf) {
1705            Ok(sz) => sz,
1706            Err(e) => {
1707                if e.kind() != std::io::ErrorKind::WouldBlock {
1708                    debug!("listening socket read failed: {}", e);
1709                }
1710                return false;
1711            }
1712        };
1713
1714        trace!("received {} bytes at IP: {}", sz, intf.ip());
1715
1716        // If sz is 0, it means sock reached End-of-File.
1717        if sz == 0 {
1718            debug!("socket {:?} was likely shutdown", &sock);
1719            if let Err(e) = self.poller.registry().deregister(sock) {
1720                debug!("failed to remove sock {:?} from poller: {}", sock, &e);
1721            }
1722
1723            // Replace the closed socket with a new one.
1724            let should_loop = if intf.ip().is_ipv4() {
1725                self.multicast_loop_v4
1726            } else {
1727                self.multicast_loop_v6
1728            };
1729            match new_socket_bind(intf, should_loop) {
1730                Ok(new_sock) => {
1731                    trace!("reset socket for IP {}", intf.ip());
1732                    self.intf_socks.insert(intf.clone(), new_sock);
1733                }
1734                Err(e) => debug!("re-bind a socket to {:?}: {}", intf, e),
1735            }
1736
1737            return false;
1738        }
1739
1740        buf.truncate(sz); // reduce potential processing errors
1741
1742        match DnsIncoming::new(buf) {
1743            Ok(msg) => {
1744                if msg.is_query() {
1745                    self.handle_query(msg, intf);
1746                } else if msg.is_response() {
1747                    self.handle_response(msg, intf);
1748                } else {
1749                    debug!("Invalid message: not query and not response");
1750                }
1751            }
1752            Err(e) => debug!("Invalid incoming DNS message: {}", e),
1753        }
1754
1755        true
1756    }
1757
1758    /// Returns true, if sent query. Returns false if SRV already exists.
1759    fn query_unresolved(&mut self, instance: &str) -> bool {
1760        if !valid_instance_name(instance) {
1761            trace!("instance name {} not valid", instance);
1762            return false;
1763        }
1764
1765        if let Some(records) = self.cache.get_srv(instance) {
1766            for record in records {
1767                if let Some(srv) = record.any().downcast_ref::<DnsSrv>() {
1768                    if self.cache.get_addr(srv.host()).is_none() {
1769                        self.send_query_vec(&[(srv.host(), RRType::A), (srv.host(), RRType::AAAA)]);
1770                        return true;
1771                    }
1772                }
1773            }
1774        } else {
1775            self.send_query(instance, RRType::ANY);
1776            return true;
1777        }
1778
1779        false
1780    }
1781
1782    /// Checks if `ty_domain` has records in the cache. If yes, sends the
1783    /// cached records via `sender`.
1784    fn query_cache_for_service(&mut self, ty_domain: &str, sender: &Sender<ServiceEvent>) {
1785        let mut resolved: HashSet<String> = HashSet::new();
1786        let mut unresolved: HashSet<String> = HashSet::new();
1787
1788        if let Some(records) = self.cache.get_ptr(ty_domain) {
1789            for record in records.iter() {
1790                if let Some(ptr) = record.any().downcast_ref::<DnsPointer>() {
1791                    let info = match self.create_service_info_from_cache(ty_domain, ptr.alias()) {
1792                        Ok(ok) => ok,
1793                        Err(err) => {
1794                            debug!("Error while creating service info from cache: {}", err);
1795                            continue;
1796                        }
1797                    };
1798
1799                    match sender.send(ServiceEvent::ServiceFound(
1800                        ty_domain.to_string(),
1801                        ptr.alias().to_string(),
1802                    )) {
1803                        Ok(()) => debug!("send service found {}", ptr.alias()),
1804                        Err(e) => {
1805                            debug!("failed to send service found: {}", e);
1806                            continue;
1807                        }
1808                    }
1809
1810                    if info.is_ready() {
1811                        resolved.insert(ptr.alias().to_string());
1812                        match sender.send(ServiceEvent::ServiceResolved(info)) {
1813                            Ok(()) => debug!("sent service resolved: {}", ptr.alias()),
1814                            Err(e) => debug!("failed to send service resolved: {}", e),
1815                        }
1816                    } else {
1817                        unresolved.insert(ptr.alias().to_string());
1818                    }
1819                }
1820            }
1821        }
1822
1823        for instance in resolved.drain() {
1824            self.pending_resolves.remove(&instance);
1825            self.resolved.insert(instance);
1826        }
1827
1828        for instance in unresolved.drain() {
1829            self.add_pending_resolve(instance);
1830        }
1831    }
1832
1833    /// Checks if `hostname` has records in the cache. If yes, sends the
1834    /// cached records via `sender`.
1835    fn query_cache_for_hostname(
1836        &mut self,
1837        hostname: &str,
1838        sender: Sender<HostnameResolutionEvent>,
1839    ) {
1840        let addresses_map = self.cache.get_addresses_for_host(hostname);
1841        for (name, addresses) in addresses_map {
1842            match sender.send(HostnameResolutionEvent::AddressesFound(name, addresses)) {
1843                Ok(()) => trace!("sent hostname addresses found"),
1844                Err(e) => debug!("failed to send hostname addresses found: {}", e),
1845            }
1846        }
1847    }
1848
1849    fn add_pending_resolve(&mut self, instance: String) {
1850        if !self.pending_resolves.contains(&instance) {
1851            let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
1852            self.add_retransmission(next_time, Command::Resolve(instance.clone(), 1));
1853            self.pending_resolves.insert(instance);
1854        }
1855    }
1856
1857    fn create_service_info_from_cache(
1858        &self,
1859        ty_domain: &str,
1860        fullname: &str,
1861    ) -> Result<ServiceInfo> {
1862        let my_name = {
1863            let name = fullname.trim_end_matches(split_sub_domain(ty_domain).0);
1864            name.strip_suffix('.').unwrap_or(name).to_string()
1865        };
1866
1867        let now = current_time_millis();
1868        let mut info = ServiceInfo::new(ty_domain, &my_name, "", (), 0, None)?;
1869
1870        // Be sure setting `subtype` if available even when querying for the parent domain.
1871        if let Some(subtype) = self.cache.get_subtype(fullname) {
1872            trace!(
1873                "ty_domain: {} found subtype {} for instance: {}",
1874                ty_domain,
1875                subtype,
1876                fullname
1877            );
1878            if info.get_subtype().is_none() {
1879                info.set_subtype(subtype.clone());
1880            }
1881        }
1882
1883        // resolve SRV record
1884        if let Some(records) = self.cache.get_srv(fullname) {
1885            if let Some(answer) = records.first() {
1886                if let Some(dns_srv) = answer.any().downcast_ref::<DnsSrv>() {
1887                    info.set_hostname(dns_srv.host().to_string());
1888                    info.set_port(dns_srv.port());
1889                }
1890            }
1891        }
1892
1893        // resolve TXT record
1894        if let Some(records) = self.cache.get_txt(fullname) {
1895            if let Some(record) = records.first() {
1896                if let Some(dns_txt) = record.any().downcast_ref::<DnsTxt>() {
1897                    info.set_properties_from_txt(dns_txt.text());
1898                }
1899            }
1900        }
1901
1902        // resolve A and AAAA records
1903        if let Some(records) = self.cache.get_addr(info.get_hostname()) {
1904            for answer in records.iter() {
1905                if let Some(dns_a) = answer.any().downcast_ref::<DnsAddress>() {
1906                    if dns_a.get_record().is_expired(now) {
1907                        trace!("Addr expired: {}", dns_a.address());
1908                    } else {
1909                        info.insert_ipaddr(dns_a.address());
1910                    }
1911                }
1912            }
1913        }
1914
1915        Ok(info)
1916    }
1917
1918    fn handle_poller_events(&mut self, events: &mio::Events) {
1919        for ev in events.iter() {
1920            trace!("event received with key {:?}", ev.token());
1921            if ev.token().0 == SIGNAL_SOCK_EVENT_KEY {
1922                // Drain signals as we will drain commands as well.
1923                self.signal_sock_drain();
1924
1925                if let Err(e) = self.poller.registry().reregister(
1926                    &mut self.signal_sock,
1927                    ev.token(),
1928                    mio::Interest::READABLE,
1929                ) {
1930                    debug!("failed to modify poller for signal socket: {}", e);
1931                }
1932                continue; // Next event.
1933            }
1934
1935            // Read until no more packets available.
1936            let intf = match self.poll_ids.get(&ev.token().0) {
1937                Some(interface) => interface.clone(),
1938                None => {
1939                    debug!("Ip for event key {} not found", ev.token().0);
1940                    break;
1941                }
1942            };
1943            while self.handle_read(&intf) {}
1944
1945            // we continue to monitor this socket.
1946            if let Some(sock) = self.intf_socks.get_mut(&intf) {
1947                if let Err(e) =
1948                    self.poller
1949                        .registry()
1950                        .reregister(sock, ev.token(), mio::Interest::READABLE)
1951                {
1952                    debug!("modify poller for interface {:?}: {}", &intf, e);
1953                    break;
1954                }
1955            }
1956        }
1957    }
1958
1959    /// Deal with incoming response packets.  All answers
1960    /// are held in the cache, and listeners are notified.
1961    fn handle_response(&mut self, mut msg: DnsIncoming, intf: &Interface) {
1962        trace!(
1963            "handle_response: {} answers {} authorities {} additionals",
1964            msg.answers().len(),
1965            &msg.authorities().len(),
1966            &msg.num_additionals()
1967        );
1968        let now = current_time_millis();
1969
1970        // remove records that are expired.
1971        let mut record_predicate = |record: &DnsRecordBox| {
1972            if !record.get_record().is_expired(now) {
1973                return true;
1974            }
1975
1976            debug!("record is expired, removing it from cache.");
1977            if self.cache.remove(record) {
1978                // for PTR records, send event to listeners
1979                if let Some(dns_ptr) = record.any().downcast_ref::<DnsPointer>() {
1980                    call_service_listener(
1981                        &self.service_queriers,
1982                        dns_ptr.get_name(),
1983                        ServiceEvent::ServiceRemoved(
1984                            dns_ptr.get_name().to_string(),
1985                            dns_ptr.alias().to_string(),
1986                        ),
1987                    );
1988                }
1989            }
1990            false
1991        };
1992        msg.answers_mut().retain(&mut record_predicate);
1993        msg.authorities_mut().retain(&mut record_predicate);
1994        msg.additionals_mut().retain(&mut record_predicate);
1995
1996        // check possible conflicts and handle them.
1997        self.conflict_handler(&msg, intf);
1998
1999        // check if the message is for us.
2000        let mut is_for_us = true; // assume it is for us.
2001
2002        // If there are any PTR records in the answers, there should be
2003        // at least one PTR for us. Otherwise, the message is not for us.
2004        // If there are no PTR records at all, assume this message is for us.
2005        for answer in msg.answers() {
2006            if answer.get_type() == RRType::PTR {
2007                if self.service_queriers.contains_key(answer.get_name()) {
2008                    is_for_us = true;
2009                    break; // OK to break: at least one PTR for us.
2010                } else {
2011                    is_for_us = false;
2012                }
2013            }
2014        }
2015
2016        /// Represents a DNS record change that involves one service instance.
2017        struct InstanceChange {
2018            ty: RRType,   // The type of DNS record for the instance.
2019            name: String, // The name of the record.
2020        }
2021
2022        // Go through all answers to get the new and updated records.
2023        // For new PTR records, send out ServiceFound immediately. For others,
2024        // collect them into `changes`.
2025        //
2026        // Note: we don't try to identify the update instances based on
2027        // each record immediately as the answers are likely related to each
2028        // other.
2029        let mut changes = Vec::new();
2030        let mut timers = Vec::new();
2031        for record in msg.all_records() {
2032            match self
2033                .cache
2034                .add_or_update(intf, record, &mut timers, is_for_us)
2035            {
2036                Some((dns_record, true)) => {
2037                    timers.push(dns_record.get_record().get_expire_time());
2038                    timers.push(dns_record.get_record().get_refresh_time());
2039
2040                    let ty = dns_record.get_type();
2041                    let name = dns_record.get_name();
2042                    if ty == RRType::PTR {
2043                        if self.service_queriers.contains_key(name) {
2044                            timers.push(dns_record.get_record().get_refresh_time());
2045                        }
2046
2047                        // send ServiceFound
2048                        if let Some(dns_ptr) = dns_record.any().downcast_ref::<DnsPointer>() {
2049                            call_service_listener(
2050                                &self.service_queriers,
2051                                name,
2052                                ServiceEvent::ServiceFound(
2053                                    name.to_string(),
2054                                    dns_ptr.alias().to_string(),
2055                                ),
2056                            );
2057                            changes.push(InstanceChange {
2058                                ty,
2059                                name: dns_ptr.alias().to_string(),
2060                            });
2061                        }
2062                    } else {
2063                        changes.push(InstanceChange {
2064                            ty,
2065                            name: name.to_string(),
2066                        });
2067                    }
2068                }
2069                Some((dns_record, false)) => {
2070                    timers.push(dns_record.get_record().get_expire_time());
2071                    timers.push(dns_record.get_record().get_refresh_time());
2072                }
2073                _ => {}
2074            }
2075        }
2076
2077        // Add timers for the new records.
2078        for t in timers {
2079            self.add_timer(t);
2080        }
2081
2082        // Go through remaining changes to see if any hostname resolutions were found or updated.
2083        for change in changes
2084            .iter()
2085            .filter(|change| change.ty == RRType::A || change.ty == RRType::AAAA)
2086        {
2087            let addr_map = self.cache.get_addresses_for_host(&change.name);
2088            for (name, addresses) in addr_map {
2089                call_hostname_resolution_listener(
2090                    &self.hostname_resolvers,
2091                    &change.name,
2092                    HostnameResolutionEvent::AddressesFound(name, addresses),
2093                )
2094            }
2095        }
2096
2097        // Identify the instances that need to be "resolved".
2098        let mut updated_instances = HashSet::new();
2099        for update in changes {
2100            match update.ty {
2101                RRType::PTR | RRType::SRV | RRType::TXT => {
2102                    updated_instances.insert(update.name);
2103                }
2104                RRType::A | RRType::AAAA => {
2105                    let instances = self.cache.get_instances_on_host(&update.name);
2106                    updated_instances.extend(instances);
2107                }
2108                _ => {}
2109            }
2110        }
2111
2112        self.resolve_updated_instances(&updated_instances);
2113    }
2114
2115    fn conflict_handler(&mut self, msg: &DnsIncoming, intf: &Interface) {
2116        let Some(dns_registry) = self.dns_registry_map.get_mut(intf) else {
2117            return;
2118        };
2119
2120        for answer in msg.answers().iter() {
2121            let mut new_records = Vec::new();
2122
2123            let name = answer.get_name();
2124            let Some(probe) = dns_registry.probing.get_mut(name) else {
2125                continue;
2126            };
2127
2128            // check against possible multicast forwarding
2129            if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2130                if let Some(answer_addr) = answer.any().downcast_ref::<DnsAddress>() {
2131                    if !valid_ip_on_intf(&answer_addr.address(), intf) {
2132                        debug!(
2133                            "conflict handler: answer addr {:?} not in the subnet of {:?}",
2134                            answer_addr, intf
2135                        );
2136                        continue;
2137                    }
2138                }
2139
2140                // double check if any other address record matches rrdata,
2141                // as there could be multiple addresses for the same name.
2142                let any_match = probe.records.iter().any(|r| {
2143                    r.get_type() == answer.get_type()
2144                        && r.get_class() == answer.get_class()
2145                        && r.rrdata_match(answer.as_ref())
2146                });
2147                if any_match {
2148                    continue; // no conflict for this answer.
2149                }
2150            }
2151
2152            probe.records.retain(|record| {
2153                if record.get_type() == answer.get_type()
2154                    && record.get_class() == answer.get_class()
2155                    && !record.rrdata_match(answer.as_ref())
2156                {
2157                    debug!(
2158                        "found conflict name: '{name}' record: {}: {} PEER: {}",
2159                        record.get_type(),
2160                        record.rdata_print(),
2161                        answer.rdata_print()
2162                    );
2163
2164                    // create a new name for this record
2165                    // then remove the old record in probing.
2166                    let mut new_record = record.clone();
2167                    let new_name = match record.get_type() {
2168                        RRType::A => hostname_change(name),
2169                        RRType::AAAA => hostname_change(name),
2170                        _ => name_change(name),
2171                    };
2172                    new_record.get_record_mut().set_new_name(new_name);
2173                    new_records.push(new_record);
2174                    return false; // old record is dropped from the probe.
2175                }
2176
2177                true
2178            });
2179
2180            // ?????
2181            // if probe.records.is_empty() {
2182            //     dns_registry.probing.remove(name);
2183            // }
2184
2185            // Probing again with the new names.
2186            let create_time = current_time_millis() + fastrand::u64(0..250);
2187
2188            let waiting_services = probe.waiting_services.clone();
2189
2190            for record in new_records {
2191                if dns_registry.update_hostname(name, record.get_name(), create_time) {
2192                    self.timers.push(Reverse(create_time));
2193                }
2194
2195                // remember the name changes (note: `name` might not be the original, it could be already changed once.)
2196                dns_registry.name_changes.insert(
2197                    record.get_record().get_original_name().to_string(),
2198                    record.get_name().to_string(),
2199                );
2200
2201                let new_probe = match dns_registry.probing.get_mut(record.get_name()) {
2202                    Some(p) => p,
2203                    None => {
2204                        let new_probe = dns_registry
2205                            .probing
2206                            .entry(record.get_name().to_string())
2207                            .or_insert_with(|| {
2208                                debug!("conflict handler: new probe of {}", record.get_name());
2209                                Probe::new(create_time)
2210                            });
2211                        self.timers.push(Reverse(new_probe.next_send));
2212                        new_probe
2213                    }
2214                };
2215
2216                debug!(
2217                    "insert record with new name '{}' {} into probe",
2218                    record.get_name(),
2219                    record.get_type()
2220                );
2221                new_probe.insert_record(record);
2222
2223                new_probe.waiting_services.extend(waiting_services.clone());
2224            }
2225        }
2226    }
2227
2228    /// Resolve the updated (including new) instances.
2229    ///
2230    /// Note: it is possible that more than 1 PTR pointing to the same
2231    /// instance. For example, a regular service type PTR and a sub-type
2232    /// service type PTR can both point to the same service instance.
2233    /// This loop automatically handles the sub-type PTRs.
2234    fn resolve_updated_instances(&mut self, updated_instances: &HashSet<String>) {
2235        let mut resolved: HashSet<String> = HashSet::new();
2236        let mut unresolved: HashSet<String> = HashSet::new();
2237        let mut removed_instances = HashMap::new();
2238
2239        for (ty_domain, records) in self.cache.all_ptr().iter() {
2240            if !self.service_queriers.contains_key(ty_domain) {
2241                // No need to resolve if not in our queries.
2242                continue;
2243            }
2244
2245            for record in records.iter() {
2246                if let Some(dns_ptr) = record.any().downcast_ref::<DnsPointer>() {
2247                    if updated_instances.contains(dns_ptr.alias()) {
2248                        if let Ok(info) =
2249                            self.create_service_info_from_cache(ty_domain, dns_ptr.alias())
2250                        {
2251                            if info.is_ready() {
2252                                debug!("call queriers to resolve {}", dns_ptr.alias());
2253                                resolved.insert(dns_ptr.alias().to_string());
2254                                call_service_listener(
2255                                    &self.service_queriers,
2256                                    ty_domain,
2257                                    ServiceEvent::ServiceResolved(info),
2258                                );
2259                            } else {
2260                                if self.resolved.remove(dns_ptr.alias()) {
2261                                    removed_instances
2262                                        .entry(ty_domain.to_string())
2263                                        .or_insert_with(HashSet::new)
2264                                        .insert(dns_ptr.alias().to_string());
2265                                }
2266                                unresolved.insert(dns_ptr.alias().to_string());
2267                            }
2268                        }
2269                    }
2270                }
2271            }
2272        }
2273
2274        for instance in resolved.drain() {
2275            self.pending_resolves.remove(&instance);
2276            self.resolved.insert(instance);
2277        }
2278
2279        for instance in unresolved.drain() {
2280            self.add_pending_resolve(instance);
2281        }
2282
2283        self.notify_service_removal(removed_instances);
2284    }
2285
2286    /// Handle incoming query packets, figure out whether and what to respond.
2287    fn handle_query(&mut self, msg: DnsIncoming, intf: &Interface) {
2288        let sock = match self.intf_socks.get(intf) {
2289            Some(sock) => sock,
2290            None => return,
2291        };
2292        let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
2293
2294        // Special meta-query "_services._dns-sd._udp.<Domain>".
2295        // See https://datatracker.ietf.org/doc/html/rfc6763#section-9
2296        const META_QUERY: &str = "_services._dns-sd._udp.local.";
2297
2298        let Some(dns_registry) = self.dns_registry_map.get_mut(intf) else {
2299            debug!("missing dns registry for intf {}", intf.ip());
2300            return;
2301        };
2302
2303        for question in msg.questions().iter() {
2304            trace!("query question: {:?}", &question);
2305
2306            let qtype = question.entry_type();
2307
2308            if qtype == RRType::PTR {
2309                for service in self.my_services.values() {
2310                    if service.get_status(intf) != ServiceStatus::Announced {
2311                        continue;
2312                    }
2313
2314                    if question.entry_name() == service.get_type()
2315                        || service
2316                            .get_subtype()
2317                            .as_ref()
2318                            .is_some_and(|v| v == question.entry_name())
2319                    {
2320                        add_answer_with_additionals(&mut out, &msg, service, intf, dns_registry);
2321                    } else if question.entry_name() == META_QUERY {
2322                        let ptr_added = out.add_answer(
2323                            &msg,
2324                            DnsPointer::new(
2325                                question.entry_name(),
2326                                RRType::PTR,
2327                                CLASS_IN,
2328                                service.get_other_ttl(),
2329                                service.get_type().to_string(),
2330                            ),
2331                        );
2332                        if !ptr_added {
2333                            trace!("answer was not added for meta-query {:?}", &question);
2334                        }
2335                    }
2336                }
2337            } else {
2338                // Simultaneous Probe Tiebreaking (RFC 6762 section 8.2)
2339                if qtype == RRType::ANY && msg.num_authorities() > 0 {
2340                    let probe_name = question.entry_name();
2341
2342                    if let Some(probe) = dns_registry.probing.get_mut(probe_name) {
2343                        let now = current_time_millis();
2344
2345                        // Only do tiebreaking if probe already started.
2346                        // This check also helps avoid redo tiebreaking if start time
2347                        // was postponed.
2348                        if probe.start_time < now {
2349                            let incoming_records: Vec<_> = msg
2350                                .authorities()
2351                                .iter()
2352                                .filter(|r| r.get_name() == probe_name)
2353                                .collect();
2354
2355                            probe.tiebreaking(&incoming_records, now, probe_name);
2356                        }
2357                    }
2358                }
2359
2360                if qtype == RRType::A || qtype == RRType::AAAA || qtype == RRType::ANY {
2361                    for service in self.my_services.values() {
2362                        if service.get_status(intf) != ServiceStatus::Announced {
2363                            continue;
2364                        }
2365
2366                        let service_hostname =
2367                            match dns_registry.name_changes.get(service.get_hostname()) {
2368                                Some(new_name) => new_name,
2369                                None => service.get_hostname(),
2370                            };
2371
2372                        if service_hostname.to_lowercase() == question.entry_name().to_lowercase() {
2373                            let intf_addrs = service.get_addrs_on_intf(intf);
2374                            if intf_addrs.is_empty()
2375                                && (qtype == RRType::A || qtype == RRType::AAAA)
2376                            {
2377                                let t = match qtype {
2378                                    RRType::A => "TYPE_A",
2379                                    RRType::AAAA => "TYPE_AAAA",
2380                                    _ => "invalid_type",
2381                                };
2382                                trace!(
2383                                    "Cannot find valid addrs for {} response on intf {:?}",
2384                                    t,
2385                                    &intf
2386                                );
2387                                return;
2388                            }
2389                            for address in intf_addrs {
2390                                out.add_answer(
2391                                    &msg,
2392                                    DnsAddress::new(
2393                                        service_hostname,
2394                                        ip_address_rr_type(&address),
2395                                        CLASS_IN | CLASS_CACHE_FLUSH,
2396                                        service.get_host_ttl(),
2397                                        address,
2398                                    ),
2399                                );
2400                            }
2401                        }
2402                    }
2403                }
2404
2405                let query_name = question.entry_name().to_lowercase();
2406                let service_opt = self
2407                    .my_services
2408                    .iter()
2409                    .find(|(k, _v)| {
2410                        let service_name = match dns_registry.name_changes.get(k.as_str()) {
2411                            Some(new_name) => new_name,
2412                            None => k,
2413                        };
2414                        service_name == &query_name
2415                    })
2416                    .map(|(_, v)| v);
2417
2418                let Some(service) = service_opt else {
2419                    continue;
2420                };
2421
2422                if service.get_status(intf) != ServiceStatus::Announced {
2423                    continue;
2424                }
2425
2426                if qtype == RRType::SRV || qtype == RRType::ANY {
2427                    out.add_answer(
2428                        &msg,
2429                        DnsSrv::new(
2430                            question.entry_name(),
2431                            CLASS_IN | CLASS_CACHE_FLUSH,
2432                            service.get_host_ttl(),
2433                            service.get_priority(),
2434                            service.get_weight(),
2435                            service.get_port(),
2436                            service.get_hostname().to_string(),
2437                        ),
2438                    );
2439                }
2440
2441                if qtype == RRType::TXT || qtype == RRType::ANY {
2442                    out.add_answer(
2443                        &msg,
2444                        DnsTxt::new(
2445                            question.entry_name(),
2446                            CLASS_IN | CLASS_CACHE_FLUSH,
2447                            service.get_host_ttl(),
2448                            service.generate_txt(),
2449                        ),
2450                    );
2451                }
2452
2453                if qtype == RRType::SRV {
2454                    let intf_addrs = service.get_addrs_on_intf(intf);
2455                    if intf_addrs.is_empty() {
2456                        debug!(
2457                            "Cannot find valid addrs for TYPE_SRV response on intf {:?}",
2458                            &intf
2459                        );
2460                        return;
2461                    }
2462                    for address in intf_addrs {
2463                        out.add_additional_answer(DnsAddress::new(
2464                            service.get_hostname(),
2465                            ip_address_rr_type(&address),
2466                            CLASS_IN | CLASS_CACHE_FLUSH,
2467                            service.get_host_ttl(),
2468                            address,
2469                        ));
2470                    }
2471                }
2472            }
2473        }
2474
2475        if !out.answers_count() > 0 {
2476            out.set_id(msg.id());
2477            send_dns_outgoing(&out, intf, sock);
2478
2479            self.increase_counter(Counter::Respond, 1);
2480            self.notify_monitors(DaemonEvent::Respond(intf.ip()));
2481        }
2482
2483        self.increase_counter(Counter::KnownAnswerSuppression, out.known_answer_count());
2484    }
2485
2486    /// Increases the value of `counter` by `count`.
2487    fn increase_counter(&mut self, counter: Counter, count: i64) {
2488        let key = counter.to_string();
2489        match self.counters.get_mut(&key) {
2490            Some(v) => *v += count,
2491            None => {
2492                self.counters.insert(key, count);
2493            }
2494        }
2495    }
2496
2497    /// Sets the value of `counter` to `count`.
2498    fn set_counter(&mut self, counter: Counter, count: i64) {
2499        let key = counter.to_string();
2500        self.counters.insert(key, count);
2501    }
2502
2503    fn signal_sock_drain(&self) {
2504        let mut signal_buf = [0; 1024];
2505
2506        // This recv is non-blocking as the socket is non-blocking.
2507        while let Ok(sz) = self.signal_sock.recv(&mut signal_buf) {
2508            trace!(
2509                "signal socket recvd: {}",
2510                String::from_utf8_lossy(&signal_buf[0..sz])
2511            );
2512        }
2513    }
2514
2515    fn add_retransmission(&mut self, next_time: u64, command: Command) {
2516        self.retransmissions.push(ReRun { next_time, command });
2517        self.add_timer(next_time);
2518    }
2519
2520    /// Sends service removal event to listeners for expired service records.
2521    fn notify_service_removal(&self, expired: HashMap<String, HashSet<String>>) {
2522        for (ty_domain, sender) in self.service_queriers.iter() {
2523            if let Some(instances) = expired.get(ty_domain) {
2524                for instance_name in instances {
2525                    let event = ServiceEvent::ServiceRemoved(
2526                        ty_domain.to_string(),
2527                        instance_name.to_string(),
2528                    );
2529                    match sender.send(event) {
2530                        Ok(()) => debug!("notify_service_removal: sent ServiceRemoved to listener of {ty_domain}: {instance_name}"),
2531                        Err(e) => debug!("Failed to send event: {}", e),
2532                    }
2533                }
2534            }
2535        }
2536    }
2537
2538    /// The entry point that executes all commands received by the daemon.
2539    ///
2540    /// `repeating`: whether this is a retransmission.
2541    fn exec_command(&mut self, command: Command, repeating: bool) {
2542        match command {
2543            Command::Browse(ty, next_delay, listener) => {
2544                self.exec_command_browse(repeating, ty, next_delay, listener);
2545            }
2546
2547            Command::ResolveHostname(hostname, next_delay, listener, timeout) => {
2548                self.exec_command_resolve_hostname(
2549                    repeating, hostname, next_delay, listener, timeout,
2550                );
2551            }
2552
2553            Command::Register(service_info) => {
2554                self.register_service(service_info);
2555                self.increase_counter(Counter::Register, 1);
2556            }
2557
2558            Command::RegisterResend(fullname, intf) => {
2559                trace!("register-resend service: {fullname} on {:?}", &intf.addr);
2560                self.exec_command_register_resend(fullname, intf);
2561            }
2562
2563            Command::Unregister(fullname, resp_s) => {
2564                trace!("unregister service {} repeat {}", &fullname, &repeating);
2565                self.exec_command_unregister(repeating, fullname, resp_s);
2566            }
2567
2568            Command::UnregisterResend(packet, ip) => {
2569                self.exec_command_unregister_resend(packet, ip);
2570            }
2571
2572            Command::StopBrowse(ty_domain) => self.exec_command_stop_browse(ty_domain),
2573
2574            Command::StopResolveHostname(hostname) => {
2575                self.exec_command_stop_resolve_hostname(hostname.to_lowercase())
2576            }
2577
2578            Command::Resolve(instance, try_count) => self.exec_command_resolve(instance, try_count),
2579
2580            Command::GetMetrics(resp_s) => self.exec_command_get_metrics(resp_s),
2581
2582            Command::GetStatus(resp_s) => match resp_s.send(self.status.clone()) {
2583                Ok(()) => trace!("Sent status to the client"),
2584                Err(e) => debug!("Failed to send status: {}", e),
2585            },
2586
2587            Command::Monitor(resp_s) => {
2588                self.monitors.push(resp_s);
2589            }
2590
2591            Command::SetOption(daemon_opt) => {
2592                self.process_set_option(daemon_opt);
2593            }
2594
2595            Command::GetOption(resp_s) => {
2596                let val = DaemonOptionVal {
2597                    _service_name_len_max: self.service_name_len_max,
2598                    ip_check_interval: self.ip_check_interval,
2599                };
2600                if let Err(e) = resp_s.send(val) {
2601                    debug!("Failed to send options: {}", e);
2602                }
2603            }
2604
2605            Command::Verify(instance_fullname, timeout) => {
2606                self.exec_command_verify(instance_fullname, timeout, repeating);
2607            }
2608
2609            _ => {
2610                debug!("unexpected command: {:?}", &command);
2611            }
2612        }
2613    }
2614
2615    fn exec_command_get_metrics(&mut self, resp_s: Sender<HashMap<String, i64>>) {
2616        self.set_counter(Counter::CachedPTR, self.cache.ptr_count() as i64);
2617        self.set_counter(Counter::CachedSRV, self.cache.srv_count() as i64);
2618        self.set_counter(Counter::CachedAddr, self.cache.addr_count() as i64);
2619        self.set_counter(Counter::CachedTxt, self.cache.txt_count() as i64);
2620        self.set_counter(Counter::CachedNSec, self.cache.nsec_count() as i64);
2621        self.set_counter(Counter::CachedSubtype, self.cache.subtype_count() as i64);
2622
2623        let dns_registry_probe_count: usize = self
2624            .dns_registry_map
2625            .values()
2626            .map(|r| r.probing.len())
2627            .sum();
2628        self.set_counter(Counter::DnsRegistryProbe, dns_registry_probe_count as i64);
2629
2630        let dns_registry_active_count: usize = self
2631            .dns_registry_map
2632            .values()
2633            .map(|r| r.active.values().map(|a| a.len()).sum::<usize>())
2634            .sum();
2635        self.set_counter(Counter::DnsRegistryActive, dns_registry_active_count as i64);
2636
2637        let dns_registry_timer_count: usize = self
2638            .dns_registry_map
2639            .values()
2640            .map(|r| r.new_timers.len())
2641            .sum();
2642        self.set_counter(Counter::DnsRegistryTimer, dns_registry_timer_count as i64);
2643
2644        let dns_registry_name_change_count: usize = self
2645            .dns_registry_map
2646            .values()
2647            .map(|r| r.name_changes.len())
2648            .sum();
2649        self.set_counter(
2650            Counter::DnsRegistryNameChange,
2651            dns_registry_name_change_count as i64,
2652        );
2653
2654        // Send the metrics to the client.
2655        if let Err(e) = resp_s.send(self.counters.clone()) {
2656            debug!("Failed to send metrics: {}", e);
2657        }
2658    }
2659
2660    fn exec_command_browse(
2661        &mut self,
2662        repeating: bool,
2663        ty: String,
2664        next_delay: u32,
2665        listener: Sender<ServiceEvent>,
2666    ) {
2667        let pretty_addrs: Vec<String> = self
2668            .intf_socks
2669            .keys()
2670            .map(|itf| format!("{} ({})", itf.ip(), itf.name))
2671            .collect();
2672
2673        if let Err(e) = listener.send(ServiceEvent::SearchStarted(format!(
2674            "{ty} on {} interfaces [{}]",
2675            pretty_addrs.len(),
2676            pretty_addrs.join(", ")
2677        ))) {
2678            debug!(
2679                "Failed to send SearchStarted({})(repeating:{}): {}",
2680                &ty, repeating, e
2681            );
2682            return;
2683        }
2684        if !repeating {
2685            // Binds a `listener` to querying mDNS domain type `ty`.
2686            //
2687            // If there is already a `listener`, it will be updated, i.e. overwritten.
2688            self.service_queriers.insert(ty.clone(), listener.clone());
2689
2690            // if we already have the records in our cache, just send them
2691            self.query_cache_for_service(&ty, &listener);
2692        }
2693
2694        self.send_query(&ty, RRType::PTR);
2695        self.increase_counter(Counter::Browse, 1);
2696
2697        let next_time = current_time_millis() + (next_delay * 1000) as u64;
2698        let max_delay = 60 * 60;
2699        let delay = cmp::min(next_delay * 2, max_delay);
2700        self.add_retransmission(next_time, Command::Browse(ty, delay, listener));
2701    }
2702
2703    fn exec_command_resolve_hostname(
2704        &mut self,
2705        repeating: bool,
2706        hostname: String,
2707        next_delay: u32,
2708        listener: Sender<HostnameResolutionEvent>,
2709        timeout: Option<u64>,
2710    ) {
2711        let addr_list: Vec<_> = self.intf_socks.keys().collect();
2712        if let Err(e) = listener.send(HostnameResolutionEvent::SearchStarted(format!(
2713            "{} on addrs {:?}",
2714            &hostname, &addr_list
2715        ))) {
2716            debug!(
2717                "Failed to send ResolveStarted({})(repeating:{}): {}",
2718                &hostname, repeating, e
2719            );
2720            return;
2721        }
2722        if !repeating {
2723            self.add_hostname_resolver(hostname.to_owned(), listener.clone(), timeout);
2724            // if we already have the records in our cache, just send them
2725            self.query_cache_for_hostname(&hostname, listener.clone());
2726        }
2727
2728        self.send_query_vec(&[(&hostname, RRType::A), (&hostname, RRType::AAAA)]);
2729        self.increase_counter(Counter::ResolveHostname, 1);
2730
2731        let now = current_time_millis();
2732        let next_time = now + u64::from(next_delay) * 1000;
2733        let max_delay = 60 * 60;
2734        let delay = cmp::min(next_delay * 2, max_delay);
2735
2736        // Only add retransmission if it does not exceed the hostname resolver timeout, if any.
2737        if self
2738            .hostname_resolvers
2739            .get(&hostname)
2740            .and_then(|(_sender, timeout)| *timeout)
2741            .map(|timeout| next_time < timeout)
2742            .unwrap_or(true)
2743        {
2744            self.add_retransmission(
2745                next_time,
2746                Command::ResolveHostname(hostname, delay, listener, None),
2747            );
2748        }
2749    }
2750
2751    fn exec_command_resolve(&mut self, instance: String, try_count: u16) {
2752        let pending_query = self.query_unresolved(&instance);
2753        let max_try = 3;
2754        if pending_query && try_count < max_try {
2755            // Note that if the current try already succeeds, the next retransmission
2756            // will be no-op as the cache has been updated.
2757            let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
2758            self.add_retransmission(next_time, Command::Resolve(instance, try_count + 1));
2759        }
2760    }
2761
2762    fn exec_command_unregister(
2763        &mut self,
2764        repeating: bool,
2765        fullname: String,
2766        resp_s: Sender<UnregisterStatus>,
2767    ) {
2768        let response = match self.my_services.remove_entry(&fullname) {
2769            None => {
2770                debug!("unregister: cannot find such service {}", &fullname);
2771                UnregisterStatus::NotFound
2772            }
2773            Some((_k, info)) => {
2774                let mut timers = Vec::new();
2775                // Send one unregister per interface and ip version
2776                let mut multicast_sent_trackers = HashSet::new();
2777
2778                for (intf, sock) in self.intf_socks.iter() {
2779                    if let Some(tracker) = multicast_send_tracker(intf) {
2780                        if multicast_sent_trackers.contains(&tracker) {
2781                            continue; // no need to send unregister the same interface with same ip version.
2782                        }
2783                        multicast_sent_trackers.insert(tracker);
2784                    }
2785                    let packet = self.unregister_service(&info, intf, sock);
2786                    // repeat for one time just in case some peers miss the message
2787                    if !repeating && !packet.is_empty() {
2788                        let next_time = current_time_millis() + 120;
2789                        self.retransmissions.push(ReRun {
2790                            next_time,
2791                            command: Command::UnregisterResend(packet, intf.clone()),
2792                        });
2793                        timers.push(next_time);
2794                    }
2795                }
2796
2797                for t in timers {
2798                    self.add_timer(t);
2799                }
2800
2801                self.increase_counter(Counter::Unregister, 1);
2802                UnregisterStatus::OK
2803            }
2804        };
2805        if let Err(e) = resp_s.send(response) {
2806            debug!("unregister: failed to send response: {}", e);
2807        }
2808    }
2809
2810    fn exec_command_unregister_resend(&mut self, packet: Vec<u8>, intf: Interface) {
2811        if let Some(sock) = self.intf_socks.get(&intf) {
2812            debug!("UnregisterResend from {}", &intf.ip());
2813            multicast_on_intf(&packet[..], &intf, sock);
2814            self.increase_counter(Counter::UnregisterResend, 1);
2815        }
2816    }
2817
2818    fn exec_command_stop_browse(&mut self, ty_domain: String) {
2819        match self.service_queriers.remove_entry(&ty_domain) {
2820            None => debug!("StopBrowse: cannot find querier for {}", &ty_domain),
2821            Some((ty, sender)) => {
2822                // Remove pending browse commands in the reruns.
2823                trace!("StopBrowse: removed queryer for {}", &ty);
2824                let mut i = 0;
2825                while i < self.retransmissions.len() {
2826                    if let Command::Browse(t, _, _) = &self.retransmissions[i].command {
2827                        if t == &ty {
2828                            self.retransmissions.remove(i);
2829                            trace!("StopBrowse: removed retransmission for {}", &ty);
2830                            continue;
2831                        }
2832                    }
2833                    i += 1;
2834                }
2835
2836                // Notify the client.
2837                match sender.send(ServiceEvent::SearchStopped(ty_domain)) {
2838                    Ok(()) => trace!("Sent SearchStopped to the listener"),
2839                    Err(e) => debug!("Failed to send SearchStopped: {}", e),
2840                }
2841            }
2842        }
2843    }
2844
2845    fn exec_command_stop_resolve_hostname(&mut self, hostname: String) {
2846        if let Some((host, (sender, _timeout))) = self.hostname_resolvers.remove_entry(&hostname) {
2847            // Remove pending resolve commands in the reruns.
2848            trace!("StopResolve: removed queryer for {}", &host);
2849            let mut i = 0;
2850            while i < self.retransmissions.len() {
2851                if let Command::Resolve(t, _) = &self.retransmissions[i].command {
2852                    if t == &host {
2853                        self.retransmissions.remove(i);
2854                        trace!("StopResolve: removed retransmission for {}", &host);
2855                        continue;
2856                    }
2857                }
2858                i += 1;
2859            }
2860
2861            // Notify the client.
2862            match sender.send(HostnameResolutionEvent::SearchStopped(hostname)) {
2863                Ok(()) => trace!("Sent SearchStopped to the listener"),
2864                Err(e) => debug!("Failed to send SearchStopped: {}", e),
2865            }
2866        }
2867    }
2868
2869    fn exec_command_register_resend(&mut self, fullname: String, intf: Interface) {
2870        let Some(info) = self.my_services.get_mut(&fullname) else {
2871            trace!("announce: cannot find such service {}", &fullname);
2872            return;
2873        };
2874
2875        let Some(dns_registry) = self.dns_registry_map.get_mut(&intf) else {
2876            return;
2877        };
2878
2879        let Some(sock) = self.intf_socks.get(&intf) else {
2880            return;
2881        };
2882
2883        if announce_service_on_intf(dns_registry, info, &intf, sock) {
2884            let mut hostname = info.get_hostname();
2885            if let Some(new_name) = dns_registry.name_changes.get(hostname) {
2886                hostname = new_name;
2887            }
2888            let service_name = match dns_registry.name_changes.get(&fullname) {
2889                Some(new_name) => new_name.to_string(),
2890                None => fullname,
2891            };
2892
2893            debug!("resend: announce service {} on {}", service_name, intf.ip());
2894
2895            notify_monitors(
2896                &mut self.monitors,
2897                DaemonEvent::Announce(service_name, format!("{}:{}", hostname, &intf.ip())),
2898            );
2899            info.set_status(&intf, ServiceStatus::Announced);
2900        } else {
2901            debug!("register-resend should not fail");
2902        }
2903
2904        self.increase_counter(Counter::RegisterResend, 1);
2905    }
2906
2907    fn exec_command_verify(&mut self, instance: String, timeout: Duration, repeating: bool) {
2908        /*
2909        RFC 6762 section 10.4:
2910        ...
2911        When the cache receives this hint that it should reconfirm some
2912        record, it MUST issue two or more queries for the resource record in
2913        dispute.  If no response is received within ten seconds, then, even
2914        though its TTL may indicate that it is not yet due to expire, that
2915        record SHOULD be promptly flushed from the cache.
2916        */
2917        let now = current_time_millis();
2918        let expire_at = if repeating {
2919            None
2920        } else {
2921            Some(now + timeout.as_millis() as u64)
2922        };
2923
2924        // send query for the resource records.
2925        let record_vec = self.cache.service_verify_queries(&instance, expire_at);
2926
2927        if !record_vec.is_empty() {
2928            let query_vec: Vec<(&str, RRType)> = record_vec
2929                .iter()
2930                .map(|(record, rr_type)| (record.as_str(), *rr_type))
2931                .collect();
2932            self.send_query_vec(&query_vec);
2933
2934            if let Some(new_expire) = expire_at {
2935                self.add_timer(new_expire); // ensure a check for the new expire time.
2936
2937                // schedule a resend 1 second later
2938                self.add_retransmission(now + 1000, Command::Verify(instance, timeout));
2939            }
2940        }
2941    }
2942
2943    /// Refresh cached service records with active queriers
2944    fn refresh_active_services(&mut self) {
2945        let mut query_ptr_count = 0;
2946        let mut query_srv_count = 0;
2947        let mut new_timers = HashSet::new();
2948        let mut query_addr_count = 0;
2949
2950        for (ty_domain, _sender) in self.service_queriers.iter() {
2951            let refreshed_timers = self.cache.refresh_due_ptr(ty_domain);
2952            if !refreshed_timers.is_empty() {
2953                trace!("sending refresh query for PTR: {}", ty_domain);
2954                self.send_query(ty_domain, RRType::PTR);
2955                query_ptr_count += 1;
2956                new_timers.extend(refreshed_timers);
2957            }
2958
2959            let (instances, timers) = self.cache.refresh_due_srv(ty_domain);
2960            for instance in instances.iter() {
2961                trace!("sending refresh query for SRV: {}", instance);
2962                self.send_query(instance, RRType::SRV);
2963                query_srv_count += 1;
2964            }
2965            new_timers.extend(timers);
2966            let (hostnames, timers) = self.cache.refresh_due_hosts(ty_domain);
2967            for hostname in hostnames.iter() {
2968                trace!("sending refresh queries for A and AAAA:  {}", hostname);
2969                self.send_query_vec(&[(hostname, RRType::A), (hostname, RRType::AAAA)]);
2970                query_addr_count += 2;
2971            }
2972            new_timers.extend(timers);
2973        }
2974
2975        for timer in new_timers {
2976            self.add_timer(timer);
2977        }
2978
2979        self.increase_counter(Counter::CacheRefreshPTR, query_ptr_count);
2980        self.increase_counter(Counter::CacheRefreshSRV, query_srv_count);
2981        self.increase_counter(Counter::CacheRefreshAddr, query_addr_count);
2982    }
2983}
2984
2985/// All possible events sent to the client from the daemon
2986/// regarding service discovery.
2987#[derive(Debug)]
2988pub enum ServiceEvent {
2989    /// Started searching for a service type.
2990    SearchStarted(String),
2991
2992    /// Found a specific (service_type, fullname).
2993    ServiceFound(String, String),
2994
2995    /// Resolved a service instance with detailed info.
2996    ServiceResolved(ServiceInfo),
2997
2998    /// A service instance (service_type, fullname) was removed.
2999    ServiceRemoved(String, String),
3000
3001    /// Stopped searching for a service type.
3002    SearchStopped(String),
3003}
3004
3005/// All possible events sent to the client from the daemon
3006/// regarding host resolution.
3007#[derive(Debug)]
3008#[non_exhaustive]
3009pub enum HostnameResolutionEvent {
3010    /// Started searching for the ip address of a hostname.
3011    SearchStarted(String),
3012    /// One or more addresses for a hostname has been found.
3013    AddressesFound(String, HashSet<IpAddr>),
3014    /// One or more addresses for a hostname has been removed.
3015    AddressesRemoved(String, HashSet<IpAddr>),
3016    /// The search for the ip address of a hostname has timed out.
3017    SearchTimeout(String),
3018    /// Stopped searching for the ip address of a hostname.
3019    SearchStopped(String),
3020}
3021
3022/// Some notable events from the daemon besides [`ServiceEvent`].
3023/// These events are expected to happen infrequently.
3024#[derive(Clone, Debug)]
3025#[non_exhaustive]
3026pub enum DaemonEvent {
3027    /// Daemon unsolicitly announced a service from an interface.
3028    Announce(String, String),
3029
3030    /// Daemon encountered an error.
3031    Error(Error),
3032
3033    /// Daemon detected a new IP address from the host.
3034    IpAdd(IpAddr),
3035
3036    /// Daemon detected a IP address removed from the host.
3037    IpDel(IpAddr),
3038
3039    /// Daemon resolved a name conflict by changing one of its names.
3040    /// see [DnsNameChange] for more details.
3041    NameChange(DnsNameChange),
3042
3043    /// Send out a multicast response via an IP address.
3044    Respond(IpAddr),
3045}
3046
3047/// Represents a name change due to a name conflict resolution.
3048/// See [RFC 6762 section 9](https://datatracker.ietf.org/doc/html/rfc6762#section-9)
3049#[derive(Clone, Debug)]
3050pub struct DnsNameChange {
3051    /// The original name set in `ServiceInfo` by the user.
3052    pub original: String,
3053
3054    /// A new name is created by appending a suffix after the original name.
3055    ///
3056    /// - for a service instance name, the suffix is `(N)`, where N starts at 2.
3057    /// - for a host name, the suffix is `-N`, where N starts at 2.
3058    ///
3059    /// For example:
3060    ///
3061    /// - Service name `foo._service-type._udp` becomes `foo (2)._service-type._udp`
3062    /// - Host name `foo.local.` becomes `foo-2.local.`
3063    pub new_name: String,
3064
3065    /// The resource record type
3066    pub rr_type: RRType,
3067
3068    /// The interface where the name conflict and its change happened.
3069    pub intf_name: String,
3070}
3071
3072/// Commands supported by the daemon
3073#[derive(Debug)]
3074enum Command {
3075    /// Browsing for a service type (ty_domain, next_time_delay_in_seconds, channel::sender)
3076    Browse(String, u32, Sender<ServiceEvent>),
3077
3078    /// Resolve a hostname to IP addresses.
3079    ResolveHostname(String, u32, Sender<HostnameResolutionEvent>, Option<u64>), // (hostname, next_time_delay_in_seconds, sender, timeout_in_milliseconds)
3080
3081    /// Register a service
3082    Register(ServiceInfo),
3083
3084    /// Unregister a service
3085    Unregister(String, Sender<UnregisterStatus>), // (fullname)
3086
3087    /// Announce again a service to local network
3088    RegisterResend(String, Interface), // (fullname)
3089
3090    /// Resend unregister packet.
3091    UnregisterResend(Vec<u8>, Interface), // (packet content)
3092
3093    /// Stop browsing a service type
3094    StopBrowse(String), // (ty_domain)
3095
3096    /// Stop resolving a hostname
3097    StopResolveHostname(String), // (hostname)
3098
3099    /// Send query to resolve a service instance.
3100    /// This is used when a PTR record exists but SRV & TXT records are missing.
3101    Resolve(String, u16), // (service_instance_fullname, try_count)
3102
3103    /// Read the current values of the counters
3104    GetMetrics(Sender<Metrics>),
3105
3106    /// Get the current status of the daemon.
3107    GetStatus(Sender<DaemonStatus>),
3108
3109    /// Monitor noticable events in the daemon.
3110    Monitor(Sender<DaemonEvent>),
3111
3112    SetOption(DaemonOption),
3113
3114    GetOption(Sender<DaemonOptionVal>),
3115
3116    /// Proactively confirm a DNS resource record.
3117    ///
3118    /// The intention is to check if a service name or IP address still valid
3119    /// before its TTL expires.
3120    Verify(String, Duration),
3121
3122    Exit(Sender<DaemonStatus>),
3123}
3124
3125impl fmt::Display for Command {
3126    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3127        match self {
3128            Self::Browse(_, _, _) => write!(f, "Command Browse"),
3129            Self::ResolveHostname(_, _, _, _) => write!(f, "Command ResolveHostname"),
3130            Self::Exit(_) => write!(f, "Command Exit"),
3131            Self::GetStatus(_) => write!(f, "Command GetStatus"),
3132            Self::GetMetrics(_) => write!(f, "Command GetMetrics"),
3133            Self::Monitor(_) => write!(f, "Command Monitor"),
3134            Self::Register(_) => write!(f, "Command Register"),
3135            Self::RegisterResend(_, _) => write!(f, "Command RegisterResend"),
3136            Self::SetOption(_) => write!(f, "Command SetOption"),
3137            Self::GetOption(_) => write!(f, "Command GetOption"),
3138            Self::StopBrowse(_) => write!(f, "Command StopBrowse"),
3139            Self::StopResolveHostname(_) => write!(f, "Command StopResolveHostname"),
3140            Self::Unregister(_, _) => write!(f, "Command Unregister"),
3141            Self::UnregisterResend(_, _) => write!(f, "Command UnregisterResend"),
3142            Self::Resolve(_, _) => write!(f, "Command Resolve"),
3143            Self::Verify(_, _) => write!(f, "Command VerifyResource"),
3144        }
3145    }
3146}
3147
3148struct DaemonOptionVal {
3149    _service_name_len_max: u8,
3150    ip_check_interval: u64,
3151}
3152
3153#[derive(Debug)]
3154enum DaemonOption {
3155    ServiceNameLenMax(u8),
3156    IpCheckInterval(u64),
3157    EnableInterface(Vec<IfKind>),
3158    DisableInterface(Vec<IfKind>),
3159    MulticastLoopV4(bool),
3160    MulticastLoopV6(bool),
3161}
3162
3163/// The length of Service Domain name supported in this lib.
3164const DOMAIN_LEN: usize = "._tcp.local.".len();
3165
3166/// Validate the length of "service_name" in a "_<service_name>.<domain_name>." string.
3167fn check_service_name_length(ty_domain: &str, limit: u8) -> Result<()> {
3168    if ty_domain.len() <= DOMAIN_LEN + 1 {
3169        // service name cannot be empty or only '_'.
3170        return Err(e_fmt!("Service type name cannot be empty: {}", ty_domain));
3171    }
3172
3173    let service_name_len = ty_domain.len() - DOMAIN_LEN - 1; // exclude the leading `_`
3174    if service_name_len > limit as usize {
3175        return Err(e_fmt!("Service name length must be <= {} bytes", limit));
3176    }
3177    Ok(())
3178}
3179
3180/// Checks if `name` ends with a valid domain: '._tcp.local.' or '._udp.local.'
3181fn check_domain_suffix(name: &str) -> Result<()> {
3182    if !(name.ends_with("._tcp.local.") || name.ends_with("._udp.local.")) {
3183        return Err(e_fmt!(
3184            "mDNS service {} must end with '._tcp.local.' or '._udp.local.'",
3185            name
3186        ));
3187    }
3188
3189    Ok(())
3190}
3191
3192/// Validate the service name in a fully qualified name.
3193///
3194/// A Full Name = <Instance>.<Service>.<Domain>
3195/// The only `<Domain>` supported are "._tcp.local." and "._udp.local.".
3196///
3197/// Note: this function does not check for the length of the service name.
3198/// Instead, `register_service` method will check the length.
3199fn check_service_name(fullname: &str) -> Result<()> {
3200    check_domain_suffix(fullname)?;
3201
3202    let remaining: Vec<&str> = fullname[..fullname.len() - DOMAIN_LEN].split('.').collect();
3203    let name = remaining.last().ok_or_else(|| e_fmt!("No service name"))?;
3204
3205    if &name[0..1] != "_" {
3206        return Err(e_fmt!("Service name must start with '_'"));
3207    }
3208
3209    let name = &name[1..];
3210
3211    if name.contains("--") {
3212        return Err(e_fmt!("Service name must not contain '--'"));
3213    }
3214
3215    if name.starts_with('-') || name.ends_with('-') {
3216        return Err(e_fmt!("Service name (%s) may not start or end with '-'"));
3217    }
3218
3219    let ascii_count = name.chars().filter(|c| c.is_ascii_alphabetic()).count();
3220    if ascii_count < 1 {
3221        return Err(e_fmt!(
3222            "Service name must contain at least one letter (eg: 'A-Za-z')"
3223        ));
3224    }
3225
3226    Ok(())
3227}
3228
3229/// Validate a hostname.
3230fn check_hostname(hostname: &str) -> Result<()> {
3231    if !hostname.ends_with(".local.") {
3232        return Err(e_fmt!("Hostname must end with '.local.': {hostname}"));
3233    }
3234
3235    if hostname == ".local." {
3236        return Err(e_fmt!(
3237            "The part of the hostname before '.local.' cannot be empty"
3238        ));
3239    }
3240
3241    if hostname.len() > 255 {
3242        return Err(e_fmt!("Hostname length must be <= 255 bytes"));
3243    }
3244
3245    Ok(())
3246}
3247
3248fn call_service_listener(
3249    listeners_map: &HashMap<String, Sender<ServiceEvent>>,
3250    ty_domain: &str,
3251    event: ServiceEvent,
3252) {
3253    if let Some(listener) = listeners_map.get(ty_domain) {
3254        match listener.send(event) {
3255            Ok(()) => trace!("Sent event to listener successfully"),
3256            Err(e) => debug!("Failed to send event: {}", e),
3257        }
3258    }
3259}
3260
3261fn call_hostname_resolution_listener(
3262    listeners_map: &HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>,
3263    hostname: &str,
3264    event: HostnameResolutionEvent,
3265) {
3266    let hostname_lower = hostname.to_lowercase();
3267    if let Some(listener) = listeners_map.get(&hostname_lower).map(|(l, _)| l) {
3268        match listener.send(event) {
3269            Ok(()) => trace!("Sent event to listener successfully"),
3270            Err(e) => debug!("Failed to send event: {}", e),
3271        }
3272    }
3273}
3274
3275/// Returns valid network interfaces in the host system.
3276/// Loopback interfaces are excluded.
3277fn my_ip_interfaces(with_loopback: bool) -> Vec<Interface> {
3278    if_addrs::get_if_addrs()
3279        .unwrap_or_default()
3280        .into_iter()
3281        .filter(|i| !i.is_loopback() || with_loopback)
3282        .collect()
3283}
3284
3285/// Send an outgoing mDNS query or response, and returns the packet bytes.
3286fn send_dns_outgoing(out: &DnsOutgoing, intf: &Interface, sock: &MioUdpSocket) -> Vec<Vec<u8>> {
3287    let qtype = if out.is_query() { "query" } else { "response" };
3288    trace!(
3289        "send outgoing {}: {} questions {} answers {} authorities {} additional",
3290        qtype,
3291        out.questions().len(),
3292        out.answers_count(),
3293        out.authorities().len(),
3294        out.additionals().len()
3295    );
3296    let packet_list = out.to_data_on_wire();
3297    for packet in packet_list.iter() {
3298        multicast_on_intf(packet, intf, sock);
3299    }
3300    packet_list
3301}
3302
3303/// Sends a multicast packet, and returns the packet bytes.
3304fn multicast_on_intf(packet: &[u8], intf: &Interface, socket: &MioUdpSocket) {
3305    if packet.len() > MAX_MSG_ABSOLUTE {
3306        debug!("Drop over-sized packet ({})", packet.len());
3307        return;
3308    }
3309
3310    let addr: SocketAddr = match intf.addr {
3311        if_addrs::IfAddr::V4(_) => SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into(),
3312        if_addrs::IfAddr::V6(_) => {
3313            let mut sock = SocketAddrV6::new(GROUP_ADDR_V6, MDNS_PORT, 0, 0);
3314            sock.set_scope_id(intf.index.unwrap_or(0)); // Choose iface for multicast
3315            sock.into()
3316        }
3317    };
3318
3319    send_packet(packet, addr, intf, socket);
3320}
3321
3322/// Sends out `packet` to `addr` on the socket in `intf_sock`.
3323fn send_packet(packet: &[u8], addr: SocketAddr, intf: &Interface, sock: &MioUdpSocket) {
3324    match sock.send_to(packet, addr) {
3325        Ok(sz) => trace!("sent out {} bytes on interface {:?}", sz, intf),
3326        Err(e) => debug!("Failed to send to {} via {:?}: {}", addr, &intf, e),
3327    }
3328}
3329
3330/// Returns true if `name` is a valid instance name of format:
3331/// <instance>.<service_type>.<_udp|_tcp>.local.
3332/// Note: <instance> could contain '.' as well.
3333fn valid_instance_name(name: &str) -> bool {
3334    name.split('.').count() >= 5
3335}
3336
3337fn notify_monitors(monitors: &mut Vec<Sender<DaemonEvent>>, event: DaemonEvent) {
3338    monitors.retain(|sender| {
3339        if let Err(e) = sender.try_send(event.clone()) {
3340            debug!("notify_monitors: try_send: {}", &e);
3341            if matches!(e, TrySendError::Disconnected(_)) {
3342                return false; // This monitor is dropped.
3343            }
3344        }
3345        true
3346    });
3347}
3348
3349/// Check if all unique records passed "probing", and if yes, create a packet
3350/// to announce the service.
3351fn prepare_announce(
3352    info: &ServiceInfo,
3353    intf: &Interface,
3354    dns_registry: &mut DnsRegistry,
3355) -> Option<DnsOutgoing> {
3356    let intf_addrs = info.get_addrs_on_intf(intf);
3357    if intf_addrs.is_empty() {
3358        trace!("No valid addrs to add on intf {:?}", &intf);
3359        return None;
3360    }
3361
3362    // check if we changed our name due to conflicts.
3363    let service_fullname = match dns_registry.name_changes.get(info.get_fullname()) {
3364        Some(new_name) => new_name,
3365        None => info.get_fullname(),
3366    };
3367
3368    debug!(
3369        "prepare to announce service {service_fullname} on {}: {}",
3370        &intf.name,
3371        &intf.ip()
3372    );
3373
3374    let mut probing_count = 0;
3375    let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
3376    let create_time = current_time_millis() + fastrand::u64(0..250);
3377
3378    out.add_answer_at_time(
3379        DnsPointer::new(
3380            info.get_type(),
3381            RRType::PTR,
3382            CLASS_IN,
3383            info.get_other_ttl(),
3384            service_fullname.to_string(),
3385        ),
3386        0,
3387    );
3388
3389    if let Some(sub) = info.get_subtype() {
3390        trace!("Adding subdomain {}", sub);
3391        out.add_answer_at_time(
3392            DnsPointer::new(
3393                sub,
3394                RRType::PTR,
3395                CLASS_IN,
3396                info.get_other_ttl(),
3397                service_fullname.to_string(),
3398            ),
3399            0,
3400        );
3401    }
3402
3403    // SRV records.
3404    let hostname = match dns_registry.name_changes.get(info.get_hostname()) {
3405        Some(new_name) => new_name.to_string(),
3406        None => info.get_hostname().to_string(),
3407    };
3408
3409    let mut srv = DnsSrv::new(
3410        info.get_fullname(),
3411        CLASS_IN | CLASS_CACHE_FLUSH,
3412        info.get_host_ttl(),
3413        info.get_priority(),
3414        info.get_weight(),
3415        info.get_port(),
3416        hostname,
3417    );
3418
3419    if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
3420        srv.get_record_mut().set_new_name(new_name.to_string());
3421    }
3422
3423    if !info.requires_probe()
3424        || dns_registry.is_probing_done(&srv, info.get_fullname(), create_time)
3425    {
3426        out.add_answer_at_time(srv, 0);
3427    } else {
3428        probing_count += 1;
3429    }
3430
3431    // TXT records.
3432
3433    let mut txt = DnsTxt::new(
3434        info.get_fullname(),
3435        CLASS_IN | CLASS_CACHE_FLUSH,
3436        info.get_other_ttl(),
3437        info.generate_txt(),
3438    );
3439
3440    if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
3441        txt.get_record_mut().set_new_name(new_name.to_string());
3442    }
3443
3444    if !info.requires_probe()
3445        || dns_registry.is_probing_done(&txt, info.get_fullname(), create_time)
3446    {
3447        out.add_answer_at_time(txt, 0);
3448    } else {
3449        probing_count += 1;
3450    }
3451
3452    // Address records. (A and AAAA)
3453
3454    let hostname = info.get_hostname();
3455    for address in intf_addrs {
3456        let mut dns_addr = DnsAddress::new(
3457            hostname,
3458            ip_address_rr_type(&address),
3459            CLASS_IN | CLASS_CACHE_FLUSH,
3460            info.get_host_ttl(),
3461            address,
3462        );
3463
3464        if let Some(new_name) = dns_registry.name_changes.get(hostname) {
3465            dns_addr.get_record_mut().set_new_name(new_name.to_string());
3466        }
3467
3468        if !info.requires_probe()
3469            || dns_registry.is_probing_done(&dns_addr, info.get_fullname(), create_time)
3470        {
3471            out.add_answer_at_time(dns_addr, 0);
3472        } else {
3473            probing_count += 1;
3474        }
3475    }
3476
3477    if probing_count > 0 {
3478        return None;
3479    }
3480
3481    Some(out)
3482}
3483
3484/// Send an unsolicited response for owned service via `intf` and `sock`.
3485/// Returns true if sent out successfully.
3486fn announce_service_on_intf(
3487    dns_registry: &mut DnsRegistry,
3488    info: &ServiceInfo,
3489    intf: &Interface,
3490    sock: &MioUdpSocket,
3491) -> bool {
3492    if let Some(out) = prepare_announce(info, intf, dns_registry) {
3493        send_dns_outgoing(&out, intf, sock);
3494        return true;
3495    }
3496    false
3497}
3498
3499/// Returns a new name based on the `original` to avoid conflicts.
3500/// If the name already contains a number in parentheses, increments that number.
3501///
3502/// Examples:
3503/// - `foo.local.` becomes `foo (2).local.`
3504/// - `foo (2).local.` becomes `foo (3).local.`
3505/// - `foo (9)` becomes `foo (10)`
3506fn name_change(original: &str) -> String {
3507    let mut parts: Vec<_> = original.split('.').collect();
3508    let Some(first_part) = parts.get_mut(0) else {
3509        return format!("{original} (2)");
3510    };
3511
3512    let mut new_name = format!("{} (2)", first_part);
3513
3514    // check if there is already has `(<num>)` suffix.
3515    if let Some(paren_pos) = first_part.rfind(" (") {
3516        // Check if there's a closing parenthesis
3517        if let Some(end_paren) = first_part[paren_pos..].find(')') {
3518            let absolute_end_pos = paren_pos + end_paren;
3519            // Only process if the closing parenthesis is the last character
3520            if absolute_end_pos == first_part.len() - 1 {
3521                let num_start = paren_pos + 2; // Skip " ("
3522                                               // Try to parse the number between parentheses
3523                if let Ok(number) = first_part[num_start..absolute_end_pos].parse::<u32>() {
3524                    let base_name = &first_part[..paren_pos];
3525                    new_name = format!("{} ({})", base_name, number + 1)
3526                }
3527            }
3528        }
3529    }
3530
3531    *first_part = &new_name;
3532    parts.join(".")
3533}
3534
3535/// Returns a new name based on the `original` to avoid conflicts.
3536/// If the name already contains a hyphenated number, increments that number.
3537///
3538/// Examples:
3539/// - `foo.local.` becomes `foo-2.local.`
3540/// - `foo-2.local.` becomes `foo-3.local.`
3541/// - `foo` becomes `foo-2`
3542fn hostname_change(original: &str) -> String {
3543    let mut parts: Vec<_> = original.split('.').collect();
3544    let Some(first_part) = parts.get_mut(0) else {
3545        return format!("{original}-2");
3546    };
3547
3548    let mut new_name = format!("{}-2", first_part);
3549
3550    // check if there is already a `-<num>` suffix
3551    if let Some(hyphen_pos) = first_part.rfind('-') {
3552        // Try to parse everything after the hyphen as a number
3553        if let Ok(number) = first_part[hyphen_pos + 1..].parse::<u32>() {
3554            let base_name = &first_part[..hyphen_pos];
3555            new_name = format!("{}-{}", base_name, number + 1);
3556        }
3557    }
3558
3559    *first_part = &new_name;
3560    parts.join(".")
3561}
3562
3563fn add_answer_with_additionals(
3564    out: &mut DnsOutgoing,
3565    msg: &DnsIncoming,
3566    service: &ServiceInfo,
3567    intf: &Interface,
3568    dns_registry: &DnsRegistry,
3569) {
3570    let intf_addrs = service.get_addrs_on_intf(intf);
3571    if intf_addrs.is_empty() {
3572        trace!("No addrs on LAN of intf {:?}", intf);
3573        return;
3574    }
3575
3576    // check if we changed our name due to conflicts.
3577    let service_fullname = match dns_registry.name_changes.get(service.get_fullname()) {
3578        Some(new_name) => new_name,
3579        None => service.get_fullname(),
3580    };
3581
3582    let hostname = match dns_registry.name_changes.get(service.get_hostname()) {
3583        Some(new_name) => new_name,
3584        None => service.get_hostname(),
3585    };
3586
3587    let ptr_added = out.add_answer(
3588        msg,
3589        DnsPointer::new(
3590            service.get_type(),
3591            RRType::PTR,
3592            CLASS_IN,
3593            service.get_other_ttl(),
3594            service_fullname.to_string(),
3595        ),
3596    );
3597
3598    if !ptr_added {
3599        trace!("answer was not added for msg {:?}", msg);
3600        return;
3601    }
3602
3603    if let Some(sub) = service.get_subtype() {
3604        trace!("Adding subdomain {}", sub);
3605        out.add_additional_answer(DnsPointer::new(
3606            sub,
3607            RRType::PTR,
3608            CLASS_IN,
3609            service.get_other_ttl(),
3610            service_fullname.to_string(),
3611        ));
3612    }
3613
3614    // Add recommended additional answers according to
3615    // https://tools.ietf.org/html/rfc6763#section-12.1.
3616    out.add_additional_answer(DnsSrv::new(
3617        service_fullname,
3618        CLASS_IN | CLASS_CACHE_FLUSH,
3619        service.get_host_ttl(),
3620        service.get_priority(),
3621        service.get_weight(),
3622        service.get_port(),
3623        hostname.to_string(),
3624    ));
3625
3626    out.add_additional_answer(DnsTxt::new(
3627        service_fullname,
3628        CLASS_IN | CLASS_CACHE_FLUSH,
3629        service.get_host_ttl(),
3630        service.generate_txt(),
3631    ));
3632
3633    for address in intf_addrs {
3634        out.add_additional_answer(DnsAddress::new(
3635            hostname,
3636            ip_address_rr_type(&address),
3637            CLASS_IN | CLASS_CACHE_FLUSH,
3638            service.get_host_ttl(),
3639            address,
3640        ));
3641    }
3642}
3643
3644#[cfg(test)]
3645mod tests {
3646    use super::{
3647        check_domain_suffix, check_service_name_length, hostname_change, my_ip_interfaces,
3648        name_change, new_socket_bind, send_dns_outgoing, valid_instance_name,
3649        HostnameResolutionEvent, ServiceDaemon, ServiceEvent, ServiceInfo, GROUP_ADDR_V4,
3650        MDNS_PORT,
3651    };
3652    use crate::{
3653        dns_parser::{DnsOutgoing, DnsPointer, RRType, CLASS_IN, FLAGS_AA, FLAGS_QR_RESPONSE},
3654        service_daemon::check_hostname,
3655    };
3656    use std::{
3657        net::{SocketAddr, SocketAddrV4},
3658        time::Duration,
3659    };
3660    use test_log::test;
3661
3662    #[test]
3663    fn test_socketaddr_print() {
3664        let addr: SocketAddr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
3665        let print = format!("{}", addr);
3666        assert_eq!(print, "224.0.0.251:5353");
3667    }
3668
3669    #[test]
3670    fn test_instance_name() {
3671        assert!(valid_instance_name("my-laser._printer._tcp.local."));
3672        assert!(valid_instance_name("my-laser.._printer._tcp.local."));
3673        assert!(!valid_instance_name("_printer._tcp.local."));
3674    }
3675
3676    #[test]
3677    fn test_check_service_name_length() {
3678        let result = check_service_name_length("_tcp", 100);
3679        assert!(result.is_err());
3680        if let Err(e) = result {
3681            println!("{}", e);
3682        }
3683    }
3684
3685    #[test]
3686    fn test_check_hostname() {
3687        // valid hostnames
3688        for hostname in &[
3689            "my_host.local.",
3690            &("A".repeat(255 - ".local.".len()) + ".local."),
3691        ] {
3692            let result = check_hostname(hostname);
3693            assert!(result.is_ok());
3694        }
3695
3696        // erroneous hostnames
3697        for hostname in &[
3698            "my_host.local",
3699            ".local.",
3700            &("A".repeat(256 - ".local.".len()) + ".local."),
3701        ] {
3702            let result = check_hostname(hostname);
3703            assert!(result.is_err());
3704            if let Err(e) = result {
3705                println!("{}", e);
3706            }
3707        }
3708    }
3709
3710    #[test]
3711    fn test_check_domain_suffix() {
3712        assert!(check_domain_suffix("_missing_dot._tcp.local").is_err());
3713        assert!(check_domain_suffix("_missing_bar.tcp.local.").is_err());
3714        assert!(check_domain_suffix("_mis_spell._tpp.local.").is_err());
3715        assert!(check_domain_suffix("_mis_spell._upp.local.").is_err());
3716        assert!(check_domain_suffix("_has_dot._tcp.local.").is_ok());
3717        assert!(check_domain_suffix("_goodname._udp.local.").is_ok());
3718    }
3719
3720    #[test]
3721    fn test_service_with_temporarily_invalidated_ptr() {
3722        // Create a daemon
3723        let d = ServiceDaemon::new().expect("Failed to create daemon");
3724
3725        let service = "_test_inval_ptr._udp.local.";
3726        let host_name = "my_host_tmp_invalidated_ptr.local.";
3727        let intfs: Vec<_> = my_ip_interfaces(false);
3728        let intf_ips: Vec<_> = intfs.iter().map(|intf| intf.ip()).collect();
3729        let port = 5201;
3730        let my_service =
3731            ServiceInfo::new(service, "my_instance", host_name, &intf_ips[..], port, None)
3732                .expect("invalid service info")
3733                .enable_addr_auto();
3734        let result = d.register(my_service.clone());
3735        assert!(result.is_ok());
3736
3737        // Browse for a service
3738        let browse_chan = d.browse(service).unwrap();
3739        let timeout = Duration::from_secs(2);
3740        let mut resolved = false;
3741
3742        while let Ok(event) = browse_chan.recv_timeout(timeout) {
3743            match event {
3744                ServiceEvent::ServiceResolved(info) => {
3745                    resolved = true;
3746                    println!("Resolved a service of {}", &info.get_fullname());
3747                    break;
3748                }
3749                e => {
3750                    println!("Received event {:?}", e);
3751                }
3752            }
3753        }
3754
3755        assert!(resolved);
3756
3757        println!("Stopping browse of {}", service);
3758        // Pause browsing so restarting will cause a new immediate query.
3759        // Unregistering will not work here, it will invalidate all the records.
3760        d.stop_browse(service).unwrap();
3761
3762        // Ensure the search is stopped.
3763        // Reduces the chance of receiving an answer adding the ptr back to the
3764        // cache causing the later browse to return directly from the cache.
3765        // (which invalidates what this test is trying to test for.)
3766        let mut stopped = false;
3767        while let Ok(event) = browse_chan.recv_timeout(timeout) {
3768            match event {
3769                ServiceEvent::SearchStopped(_) => {
3770                    stopped = true;
3771                    println!("Stopped browsing service");
3772                    break;
3773                }
3774                // Other `ServiceResolved` messages may be received
3775                // here as they come from different interfaces.
3776                // That's fine for this test.
3777                e => {
3778                    println!("Received event {:?}", e);
3779                }
3780            }
3781        }
3782
3783        assert!(stopped);
3784
3785        // Invalidate the ptr from the service to the host.
3786        let invalidate_ptr_packet = DnsPointer::new(
3787            my_service.get_type(),
3788            RRType::PTR,
3789            CLASS_IN,
3790            0,
3791            my_service.get_fullname().to_string(),
3792        );
3793
3794        let mut packet_buffer = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
3795        packet_buffer.add_additional_answer(invalidate_ptr_packet);
3796
3797        for intf in intfs {
3798            let sock = new_socket_bind(&intf, true).unwrap();
3799            send_dns_outgoing(&packet_buffer, &intf, &sock);
3800        }
3801
3802        println!(
3803            "Sent PTR record invalidation. Starting second browse for {}",
3804            service
3805        );
3806
3807        // Restart the browse to force the sender to re-send the announcements.
3808        let browse_chan = d.browse(service).unwrap();
3809
3810        resolved = false;
3811        while let Ok(event) = browse_chan.recv_timeout(timeout) {
3812            match event {
3813                ServiceEvent::ServiceResolved(info) => {
3814                    resolved = true;
3815                    println!("Resolved a service of {}", &info.get_fullname());
3816                    break;
3817                }
3818                e => {
3819                    println!("Received event {:?}", e);
3820                }
3821            }
3822        }
3823
3824        assert!(resolved);
3825        d.shutdown().unwrap();
3826    }
3827
3828    #[test]
3829    fn test_expired_srv() {
3830        // construct service info
3831        let service_type = "_expired-srv._udp.local.";
3832        let instance = "test_instance";
3833        let host_name = "expired_srv_host.local.";
3834        let mut my_service = ServiceInfo::new(service_type, instance, host_name, "", 5023, None)
3835            .unwrap()
3836            .enable_addr_auto();
3837        // let fullname = my_service.get_fullname().to_string();
3838
3839        // set SRV to expire soon.
3840        let new_ttl = 3; // for testing only.
3841        my_service._set_host_ttl(new_ttl);
3842
3843        // register my service
3844        let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
3845        let result = mdns_server.register(my_service);
3846        assert!(result.is_ok());
3847
3848        let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
3849        let browse_chan = mdns_client.browse(service_type).unwrap();
3850        let timeout = Duration::from_secs(2);
3851        let mut resolved = false;
3852
3853        while let Ok(event) = browse_chan.recv_timeout(timeout) {
3854            match event {
3855                ServiceEvent::ServiceResolved(info) => {
3856                    resolved = true;
3857                    println!("Resolved a service of {}", &info.get_fullname());
3858                    break;
3859                }
3860                _ => {}
3861            }
3862        }
3863
3864        assert!(resolved);
3865
3866        // Exit the server so that no more responses.
3867        mdns_server.shutdown().unwrap();
3868
3869        // SRV record in the client cache will expire.
3870        let expire_timeout = Duration::from_secs(new_ttl as u64);
3871        while let Ok(event) = browse_chan.recv_timeout(expire_timeout) {
3872            match event {
3873                ServiceEvent::ServiceRemoved(service_type, full_name) => {
3874                    println!("Service removed: {}: {}", &service_type, &full_name);
3875                    break;
3876                }
3877                _ => {}
3878            }
3879        }
3880    }
3881
3882    #[test]
3883    fn test_hostname_resolution_address_removed() {
3884        // Create a mDNS server
3885        let server = ServiceDaemon::new().expect("Failed to create server");
3886        let hostname = "addr_remove_host._tcp.local.";
3887        let service_ip_addr = my_ip_interfaces(false)
3888            .iter()
3889            .find(|iface| iface.ip().is_ipv4())
3890            .map(|iface| iface.ip())
3891            .unwrap();
3892
3893        let mut my_service = ServiceInfo::new(
3894            "_host_res_test._tcp.local.",
3895            "my_instance",
3896            hostname,
3897            &service_ip_addr,
3898            1234,
3899            None,
3900        )
3901        .expect("invalid service info");
3902
3903        // Set a short TTL for addresses for testing.
3904        let addr_ttl = 2;
3905        my_service._set_host_ttl(addr_ttl); // Expire soon
3906
3907        server.register(my_service).unwrap();
3908
3909        // Create a mDNS client for resolving the hostname.
3910        let client = ServiceDaemon::new().expect("Failed to create client");
3911        let event_receiver = client.resolve_hostname(hostname, None).unwrap();
3912        let resolved = loop {
3913            match event_receiver.recv() {
3914                Ok(HostnameResolutionEvent::AddressesFound(found_hostname, addresses)) => {
3915                    assert!(found_hostname == hostname);
3916                    assert!(addresses.contains(&service_ip_addr));
3917                    println!("address found: {:?}", &addresses);
3918                    break true;
3919                }
3920                Ok(HostnameResolutionEvent::SearchStopped(_)) => break false,
3921                Ok(_event) => {}
3922                Err(_) => break false,
3923            }
3924        };
3925
3926        assert!(resolved);
3927
3928        // Shutdown the server so no more responses / refreshes for addresses.
3929        server.shutdown().unwrap();
3930
3931        // Wait till hostname address record expires, with 1 second grace period.
3932        let timeout = Duration::from_secs(addr_ttl as u64 + 1);
3933        let removed = loop {
3934            match event_receiver.recv_timeout(timeout) {
3935                Ok(HostnameResolutionEvent::AddressesRemoved(removed_host, addresses)) => {
3936                    assert!(removed_host == hostname);
3937                    assert!(addresses.contains(&service_ip_addr));
3938
3939                    println!(
3940                        "address removed: hostname: {} addresses: {:?}",
3941                        &hostname, &addresses
3942                    );
3943                    break true;
3944                }
3945                Ok(_event) => {}
3946                Err(_) => {
3947                    break false;
3948                }
3949            }
3950        };
3951
3952        assert!(removed);
3953
3954        client.shutdown().unwrap();
3955    }
3956
3957    #[test]
3958    fn test_refresh_ptr() {
3959        // construct service info
3960        let service_type = "_refresh-ptr._udp.local.";
3961        let instance = "test_instance";
3962        let host_name = "refresh_ptr_host.local.";
3963        let service_ip_addr = my_ip_interfaces(false)
3964            .iter()
3965            .find(|iface| iface.ip().is_ipv4())
3966            .map(|iface| iface.ip())
3967            .unwrap();
3968
3969        let mut my_service = ServiceInfo::new(
3970            service_type,
3971            instance,
3972            host_name,
3973            &service_ip_addr,
3974            5023,
3975            None,
3976        )
3977        .unwrap();
3978
3979        let new_ttl = 3; // for testing only.
3980        my_service._set_other_ttl(new_ttl);
3981
3982        // register my service
3983        let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
3984        let result = mdns_server.register(my_service);
3985        assert!(result.is_ok());
3986
3987        let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
3988        let browse_chan = mdns_client.browse(service_type).unwrap();
3989        let timeout = Duration::from_millis(1500); // Give at least 1 second for the service probing.
3990        let mut resolved = false;
3991
3992        // resolve the service first.
3993        while let Ok(event) = browse_chan.recv_timeout(timeout) {
3994            match event {
3995                ServiceEvent::ServiceResolved(info) => {
3996                    resolved = true;
3997                    println!("Resolved a service of {}", &info.get_fullname());
3998                    break;
3999                }
4000                _ => {}
4001            }
4002        }
4003
4004        assert!(resolved);
4005
4006        // wait over 80% of TTL, and refresh PTR should be sent out.
4007        let timeout = Duration::from_millis(new_ttl as u64 * 1000 * 90 / 100);
4008        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4009            println!("event: {:?}", &event);
4010        }
4011
4012        // verify refresh counter.
4013        let metrics_chan = mdns_client.get_metrics().unwrap();
4014        let metrics = metrics_chan.recv_timeout(timeout).unwrap();
4015        let refresh_counter = metrics["cache-refresh-ptr"];
4016        assert_eq!(refresh_counter, 1);
4017
4018        // Exit the server so that no more responses.
4019        mdns_server.shutdown().unwrap();
4020        mdns_client.shutdown().unwrap();
4021    }
4022
4023    #[test]
4024    fn test_name_change() {
4025        assert_eq!(name_change("foo.local."), "foo (2).local.");
4026        assert_eq!(name_change("foo (2).local."), "foo (3).local.");
4027        assert_eq!(name_change("foo (9).local."), "foo (10).local.");
4028        assert_eq!(name_change("foo"), "foo (2)");
4029        assert_eq!(name_change("foo (2)"), "foo (3)");
4030        assert_eq!(name_change(""), " (2)");
4031
4032        // Additional edge cases
4033        assert_eq!(name_change("foo (abc)"), "foo (abc) (2)"); // Invalid number
4034        assert_eq!(name_change("foo (2"), "foo (2 (2)"); // Missing closing parenthesis
4035        assert_eq!(name_change("foo (2) extra"), "foo (2) extra (2)"); // Extra text after number
4036    }
4037
4038    #[test]
4039    fn test_hostname_change() {
4040        assert_eq!(hostname_change("foo.local."), "foo-2.local.");
4041        assert_eq!(hostname_change("foo"), "foo-2");
4042        assert_eq!(hostname_change("foo-2.local."), "foo-3.local.");
4043        assert_eq!(hostname_change("foo-9"), "foo-10");
4044        assert_eq!(hostname_change("test-42.domain."), "test-43.domain.");
4045    }
4046}