mdns_sd/
service_daemon.rs

1//! Service daemon for mDNS Service Discovery.
2
3// How DNS-based Service Discovery works in a nutshell:
4//
5// (excerpt from RFC 6763)
6// .... that a particular service instance can be
7//    described using a DNS SRV [RFC2782] and DNS TXT [RFC1035] record.
8//    The SRV record has a name of the form "<Instance>.<Service>.<Domain>"
9//    and gives the target host and port where the service instance can be
10//    reached.  The DNS TXT record of the same name gives additional
11//    information about this instance, in a structured form using key/value
12//    pairs, described in Section 6.  A client discovers the list of
13//    available instances of a given service type using a query for a DNS
14//    PTR [RFC1035] record with a name of the form "<Service>.<Domain>",
15//    which returns a set of zero or more names, which are the names of the
16//    aforementioned DNS SRV/TXT record pairs.
17//
18// Some naming conventions in this source code:
19//
20// `ty_domain` refers to service type together with domain name, i.e. <service>.<domain>.
21// Every <service> consists of two labels: service itself and "_udp." or "_tcp".
22// See RFC 6763 section 7 Service Names.
23//     for example: `_my-service._udp.local.`
24//
25// `fullname` refers to a full Service Instance Name, i.e. <instance>.<service>.<domain>
26//     for example: `my_home._my-service._udp.local.`
27//
28// In mDNS and DNS, the basic data structure is "Resource Record" (RR), where
29// in Service Discovery, the basic data structure is "Service Info". One Service Info
30// corresponds to a set of DNS Resource Records.
31#[cfg(feature = "logging")]
32use crate::log::{debug, trace};
33use crate::{
34    dns_cache::{current_time_millis, DnsCache},
35    dns_parser::{
36        ip_address_rr_type, DnsAddress, DnsEntryExt, DnsIncoming, DnsOutgoing, DnsPointer,
37        DnsRecordBox, DnsRecordExt, DnsSrv, DnsTxt, RRType, CLASS_CACHE_FLUSH, CLASS_IN, FLAGS_AA,
38        FLAGS_QR_QUERY, FLAGS_QR_RESPONSE, MAX_MSG_ABSOLUTE,
39    },
40    error::{e_fmt, Error, Result},
41    service_info::{
42        split_sub_domain, valid_ip_on_intf, DnsRegistry, Probe, ServiceInfo, ServiceStatus,
43    },
44    Receiver,
45};
46use flume::{bounded, Sender, TrySendError};
47use if_addrs::{IfAddr, Interface};
48use mio::{net::UdpSocket as MioUdpSocket, Poll};
49use socket2::Socket;
50use std::{
51    cmp::{self, Reverse},
52    collections::{BinaryHeap, HashMap, HashSet},
53    fmt,
54    net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, UdpSocket},
55    str, thread,
56    time::Duration,
57    vec,
58};
59
60/// The default max length of the service name without domain, not including the
61/// leading underscore (`_`). It is set to 15 per
62/// [RFC 6763 section 7.2](https://www.rfc-editor.org/rfc/rfc6763#section-7.2).
63pub const SERVICE_NAME_LEN_MAX_DEFAULT: u8 = 15;
64
65/// The default interval for checking IP changes automatically.
66pub const IP_CHECK_INTERVAL_IN_SECS_DEFAULT: u32 = 5;
67
68/// The default time out for [ServiceDaemon::verify] is 10 seconds, per
69/// [RFC 6762 section 10.4](https://datatracker.ietf.org/doc/html/rfc6762#section-10.4)
70pub const VERIFY_TIMEOUT_DEFAULT: Duration = Duration::from_secs(10);
71
72const MDNS_PORT: u16 = 5353;
73const GROUP_ADDR_V4: Ipv4Addr = Ipv4Addr::new(224, 0, 0, 251);
74const GROUP_ADDR_V6: Ipv6Addr = Ipv6Addr::new(0xff02, 0, 0, 0, 0, 0, 0, 0xfb);
75const LOOPBACK_V4: Ipv4Addr = Ipv4Addr::new(127, 0, 0, 1);
76
77const RESOLVE_WAIT_IN_MILLIS: u64 = 500;
78
79/// Response status code for the service `unregister` call.
80#[derive(Debug)]
81pub enum UnregisterStatus {
82    /// Unregister was successful.
83    OK,
84    /// The service was not found in the registration.
85    NotFound,
86}
87
88/// Status code for the service daemon.
89#[derive(Debug, PartialEq, Clone, Eq)]
90#[non_exhaustive]
91pub enum DaemonStatus {
92    /// The daemon is running as normal.
93    Running,
94
95    /// The daemon has been shutdown.
96    Shutdown,
97}
98
99/// Different counters included in the metrics.
100/// Currently all counters are for outgoing packets.
101#[derive(Hash, Eq, PartialEq)]
102enum Counter {
103    Register,
104    RegisterResend,
105    Unregister,
106    UnregisterResend,
107    Browse,
108    ResolveHostname,
109    Respond,
110    CacheRefreshPTR,
111    CacheRefreshSRV,
112    CacheRefreshAddr,
113    KnownAnswerSuppression,
114    CachedPTR,
115    CachedSRV,
116    CachedAddr,
117    CachedTxt,
118    CachedNSec,
119    CachedSubtype,
120    DnsRegistryProbe,
121    DnsRegistryActive,
122    DnsRegistryTimer,
123    DnsRegistryNameChange,
124    Timer,
125}
126
127impl fmt::Display for Counter {
128    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
129        match self {
130            Self::Register => write!(f, "register"),
131            Self::RegisterResend => write!(f, "register-resend"),
132            Self::Unregister => write!(f, "unregister"),
133            Self::UnregisterResend => write!(f, "unregister-resend"),
134            Self::Browse => write!(f, "browse"),
135            Self::ResolveHostname => write!(f, "resolve-hostname"),
136            Self::Respond => write!(f, "respond"),
137            Self::CacheRefreshPTR => write!(f, "cache-refresh-ptr"),
138            Self::CacheRefreshSRV => write!(f, "cache-refresh-srv"),
139            Self::CacheRefreshAddr => write!(f, "cache-refresh-addr"),
140            Self::KnownAnswerSuppression => write!(f, "known-answer-suppression"),
141            Self::CachedPTR => write!(f, "cached-ptr"),
142            Self::CachedSRV => write!(f, "cached-srv"),
143            Self::CachedAddr => write!(f, "cached-addr"),
144            Self::CachedTxt => write!(f, "cached-txt"),
145            Self::CachedNSec => write!(f, "cached-nsec"),
146            Self::CachedSubtype => write!(f, "cached-subtype"),
147            Self::DnsRegistryProbe => write!(f, "dns-registry-probe"),
148            Self::DnsRegistryActive => write!(f, "dns-registry-active"),
149            Self::DnsRegistryTimer => write!(f, "dns-registry-timer"),
150            Self::DnsRegistryNameChange => write!(f, "dns-registry-name-change"),
151            Self::Timer => write!(f, "timer"),
152        }
153    }
154}
155
156/// The metrics is a HashMap of (name_key, i64_value).
157/// The main purpose is to help monitoring the mDNS packet traffic.
158pub type Metrics = HashMap<String, i64>;
159
160const SIGNAL_SOCK_EVENT_KEY: usize = usize::MAX - 1; // avoid to overlap with zc.poll_ids
161
162/// A daemon thread for mDNS
163///
164/// This struct provides a handle and an API to the daemon. It is cloneable.
165#[derive(Clone)]
166pub struct ServiceDaemon {
167    /// Sender handle of the channel to the daemon.
168    sender: Sender<Command>,
169
170    /// Send to this addr to signal that a `Command` is coming.
171    ///
172    /// The daemon listens on this addr together with other mDNS sockets,
173    /// to avoid busy polling the flume channel. If there is a way to poll
174    /// the channel and mDNS sockets together, then this can be removed.
175    signal_addr: SocketAddr,
176}
177
178impl ServiceDaemon {
179    /// Creates a new daemon and spawns a thread to run the daemon.
180    ///
181    /// The daemon (re)uses the default mDNS port 5353. To keep it simple, we don't
182    /// ask callers to set the port.
183    pub fn new() -> Result<Self> {
184        // Use port 0 to allow the system assign a random available port,
185        // no need for a pre-defined port number.
186        let signal_addr = SocketAddrV4::new(LOOPBACK_V4, 0);
187
188        let signal_sock = UdpSocket::bind(signal_addr)
189            .map_err(|e| e_fmt!("failed to create signal_sock for daemon: {}", e))?;
190
191        // Get the socket with the OS chosen port
192        let signal_addr = signal_sock
193            .local_addr()
194            .map_err(|e| e_fmt!("failed to get signal sock addr: {}", e))?;
195
196        // Must be nonblocking so we can listen to it together with mDNS sockets.
197        signal_sock
198            .set_nonblocking(true)
199            .map_err(|e| e_fmt!("failed to set nonblocking for signal socket: {}", e))?;
200
201        let poller = Poll::new().map_err(|e| e_fmt!("failed to create mio Poll: {e}"))?;
202
203        let (sender, receiver) = bounded(100);
204
205        // Spawn the daemon thread
206        let mio_sock = MioUdpSocket::from_std(signal_sock);
207        thread::Builder::new()
208            .name("mDNS_daemon".to_string())
209            .spawn(move || Self::daemon_thread(mio_sock, poller, receiver))
210            .map_err(|e| e_fmt!("thread builder failed to spawn: {}", e))?;
211
212        Ok(Self {
213            sender,
214            signal_addr,
215        })
216    }
217
218    /// Sends `cmd` to the daemon via its channel, and sends a signal
219    /// to its sock addr to notify.
220    fn send_cmd(&self, cmd: Command) -> Result<()> {
221        let cmd_name = cmd.to_string();
222
223        // First, send to the flume channel.
224        self.sender.try_send(cmd).map_err(|e| match e {
225            TrySendError::Full(_) => Error::Again,
226            e => e_fmt!("flume::channel::send failed: {}", e),
227        })?;
228
229        // Second, send a signal to notify the daemon.
230        let addr = SocketAddrV4::new(LOOPBACK_V4, 0);
231        let socket = UdpSocket::bind(addr)
232            .map_err(|e| e_fmt!("Failed to create socket to send signal: {}", e))?;
233        socket
234            .send_to(cmd_name.as_bytes(), self.signal_addr)
235            .map_err(|e| {
236                e_fmt!(
237                    "signal socket send_to {} ({}) failed: {}",
238                    self.signal_addr,
239                    cmd_name,
240                    e
241                )
242            })?;
243
244        Ok(())
245    }
246
247    /// Starts browsing for a specific service type.
248    ///
249    /// `service_type` must end with a valid mDNS domain: '._tcp.local.' or '._udp.local.'
250    ///
251    /// Returns a channel `Receiver` to receive events about the service. The caller
252    /// can call `.recv_async().await` on this receiver to handle events in an
253    /// async environment or call `.recv()` in a sync environment.
254    ///
255    /// When a new instance is found, the daemon automatically tries to resolve, i.e.
256    /// finding more details, i.e. SRV records and TXT records.
257    pub fn browse(&self, service_type: &str) -> Result<Receiver<ServiceEvent>> {
258        check_domain_suffix(service_type)?;
259
260        let (resp_s, resp_r) = bounded(10);
261        self.send_cmd(Command::Browse(service_type.to_string(), 1, resp_s))?;
262        Ok(resp_r)
263    }
264
265    /// Stops searching for a specific service type.
266    ///
267    /// When an error is returned, the caller should retry only when
268    /// the error is `Error::Again`, otherwise should log and move on.
269    pub fn stop_browse(&self, ty_domain: &str) -> Result<()> {
270        self.send_cmd(Command::StopBrowse(ty_domain.to_string()))
271    }
272
273    /// Starts querying for the ip addresses of a hostname.
274    ///
275    /// Returns a channel `Receiver` to receive events about the hostname.
276    /// The caller can call `.recv_async().await` on this receiver to handle events in an
277    /// async environment or call `.recv()` in a sync environment.
278    ///
279    /// The `timeout` is specified in milliseconds.
280    pub fn resolve_hostname(
281        &self,
282        hostname: &str,
283        timeout: Option<u64>,
284    ) -> Result<Receiver<HostnameResolutionEvent>> {
285        check_hostname(hostname)?;
286        let (resp_s, resp_r) = bounded(10);
287        self.send_cmd(Command::ResolveHostname(
288            hostname.to_string(),
289            1,
290            resp_s,
291            timeout,
292        ))?;
293        Ok(resp_r)
294    }
295
296    /// Stops querying for the ip addresses of a hostname.
297    ///
298    /// When an error is returned, the caller should retry only when
299    /// the error is `Error::Again`, otherwise should log and move on.
300    pub fn stop_resolve_hostname(&self, hostname: &str) -> Result<()> {
301        self.send_cmd(Command::StopResolveHostname(hostname.to_string()))
302    }
303
304    /// Registers a service provided by this host.
305    ///
306    /// If `service_info` has no addresses yet and its `addr_auto` is enabled,
307    /// this method will automatically fill in addresses from the host.
308    ///
309    /// To re-announce a service with an updated `service_info`, just call
310    /// this `register` function again. No need to call `unregister` first.
311    pub fn register(&self, service_info: ServiceInfo) -> Result<()> {
312        check_service_name(service_info.get_fullname())?;
313        check_hostname(service_info.get_hostname())?;
314
315        self.send_cmd(Command::Register(service_info))
316    }
317
318    /// Unregisters a service. This is a graceful shutdown of a service.
319    ///
320    /// Returns a channel receiver that is used to receive the status code
321    /// of the unregister.
322    ///
323    /// When an error is returned, the caller should retry only when
324    /// the error is `Error::Again`, otherwise should log and move on.
325    pub fn unregister(&self, fullname: &str) -> Result<Receiver<UnregisterStatus>> {
326        let (resp_s, resp_r) = bounded(1);
327        self.send_cmd(Command::Unregister(fullname.to_lowercase(), resp_s))?;
328        Ok(resp_r)
329    }
330
331    /// Starts to monitor events from the daemon.
332    ///
333    /// Returns a channel [`Receiver`] of [`DaemonEvent`].
334    pub fn monitor(&self) -> Result<Receiver<DaemonEvent>> {
335        let (resp_s, resp_r) = bounded(100);
336        self.send_cmd(Command::Monitor(resp_s))?;
337        Ok(resp_r)
338    }
339
340    /// Shuts down the daemon thread and returns a channel to receive the status.
341    ///
342    /// When an error is returned, the caller should retry only when
343    /// the error is `Error::Again`, otherwise should log and move on.
344    pub fn shutdown(&self) -> Result<Receiver<DaemonStatus>> {
345        let (resp_s, resp_r) = bounded(1);
346        self.send_cmd(Command::Exit(resp_s))?;
347        Ok(resp_r)
348    }
349
350    /// Returns the status of the daemon.
351    ///
352    /// When an error is returned, the caller should retry only when
353    /// the error is `Error::Again`, otherwise should consider the daemon
354    /// stopped working and move on.
355    pub fn status(&self) -> Result<Receiver<DaemonStatus>> {
356        let (resp_s, resp_r) = bounded(1);
357
358        if self.sender.is_disconnected() {
359            resp_s
360                .send(DaemonStatus::Shutdown)
361                .map_err(|e| e_fmt!("failed to send daemon status to the client: {}", e))?;
362        } else {
363            self.send_cmd(Command::GetStatus(resp_s))?;
364        }
365
366        Ok(resp_r)
367    }
368
369    /// Returns a channel receiver for the metrics, e.g. input/output counters.
370    ///
371    /// The metrics returned is a snapshot. Hence the caller should call
372    /// this method repeatedly if they want to monitor the metrics continuously.
373    pub fn get_metrics(&self) -> Result<Receiver<Metrics>> {
374        let (resp_s, resp_r) = bounded(1);
375        self.send_cmd(Command::GetMetrics(resp_s))?;
376        Ok(resp_r)
377    }
378
379    /// Change the max length allowed for a service name.
380    ///
381    /// As RFC 6763 defines a length max for a service name, a user should not call
382    /// this method unless they have to. See [`SERVICE_NAME_LEN_MAX_DEFAULT`].
383    ///
384    /// `len_max` is capped at an internal limit, which is currently 30.
385    pub fn set_service_name_len_max(&self, len_max: u8) -> Result<()> {
386        const SERVICE_NAME_LEN_MAX_LIMIT: u8 = 30; // Double the default length max.
387
388        if len_max > SERVICE_NAME_LEN_MAX_LIMIT {
389            return Err(Error::Msg(format!(
390                "service name length max {} is too large",
391                len_max
392            )));
393        }
394
395        self.send_cmd(Command::SetOption(DaemonOption::ServiceNameLenMax(len_max)))
396    }
397
398    /// Change the interval for checking IP changes automatically.
399    ///
400    /// Setting the interval to 0 disables the IP check.
401    ///
402    /// See [`IP_CHECK_INTERVAL_IN_SECS_DEFAULT`] for the default interval.
403    pub fn set_ip_check_interval(&self, interval_in_secs: u32) -> Result<()> {
404        let interval_in_millis = interval_in_secs as u64 * 1000;
405        self.send_cmd(Command::SetOption(DaemonOption::IpCheckInterval(
406            interval_in_millis,
407        )))
408    }
409
410    /// Get the current interval in seconds for checking IP changes automatically.
411    pub fn get_ip_check_interval(&self) -> Result<u32> {
412        let (resp_s, resp_r) = bounded(1);
413        self.send_cmd(Command::GetOption(resp_s))?;
414
415        let option = resp_r
416            .recv_timeout(Duration::from_secs(10))
417            .map_err(|e| e_fmt!("failed to receive ip check interval: {}", e))?;
418        let ip_check_interval_in_secs = option.ip_check_interval / 1000;
419        Ok(ip_check_interval_in_secs as u32)
420    }
421
422    /// Include interfaces that match `if_kind` for this service daemon.
423    ///
424    /// For example:
425    /// ```ignore
426    ///     daemon.enable_interface("en0")?;
427    /// ```
428    pub fn enable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
429        let if_kind_vec = if_kind.into_vec();
430        self.send_cmd(Command::SetOption(DaemonOption::EnableInterface(
431            if_kind_vec.kinds,
432        )))
433    }
434
435    /// Ignore/exclude interfaces that match `if_kind` for this daemon.
436    ///
437    /// For example:
438    /// ```ignore
439    ///     daemon.disable_interface(IfKind::IPv6)?;
440    /// ```
441    pub fn disable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
442        let if_kind_vec = if_kind.into_vec();
443        self.send_cmd(Command::SetOption(DaemonOption::DisableInterface(
444            if_kind_vec.kinds,
445        )))
446    }
447
448    /// Enable or disable the loopback for locally sent multicast packets in IPv4.
449    ///
450    /// By default, multicast loop is enabled for IPv4. When disabled, a querier will not
451    /// receive announcements from a responder on the same host.
452    ///
453    /// Reference: <https://learn.microsoft.com/en-us/windows/win32/winsock/ip-multicast-2>
454    ///
455    /// "The Winsock version of the IP_MULTICAST_LOOP option is semantically different than
456    /// the UNIX version of the IP_MULTICAST_LOOP option:
457    ///
458    /// In Winsock, the IP_MULTICAST_LOOP option applies only to the receive path.
459    /// In the UNIX version, the IP_MULTICAST_LOOP option applies to the send path."
460    ///
461    /// Which means, in order NOT to receive localhost announcements, you want to call
462    /// this API on the querier side on Windows, but on the responder side on Unix.
463    pub fn set_multicast_loop_v4(&self, on: bool) -> Result<()> {
464        self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV4(on)))
465    }
466
467    /// Enable or disable the loopback for locally sent multicast packets in IPv6.
468    ///
469    /// By default, multicast loop is enabled for IPv6. When disabled, a querier will not
470    /// receive announcements from a responder on the same host.
471    ///
472    /// Reference: <https://learn.microsoft.com/en-us/windows/win32/winsock/ip-multicast-2>
473    ///
474    /// "The Winsock version of the IP_MULTICAST_LOOP option is semantically different than
475    /// the UNIX version of the IP_MULTICAST_LOOP option:
476    ///
477    /// In Winsock, the IP_MULTICAST_LOOP option applies only to the receive path.
478    /// In the UNIX version, the IP_MULTICAST_LOOP option applies to the send path."
479    ///
480    /// Which means, in order NOT to receive localhost announcements, you want to call
481    /// this API on the querier side on Windows, but on the responder side on Unix.
482    pub fn set_multicast_loop_v6(&self, on: bool) -> Result<()> {
483        self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV6(on)))
484    }
485
486    /// Proactively confirms whether a service instance still valid.
487    ///
488    /// This call will issue queries for a service instance's SRV record and Address records.
489    ///
490    /// For `timeout`, most users should use [VERIFY_TIMEOUT_DEFAULT]
491    /// unless there is a reason not to follow RFC.
492    ///
493    /// If no response is received within `timeout`, the current resource
494    /// records will be flushed, and if needed, `ServiceRemoved` event will be
495    /// sent to active queriers.
496    ///
497    /// Reference: [RFC 6762](https://datatracker.ietf.org/doc/html/rfc6762#section-10.4)
498    pub fn verify(&self, instance_fullname: String, timeout: Duration) -> Result<()> {
499        self.send_cmd(Command::Verify(instance_fullname, timeout))
500    }
501
502    fn daemon_thread(signal_sock: MioUdpSocket, poller: Poll, receiver: Receiver<Command>) {
503        let zc = Zeroconf::new(signal_sock, poller);
504
505        if let Some(cmd) = Self::run(zc, receiver) {
506            match cmd {
507                Command::Exit(resp_s) => {
508                    // It is guaranteed that the receiver already dropped,
509                    // i.e. the daemon command channel closed.
510                    if let Err(e) = resp_s.send(DaemonStatus::Shutdown) {
511                        debug!("exit: failed to send response of shutdown: {}", e);
512                    }
513                }
514                _ => {
515                    debug!("Unexpected command: {:?}", cmd);
516                }
517            }
518        }
519    }
520
521    /// The main event loop of the daemon thread
522    ///
523    /// In each round, it will:
524    /// 1. select the listening sockets with a timeout.
525    /// 2. process the incoming packets if any.
526    /// 3. try_recv on its channel and execute commands.
527    /// 4. announce its registered services.
528    /// 5. process retransmissions if any.
529    fn run(mut zc: Zeroconf, receiver: Receiver<Command>) -> Option<Command> {
530        // Add the daemon's signal socket to the poller.
531        if let Err(e) = zc.poller.registry().register(
532            &mut zc.signal_sock,
533            mio::Token(SIGNAL_SOCK_EVENT_KEY),
534            mio::Interest::READABLE,
535        ) {
536            debug!("failed to add signal socket to the poller: {}", e);
537            return None;
538        }
539
540        // Add mDNS sockets to the poller.
541        for (intf, sock) in zc.intf_socks.iter_mut() {
542            let key =
543                Zeroconf::add_poll_impl(&mut zc.poll_ids, &mut zc.poll_id_count, intf.clone());
544
545            if let Err(e) =
546                zc.poller
547                    .registry()
548                    .register(sock, mio::Token(key), mio::Interest::READABLE)
549            {
550                debug!("add socket of {:?} to poller: {e}", intf);
551                return None;
552            }
553        }
554
555        // Setup timer for IP checks.
556        let mut next_ip_check = if zc.ip_check_interval > 0 {
557            current_time_millis() + zc.ip_check_interval
558        } else {
559            0
560        };
561
562        if next_ip_check > 0 {
563            zc.add_timer(next_ip_check);
564        }
565
566        // Start the run loop.
567
568        let mut events = mio::Events::with_capacity(1024);
569        loop {
570            let now = current_time_millis();
571
572            let earliest_timer = zc.peek_earliest_timer();
573            let timeout = earliest_timer.map(|timer| {
574                // If `timer` already passed, set `timeout` to be 1ms.
575                let millis = if timer > now { timer - now } else { 1 };
576                Duration::from_millis(millis)
577            });
578
579            // Process incoming packets, command events and optional timeout.
580            events.clear();
581            match zc.poller.poll(&mut events, timeout) {
582                Ok(_) => zc.handle_poller_events(&events),
583                Err(e) => debug!("failed to select from sockets: {}", e),
584            }
585
586            let now = current_time_millis();
587
588            // Remove the timers if already passed.
589            zc.pop_timers_till(now);
590
591            // Remove hostname resolvers with expired timeouts.
592            for hostname in zc
593                .hostname_resolvers
594                .clone()
595                .into_iter()
596                .filter(|(_, (_, timeout))| timeout.map(|t| now >= t).unwrap_or(false))
597                .map(|(hostname, _)| hostname)
598            {
599                trace!("hostname resolver timeout for {}", &hostname);
600                call_hostname_resolution_listener(
601                    &zc.hostname_resolvers,
602                    &hostname,
603                    HostnameResolutionEvent::SearchTimeout(hostname.to_owned()),
604                );
605                call_hostname_resolution_listener(
606                    &zc.hostname_resolvers,
607                    &hostname,
608                    HostnameResolutionEvent::SearchStopped(hostname.to_owned()),
609                );
610                zc.hostname_resolvers.remove(&hostname);
611            }
612
613            // process commands from the command channel
614            while let Ok(command) = receiver.try_recv() {
615                if matches!(command, Command::Exit(_)) {
616                    zc.status = DaemonStatus::Shutdown;
617                    return Some(command);
618                }
619                zc.exec_command(command, false);
620            }
621
622            // check for repeated commands and run them if their time is up.
623            let mut i = 0;
624            while i < zc.retransmissions.len() {
625                if now >= zc.retransmissions[i].next_time {
626                    let rerun = zc.retransmissions.remove(i);
627                    zc.exec_command(rerun.command, true);
628                } else {
629                    i += 1;
630                }
631            }
632
633            // Refresh cached service records with active queriers
634            zc.refresh_active_services();
635
636            // Refresh cached A/AAAA records with active queriers
637            let mut query_count = 0;
638            for (hostname, _sender) in zc.hostname_resolvers.iter() {
639                for (hostname, ip_addr) in
640                    zc.cache.refresh_due_hostname_resolutions(hostname).iter()
641                {
642                    zc.send_query(hostname, ip_address_rr_type(ip_addr));
643                    query_count += 1;
644                }
645            }
646
647            zc.increase_counter(Counter::CacheRefreshAddr, query_count);
648
649            // check and evict expired records in our cache
650            let now = current_time_millis();
651
652            // Notify service listeners about the expired records.
653            let expired_services = zc.cache.evict_expired_services(now);
654            zc.notify_service_removal(expired_services);
655
656            // Notify hostname listeners about the expired records.
657            let expired_addrs = zc.cache.evict_expired_addr(now);
658            for (hostname, addrs) in expired_addrs {
659                call_hostname_resolution_listener(
660                    &zc.hostname_resolvers,
661                    &hostname,
662                    HostnameResolutionEvent::AddressesRemoved(hostname.clone(), addrs),
663                );
664                let instances = zc.cache.get_instances_on_host(&hostname);
665                let instance_set: HashSet<String> = instances.into_iter().collect();
666                zc.resolve_updated_instances(&instance_set);
667            }
668
669            // Send out probing queries.
670            zc.probing_handler();
671
672            // check IP changes if next_ip_check is reached.
673            if now >= next_ip_check && next_ip_check > 0 {
674                next_ip_check = now + zc.ip_check_interval;
675                zc.add_timer(next_ip_check);
676
677                zc.check_ip_changes();
678            }
679        }
680    }
681}
682
683/// Creates a new UDP socket that uses `intf` to send and recv multicast.
684fn new_socket_bind(intf: &Interface, should_loop: bool) -> Result<MioUdpSocket> {
685    // Use the same socket for receiving and sending multicast packets.
686    // Such socket has to bind to INADDR_ANY or IN6ADDR_ANY.
687    let intf_ip = &intf.ip();
688    match intf_ip {
689        IpAddr::V4(ip) => {
690            let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), MDNS_PORT);
691            let sock = new_socket(addr.into(), true)?;
692
693            // Join mDNS group to receive packets.
694            sock.join_multicast_v4(&GROUP_ADDR_V4, ip)
695                .map_err(|e| e_fmt!("join multicast group on addr {}: {}", intf_ip, e))?;
696
697            // Set IP_MULTICAST_IF to send packets.
698            sock.set_multicast_if_v4(ip)
699                .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
700
701            if !should_loop {
702                sock.set_multicast_loop_v4(false)
703                    .map_err(|e| e_fmt!("failed to set multicast loop v4 for {ip}: {e}"))?;
704            }
705
706            // Test if we can send packets successfully.
707            let multicast_addr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
708            let test_packets = DnsOutgoing::new(0).to_data_on_wire();
709            for packet in test_packets {
710                sock.send_to(&packet, &multicast_addr)
711                    .map_err(|e| e_fmt!("send multicast packet on addr {}: {}", ip, e))?;
712            }
713            Ok(MioUdpSocket::from_std(UdpSocket::from(sock)))
714        }
715        IpAddr::V6(ip) => {
716            let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), MDNS_PORT, 0, 0);
717            let sock = new_socket(addr.into(), true)?;
718
719            // Join mDNS group to receive packets.
720            sock.join_multicast_v6(&GROUP_ADDR_V6, intf.index.unwrap_or(0))
721                .map_err(|e| e_fmt!("join multicast group on addr {}: {}", ip, e))?;
722
723            // Set IPV6_MULTICAST_IF to send packets.
724            sock.set_multicast_if_v6(intf.index.unwrap_or(0))
725                .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
726
727            // We are not sending multicast packets to test this socket as there might
728            // be many IPv6 interfaces on a host and could cause such send error:
729            // "No buffer space available (os error 55)".
730
731            Ok(MioUdpSocket::from_std(UdpSocket::from(sock)))
732        }
733    }
734}
735
736/// Creates a new UDP socket to bind to `port` with REUSEPORT option.
737/// `non_block` indicates whether to set O_NONBLOCK for the socket.
738fn new_socket(addr: SocketAddr, non_block: bool) -> Result<Socket> {
739    let domain = match addr {
740        SocketAddr::V4(_) => socket2::Domain::IPV4,
741        SocketAddr::V6(_) => socket2::Domain::IPV6,
742    };
743
744    let fd = Socket::new(domain, socket2::Type::DGRAM, None)
745        .map_err(|e| e_fmt!("create socket failed: {}", e))?;
746
747    fd.set_reuse_address(true)
748        .map_err(|e| e_fmt!("set ReuseAddr failed: {}", e))?;
749    #[cfg(unix)] // this is currently restricted to Unix's in socket2
750    fd.set_reuse_port(true)
751        .map_err(|e| e_fmt!("set ReusePort failed: {}", e))?;
752
753    if non_block {
754        fd.set_nonblocking(true)
755            .map_err(|e| e_fmt!("set O_NONBLOCK: {}", e))?;
756    }
757
758    fd.bind(&addr.into())
759        .map_err(|e| e_fmt!("socket bind to {} failed: {}", &addr, e))?;
760
761    trace!("new socket bind to {}", &addr);
762    Ok(fd)
763}
764
765/// Specify a UNIX timestamp in millis to run `command` for the next time.
766struct ReRun {
767    /// UNIX timestamp in millis.
768    next_time: u64,
769    command: Command,
770}
771
772/// Enum to represent the IP version.
773#[derive(Debug, Eq, Hash, PartialEq)]
774enum IpVersion {
775    V4,
776    V6,
777}
778
779/// A struct to track multicast send status for a network interface.
780#[derive(Debug, Eq, Hash, PartialEq)]
781struct MulticastSendTracker {
782    intf_index: u32,
783    ip_version: IpVersion,
784}
785
786/// Returns the multicast send tracker if the interface index is valid
787fn multicast_send_tracker(intf: &Interface) -> Option<MulticastSendTracker> {
788    match intf.index {
789        Some(index) => {
790            let ip_ver = match intf.addr {
791                IfAddr::V4(_) => IpVersion::V4,
792                IfAddr::V6(_) => IpVersion::V6,
793            };
794            Some(MulticastSendTracker {
795                intf_index: index,
796                ip_version: ip_ver,
797            })
798        }
799        None => None,
800    }
801}
802
803/// Specify kinds of interfaces. It is used to enable or to disable interfaces in the daemon.
804///
805/// Note that for ergonomic reasons, `From<&str>` and `From<IpAddr>` are implemented.
806#[derive(Debug, Clone)]
807#[non_exhaustive]
808pub enum IfKind {
809    /// All interfaces.
810    All,
811
812    /// All IPv4 interfaces.
813    IPv4,
814
815    /// All IPv6 interfaces.
816    IPv6,
817
818    /// By the interface name, for example "en0"
819    Name(String),
820
821    /// By an IPv4 or IPv6 address.
822    Addr(IpAddr),
823
824    /// 127.0.0.1 (or anything in 127.0.0.0/8), disabled by default.
825    ///
826    /// Use [ServiceDaemon::enable_interface] to support registering services on loopback interfaces,
827    /// which is required by some use cases (e.g., OSCQuery) that publish via mDNS.
828    LoopbackV4,
829
830    /// ::1/128, disabled by default.
831    LoopbackV6,
832}
833
834impl IfKind {
835    /// Checks if `intf` matches with this interface kind.
836    fn matches(&self, intf: &Interface) -> bool {
837        match self {
838            Self::All => true,
839            Self::IPv4 => intf.ip().is_ipv4(),
840            Self::IPv6 => intf.ip().is_ipv6(),
841            Self::Name(ifname) => ifname == &intf.name,
842            Self::Addr(addr) => addr == &intf.ip(),
843            Self::LoopbackV4 => intf.is_loopback() && intf.ip().is_ipv4(),
844            Self::LoopbackV6 => intf.is_loopback() && intf.ip().is_ipv6(),
845        }
846    }
847}
848
849/// The first use case of specifying an interface was to
850/// use an interface name. Hence adding this for ergonomic reasons.
851impl From<&str> for IfKind {
852    fn from(val: &str) -> Self {
853        Self::Name(val.to_string())
854    }
855}
856
857impl From<&String> for IfKind {
858    fn from(val: &String) -> Self {
859        Self::Name(val.to_string())
860    }
861}
862
863/// Still for ergonomic reasons.
864impl From<IpAddr> for IfKind {
865    fn from(val: IpAddr) -> Self {
866        Self::Addr(val)
867    }
868}
869
870/// A list of `IfKind` that can be used to match interfaces.
871pub struct IfKindVec {
872    kinds: Vec<IfKind>,
873}
874
875/// A trait that converts a type into a Vec of `IfKind`.
876pub trait IntoIfKindVec {
877    fn into_vec(self) -> IfKindVec;
878}
879
880impl<T: Into<IfKind>> IntoIfKindVec for T {
881    fn into_vec(self) -> IfKindVec {
882        let if_kind: IfKind = self.into();
883        IfKindVec {
884            kinds: vec![if_kind],
885        }
886    }
887}
888
889impl<T: Into<IfKind>> IntoIfKindVec for Vec<T> {
890    fn into_vec(self) -> IfKindVec {
891        let kinds: Vec<IfKind> = self.into_iter().map(|x| x.into()).collect();
892        IfKindVec { kinds }
893    }
894}
895
896/// Selection of interfaces.
897struct IfSelection {
898    /// The interfaces to be selected.
899    if_kind: IfKind,
900
901    /// Whether the `if_kind` should be enabled or not.
902    selected: bool,
903}
904
905/// A struct holding the state. It was inspired by `zeroconf` package in Python.
906struct Zeroconf {
907    /// Local interfaces with sockets to recv/send on these interfaces.
908    intf_socks: HashMap<Interface, MioUdpSocket>,
909
910    /// Map poll id to Interface.
911    poll_ids: HashMap<usize, Interface>,
912
913    /// Next poll id value
914    poll_id_count: usize,
915
916    /// Local registered services, keyed by service full names.
917    my_services: HashMap<String, ServiceInfo>,
918
919    /// Received DNS records.
920    cache: DnsCache,
921
922    /// Registered service records.
923    dns_registry_map: HashMap<Interface, DnsRegistry>,
924
925    /// Active "Browse" commands.
926    service_queriers: HashMap<String, Sender<ServiceEvent>>, // <ty_domain, channel::sender>
927
928    /// Active "ResolveHostname" commands.
929    ///
930    /// The timestamps are set at the future timestamp when the command should timeout.
931    /// `hostname` is case-insensitive and stored in lowercase.
932    hostname_resolvers: HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>, // <hostname, (channel::sender, UNIX timestamp in millis)>
933
934    /// All repeating transmissions.
935    retransmissions: Vec<ReRun>,
936
937    counters: Metrics,
938
939    /// Waits for incoming packets.
940    poller: Poll,
941
942    /// Channels to notify events.
943    monitors: Vec<Sender<DaemonEvent>>,
944
945    /// Options
946    service_name_len_max: u8,
947
948    /// Interval in millis to check IP address changes.
949    ip_check_interval: u64,
950
951    /// All interface selections called to the daemon.
952    if_selections: Vec<IfSelection>,
953
954    /// Socket for signaling.
955    signal_sock: MioUdpSocket,
956
957    /// Timestamps marking where we need another iteration of the run loop,
958    /// to react to events like retransmissions, cache refreshes, interface IP address changes, etc.
959    ///
960    /// When the run loop goes through a single iteration, it will
961    /// set its timeout to the earliest timer in this list.
962    timers: BinaryHeap<Reverse<u64>>,
963
964    status: DaemonStatus,
965
966    /// Service instances that are pending for resolving SRV and TXT.
967    pending_resolves: HashSet<String>,
968
969    /// Service instances that are already resolved.
970    resolved: HashSet<String>,
971
972    multicast_loop_v4: bool,
973
974    multicast_loop_v6: bool,
975}
976
977impl Zeroconf {
978    fn new(signal_sock: MioUdpSocket, poller: Poll) -> Self {
979        // Get interfaces.
980        let my_ifaddrs = my_ip_interfaces(false);
981
982        // Create a socket for every IP addr.
983        // Note: it is possible that `my_ifaddrs` contains the same IP addr with different interface names,
984        // or the same interface name with different IP addrs.
985        let mut intf_socks = HashMap::new();
986        let mut dns_registry_map = HashMap::new();
987
988        for intf in my_ifaddrs {
989            let sock = match new_socket_bind(&intf, true) {
990                Ok(s) => s,
991                Err(e) => {
992                    trace!("bind a socket to {}: {}. Skipped.", &intf.ip(), e);
993                    continue;
994                }
995            };
996
997            dns_registry_map.insert(intf.clone(), DnsRegistry::new());
998
999            intf_socks.insert(intf, sock);
1000        }
1001
1002        let monitors = Vec::new();
1003        let service_name_len_max = SERVICE_NAME_LEN_MAX_DEFAULT;
1004        let ip_check_interval = IP_CHECK_INTERVAL_IN_SECS_DEFAULT as u64 * 1000;
1005
1006        let timers = BinaryHeap::new();
1007
1008        // Disable loopback by default.
1009        let if_selections = vec![
1010            IfSelection {
1011                if_kind: IfKind::LoopbackV4,
1012                selected: false,
1013            },
1014            IfSelection {
1015                if_kind: IfKind::LoopbackV6,
1016                selected: false,
1017            },
1018        ];
1019
1020        let status = DaemonStatus::Running;
1021
1022        Self {
1023            intf_socks,
1024            poll_ids: HashMap::new(),
1025            poll_id_count: 0,
1026            my_services: HashMap::new(),
1027            cache: DnsCache::new(),
1028            dns_registry_map,
1029            hostname_resolvers: HashMap::new(),
1030            service_queriers: HashMap::new(),
1031            retransmissions: Vec::new(),
1032            counters: HashMap::new(),
1033            poller,
1034            monitors,
1035            service_name_len_max,
1036            ip_check_interval,
1037            if_selections,
1038            signal_sock,
1039            timers,
1040            status,
1041            pending_resolves: HashSet::new(),
1042            resolved: HashSet::new(),
1043            multicast_loop_v4: true,
1044            multicast_loop_v6: true,
1045        }
1046    }
1047
1048    fn process_set_option(&mut self, daemon_opt: DaemonOption) {
1049        match daemon_opt {
1050            DaemonOption::ServiceNameLenMax(length) => self.service_name_len_max = length,
1051            DaemonOption::IpCheckInterval(interval) => self.ip_check_interval = interval,
1052            DaemonOption::EnableInterface(if_kind) => self.enable_interface(if_kind),
1053            DaemonOption::DisableInterface(if_kind) => self.disable_interface(if_kind),
1054            DaemonOption::MulticastLoopV4(on) => self.set_multicast_loop_v4(on),
1055            DaemonOption::MulticastLoopV6(on) => self.set_multicast_loop_v6(on),
1056        }
1057    }
1058
1059    fn enable_interface(&mut self, kinds: Vec<IfKind>) {
1060        for if_kind in kinds {
1061            self.if_selections.push(IfSelection {
1062                if_kind,
1063                selected: true,
1064            });
1065        }
1066
1067        self.apply_intf_selections(my_ip_interfaces(true));
1068    }
1069
1070    fn disable_interface(&mut self, kinds: Vec<IfKind>) {
1071        for if_kind in kinds {
1072            self.if_selections.push(IfSelection {
1073                if_kind,
1074                selected: false,
1075            });
1076        }
1077
1078        self.apply_intf_selections(my_ip_interfaces(true));
1079    }
1080
1081    fn set_multicast_loop_v4(&mut self, on: bool) {
1082        for (_, sock) in self.intf_socks.iter_mut() {
1083            if let Err(e) = sock.set_multicast_loop_v4(on) {
1084                debug!("failed to set multicast loop v4: {e}");
1085            }
1086        }
1087    }
1088
1089    fn set_multicast_loop_v6(&mut self, on: bool) {
1090        for (_, sock) in self.intf_socks.iter_mut() {
1091            if let Err(e) = sock.set_multicast_loop_v6(on) {
1092                debug!("failed to set multicast loop v6: {e}");
1093            }
1094        }
1095    }
1096
1097    fn notify_monitors(&mut self, event: DaemonEvent) {
1098        // Only retain the monitors that are still connected.
1099        self.monitors.retain(|sender| {
1100            if let Err(e) = sender.try_send(event.clone()) {
1101                debug!("notify_monitors: try_send: {}", &e);
1102                if matches!(e, TrySendError::Disconnected(_)) {
1103                    return false; // This monitor is dropped.
1104                }
1105            }
1106            true
1107        });
1108    }
1109
1110    /// Remove `addr` in my services that enabled `addr_auto`.
1111    fn del_addr_in_my_services(&mut self, addr: &IpAddr) {
1112        for (_, service_info) in self.my_services.iter_mut() {
1113            if service_info.is_addr_auto() {
1114                service_info.remove_ipaddr(addr);
1115            }
1116        }
1117    }
1118
1119    /// Insert a new interface into the poll map and return key
1120    fn add_poll(&mut self, intf: Interface) -> usize {
1121        Self::add_poll_impl(&mut self.poll_ids, &mut self.poll_id_count, intf)
1122    }
1123
1124    /// Insert a new interface into the poll map and return its key.
1125    ///
1126    /// This exists to satisfy the borrow checker
1127    fn add_poll_impl(
1128        poll_ids: &mut HashMap<usize, Interface>,
1129        poll_id_count: &mut usize,
1130        intf: Interface,
1131    ) -> usize {
1132        let key = *poll_id_count;
1133        *poll_id_count += 1;
1134        let _ = (*poll_ids).insert(key, intf);
1135        key
1136    }
1137
1138    fn add_timer(&mut self, next_time: u64) {
1139        self.timers.push(Reverse(next_time));
1140    }
1141
1142    fn peek_earliest_timer(&self) -> Option<u64> {
1143        self.timers.peek().map(|Reverse(v)| *v)
1144    }
1145
1146    fn _pop_earliest_timer(&mut self) -> Option<u64> {
1147        self.timers.pop().map(|Reverse(v)| v)
1148    }
1149
1150    /// Pop all timers that are already passed till `now`.
1151    fn pop_timers_till(&mut self, now: u64) {
1152        while let Some(Reverse(v)) = self.timers.peek() {
1153            if *v > now {
1154                break;
1155            }
1156            self.timers.pop();
1157        }
1158    }
1159
1160    /// Apply all selections to `interfaces` and return the selected addresses.
1161    fn selected_addrs(&self, interfaces: Vec<Interface>) -> HashSet<IpAddr> {
1162        let intf_count = interfaces.len();
1163        let mut intf_selections = vec![true; intf_count];
1164
1165        // apply if_selections
1166        for selection in self.if_selections.iter() {
1167            // Mark the interfaces for this selection.
1168            for i in 0..intf_count {
1169                if selection.if_kind.matches(&interfaces[i]) {
1170                    intf_selections[i] = selection.selected;
1171                }
1172            }
1173        }
1174
1175        let mut selected_addrs = HashSet::new();
1176        for i in 0..intf_count {
1177            if intf_selections[i] {
1178                selected_addrs.insert(interfaces[i].addr.ip());
1179            }
1180        }
1181
1182        selected_addrs
1183    }
1184
1185    /// Apply all selections to `interfaces`.
1186    ///
1187    /// For any interface, add it if selected but not bound yet,
1188    /// delete it if not selected but still bound.
1189    fn apply_intf_selections(&mut self, interfaces: Vec<Interface>) {
1190        // By default, we enable all interfaces.
1191        let intf_count = interfaces.len();
1192        let mut intf_selections = vec![true; intf_count];
1193
1194        // apply if_selections
1195        for selection in self.if_selections.iter() {
1196            // Mark the interfaces for this selection.
1197            for i in 0..intf_count {
1198                if selection.if_kind.matches(&interfaces[i]) {
1199                    intf_selections[i] = selection.selected;
1200                }
1201            }
1202        }
1203
1204        // Update `intf_socks` based on the selections.
1205        for (idx, intf) in interfaces.into_iter().enumerate() {
1206            if intf_selections[idx] {
1207                // Add the interface
1208                if !self.intf_socks.contains_key(&intf) {
1209                    debug!("apply_intf_selections: add {:?}", &intf.ip());
1210                    self.add_new_interface(intf);
1211                }
1212            } else {
1213                // Remove the interface
1214                if let Some(mut sock) = self.intf_socks.remove(&intf) {
1215                    match self.poller.registry().deregister(&mut sock) {
1216                        Ok(()) => debug!("apply_intf_selections: deregister {:?}", &intf.ip()),
1217                        Err(e) => debug!("apply_intf_selections: poller.delete {:?}: {}", &intf, e),
1218                    }
1219
1220                    // Remove from poll_ids
1221                    self.poll_ids.retain(|_, v| v != &intf);
1222
1223                    // Remove cache records for this interface.
1224                    self.cache.remove_addrs_on_disabled_intf(&intf);
1225                }
1226            }
1227        }
1228    }
1229
1230    /// Check for IP changes and update intf_socks as needed.
1231    fn check_ip_changes(&mut self) {
1232        // Get the current interfaces.
1233        let my_ifaddrs = my_ip_interfaces(true);
1234
1235        let poll_ids = &mut self.poll_ids;
1236        let poller = &mut self.poller;
1237        // Remove unused sockets in the poller.
1238        let deleted_addrs = self
1239            .intf_socks
1240            .iter_mut()
1241            .filter_map(|(intf, sock)| {
1242                if !my_ifaddrs.contains(intf) {
1243                    if let Err(e) = poller.registry().deregister(sock) {
1244                        debug!("check_ip_changes: poller.delete {:?}: {}", intf, e);
1245                    }
1246                    // Remove from poll_ids
1247                    poll_ids.retain(|_, v| v != intf);
1248                    Some(intf.ip())
1249                } else {
1250                    None
1251                }
1252            })
1253            .collect::<Vec<IpAddr>>();
1254
1255        // Remove deleted addrs from my services that enabled `addr_auto`.
1256        for ip in deleted_addrs.iter() {
1257            self.del_addr_in_my_services(ip);
1258            self.notify_monitors(DaemonEvent::IpDel(*ip));
1259        }
1260
1261        // Keep the interfaces only if they still exist.
1262        self.intf_socks.retain(|intf, _| my_ifaddrs.contains(intf));
1263
1264        // Add newly found interfaces only if in our selections.
1265        self.apply_intf_selections(my_ifaddrs);
1266    }
1267
1268    fn add_new_interface(&mut self, intf: Interface) {
1269        // Bind the new interface.
1270        let new_ip = intf.ip();
1271        let should_loop = if new_ip.is_ipv4() {
1272            self.multicast_loop_v4
1273        } else {
1274            self.multicast_loop_v6
1275        };
1276        let mut sock = match new_socket_bind(&intf, should_loop) {
1277            Ok(s) => s,
1278            Err(e) => {
1279                debug!("bind a socket to {}: {}. Skipped.", &intf.ip(), e);
1280                return;
1281            }
1282        };
1283
1284        // Add the new interface into the poller.
1285        let key = self.add_poll(intf.clone());
1286        if let Err(e) =
1287            self.poller
1288                .registry()
1289                .register(&mut sock, mio::Token(key), mio::Interest::READABLE)
1290        {
1291            debug!("check_ip_changes: poller add ip {}: {}", new_ip, e);
1292            return;
1293        }
1294
1295        debug!("add new interface {}: {new_ip}", intf.name);
1296        let dns_registry = match self.dns_registry_map.get_mut(&intf) {
1297            Some(registry) => registry,
1298            None => self
1299                .dns_registry_map
1300                .entry(intf.clone())
1301                .or_insert_with(DnsRegistry::new),
1302        };
1303
1304        for (_, service_info) in self.my_services.iter_mut() {
1305            if service_info.is_addr_auto() {
1306                service_info.insert_ipaddr(new_ip);
1307
1308                if announce_service_on_intf(dns_registry, service_info, &intf, &sock) {
1309                    debug!(
1310                        "Announce service {} on {}",
1311                        service_info.get_fullname(),
1312                        intf.ip()
1313                    );
1314                    service_info.set_status(&intf, ServiceStatus::Announced);
1315                } else {
1316                    for timer in dns_registry.new_timers.drain(..) {
1317                        self.timers.push(Reverse(timer));
1318                    }
1319                    service_info.set_status(&intf, ServiceStatus::Probing);
1320                }
1321            }
1322        }
1323
1324        self.intf_socks.insert(intf, sock);
1325
1326        // Notify the monitors.
1327        self.notify_monitors(DaemonEvent::IpAdd(new_ip));
1328    }
1329
1330    /// Registers a service.
1331    ///
1332    /// RFC 6762 section 8.3.
1333    /// ...the Multicast DNS responder MUST send
1334    ///    an unsolicited Multicast DNS response containing, in the Answer
1335    ///    Section, all of its newly registered resource records
1336    ///
1337    /// Zeroconf will then respond to requests for information about this service.
1338    fn register_service(&mut self, mut info: ServiceInfo) {
1339        // Check the service name length.
1340        if let Err(e) = check_service_name_length(info.get_type(), self.service_name_len_max) {
1341            debug!("check_service_name_length: {}", &e);
1342            self.notify_monitors(DaemonEvent::Error(e));
1343            return;
1344        }
1345
1346        if info.is_addr_auto() {
1347            let selected_addrs = self.selected_addrs(my_ip_interfaces(true));
1348            for addr in selected_addrs {
1349                info.insert_ipaddr(addr);
1350            }
1351        }
1352
1353        debug!("register service {:?}", &info);
1354
1355        let outgoing_addrs = self.send_unsolicited_response(&mut info);
1356        if !outgoing_addrs.is_empty() {
1357            self.notify_monitors(DaemonEvent::Announce(
1358                info.get_fullname().to_string(),
1359                format!("{:?}", &outgoing_addrs),
1360            ));
1361        }
1362
1363        // The key has to be lower case letter as DNS record name is case insensitive.
1364        // The info will have the original name.
1365        let service_fullname = info.get_fullname().to_lowercase();
1366        self.my_services.insert(service_fullname, info);
1367    }
1368
1369    /// Sends out announcement of `info` on every valid interface.
1370    /// Returns the list of interface IPs that sent out the announcement.
1371    fn send_unsolicited_response(&mut self, info: &mut ServiceInfo) -> Vec<IpAddr> {
1372        let mut outgoing_addrs = Vec::new();
1373        // Send the announcement on one interface per ip version.
1374        let mut multicast_sent_trackers = HashSet::new();
1375
1376        let mut outgoing_intfs = Vec::new();
1377
1378        for (intf, sock) in self.intf_socks.iter() {
1379            if let Some(tracker) = multicast_send_tracker(intf) {
1380                if multicast_sent_trackers.contains(&tracker) {
1381                    continue; // No need to send again on the same interface with same ip version.
1382                }
1383            }
1384
1385            let dns_registry = match self.dns_registry_map.get_mut(intf) {
1386                Some(registry) => registry,
1387                None => self
1388                    .dns_registry_map
1389                    .entry(intf.clone())
1390                    .or_insert_with(DnsRegistry::new),
1391            };
1392
1393            if announce_service_on_intf(dns_registry, info, intf, sock) {
1394                if let Some(tracker) = multicast_send_tracker(intf) {
1395                    multicast_sent_trackers.insert(tracker);
1396                }
1397                outgoing_addrs.push(intf.ip());
1398                outgoing_intfs.push(intf.clone());
1399
1400                debug!("Announce service {} on {}", info.get_fullname(), intf.ip());
1401
1402                info.set_status(intf, ServiceStatus::Announced);
1403            } else {
1404                for timer in dns_registry.new_timers.drain(..) {
1405                    self.timers.push(Reverse(timer));
1406                }
1407                info.set_status(intf, ServiceStatus::Probing);
1408            }
1409        }
1410
1411        // RFC 6762 section 8.3.
1412        // ..The Multicast DNS responder MUST send at least two unsolicited
1413        //    responses, one second apart.
1414        let next_time = current_time_millis() + 1000;
1415        for intf in outgoing_intfs {
1416            self.add_retransmission(
1417                next_time,
1418                Command::RegisterResend(info.get_fullname().to_string(), intf),
1419            );
1420        }
1421
1422        outgoing_addrs
1423    }
1424
1425    /// Send probings or finish them if expired. Notify waiting services.
1426    fn probing_handler(&mut self) {
1427        let now = current_time_millis();
1428
1429        for (intf, sock) in self.intf_socks.iter() {
1430            let Some(dns_registry) = self.dns_registry_map.get_mut(intf) else {
1431                continue;
1432            };
1433
1434            let mut expired_probe_names = Vec::new();
1435            let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
1436
1437            for (name, probe) in dns_registry.probing.iter_mut() {
1438                if now >= probe.next_send {
1439                    if probe.expired(now) {
1440                        // move the record to active
1441                        expired_probe_names.push(name.clone());
1442                    } else {
1443                        out.add_question(name, RRType::ANY);
1444
1445                        /*
1446                        RFC 6762 section 8.2: https://datatracker.ietf.org/doc/html/rfc6762#section-8.2
1447                        ...
1448                        for tiebreaking to work correctly in all
1449                        cases, the Authority Section must contain *all* the records and
1450                        proposed rdata being probed for uniqueness.
1451                         */
1452                        for record in probe.records.iter() {
1453                            out.add_authority(record.clone());
1454                        }
1455
1456                        probe.update_next_send(now);
1457
1458                        // add timer
1459                        self.timers.push(Reverse(probe.next_send));
1460                    }
1461                }
1462            }
1463
1464            // send probing.
1465            if !out.questions().is_empty() {
1466                debug!("sending out probing of {} questions", out.questions().len());
1467                send_dns_outgoing(&out, intf, sock);
1468            }
1469
1470            let mut waiting_services = HashSet::new();
1471
1472            for name in expired_probe_names {
1473                let Some(probe) = dns_registry.probing.remove(&name) else {
1474                    continue;
1475                };
1476
1477                // send notifications about name changes
1478                for record in probe.records.iter() {
1479                    if let Some(new_name) = record.get_record().get_new_name() {
1480                        dns_registry
1481                            .name_changes
1482                            .insert(name.clone(), new_name.to_string());
1483
1484                        let event = DnsNameChange {
1485                            original: record.get_record().get_original_name().to_string(),
1486                            new_name: new_name.to_string(),
1487                            rr_type: record.get_type(),
1488                            intf_name: intf.name.to_string(),
1489                        };
1490                        notify_monitors(&mut self.monitors, DaemonEvent::NameChange(event));
1491                    }
1492                }
1493
1494                // move RR from probe to active.
1495                debug!(
1496                    "probe of '{name}' finished: move {} records to active. ({} waiting services)",
1497                    probe.records.len(),
1498                    probe.waiting_services.len(),
1499                );
1500
1501                // Move records to active and plan to wake up services if records are not empty.
1502                if !probe.records.is_empty() {
1503                    match dns_registry.active.get_mut(&name) {
1504                        Some(records) => {
1505                            records.extend(probe.records);
1506                        }
1507                        None => {
1508                            dns_registry.active.insert(name, probe.records);
1509                        }
1510                    }
1511
1512                    waiting_services.extend(probe.waiting_services);
1513                }
1514            }
1515
1516            // wake up services waiting.
1517            for service_name in waiting_services {
1518                debug!(
1519                    "try to announce service {service_name} on intf {}",
1520                    intf.ip()
1521                );
1522                // service names are lowercase
1523                if let Some(info) = self.my_services.get_mut(&service_name.to_lowercase()) {
1524                    if info.get_status(intf) == ServiceStatus::Announced {
1525                        debug!("service {} already announced", info.get_fullname());
1526                        continue;
1527                    }
1528
1529                    if announce_service_on_intf(dns_registry, info, intf, sock) {
1530                        let next_time = now + 1000;
1531                        let command =
1532                            Command::RegisterResend(info.get_fullname().to_string(), intf.clone());
1533                        self.retransmissions.push(ReRun { next_time, command });
1534                        self.timers.push(Reverse(next_time));
1535
1536                        let fullname = match dns_registry.name_changes.get(&service_name) {
1537                            Some(new_name) => new_name.to_string(),
1538                            None => service_name.to_string(),
1539                        };
1540
1541                        let mut hostname = info.get_hostname();
1542                        if let Some(new_name) = dns_registry.name_changes.get(hostname) {
1543                            hostname = new_name;
1544                        }
1545
1546                        debug!("wake up: announce service {} on {}", fullname, intf.ip());
1547                        notify_monitors(
1548                            &mut self.monitors,
1549                            DaemonEvent::Announce(fullname, format!("{}:{}", hostname, &intf.ip())),
1550                        );
1551
1552                        info.set_status(intf, ServiceStatus::Announced);
1553                    }
1554                }
1555            }
1556        }
1557    }
1558
1559    fn unregister_service(
1560        &self,
1561        info: &ServiceInfo,
1562        intf: &Interface,
1563        sock: &MioUdpSocket,
1564    ) -> Vec<u8> {
1565        let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
1566        out.add_answer_at_time(
1567            DnsPointer::new(
1568                info.get_type(),
1569                RRType::PTR,
1570                CLASS_IN,
1571                0,
1572                info.get_fullname().to_string(),
1573            ),
1574            0,
1575        );
1576
1577        if let Some(sub) = info.get_subtype() {
1578            trace!("Adding subdomain {}", sub);
1579            out.add_answer_at_time(
1580                DnsPointer::new(
1581                    sub,
1582                    RRType::PTR,
1583                    CLASS_IN,
1584                    0,
1585                    info.get_fullname().to_string(),
1586                ),
1587                0,
1588            );
1589        }
1590
1591        out.add_answer_at_time(
1592            DnsSrv::new(
1593                info.get_fullname(),
1594                CLASS_IN | CLASS_CACHE_FLUSH,
1595                0,
1596                info.get_priority(),
1597                info.get_weight(),
1598                info.get_port(),
1599                info.get_hostname().to_string(),
1600            ),
1601            0,
1602        );
1603        out.add_answer_at_time(
1604            DnsTxt::new(
1605                info.get_fullname(),
1606                CLASS_IN | CLASS_CACHE_FLUSH,
1607                0,
1608                info.generate_txt(),
1609            ),
1610            0,
1611        );
1612
1613        for address in info.get_addrs_on_intf(intf) {
1614            out.add_answer_at_time(
1615                DnsAddress::new(
1616                    info.get_hostname(),
1617                    ip_address_rr_type(&address),
1618                    CLASS_IN | CLASS_CACHE_FLUSH,
1619                    0,
1620                    address,
1621                    intf.into(),
1622                ),
1623                0,
1624            );
1625        }
1626
1627        // `out` data is non-empty, hence we can do this.
1628        send_dns_outgoing(&out, intf, sock).remove(0)
1629    }
1630
1631    /// Binds a channel `listener` to querying mDNS hostnames.
1632    ///
1633    /// If there is already a `listener`, it will be updated, i.e. overwritten.
1634    fn add_hostname_resolver(
1635        &mut self,
1636        hostname: String,
1637        listener: Sender<HostnameResolutionEvent>,
1638        timeout: Option<u64>,
1639    ) {
1640        let real_timeout = timeout.map(|t| current_time_millis() + t);
1641        self.hostname_resolvers
1642            .insert(hostname.to_lowercase(), (listener, real_timeout));
1643        if let Some(t) = real_timeout {
1644            self.add_timer(t);
1645        }
1646    }
1647
1648    /// Sends a multicast query for `name` with `qtype`.
1649    fn send_query(&self, name: &str, qtype: RRType) {
1650        self.send_query_vec(&[(name, qtype)]);
1651    }
1652
1653    /// Sends out a list of `questions` (i.e. DNS questions) via multicast.
1654    fn send_query_vec(&self, questions: &[(&str, RRType)]) {
1655        trace!("Sending query questions: {:?}", questions);
1656        let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
1657        let now = current_time_millis();
1658
1659        for (name, qtype) in questions {
1660            out.add_question(name, *qtype);
1661
1662            for record in self.cache.get_known_answers(name, *qtype, now) {
1663                /*
1664                RFC 6762 section 7.1: https://datatracker.ietf.org/doc/html/rfc6762#section-7.1
1665                ...
1666                    When a Multicast DNS querier sends a query to which it already knows
1667                    some answers, it populates the Answer Section of the DNS query
1668                    message with those answers.
1669                 */
1670                trace!("add known answer: {:?}", record);
1671                let mut new_record = record.clone();
1672                new_record.get_record_mut().update_ttl(now);
1673                out.add_answer_box(new_record);
1674            }
1675        }
1676
1677        // Send the query on one interface per ip version.
1678        let mut multicast_sent_trackers = HashSet::new();
1679        for (intf, sock) in self.intf_socks.iter() {
1680            if let Some(tracker) = multicast_send_tracker(intf) {
1681                if multicast_sent_trackers.contains(&tracker) {
1682                    continue; // no need to send query the same interface with same ip version.
1683                }
1684                multicast_sent_trackers.insert(tracker);
1685            }
1686            send_dns_outgoing(&out, intf, sock);
1687        }
1688    }
1689
1690    /// Reads one UDP datagram from the socket of `intf`.
1691    ///
1692    /// Returns false if failed to receive a packet,
1693    /// otherwise returns true.
1694    fn handle_read(&mut self, intf: &Interface) -> bool {
1695        let sock = match self.intf_socks.get_mut(intf) {
1696            Some(if_sock) => if_sock,
1697            None => return false,
1698        };
1699        let mut buf = vec![0u8; MAX_MSG_ABSOLUTE];
1700
1701        // Read the next mDNS UDP datagram.
1702        //
1703        // If the datagram is larger than `buf`, excess bytes may or may not
1704        // be truncated by the socket layer depending on the platform's libc.
1705        // In any case, such large datagram will not be decoded properly and
1706        // this function should return false but should not crash.
1707        let sz = match sock.recv(&mut buf) {
1708            Ok(sz) => sz,
1709            Err(e) => {
1710                if e.kind() != std::io::ErrorKind::WouldBlock {
1711                    debug!("listening socket read failed: {}", e);
1712                }
1713                return false;
1714            }
1715        };
1716
1717        trace!("received {} bytes at IP: {}", sz, intf.ip());
1718
1719        // If sz is 0, it means sock reached End-of-File.
1720        if sz == 0 {
1721            debug!("socket {:?} was likely shutdown", &sock);
1722            if let Err(e) = self.poller.registry().deregister(sock) {
1723                debug!("failed to remove sock {:?} from poller: {}", sock, &e);
1724            }
1725
1726            // Replace the closed socket with a new one.
1727            let should_loop = if intf.ip().is_ipv4() {
1728                self.multicast_loop_v4
1729            } else {
1730                self.multicast_loop_v6
1731            };
1732            match new_socket_bind(intf, should_loop) {
1733                Ok(new_sock) => {
1734                    trace!("reset socket for IP {}", intf.ip());
1735                    self.intf_socks.insert(intf.clone(), new_sock);
1736                }
1737                Err(e) => debug!("re-bind a socket to {:?}: {}", intf, e),
1738            }
1739
1740            return false;
1741        }
1742
1743        buf.truncate(sz); // reduce potential processing errors
1744
1745        match DnsIncoming::new(buf, intf.into()) {
1746            Ok(msg) => {
1747                if msg.is_query() {
1748                    self.handle_query(msg, intf);
1749                } else if msg.is_response() {
1750                    self.handle_response(msg, intf);
1751                } else {
1752                    debug!("Invalid message: not query and not response");
1753                }
1754            }
1755            Err(e) => debug!("Invalid incoming DNS message: {}", e),
1756        }
1757
1758        true
1759    }
1760
1761    /// Returns true, if sent query. Returns false if SRV already exists.
1762    fn query_unresolved(&mut self, instance: &str) -> bool {
1763        if !valid_instance_name(instance) {
1764            trace!("instance name {} not valid", instance);
1765            return false;
1766        }
1767
1768        if let Some(records) = self.cache.get_srv(instance) {
1769            for record in records {
1770                if let Some(srv) = record.any().downcast_ref::<DnsSrv>() {
1771                    if self.cache.get_addr(srv.host()).is_none() {
1772                        self.send_query_vec(&[(srv.host(), RRType::A), (srv.host(), RRType::AAAA)]);
1773                        return true;
1774                    }
1775                }
1776            }
1777        } else {
1778            self.send_query(instance, RRType::ANY);
1779            return true;
1780        }
1781
1782        false
1783    }
1784
1785    /// Checks if `ty_domain` has records in the cache. If yes, sends the
1786    /// cached records via `sender`.
1787    fn query_cache_for_service(&mut self, ty_domain: &str, sender: &Sender<ServiceEvent>) {
1788        let mut resolved: HashSet<String> = HashSet::new();
1789        let mut unresolved: HashSet<String> = HashSet::new();
1790
1791        if let Some(records) = self.cache.get_ptr(ty_domain) {
1792            for record in records.iter() {
1793                if let Some(ptr) = record.any().downcast_ref::<DnsPointer>() {
1794                    let info = match self.create_service_info_from_cache(ty_domain, ptr.alias()) {
1795                        Ok(ok) => ok,
1796                        Err(err) => {
1797                            debug!("Error while creating service info from cache: {}", err);
1798                            continue;
1799                        }
1800                    };
1801
1802                    match sender.send(ServiceEvent::ServiceFound(
1803                        ty_domain.to_string(),
1804                        ptr.alias().to_string(),
1805                    )) {
1806                        Ok(()) => debug!("send service found {}", ptr.alias()),
1807                        Err(e) => {
1808                            debug!("failed to send service found: {}", e);
1809                            continue;
1810                        }
1811                    }
1812
1813                    if info.is_ready() {
1814                        resolved.insert(ptr.alias().to_string());
1815                        match sender.send(ServiceEvent::ServiceResolved(info)) {
1816                            Ok(()) => debug!("sent service resolved: {}", ptr.alias()),
1817                            Err(e) => debug!("failed to send service resolved: {}", e),
1818                        }
1819                    } else {
1820                        unresolved.insert(ptr.alias().to_string());
1821                    }
1822                }
1823            }
1824        }
1825
1826        for instance in resolved.drain() {
1827            self.pending_resolves.remove(&instance);
1828            self.resolved.insert(instance);
1829        }
1830
1831        for instance in unresolved.drain() {
1832            self.add_pending_resolve(instance);
1833        }
1834    }
1835
1836    /// Checks if `hostname` has records in the cache. If yes, sends the
1837    /// cached records via `sender`.
1838    fn query_cache_for_hostname(
1839        &mut self,
1840        hostname: &str,
1841        sender: Sender<HostnameResolutionEvent>,
1842    ) {
1843        let addresses_map = self.cache.get_addresses_for_host(hostname);
1844        for (name, addresses) in addresses_map {
1845            match sender.send(HostnameResolutionEvent::AddressesFound(name, addresses)) {
1846                Ok(()) => trace!("sent hostname addresses found"),
1847                Err(e) => debug!("failed to send hostname addresses found: {}", e),
1848            }
1849        }
1850    }
1851
1852    fn add_pending_resolve(&mut self, instance: String) {
1853        if !self.pending_resolves.contains(&instance) {
1854            let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
1855            self.add_retransmission(next_time, Command::Resolve(instance.clone(), 1));
1856            self.pending_resolves.insert(instance);
1857        }
1858    }
1859
1860    fn create_service_info_from_cache(
1861        &self,
1862        ty_domain: &str,
1863        fullname: &str,
1864    ) -> Result<ServiceInfo> {
1865        let my_name = {
1866            let name = fullname.trim_end_matches(split_sub_domain(ty_domain).0);
1867            name.strip_suffix('.').unwrap_or(name).to_string()
1868        };
1869
1870        let now = current_time_millis();
1871        let mut info = ServiceInfo::new(ty_domain, &my_name, "", (), 0, None)?;
1872
1873        // Be sure setting `subtype` if available even when querying for the parent domain.
1874        if let Some(subtype) = self.cache.get_subtype(fullname) {
1875            trace!(
1876                "ty_domain: {} found subtype {} for instance: {}",
1877                ty_domain,
1878                subtype,
1879                fullname
1880            );
1881            if info.get_subtype().is_none() {
1882                info.set_subtype(subtype.clone());
1883            }
1884        }
1885
1886        // resolve SRV record
1887        if let Some(records) = self.cache.get_srv(fullname) {
1888            if let Some(answer) = records.first() {
1889                if let Some(dns_srv) = answer.any().downcast_ref::<DnsSrv>() {
1890                    info.set_hostname(dns_srv.host().to_string());
1891                    info.set_port(dns_srv.port());
1892                }
1893            }
1894        }
1895
1896        // resolve TXT record
1897        if let Some(records) = self.cache.get_txt(fullname) {
1898            if let Some(record) = records.first() {
1899                if let Some(dns_txt) = record.any().downcast_ref::<DnsTxt>() {
1900                    info.set_properties_from_txt(dns_txt.text());
1901                }
1902            }
1903        }
1904
1905        // resolve A and AAAA records
1906        if let Some(records) = self.cache.get_addr(info.get_hostname()) {
1907            for answer in records.iter() {
1908                if let Some(dns_a) = answer.any().downcast_ref::<DnsAddress>() {
1909                    if dns_a.get_record().is_expired(now) {
1910                        trace!("Addr expired: {}", dns_a.address());
1911                    } else {
1912                        info.insert_ipaddr(dns_a.address());
1913                    }
1914                }
1915            }
1916        }
1917
1918        Ok(info)
1919    }
1920
1921    fn handle_poller_events(&mut self, events: &mio::Events) {
1922        for ev in events.iter() {
1923            trace!("event received with key {:?}", ev.token());
1924            if ev.token().0 == SIGNAL_SOCK_EVENT_KEY {
1925                // Drain signals as we will drain commands as well.
1926                self.signal_sock_drain();
1927
1928                if let Err(e) = self.poller.registry().reregister(
1929                    &mut self.signal_sock,
1930                    ev.token(),
1931                    mio::Interest::READABLE,
1932                ) {
1933                    debug!("failed to modify poller for signal socket: {}", e);
1934                }
1935                continue; // Next event.
1936            }
1937
1938            // Read until no more packets available.
1939            let intf = match self.poll_ids.get(&ev.token().0) {
1940                Some(interface) => interface.clone(),
1941                None => {
1942                    debug!("Ip for event key {} not found", ev.token().0);
1943                    break;
1944                }
1945            };
1946            while self.handle_read(&intf) {}
1947
1948            // we continue to monitor this socket.
1949            if let Some(sock) = self.intf_socks.get_mut(&intf) {
1950                if let Err(e) =
1951                    self.poller
1952                        .registry()
1953                        .reregister(sock, ev.token(), mio::Interest::READABLE)
1954                {
1955                    debug!("modify poller for interface {:?}: {}", &intf, e);
1956                    break;
1957                }
1958            }
1959        }
1960    }
1961
1962    /// Deal with incoming response packets.  All answers
1963    /// are held in the cache, and listeners are notified.
1964    fn handle_response(&mut self, mut msg: DnsIncoming, intf: &Interface) {
1965        trace!(
1966            "handle_response: {} answers {} authorities {} additionals",
1967            msg.answers().len(),
1968            &msg.authorities().len(),
1969            &msg.num_additionals()
1970        );
1971        let now = current_time_millis();
1972
1973        // remove records that are expired.
1974        let mut record_predicate = |record: &DnsRecordBox| {
1975            if !record.get_record().is_expired(now) {
1976                return true;
1977            }
1978
1979            debug!("record is expired, removing it from cache.");
1980            if self.cache.remove(record) {
1981                // for PTR records, send event to listeners
1982                if let Some(dns_ptr) = record.any().downcast_ref::<DnsPointer>() {
1983                    call_service_listener(
1984                        &self.service_queriers,
1985                        dns_ptr.get_name(),
1986                        ServiceEvent::ServiceRemoved(
1987                            dns_ptr.get_name().to_string(),
1988                            dns_ptr.alias().to_string(),
1989                        ),
1990                    );
1991                }
1992            }
1993            false
1994        };
1995        msg.answers_mut().retain(&mut record_predicate);
1996        msg.authorities_mut().retain(&mut record_predicate);
1997        msg.additionals_mut().retain(&mut record_predicate);
1998
1999        // check possible conflicts and handle them.
2000        self.conflict_handler(&msg, intf);
2001
2002        // check if the message is for us.
2003        let mut is_for_us = true; // assume it is for us.
2004
2005        // If there are any PTR records in the answers, there should be
2006        // at least one PTR for us. Otherwise, the message is not for us.
2007        // If there are no PTR records at all, assume this message is for us.
2008        for answer in msg.answers() {
2009            if answer.get_type() == RRType::PTR {
2010                if self.service_queriers.contains_key(answer.get_name()) {
2011                    is_for_us = true;
2012                    break; // OK to break: at least one PTR for us.
2013                } else {
2014                    is_for_us = false;
2015                }
2016            } else if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2017                // If there is a hostname querier for this address, then it is for us.
2018                let answer_lowercase = answer.get_name().to_lowercase();
2019                if self.hostname_resolvers.contains_key(&answer_lowercase) {
2020                    is_for_us = true;
2021                    break; // OK to break: at least one hostname for us.
2022                }
2023            }
2024        }
2025
2026        /// Represents a DNS record change that involves one service instance.
2027        struct InstanceChange {
2028            ty: RRType,   // The type of DNS record for the instance.
2029            name: String, // The name of the record.
2030        }
2031
2032        // Go through all answers to get the new and updated records.
2033        // For new PTR records, send out ServiceFound immediately. For others,
2034        // collect them into `changes`.
2035        //
2036        // Note: we don't try to identify the update instances based on
2037        // each record immediately as the answers are likely related to each
2038        // other.
2039        let mut changes = Vec::new();
2040        let mut timers = Vec::new();
2041        for record in msg.all_records() {
2042            match self
2043                .cache
2044                .add_or_update(intf, record, &mut timers, is_for_us)
2045            {
2046                Some((dns_record, true)) => {
2047                    timers.push(dns_record.get_record().get_expire_time());
2048                    timers.push(dns_record.get_record().get_refresh_time());
2049
2050                    let ty = dns_record.get_type();
2051                    let name = dns_record.get_name();
2052                    if ty == RRType::PTR {
2053                        if self.service_queriers.contains_key(name) {
2054                            timers.push(dns_record.get_record().get_refresh_time());
2055                        }
2056
2057                        // send ServiceFound
2058                        if let Some(dns_ptr) = dns_record.any().downcast_ref::<DnsPointer>() {
2059                            call_service_listener(
2060                                &self.service_queriers,
2061                                name,
2062                                ServiceEvent::ServiceFound(
2063                                    name.to_string(),
2064                                    dns_ptr.alias().to_string(),
2065                                ),
2066                            );
2067                            changes.push(InstanceChange {
2068                                ty,
2069                                name: dns_ptr.alias().to_string(),
2070                            });
2071                        }
2072                    } else {
2073                        changes.push(InstanceChange {
2074                            ty,
2075                            name: name.to_string(),
2076                        });
2077                    }
2078                }
2079                Some((dns_record, false)) => {
2080                    timers.push(dns_record.get_record().get_expire_time());
2081                    timers.push(dns_record.get_record().get_refresh_time());
2082                }
2083                _ => {}
2084            }
2085        }
2086
2087        // Add timers for the new records.
2088        for t in timers {
2089            self.add_timer(t);
2090        }
2091
2092        // Go through remaining changes to see if any hostname resolutions were found or updated.
2093        for change in changes
2094            .iter()
2095            .filter(|change| change.ty == RRType::A || change.ty == RRType::AAAA)
2096        {
2097            let addr_map = self.cache.get_addresses_for_host(&change.name);
2098            for (name, addresses) in addr_map {
2099                call_hostname_resolution_listener(
2100                    &self.hostname_resolvers,
2101                    &change.name,
2102                    HostnameResolutionEvent::AddressesFound(name, addresses),
2103                )
2104            }
2105        }
2106
2107        // Identify the instances that need to be "resolved".
2108        let mut updated_instances = HashSet::new();
2109        for update in changes {
2110            match update.ty {
2111                RRType::PTR | RRType::SRV | RRType::TXT => {
2112                    updated_instances.insert(update.name);
2113                }
2114                RRType::A | RRType::AAAA => {
2115                    let instances = self.cache.get_instances_on_host(&update.name);
2116                    updated_instances.extend(instances);
2117                }
2118                _ => {}
2119            }
2120        }
2121
2122        self.resolve_updated_instances(&updated_instances);
2123    }
2124
2125    fn conflict_handler(&mut self, msg: &DnsIncoming, intf: &Interface) {
2126        let Some(dns_registry) = self.dns_registry_map.get_mut(intf) else {
2127            return;
2128        };
2129
2130        for answer in msg.answers().iter() {
2131            let mut new_records = Vec::new();
2132
2133            let name = answer.get_name();
2134            let Some(probe) = dns_registry.probing.get_mut(name) else {
2135                continue;
2136            };
2137
2138            // check against possible multicast forwarding
2139            if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2140                if let Some(answer_addr) = answer.any().downcast_ref::<DnsAddress>() {
2141                    if !valid_ip_on_intf(&answer_addr.address(), intf) {
2142                        debug!(
2143                            "conflict handler: answer addr {:?} not in the subnet of {:?}",
2144                            answer_addr, intf
2145                        );
2146                        continue;
2147                    }
2148                }
2149
2150                // double check if any other address record matches rrdata,
2151                // as there could be multiple addresses for the same name.
2152                let any_match = probe.records.iter().any(|r| {
2153                    r.get_type() == answer.get_type()
2154                        && r.get_class() == answer.get_class()
2155                        && r.rrdata_match(answer.as_ref())
2156                });
2157                if any_match {
2158                    continue; // no conflict for this answer.
2159                }
2160            }
2161
2162            probe.records.retain(|record| {
2163                if record.get_type() == answer.get_type()
2164                    && record.get_class() == answer.get_class()
2165                    && !record.rrdata_match(answer.as_ref())
2166                {
2167                    debug!(
2168                        "found conflict name: '{name}' record: {}: {} PEER: {}",
2169                        record.get_type(),
2170                        record.rdata_print(),
2171                        answer.rdata_print()
2172                    );
2173
2174                    // create a new name for this record
2175                    // then remove the old record in probing.
2176                    let mut new_record = record.clone();
2177                    let new_name = match record.get_type() {
2178                        RRType::A => hostname_change(name),
2179                        RRType::AAAA => hostname_change(name),
2180                        _ => name_change(name),
2181                    };
2182                    new_record.get_record_mut().set_new_name(new_name);
2183                    new_records.push(new_record);
2184                    return false; // old record is dropped from the probe.
2185                }
2186
2187                true
2188            });
2189
2190            // ?????
2191            // if probe.records.is_empty() {
2192            //     dns_registry.probing.remove(name);
2193            // }
2194
2195            // Probing again with the new names.
2196            let create_time = current_time_millis() + fastrand::u64(0..250);
2197
2198            let waiting_services = probe.waiting_services.clone();
2199
2200            for record in new_records {
2201                if dns_registry.update_hostname(name, record.get_name(), create_time) {
2202                    self.timers.push(Reverse(create_time));
2203                }
2204
2205                // remember the name changes (note: `name` might not be the original, it could be already changed once.)
2206                dns_registry.name_changes.insert(
2207                    record.get_record().get_original_name().to_string(),
2208                    record.get_name().to_string(),
2209                );
2210
2211                let new_probe = match dns_registry.probing.get_mut(record.get_name()) {
2212                    Some(p) => p,
2213                    None => {
2214                        let new_probe = dns_registry
2215                            .probing
2216                            .entry(record.get_name().to_string())
2217                            .or_insert_with(|| {
2218                                debug!("conflict handler: new probe of {}", record.get_name());
2219                                Probe::new(create_time)
2220                            });
2221                        self.timers.push(Reverse(new_probe.next_send));
2222                        new_probe
2223                    }
2224                };
2225
2226                debug!(
2227                    "insert record with new name '{}' {} into probe",
2228                    record.get_name(),
2229                    record.get_type()
2230                );
2231                new_probe.insert_record(record);
2232
2233                new_probe.waiting_services.extend(waiting_services.clone());
2234            }
2235        }
2236    }
2237
2238    /// Resolve the updated (including new) instances.
2239    ///
2240    /// Note: it is possible that more than 1 PTR pointing to the same
2241    /// instance. For example, a regular service type PTR and a sub-type
2242    /// service type PTR can both point to the same service instance.
2243    /// This loop automatically handles the sub-type PTRs.
2244    fn resolve_updated_instances(&mut self, updated_instances: &HashSet<String>) {
2245        let mut resolved: HashSet<String> = HashSet::new();
2246        let mut unresolved: HashSet<String> = HashSet::new();
2247        let mut removed_instances = HashMap::new();
2248
2249        for (ty_domain, records) in self.cache.all_ptr().iter() {
2250            if !self.service_queriers.contains_key(ty_domain) {
2251                // No need to resolve if not in our queries.
2252                continue;
2253            }
2254
2255            for record in records.iter() {
2256                if let Some(dns_ptr) = record.any().downcast_ref::<DnsPointer>() {
2257                    if updated_instances.contains(dns_ptr.alias()) {
2258                        if let Ok(info) =
2259                            self.create_service_info_from_cache(ty_domain, dns_ptr.alias())
2260                        {
2261                            if info.is_ready() {
2262                                debug!("call queriers to resolve {}", dns_ptr.alias());
2263                                resolved.insert(dns_ptr.alias().to_string());
2264                                call_service_listener(
2265                                    &self.service_queriers,
2266                                    ty_domain,
2267                                    ServiceEvent::ServiceResolved(info),
2268                                );
2269                            } else {
2270                                if self.resolved.remove(dns_ptr.alias()) {
2271                                    removed_instances
2272                                        .entry(ty_domain.to_string())
2273                                        .or_insert_with(HashSet::new)
2274                                        .insert(dns_ptr.alias().to_string());
2275                                }
2276                                unresolved.insert(dns_ptr.alias().to_string());
2277                            }
2278                        }
2279                    }
2280                }
2281            }
2282        }
2283
2284        for instance in resolved.drain() {
2285            self.pending_resolves.remove(&instance);
2286            self.resolved.insert(instance);
2287        }
2288
2289        for instance in unresolved.drain() {
2290            self.add_pending_resolve(instance);
2291        }
2292
2293        self.notify_service_removal(removed_instances);
2294    }
2295
2296    /// Handle incoming query packets, figure out whether and what to respond.
2297    fn handle_query(&mut self, msg: DnsIncoming, intf: &Interface) {
2298        let sock = match self.intf_socks.get(intf) {
2299            Some(sock) => sock,
2300            None => return,
2301        };
2302        let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
2303
2304        // Special meta-query "_services._dns-sd._udp.<Domain>".
2305        // See https://datatracker.ietf.org/doc/html/rfc6763#section-9
2306        const META_QUERY: &str = "_services._dns-sd._udp.local.";
2307
2308        let Some(dns_registry) = self.dns_registry_map.get_mut(intf) else {
2309            debug!("missing dns registry for intf {}", intf.ip());
2310            return;
2311        };
2312
2313        for question in msg.questions().iter() {
2314            trace!("query question: {:?}", &question);
2315
2316            let qtype = question.entry_type();
2317
2318            if qtype == RRType::PTR {
2319                for service in self.my_services.values() {
2320                    if service.get_status(intf) != ServiceStatus::Announced {
2321                        continue;
2322                    }
2323
2324                    if question.entry_name() == service.get_type()
2325                        || service
2326                            .get_subtype()
2327                            .as_ref()
2328                            .is_some_and(|v| v == question.entry_name())
2329                    {
2330                        add_answer_with_additionals(&mut out, &msg, service, intf, dns_registry);
2331                    } else if question.entry_name() == META_QUERY {
2332                        let ptr_added = out.add_answer(
2333                            &msg,
2334                            DnsPointer::new(
2335                                question.entry_name(),
2336                                RRType::PTR,
2337                                CLASS_IN,
2338                                service.get_other_ttl(),
2339                                service.get_type().to_string(),
2340                            ),
2341                        );
2342                        if !ptr_added {
2343                            trace!("answer was not added for meta-query {:?}", &question);
2344                        }
2345                    }
2346                }
2347            } else {
2348                // Simultaneous Probe Tiebreaking (RFC 6762 section 8.2)
2349                if qtype == RRType::ANY && msg.num_authorities() > 0 {
2350                    let probe_name = question.entry_name();
2351
2352                    if let Some(probe) = dns_registry.probing.get_mut(probe_name) {
2353                        let now = current_time_millis();
2354
2355                        // Only do tiebreaking if probe already started.
2356                        // This check also helps avoid redo tiebreaking if start time
2357                        // was postponed.
2358                        if probe.start_time < now {
2359                            let incoming_records: Vec<_> = msg
2360                                .authorities()
2361                                .iter()
2362                                .filter(|r| r.get_name() == probe_name)
2363                                .collect();
2364
2365                            probe.tiebreaking(&incoming_records, now, probe_name);
2366                        }
2367                    }
2368                }
2369
2370                if qtype == RRType::A || qtype == RRType::AAAA || qtype == RRType::ANY {
2371                    for service in self.my_services.values() {
2372                        if service.get_status(intf) != ServiceStatus::Announced {
2373                            continue;
2374                        }
2375
2376                        let service_hostname =
2377                            match dns_registry.name_changes.get(service.get_hostname()) {
2378                                Some(new_name) => new_name,
2379                                None => service.get_hostname(),
2380                            };
2381
2382                        if service_hostname.to_lowercase() == question.entry_name().to_lowercase() {
2383                            let intf_addrs = service.get_addrs_on_intf(intf);
2384                            if intf_addrs.is_empty()
2385                                && (qtype == RRType::A || qtype == RRType::AAAA)
2386                            {
2387                                let t = match qtype {
2388                                    RRType::A => "TYPE_A",
2389                                    RRType::AAAA => "TYPE_AAAA",
2390                                    _ => "invalid_type",
2391                                };
2392                                trace!(
2393                                    "Cannot find valid addrs for {} response on intf {:?}",
2394                                    t,
2395                                    &intf
2396                                );
2397                                return;
2398                            }
2399                            for address in intf_addrs {
2400                                out.add_answer(
2401                                    &msg,
2402                                    DnsAddress::new(
2403                                        service_hostname,
2404                                        ip_address_rr_type(&address),
2405                                        CLASS_IN | CLASS_CACHE_FLUSH,
2406                                        service.get_host_ttl(),
2407                                        address,
2408                                        intf.into(),
2409                                    ),
2410                                );
2411                            }
2412                        }
2413                    }
2414                }
2415
2416                let query_name = question.entry_name().to_lowercase();
2417                let service_opt = self
2418                    .my_services
2419                    .iter()
2420                    .find(|(k, _v)| {
2421                        let service_name = match dns_registry.name_changes.get(k.as_str()) {
2422                            Some(new_name) => new_name,
2423                            None => k,
2424                        };
2425                        service_name == &query_name
2426                    })
2427                    .map(|(_, v)| v);
2428
2429                let Some(service) = service_opt else {
2430                    continue;
2431                };
2432
2433                if service.get_status(intf) != ServiceStatus::Announced {
2434                    continue;
2435                }
2436
2437                if qtype == RRType::SRV || qtype == RRType::ANY {
2438                    out.add_answer(
2439                        &msg,
2440                        DnsSrv::new(
2441                            question.entry_name(),
2442                            CLASS_IN | CLASS_CACHE_FLUSH,
2443                            service.get_host_ttl(),
2444                            service.get_priority(),
2445                            service.get_weight(),
2446                            service.get_port(),
2447                            service.get_hostname().to_string(),
2448                        ),
2449                    );
2450                }
2451
2452                if qtype == RRType::TXT || qtype == RRType::ANY {
2453                    out.add_answer(
2454                        &msg,
2455                        DnsTxt::new(
2456                            question.entry_name(),
2457                            CLASS_IN | CLASS_CACHE_FLUSH,
2458                            service.get_host_ttl(),
2459                            service.generate_txt(),
2460                        ),
2461                    );
2462                }
2463
2464                if qtype == RRType::SRV {
2465                    let intf_addrs = service.get_addrs_on_intf(intf);
2466                    if intf_addrs.is_empty() {
2467                        debug!(
2468                            "Cannot find valid addrs for TYPE_SRV response on intf {:?}",
2469                            &intf
2470                        );
2471                        return;
2472                    }
2473                    for address in intf_addrs {
2474                        out.add_additional_answer(DnsAddress::new(
2475                            service.get_hostname(),
2476                            ip_address_rr_type(&address),
2477                            CLASS_IN | CLASS_CACHE_FLUSH,
2478                            service.get_host_ttl(),
2479                            address,
2480                            intf.into(),
2481                        ));
2482                    }
2483                }
2484            }
2485        }
2486
2487        if !out.answers_count() > 0 {
2488            out.set_id(msg.id());
2489            send_dns_outgoing(&out, intf, sock);
2490
2491            self.increase_counter(Counter::Respond, 1);
2492            self.notify_monitors(DaemonEvent::Respond(intf.ip()));
2493        }
2494
2495        self.increase_counter(Counter::KnownAnswerSuppression, out.known_answer_count());
2496    }
2497
2498    /// Increases the value of `counter` by `count`.
2499    fn increase_counter(&mut self, counter: Counter, count: i64) {
2500        let key = counter.to_string();
2501        match self.counters.get_mut(&key) {
2502            Some(v) => *v += count,
2503            None => {
2504                self.counters.insert(key, count);
2505            }
2506        }
2507    }
2508
2509    /// Sets the value of `counter` to `count`.
2510    fn set_counter(&mut self, counter: Counter, count: i64) {
2511        let key = counter.to_string();
2512        self.counters.insert(key, count);
2513    }
2514
2515    fn signal_sock_drain(&self) {
2516        let mut signal_buf = [0; 1024];
2517
2518        // This recv is non-blocking as the socket is non-blocking.
2519        while let Ok(sz) = self.signal_sock.recv(&mut signal_buf) {
2520            trace!(
2521                "signal socket recvd: {}",
2522                String::from_utf8_lossy(&signal_buf[0..sz])
2523            );
2524        }
2525    }
2526
2527    fn add_retransmission(&mut self, next_time: u64, command: Command) {
2528        self.retransmissions.push(ReRun { next_time, command });
2529        self.add_timer(next_time);
2530    }
2531
2532    /// Sends service removal event to listeners for expired service records.
2533    fn notify_service_removal(&self, expired: HashMap<String, HashSet<String>>) {
2534        for (ty_domain, sender) in self.service_queriers.iter() {
2535            if let Some(instances) = expired.get(ty_domain) {
2536                for instance_name in instances {
2537                    let event = ServiceEvent::ServiceRemoved(
2538                        ty_domain.to_string(),
2539                        instance_name.to_string(),
2540                    );
2541                    match sender.send(event) {
2542                        Ok(()) => debug!("notify_service_removal: sent ServiceRemoved to listener of {ty_domain}: {instance_name}"),
2543                        Err(e) => debug!("Failed to send event: {}", e),
2544                    }
2545                }
2546            }
2547        }
2548    }
2549
2550    /// The entry point that executes all commands received by the daemon.
2551    ///
2552    /// `repeating`: whether this is a retransmission.
2553    fn exec_command(&mut self, command: Command, repeating: bool) {
2554        match command {
2555            Command::Browse(ty, next_delay, listener) => {
2556                self.exec_command_browse(repeating, ty, next_delay, listener);
2557            }
2558
2559            Command::ResolveHostname(hostname, next_delay, listener, timeout) => {
2560                self.exec_command_resolve_hostname(
2561                    repeating, hostname, next_delay, listener, timeout,
2562                );
2563            }
2564
2565            Command::Register(service_info) => {
2566                self.register_service(service_info);
2567                self.increase_counter(Counter::Register, 1);
2568            }
2569
2570            Command::RegisterResend(fullname, intf) => {
2571                trace!("register-resend service: {fullname} on {:?}", &intf.addr);
2572                self.exec_command_register_resend(fullname, intf);
2573            }
2574
2575            Command::Unregister(fullname, resp_s) => {
2576                trace!("unregister service {} repeat {}", &fullname, &repeating);
2577                self.exec_command_unregister(repeating, fullname, resp_s);
2578            }
2579
2580            Command::UnregisterResend(packet, ip) => {
2581                self.exec_command_unregister_resend(packet, ip);
2582            }
2583
2584            Command::StopBrowse(ty_domain) => self.exec_command_stop_browse(ty_domain),
2585
2586            Command::StopResolveHostname(hostname) => {
2587                self.exec_command_stop_resolve_hostname(hostname.to_lowercase())
2588            }
2589
2590            Command::Resolve(instance, try_count) => self.exec_command_resolve(instance, try_count),
2591
2592            Command::GetMetrics(resp_s) => self.exec_command_get_metrics(resp_s),
2593
2594            Command::GetStatus(resp_s) => match resp_s.send(self.status.clone()) {
2595                Ok(()) => trace!("Sent status to the client"),
2596                Err(e) => debug!("Failed to send status: {}", e),
2597            },
2598
2599            Command::Monitor(resp_s) => {
2600                self.monitors.push(resp_s);
2601            }
2602
2603            Command::SetOption(daemon_opt) => {
2604                self.process_set_option(daemon_opt);
2605            }
2606
2607            Command::GetOption(resp_s) => {
2608                let val = DaemonOptionVal {
2609                    _service_name_len_max: self.service_name_len_max,
2610                    ip_check_interval: self.ip_check_interval,
2611                };
2612                if let Err(e) = resp_s.send(val) {
2613                    debug!("Failed to send options: {}", e);
2614                }
2615            }
2616
2617            Command::Verify(instance_fullname, timeout) => {
2618                self.exec_command_verify(instance_fullname, timeout, repeating);
2619            }
2620
2621            _ => {
2622                debug!("unexpected command: {:?}", &command);
2623            }
2624        }
2625    }
2626
2627    fn exec_command_get_metrics(&mut self, resp_s: Sender<HashMap<String, i64>>) {
2628        self.set_counter(Counter::CachedPTR, self.cache.ptr_count() as i64);
2629        self.set_counter(Counter::CachedSRV, self.cache.srv_count() as i64);
2630        self.set_counter(Counter::CachedAddr, self.cache.addr_count() as i64);
2631        self.set_counter(Counter::CachedTxt, self.cache.txt_count() as i64);
2632        self.set_counter(Counter::CachedNSec, self.cache.nsec_count() as i64);
2633        self.set_counter(Counter::CachedSubtype, self.cache.subtype_count() as i64);
2634        self.set_counter(Counter::Timer, self.timers.len() as i64);
2635
2636        let dns_registry_probe_count: usize = self
2637            .dns_registry_map
2638            .values()
2639            .map(|r| r.probing.len())
2640            .sum();
2641        self.set_counter(Counter::DnsRegistryProbe, dns_registry_probe_count as i64);
2642
2643        let dns_registry_active_count: usize = self
2644            .dns_registry_map
2645            .values()
2646            .map(|r| r.active.values().map(|a| a.len()).sum::<usize>())
2647            .sum();
2648        self.set_counter(Counter::DnsRegistryActive, dns_registry_active_count as i64);
2649
2650        let dns_registry_timer_count: usize = self
2651            .dns_registry_map
2652            .values()
2653            .map(|r| r.new_timers.len())
2654            .sum();
2655        self.set_counter(Counter::DnsRegistryTimer, dns_registry_timer_count as i64);
2656
2657        let dns_registry_name_change_count: usize = self
2658            .dns_registry_map
2659            .values()
2660            .map(|r| r.name_changes.len())
2661            .sum();
2662        self.set_counter(
2663            Counter::DnsRegistryNameChange,
2664            dns_registry_name_change_count as i64,
2665        );
2666
2667        // Send the metrics to the client.
2668        if let Err(e) = resp_s.send(self.counters.clone()) {
2669            debug!("Failed to send metrics: {}", e);
2670        }
2671    }
2672
2673    fn exec_command_browse(
2674        &mut self,
2675        repeating: bool,
2676        ty: String,
2677        next_delay: u32,
2678        listener: Sender<ServiceEvent>,
2679    ) {
2680        let pretty_addrs: Vec<String> = self
2681            .intf_socks
2682            .keys()
2683            .map(|itf| format!("{} ({})", itf.ip(), itf.name))
2684            .collect();
2685
2686        if let Err(e) = listener.send(ServiceEvent::SearchStarted(format!(
2687            "{ty} on {} interfaces [{}]",
2688            pretty_addrs.len(),
2689            pretty_addrs.join(", ")
2690        ))) {
2691            debug!(
2692                "Failed to send SearchStarted({})(repeating:{}): {}",
2693                &ty, repeating, e
2694            );
2695            return;
2696        }
2697        if !repeating {
2698            // Binds a `listener` to querying mDNS domain type `ty`.
2699            //
2700            // If there is already a `listener`, it will be updated, i.e. overwritten.
2701            self.service_queriers.insert(ty.clone(), listener.clone());
2702
2703            // if we already have the records in our cache, just send them
2704            self.query_cache_for_service(&ty, &listener);
2705        }
2706
2707        self.send_query(&ty, RRType::PTR);
2708        self.increase_counter(Counter::Browse, 1);
2709
2710        let next_time = current_time_millis() + (next_delay * 1000) as u64;
2711        let max_delay = 60 * 60;
2712        let delay = cmp::min(next_delay * 2, max_delay);
2713        self.add_retransmission(next_time, Command::Browse(ty, delay, listener));
2714    }
2715
2716    fn exec_command_resolve_hostname(
2717        &mut self,
2718        repeating: bool,
2719        hostname: String,
2720        next_delay: u32,
2721        listener: Sender<HostnameResolutionEvent>,
2722        timeout: Option<u64>,
2723    ) {
2724        let addr_list: Vec<_> = self.intf_socks.keys().collect();
2725        if let Err(e) = listener.send(HostnameResolutionEvent::SearchStarted(format!(
2726            "{} on addrs {:?}",
2727            &hostname, &addr_list
2728        ))) {
2729            debug!(
2730                "Failed to send ResolveStarted({})(repeating:{}): {}",
2731                &hostname, repeating, e
2732            );
2733            return;
2734        }
2735        if !repeating {
2736            self.add_hostname_resolver(hostname.to_owned(), listener.clone(), timeout);
2737            // if we already have the records in our cache, just send them
2738            self.query_cache_for_hostname(&hostname, listener.clone());
2739        }
2740
2741        self.send_query_vec(&[(&hostname, RRType::A), (&hostname, RRType::AAAA)]);
2742        self.increase_counter(Counter::ResolveHostname, 1);
2743
2744        let now = current_time_millis();
2745        let next_time = now + u64::from(next_delay) * 1000;
2746        let max_delay = 60 * 60;
2747        let delay = cmp::min(next_delay * 2, max_delay);
2748
2749        // Only add retransmission if it does not exceed the hostname resolver timeout, if any.
2750        if self
2751            .hostname_resolvers
2752            .get(&hostname)
2753            .and_then(|(_sender, timeout)| *timeout)
2754            .map(|timeout| next_time < timeout)
2755            .unwrap_or(true)
2756        {
2757            self.add_retransmission(
2758                next_time,
2759                Command::ResolveHostname(hostname, delay, listener, None),
2760            );
2761        }
2762    }
2763
2764    fn exec_command_resolve(&mut self, instance: String, try_count: u16) {
2765        let pending_query = self.query_unresolved(&instance);
2766        let max_try = 3;
2767        if pending_query && try_count < max_try {
2768            // Note that if the current try already succeeds, the next retransmission
2769            // will be no-op as the cache has been updated.
2770            let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
2771            self.add_retransmission(next_time, Command::Resolve(instance, try_count + 1));
2772        }
2773    }
2774
2775    fn exec_command_unregister(
2776        &mut self,
2777        repeating: bool,
2778        fullname: String,
2779        resp_s: Sender<UnregisterStatus>,
2780    ) {
2781        let response = match self.my_services.remove_entry(&fullname) {
2782            None => {
2783                debug!("unregister: cannot find such service {}", &fullname);
2784                UnregisterStatus::NotFound
2785            }
2786            Some((_k, info)) => {
2787                let mut timers = Vec::new();
2788                // Send one unregister per interface and ip version
2789                let mut multicast_sent_trackers = HashSet::new();
2790
2791                for (intf, sock) in self.intf_socks.iter() {
2792                    if let Some(tracker) = multicast_send_tracker(intf) {
2793                        if multicast_sent_trackers.contains(&tracker) {
2794                            continue; // no need to send unregister the same interface with same ip version.
2795                        }
2796                        multicast_sent_trackers.insert(tracker);
2797                    }
2798                    let packet = self.unregister_service(&info, intf, sock);
2799                    // repeat for one time just in case some peers miss the message
2800                    if !repeating && !packet.is_empty() {
2801                        let next_time = current_time_millis() + 120;
2802                        self.retransmissions.push(ReRun {
2803                            next_time,
2804                            command: Command::UnregisterResend(packet, intf.clone()),
2805                        });
2806                        timers.push(next_time);
2807                    }
2808                }
2809
2810                for t in timers {
2811                    self.add_timer(t);
2812                }
2813
2814                self.increase_counter(Counter::Unregister, 1);
2815                UnregisterStatus::OK
2816            }
2817        };
2818        if let Err(e) = resp_s.send(response) {
2819            debug!("unregister: failed to send response: {}", e);
2820        }
2821    }
2822
2823    fn exec_command_unregister_resend(&mut self, packet: Vec<u8>, intf: Interface) {
2824        if let Some(sock) = self.intf_socks.get(&intf) {
2825            debug!("UnregisterResend from {}", &intf.ip());
2826            multicast_on_intf(&packet[..], &intf, sock);
2827            self.increase_counter(Counter::UnregisterResend, 1);
2828        }
2829    }
2830
2831    fn exec_command_stop_browse(&mut self, ty_domain: String) {
2832        match self.service_queriers.remove_entry(&ty_domain) {
2833            None => debug!("StopBrowse: cannot find querier for {}", &ty_domain),
2834            Some((ty, sender)) => {
2835                // Remove pending browse commands in the reruns.
2836                trace!("StopBrowse: removed queryer for {}", &ty);
2837                let mut i = 0;
2838                while i < self.retransmissions.len() {
2839                    if let Command::Browse(t, _, _) = &self.retransmissions[i].command {
2840                        if t == &ty {
2841                            self.retransmissions.remove(i);
2842                            trace!("StopBrowse: removed retransmission for {}", &ty);
2843                            continue;
2844                        }
2845                    }
2846                    i += 1;
2847                }
2848
2849                // Remove cache entries.
2850                self.cache.remove_service_type(&ty_domain);
2851
2852                // Notify the client.
2853                match sender.send(ServiceEvent::SearchStopped(ty_domain)) {
2854                    Ok(()) => trace!("Sent SearchStopped to the listener"),
2855                    Err(e) => debug!("Failed to send SearchStopped: {}", e),
2856                }
2857            }
2858        }
2859    }
2860
2861    fn exec_command_stop_resolve_hostname(&mut self, hostname: String) {
2862        if let Some((host, (sender, _timeout))) = self.hostname_resolvers.remove_entry(&hostname) {
2863            // Remove pending resolve commands in the reruns.
2864            trace!("StopResolve: removed queryer for {}", &host);
2865            let mut i = 0;
2866            while i < self.retransmissions.len() {
2867                if let Command::Resolve(t, _) = &self.retransmissions[i].command {
2868                    if t == &host {
2869                        self.retransmissions.remove(i);
2870                        trace!("StopResolve: removed retransmission for {}", &host);
2871                        continue;
2872                    }
2873                }
2874                i += 1;
2875            }
2876
2877            // Notify the client.
2878            match sender.send(HostnameResolutionEvent::SearchStopped(hostname)) {
2879                Ok(()) => trace!("Sent SearchStopped to the listener"),
2880                Err(e) => debug!("Failed to send SearchStopped: {}", e),
2881            }
2882        }
2883    }
2884
2885    fn exec_command_register_resend(&mut self, fullname: String, intf: Interface) {
2886        let Some(info) = self.my_services.get_mut(&fullname) else {
2887            trace!("announce: cannot find such service {}", &fullname);
2888            return;
2889        };
2890
2891        let Some(dns_registry) = self.dns_registry_map.get_mut(&intf) else {
2892            return;
2893        };
2894
2895        let Some(sock) = self.intf_socks.get(&intf) else {
2896            return;
2897        };
2898
2899        if announce_service_on_intf(dns_registry, info, &intf, sock) {
2900            let mut hostname = info.get_hostname();
2901            if let Some(new_name) = dns_registry.name_changes.get(hostname) {
2902                hostname = new_name;
2903            }
2904            let service_name = match dns_registry.name_changes.get(&fullname) {
2905                Some(new_name) => new_name.to_string(),
2906                None => fullname,
2907            };
2908
2909            debug!("resend: announce service {} on {}", service_name, intf.ip());
2910
2911            notify_monitors(
2912                &mut self.monitors,
2913                DaemonEvent::Announce(service_name, format!("{}:{}", hostname, &intf.ip())),
2914            );
2915            info.set_status(&intf, ServiceStatus::Announced);
2916        } else {
2917            debug!("register-resend should not fail");
2918        }
2919
2920        self.increase_counter(Counter::RegisterResend, 1);
2921    }
2922
2923    fn exec_command_verify(&mut self, instance: String, timeout: Duration, repeating: bool) {
2924        /*
2925        RFC 6762 section 10.4:
2926        ...
2927        When the cache receives this hint that it should reconfirm some
2928        record, it MUST issue two or more queries for the resource record in
2929        dispute.  If no response is received within ten seconds, then, even
2930        though its TTL may indicate that it is not yet due to expire, that
2931        record SHOULD be promptly flushed from the cache.
2932        */
2933        let now = current_time_millis();
2934        let expire_at = if repeating {
2935            None
2936        } else {
2937            Some(now + timeout.as_millis() as u64)
2938        };
2939
2940        // send query for the resource records.
2941        let record_vec = self.cache.service_verify_queries(&instance, expire_at);
2942
2943        if !record_vec.is_empty() {
2944            let query_vec: Vec<(&str, RRType)> = record_vec
2945                .iter()
2946                .map(|(record, rr_type)| (record.as_str(), *rr_type))
2947                .collect();
2948            self.send_query_vec(&query_vec);
2949
2950            if let Some(new_expire) = expire_at {
2951                self.add_timer(new_expire); // ensure a check for the new expire time.
2952
2953                // schedule a resend 1 second later
2954                self.add_retransmission(now + 1000, Command::Verify(instance, timeout));
2955            }
2956        }
2957    }
2958
2959    /// Refresh cached service records with active queriers
2960    fn refresh_active_services(&mut self) {
2961        let mut query_ptr_count = 0;
2962        let mut query_srv_count = 0;
2963        let mut new_timers = HashSet::new();
2964        let mut query_addr_count = 0;
2965
2966        for (ty_domain, _sender) in self.service_queriers.iter() {
2967            let refreshed_timers = self.cache.refresh_due_ptr(ty_domain);
2968            if !refreshed_timers.is_empty() {
2969                trace!("sending refresh query for PTR: {}", ty_domain);
2970                self.send_query(ty_domain, RRType::PTR);
2971                query_ptr_count += 1;
2972                new_timers.extend(refreshed_timers);
2973            }
2974
2975            let (instances, timers) = self.cache.refresh_due_srv(ty_domain);
2976            for instance in instances.iter() {
2977                trace!("sending refresh query for SRV: {}", instance);
2978                self.send_query(instance, RRType::SRV);
2979                query_srv_count += 1;
2980            }
2981            new_timers.extend(timers);
2982            let (hostnames, timers) = self.cache.refresh_due_hosts(ty_domain);
2983            for hostname in hostnames.iter() {
2984                trace!("sending refresh queries for A and AAAA:  {}", hostname);
2985                self.send_query_vec(&[(hostname, RRType::A), (hostname, RRType::AAAA)]);
2986                query_addr_count += 2;
2987            }
2988            new_timers.extend(timers);
2989        }
2990
2991        for timer in new_timers {
2992            self.add_timer(timer);
2993        }
2994
2995        self.increase_counter(Counter::CacheRefreshPTR, query_ptr_count);
2996        self.increase_counter(Counter::CacheRefreshSRV, query_srv_count);
2997        self.increase_counter(Counter::CacheRefreshAddr, query_addr_count);
2998    }
2999}
3000
3001/// All possible events sent to the client from the daemon
3002/// regarding service discovery.
3003#[derive(Debug)]
3004pub enum ServiceEvent {
3005    /// Started searching for a service type.
3006    SearchStarted(String),
3007
3008    /// Found a specific (service_type, fullname).
3009    ServiceFound(String, String),
3010
3011    /// Resolved a service instance with detailed info.
3012    ServiceResolved(ServiceInfo),
3013
3014    /// A service instance (service_type, fullname) was removed.
3015    ServiceRemoved(String, String),
3016
3017    /// Stopped searching for a service type.
3018    SearchStopped(String),
3019}
3020
3021/// All possible events sent to the client from the daemon
3022/// regarding host resolution.
3023#[derive(Debug)]
3024#[non_exhaustive]
3025pub enum HostnameResolutionEvent {
3026    /// Started searching for the ip address of a hostname.
3027    SearchStarted(String),
3028    /// One or more addresses for a hostname has been found.
3029    AddressesFound(String, HashSet<IpAddr>),
3030    /// One or more addresses for a hostname has been removed.
3031    AddressesRemoved(String, HashSet<IpAddr>),
3032    /// The search for the ip address of a hostname has timed out.
3033    SearchTimeout(String),
3034    /// Stopped searching for the ip address of a hostname.
3035    SearchStopped(String),
3036}
3037
3038/// Some notable events from the daemon besides [`ServiceEvent`].
3039/// These events are expected to happen infrequently.
3040#[derive(Clone, Debug)]
3041#[non_exhaustive]
3042pub enum DaemonEvent {
3043    /// Daemon unsolicitly announced a service from an interface.
3044    Announce(String, String),
3045
3046    /// Daemon encountered an error.
3047    Error(Error),
3048
3049    /// Daemon detected a new IP address from the host.
3050    IpAdd(IpAddr),
3051
3052    /// Daemon detected a IP address removed from the host.
3053    IpDel(IpAddr),
3054
3055    /// Daemon resolved a name conflict by changing one of its names.
3056    /// see [DnsNameChange] for more details.
3057    NameChange(DnsNameChange),
3058
3059    /// Send out a multicast response via an IP address.
3060    Respond(IpAddr),
3061}
3062
3063/// Represents a name change due to a name conflict resolution.
3064/// See [RFC 6762 section 9](https://datatracker.ietf.org/doc/html/rfc6762#section-9)
3065#[derive(Clone, Debug)]
3066pub struct DnsNameChange {
3067    /// The original name set in `ServiceInfo` by the user.
3068    pub original: String,
3069
3070    /// A new name is created by appending a suffix after the original name.
3071    ///
3072    /// - for a service instance name, the suffix is `(N)`, where N starts at 2.
3073    /// - for a host name, the suffix is `-N`, where N starts at 2.
3074    ///
3075    /// For example:
3076    ///
3077    /// - Service name `foo._service-type._udp` becomes `foo (2)._service-type._udp`
3078    /// - Host name `foo.local.` becomes `foo-2.local.`
3079    pub new_name: String,
3080
3081    /// The resource record type
3082    pub rr_type: RRType,
3083
3084    /// The interface where the name conflict and its change happened.
3085    pub intf_name: String,
3086}
3087
3088/// Commands supported by the daemon
3089#[derive(Debug)]
3090enum Command {
3091    /// Browsing for a service type (ty_domain, next_time_delay_in_seconds, channel::sender)
3092    Browse(String, u32, Sender<ServiceEvent>),
3093
3094    /// Resolve a hostname to IP addresses.
3095    ResolveHostname(String, u32, Sender<HostnameResolutionEvent>, Option<u64>), // (hostname, next_time_delay_in_seconds, sender, timeout_in_milliseconds)
3096
3097    /// Register a service
3098    Register(ServiceInfo),
3099
3100    /// Unregister a service
3101    Unregister(String, Sender<UnregisterStatus>), // (fullname)
3102
3103    /// Announce again a service to local network
3104    RegisterResend(String, Interface), // (fullname)
3105
3106    /// Resend unregister packet.
3107    UnregisterResend(Vec<u8>, Interface), // (packet content)
3108
3109    /// Stop browsing a service type
3110    StopBrowse(String), // (ty_domain)
3111
3112    /// Stop resolving a hostname
3113    StopResolveHostname(String), // (hostname)
3114
3115    /// Send query to resolve a service instance.
3116    /// This is used when a PTR record exists but SRV & TXT records are missing.
3117    Resolve(String, u16), // (service_instance_fullname, try_count)
3118
3119    /// Read the current values of the counters
3120    GetMetrics(Sender<Metrics>),
3121
3122    /// Get the current status of the daemon.
3123    GetStatus(Sender<DaemonStatus>),
3124
3125    /// Monitor noticeable events in the daemon.
3126    Monitor(Sender<DaemonEvent>),
3127
3128    SetOption(DaemonOption),
3129
3130    GetOption(Sender<DaemonOptionVal>),
3131
3132    /// Proactively confirm a DNS resource record.
3133    ///
3134    /// The intention is to check if a service name or IP address still valid
3135    /// before its TTL expires.
3136    Verify(String, Duration),
3137
3138    Exit(Sender<DaemonStatus>),
3139}
3140
3141impl fmt::Display for Command {
3142    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3143        match self {
3144            Self::Browse(_, _, _) => write!(f, "Command Browse"),
3145            Self::ResolveHostname(_, _, _, _) => write!(f, "Command ResolveHostname"),
3146            Self::Exit(_) => write!(f, "Command Exit"),
3147            Self::GetStatus(_) => write!(f, "Command GetStatus"),
3148            Self::GetMetrics(_) => write!(f, "Command GetMetrics"),
3149            Self::Monitor(_) => write!(f, "Command Monitor"),
3150            Self::Register(_) => write!(f, "Command Register"),
3151            Self::RegisterResend(_, _) => write!(f, "Command RegisterResend"),
3152            Self::SetOption(_) => write!(f, "Command SetOption"),
3153            Self::GetOption(_) => write!(f, "Command GetOption"),
3154            Self::StopBrowse(_) => write!(f, "Command StopBrowse"),
3155            Self::StopResolveHostname(_) => write!(f, "Command StopResolveHostname"),
3156            Self::Unregister(_, _) => write!(f, "Command Unregister"),
3157            Self::UnregisterResend(_, _) => write!(f, "Command UnregisterResend"),
3158            Self::Resolve(_, _) => write!(f, "Command Resolve"),
3159            Self::Verify(_, _) => write!(f, "Command VerifyResource"),
3160        }
3161    }
3162}
3163
3164struct DaemonOptionVal {
3165    _service_name_len_max: u8,
3166    ip_check_interval: u64,
3167}
3168
3169#[derive(Debug)]
3170enum DaemonOption {
3171    ServiceNameLenMax(u8),
3172    IpCheckInterval(u64),
3173    EnableInterface(Vec<IfKind>),
3174    DisableInterface(Vec<IfKind>),
3175    MulticastLoopV4(bool),
3176    MulticastLoopV6(bool),
3177}
3178
3179/// The length of Service Domain name supported in this lib.
3180const DOMAIN_LEN: usize = "._tcp.local.".len();
3181
3182/// Validate the length of "service_name" in a "_<service_name>.<domain_name>." string.
3183fn check_service_name_length(ty_domain: &str, limit: u8) -> Result<()> {
3184    if ty_domain.len() <= DOMAIN_LEN + 1 {
3185        // service name cannot be empty or only '_'.
3186        return Err(e_fmt!("Service type name cannot be empty: {}", ty_domain));
3187    }
3188
3189    let service_name_len = ty_domain.len() - DOMAIN_LEN - 1; // exclude the leading `_`
3190    if service_name_len > limit as usize {
3191        return Err(e_fmt!("Service name length must be <= {} bytes", limit));
3192    }
3193    Ok(())
3194}
3195
3196/// Checks if `name` ends with a valid domain: '._tcp.local.' or '._udp.local.'
3197fn check_domain_suffix(name: &str) -> Result<()> {
3198    if !(name.ends_with("._tcp.local.") || name.ends_with("._udp.local.")) {
3199        return Err(e_fmt!(
3200            "mDNS service {} must end with '._tcp.local.' or '._udp.local.'",
3201            name
3202        ));
3203    }
3204
3205    Ok(())
3206}
3207
3208/// Validate the service name in a fully qualified name.
3209///
3210/// A Full Name = <Instance>.<Service>.<Domain>
3211/// The only `<Domain>` supported are "._tcp.local." and "._udp.local.".
3212///
3213/// Note: this function does not check for the length of the service name.
3214/// Instead, `register_service` method will check the length.
3215fn check_service_name(fullname: &str) -> Result<()> {
3216    check_domain_suffix(fullname)?;
3217
3218    let remaining: Vec<&str> = fullname[..fullname.len() - DOMAIN_LEN].split('.').collect();
3219    let name = remaining.last().ok_or_else(|| e_fmt!("No service name"))?;
3220
3221    if &name[0..1] != "_" {
3222        return Err(e_fmt!("Service name must start with '_'"));
3223    }
3224
3225    let name = &name[1..];
3226
3227    if name.contains("--") {
3228        return Err(e_fmt!("Service name must not contain '--'"));
3229    }
3230
3231    if name.starts_with('-') || name.ends_with('-') {
3232        return Err(e_fmt!("Service name (%s) may not start or end with '-'"));
3233    }
3234
3235    let ascii_count = name.chars().filter(|c| c.is_ascii_alphabetic()).count();
3236    if ascii_count < 1 {
3237        return Err(e_fmt!(
3238            "Service name must contain at least one letter (eg: 'A-Za-z')"
3239        ));
3240    }
3241
3242    Ok(())
3243}
3244
3245/// Validate a hostname.
3246fn check_hostname(hostname: &str) -> Result<()> {
3247    if !hostname.ends_with(".local.") {
3248        return Err(e_fmt!("Hostname must end with '.local.': {hostname}"));
3249    }
3250
3251    if hostname == ".local." {
3252        return Err(e_fmt!(
3253            "The part of the hostname before '.local.' cannot be empty"
3254        ));
3255    }
3256
3257    if hostname.len() > 255 {
3258        return Err(e_fmt!("Hostname length must be <= 255 bytes"));
3259    }
3260
3261    Ok(())
3262}
3263
3264fn call_service_listener(
3265    listeners_map: &HashMap<String, Sender<ServiceEvent>>,
3266    ty_domain: &str,
3267    event: ServiceEvent,
3268) {
3269    if let Some(listener) = listeners_map.get(ty_domain) {
3270        match listener.send(event) {
3271            Ok(()) => trace!("Sent event to listener successfully"),
3272            Err(e) => debug!("Failed to send event: {}", e),
3273        }
3274    }
3275}
3276
3277fn call_hostname_resolution_listener(
3278    listeners_map: &HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>,
3279    hostname: &str,
3280    event: HostnameResolutionEvent,
3281) {
3282    let hostname_lower = hostname.to_lowercase();
3283    if let Some(listener) = listeners_map.get(&hostname_lower).map(|(l, _)| l) {
3284        match listener.send(event) {
3285            Ok(()) => trace!("Sent event to listener successfully"),
3286            Err(e) => debug!("Failed to send event: {}", e),
3287        }
3288    }
3289}
3290
3291/// Returns valid network interfaces in the host system.
3292/// Loopback interfaces are excluded.
3293fn my_ip_interfaces(with_loopback: bool) -> Vec<Interface> {
3294    if_addrs::get_if_addrs()
3295        .unwrap_or_default()
3296        .into_iter()
3297        .filter(|i| !i.is_loopback() || with_loopback)
3298        .collect()
3299}
3300
3301/// Send an outgoing mDNS query or response, and returns the packet bytes.
3302fn send_dns_outgoing(out: &DnsOutgoing, intf: &Interface, sock: &MioUdpSocket) -> Vec<Vec<u8>> {
3303    let qtype = if out.is_query() { "query" } else { "response" };
3304    trace!(
3305        "send outgoing {}: {} questions {} answers {} authorities {} additional",
3306        qtype,
3307        out.questions().len(),
3308        out.answers_count(),
3309        out.authorities().len(),
3310        out.additionals().len()
3311    );
3312    let packet_list = out.to_data_on_wire();
3313    for packet in packet_list.iter() {
3314        multicast_on_intf(packet, intf, sock);
3315    }
3316    packet_list
3317}
3318
3319/// Sends a multicast packet, and returns the packet bytes.
3320fn multicast_on_intf(packet: &[u8], intf: &Interface, socket: &MioUdpSocket) {
3321    if packet.len() > MAX_MSG_ABSOLUTE {
3322        debug!("Drop over-sized packet ({})", packet.len());
3323        return;
3324    }
3325
3326    let addr: SocketAddr = match intf.addr {
3327        if_addrs::IfAddr::V4(_) => SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into(),
3328        if_addrs::IfAddr::V6(_) => {
3329            let mut sock = SocketAddrV6::new(GROUP_ADDR_V6, MDNS_PORT, 0, 0);
3330            sock.set_scope_id(intf.index.unwrap_or(0)); // Choose iface for multicast
3331            sock.into()
3332        }
3333    };
3334
3335    send_packet(packet, addr, intf, socket);
3336}
3337
3338/// Sends out `packet` to `addr` on the socket in `intf_sock`.
3339fn send_packet(packet: &[u8], addr: SocketAddr, intf: &Interface, sock: &MioUdpSocket) {
3340    match sock.send_to(packet, addr) {
3341        Ok(sz) => trace!("sent out {} bytes on interface {:?}", sz, intf),
3342        Err(e) => debug!("Failed to send to {} via {:?}: {}", addr, &intf, e),
3343    }
3344}
3345
3346/// Returns true if `name` is a valid instance name of format:
3347/// <instance>.<service_type>.<_udp|_tcp>.local.
3348/// Note: <instance> could contain '.' as well.
3349fn valid_instance_name(name: &str) -> bool {
3350    name.split('.').count() >= 5
3351}
3352
3353fn notify_monitors(monitors: &mut Vec<Sender<DaemonEvent>>, event: DaemonEvent) {
3354    monitors.retain(|sender| {
3355        if let Err(e) = sender.try_send(event.clone()) {
3356            debug!("notify_monitors: try_send: {}", &e);
3357            if matches!(e, TrySendError::Disconnected(_)) {
3358                return false; // This monitor is dropped.
3359            }
3360        }
3361        true
3362    });
3363}
3364
3365/// Check if all unique records passed "probing", and if yes, create a packet
3366/// to announce the service.
3367fn prepare_announce(
3368    info: &ServiceInfo,
3369    intf: &Interface,
3370    dns_registry: &mut DnsRegistry,
3371) -> Option<DnsOutgoing> {
3372    let intf_addrs = info.get_addrs_on_intf(intf);
3373    if intf_addrs.is_empty() {
3374        trace!("No valid addrs to add on intf {:?}", &intf);
3375        return None;
3376    }
3377
3378    // check if we changed our name due to conflicts.
3379    let service_fullname = match dns_registry.name_changes.get(info.get_fullname()) {
3380        Some(new_name) => new_name,
3381        None => info.get_fullname(),
3382    };
3383
3384    debug!(
3385        "prepare to announce service {service_fullname} on {}: {}",
3386        &intf.name,
3387        &intf.ip()
3388    );
3389
3390    let mut probing_count = 0;
3391    let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
3392    let create_time = current_time_millis() + fastrand::u64(0..250);
3393
3394    out.add_answer_at_time(
3395        DnsPointer::new(
3396            info.get_type(),
3397            RRType::PTR,
3398            CLASS_IN,
3399            info.get_other_ttl(),
3400            service_fullname.to_string(),
3401        ),
3402        0,
3403    );
3404
3405    if let Some(sub) = info.get_subtype() {
3406        trace!("Adding subdomain {}", sub);
3407        out.add_answer_at_time(
3408            DnsPointer::new(
3409                sub,
3410                RRType::PTR,
3411                CLASS_IN,
3412                info.get_other_ttl(),
3413                service_fullname.to_string(),
3414            ),
3415            0,
3416        );
3417    }
3418
3419    // SRV records.
3420    let hostname = match dns_registry.name_changes.get(info.get_hostname()) {
3421        Some(new_name) => new_name.to_string(),
3422        None => info.get_hostname().to_string(),
3423    };
3424
3425    let mut srv = DnsSrv::new(
3426        info.get_fullname(),
3427        CLASS_IN | CLASS_CACHE_FLUSH,
3428        info.get_host_ttl(),
3429        info.get_priority(),
3430        info.get_weight(),
3431        info.get_port(),
3432        hostname,
3433    );
3434
3435    if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
3436        srv.get_record_mut().set_new_name(new_name.to_string());
3437    }
3438
3439    if !info.requires_probe()
3440        || dns_registry.is_probing_done(&srv, info.get_fullname(), create_time)
3441    {
3442        out.add_answer_at_time(srv, 0);
3443    } else {
3444        probing_count += 1;
3445    }
3446
3447    // TXT records.
3448
3449    let mut txt = DnsTxt::new(
3450        info.get_fullname(),
3451        CLASS_IN | CLASS_CACHE_FLUSH,
3452        info.get_other_ttl(),
3453        info.generate_txt(),
3454    );
3455
3456    if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
3457        txt.get_record_mut().set_new_name(new_name.to_string());
3458    }
3459
3460    if !info.requires_probe()
3461        || dns_registry.is_probing_done(&txt, info.get_fullname(), create_time)
3462    {
3463        out.add_answer_at_time(txt, 0);
3464    } else {
3465        probing_count += 1;
3466    }
3467
3468    // Address records. (A and AAAA)
3469
3470    let hostname = info.get_hostname();
3471    for address in intf_addrs {
3472        let mut dns_addr = DnsAddress::new(
3473            hostname,
3474            ip_address_rr_type(&address),
3475            CLASS_IN | CLASS_CACHE_FLUSH,
3476            info.get_host_ttl(),
3477            address,
3478            intf.into(),
3479        );
3480
3481        if let Some(new_name) = dns_registry.name_changes.get(hostname) {
3482            dns_addr.get_record_mut().set_new_name(new_name.to_string());
3483        }
3484
3485        if !info.requires_probe()
3486            || dns_registry.is_probing_done(&dns_addr, info.get_fullname(), create_time)
3487        {
3488            out.add_answer_at_time(dns_addr, 0);
3489        } else {
3490            probing_count += 1;
3491        }
3492    }
3493
3494    if probing_count > 0 {
3495        return None;
3496    }
3497
3498    Some(out)
3499}
3500
3501/// Send an unsolicited response for owned service via `intf` and `sock`.
3502/// Returns true if sent out successfully.
3503fn announce_service_on_intf(
3504    dns_registry: &mut DnsRegistry,
3505    info: &ServiceInfo,
3506    intf: &Interface,
3507    sock: &MioUdpSocket,
3508) -> bool {
3509    if let Some(out) = prepare_announce(info, intf, dns_registry) {
3510        send_dns_outgoing(&out, intf, sock);
3511        return true;
3512    }
3513    false
3514}
3515
3516/// Returns a new name based on the `original` to avoid conflicts.
3517/// If the name already contains a number in parentheses, increments that number.
3518///
3519/// Examples:
3520/// - `foo.local.` becomes `foo (2).local.`
3521/// - `foo (2).local.` becomes `foo (3).local.`
3522/// - `foo (9)` becomes `foo (10)`
3523fn name_change(original: &str) -> String {
3524    let mut parts: Vec<_> = original.split('.').collect();
3525    let Some(first_part) = parts.get_mut(0) else {
3526        return format!("{original} (2)");
3527    };
3528
3529    let mut new_name = format!("{} (2)", first_part);
3530
3531    // check if there is already has `(<num>)` suffix.
3532    if let Some(paren_pos) = first_part.rfind(" (") {
3533        // Check if there's a closing parenthesis
3534        if let Some(end_paren) = first_part[paren_pos..].find(')') {
3535            let absolute_end_pos = paren_pos + end_paren;
3536            // Only process if the closing parenthesis is the last character
3537            if absolute_end_pos == first_part.len() - 1 {
3538                let num_start = paren_pos + 2; // Skip " ("
3539                                               // Try to parse the number between parentheses
3540                if let Ok(number) = first_part[num_start..absolute_end_pos].parse::<u32>() {
3541                    let base_name = &first_part[..paren_pos];
3542                    new_name = format!("{} ({})", base_name, number + 1)
3543                }
3544            }
3545        }
3546    }
3547
3548    *first_part = &new_name;
3549    parts.join(".")
3550}
3551
3552/// Returns a new name based on the `original` to avoid conflicts.
3553/// If the name already contains a hyphenated number, increments that number.
3554///
3555/// Examples:
3556/// - `foo.local.` becomes `foo-2.local.`
3557/// - `foo-2.local.` becomes `foo-3.local.`
3558/// - `foo` becomes `foo-2`
3559fn hostname_change(original: &str) -> String {
3560    let mut parts: Vec<_> = original.split('.').collect();
3561    let Some(first_part) = parts.get_mut(0) else {
3562        return format!("{original}-2");
3563    };
3564
3565    let mut new_name = format!("{}-2", first_part);
3566
3567    // check if there is already a `-<num>` suffix
3568    if let Some(hyphen_pos) = first_part.rfind('-') {
3569        // Try to parse everything after the hyphen as a number
3570        if let Ok(number) = first_part[hyphen_pos + 1..].parse::<u32>() {
3571            let base_name = &first_part[..hyphen_pos];
3572            new_name = format!("{}-{}", base_name, number + 1);
3573        }
3574    }
3575
3576    *first_part = &new_name;
3577    parts.join(".")
3578}
3579
3580fn add_answer_with_additionals(
3581    out: &mut DnsOutgoing,
3582    msg: &DnsIncoming,
3583    service: &ServiceInfo,
3584    intf: &Interface,
3585    dns_registry: &DnsRegistry,
3586) {
3587    let intf_addrs = service.get_addrs_on_intf(intf);
3588    if intf_addrs.is_empty() {
3589        trace!("No addrs on LAN of intf {:?}", intf);
3590        return;
3591    }
3592
3593    // check if we changed our name due to conflicts.
3594    let service_fullname = match dns_registry.name_changes.get(service.get_fullname()) {
3595        Some(new_name) => new_name,
3596        None => service.get_fullname(),
3597    };
3598
3599    let hostname = match dns_registry.name_changes.get(service.get_hostname()) {
3600        Some(new_name) => new_name,
3601        None => service.get_hostname(),
3602    };
3603
3604    let ptr_added = out.add_answer(
3605        msg,
3606        DnsPointer::new(
3607            service.get_type(),
3608            RRType::PTR,
3609            CLASS_IN,
3610            service.get_other_ttl(),
3611            service_fullname.to_string(),
3612        ),
3613    );
3614
3615    if !ptr_added {
3616        trace!("answer was not added for msg {:?}", msg);
3617        return;
3618    }
3619
3620    if let Some(sub) = service.get_subtype() {
3621        trace!("Adding subdomain {}", sub);
3622        out.add_additional_answer(DnsPointer::new(
3623            sub,
3624            RRType::PTR,
3625            CLASS_IN,
3626            service.get_other_ttl(),
3627            service_fullname.to_string(),
3628        ));
3629    }
3630
3631    // Add recommended additional answers according to
3632    // https://tools.ietf.org/html/rfc6763#section-12.1.
3633    out.add_additional_answer(DnsSrv::new(
3634        service_fullname,
3635        CLASS_IN | CLASS_CACHE_FLUSH,
3636        service.get_host_ttl(),
3637        service.get_priority(),
3638        service.get_weight(),
3639        service.get_port(),
3640        hostname.to_string(),
3641    ));
3642
3643    out.add_additional_answer(DnsTxt::new(
3644        service_fullname,
3645        CLASS_IN | CLASS_CACHE_FLUSH,
3646        service.get_host_ttl(),
3647        service.generate_txt(),
3648    ));
3649
3650    for address in intf_addrs {
3651        out.add_additional_answer(DnsAddress::new(
3652            hostname,
3653            ip_address_rr_type(&address),
3654            CLASS_IN | CLASS_CACHE_FLUSH,
3655            service.get_host_ttl(),
3656            address,
3657            intf.into(),
3658        ));
3659    }
3660}
3661
3662#[cfg(test)]
3663mod tests {
3664    use super::{
3665        check_domain_suffix, check_service_name_length, hostname_change, my_ip_interfaces,
3666        name_change, new_socket_bind, send_dns_outgoing, valid_instance_name,
3667        HostnameResolutionEvent, ServiceDaemon, ServiceEvent, ServiceInfo, GROUP_ADDR_V4,
3668        MDNS_PORT,
3669    };
3670    use crate::{
3671        dns_parser::{DnsOutgoing, DnsPointer, RRType, CLASS_IN, FLAGS_AA, FLAGS_QR_RESPONSE},
3672        service_daemon::check_hostname,
3673    };
3674    use std::{
3675        net::{SocketAddr, SocketAddrV4},
3676        time::Duration,
3677    };
3678    use test_log::test;
3679
3680    #[test]
3681    fn test_socketaddr_print() {
3682        let addr: SocketAddr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
3683        let print = format!("{}", addr);
3684        assert_eq!(print, "224.0.0.251:5353");
3685    }
3686
3687    #[test]
3688    fn test_instance_name() {
3689        assert!(valid_instance_name("my-laser._printer._tcp.local."));
3690        assert!(valid_instance_name("my-laser.._printer._tcp.local."));
3691        assert!(!valid_instance_name("_printer._tcp.local."));
3692    }
3693
3694    #[test]
3695    fn test_check_service_name_length() {
3696        let result = check_service_name_length("_tcp", 100);
3697        assert!(result.is_err());
3698        if let Err(e) = result {
3699            println!("{}", e);
3700        }
3701    }
3702
3703    #[test]
3704    fn test_check_hostname() {
3705        // valid hostnames
3706        for hostname in &[
3707            "my_host.local.",
3708            &("A".repeat(255 - ".local.".len()) + ".local."),
3709        ] {
3710            let result = check_hostname(hostname);
3711            assert!(result.is_ok());
3712        }
3713
3714        // erroneous hostnames
3715        for hostname in &[
3716            "my_host.local",
3717            ".local.",
3718            &("A".repeat(256 - ".local.".len()) + ".local."),
3719        ] {
3720            let result = check_hostname(hostname);
3721            assert!(result.is_err());
3722            if let Err(e) = result {
3723                println!("{}", e);
3724            }
3725        }
3726    }
3727
3728    #[test]
3729    fn test_check_domain_suffix() {
3730        assert!(check_domain_suffix("_missing_dot._tcp.local").is_err());
3731        assert!(check_domain_suffix("_missing_bar.tcp.local.").is_err());
3732        assert!(check_domain_suffix("_mis_spell._tpp.local.").is_err());
3733        assert!(check_domain_suffix("_mis_spell._upp.local.").is_err());
3734        assert!(check_domain_suffix("_has_dot._tcp.local.").is_ok());
3735        assert!(check_domain_suffix("_goodname._udp.local.").is_ok());
3736    }
3737
3738    #[test]
3739    fn test_service_with_temporarily_invalidated_ptr() {
3740        // Create a daemon
3741        let d = ServiceDaemon::new().expect("Failed to create daemon");
3742
3743        let service = "_test_inval_ptr._udp.local.";
3744        let host_name = "my_host_tmp_invalidated_ptr.local.";
3745        let intfs: Vec<_> = my_ip_interfaces(false);
3746        let intf_ips: Vec<_> = intfs.iter().map(|intf| intf.ip()).collect();
3747        let port = 5201;
3748        let my_service =
3749            ServiceInfo::new(service, "my_instance", host_name, &intf_ips[..], port, None)
3750                .expect("invalid service info")
3751                .enable_addr_auto();
3752        let result = d.register(my_service.clone());
3753        assert!(result.is_ok());
3754
3755        // Browse for a service
3756        let browse_chan = d.browse(service).unwrap();
3757        let timeout = Duration::from_secs(2);
3758        let mut resolved = false;
3759
3760        while let Ok(event) = browse_chan.recv_timeout(timeout) {
3761            match event {
3762                ServiceEvent::ServiceResolved(info) => {
3763                    resolved = true;
3764                    println!("Resolved a service of {}", &info.get_fullname());
3765                    break;
3766                }
3767                e => {
3768                    println!("Received event {:?}", e);
3769                }
3770            }
3771        }
3772
3773        assert!(resolved);
3774
3775        println!("Stopping browse of {}", service);
3776        // Pause browsing so restarting will cause a new immediate query.
3777        // Unregistering will not work here, it will invalidate all the records.
3778        d.stop_browse(service).unwrap();
3779
3780        // Ensure the search is stopped.
3781        // Reduces the chance of receiving an answer adding the ptr back to the
3782        // cache causing the later browse to return directly from the cache.
3783        // (which invalidates what this test is trying to test for.)
3784        let mut stopped = false;
3785        while let Ok(event) = browse_chan.recv_timeout(timeout) {
3786            match event {
3787                ServiceEvent::SearchStopped(_) => {
3788                    stopped = true;
3789                    println!("Stopped browsing service");
3790                    break;
3791                }
3792                // Other `ServiceResolved` messages may be received
3793                // here as they come from different interfaces.
3794                // That's fine for this test.
3795                e => {
3796                    println!("Received event {:?}", e);
3797                }
3798            }
3799        }
3800
3801        assert!(stopped);
3802
3803        // Invalidate the ptr from the service to the host.
3804        let invalidate_ptr_packet = DnsPointer::new(
3805            my_service.get_type(),
3806            RRType::PTR,
3807            CLASS_IN,
3808            0,
3809            my_service.get_fullname().to_string(),
3810        );
3811
3812        let mut packet_buffer = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
3813        packet_buffer.add_additional_answer(invalidate_ptr_packet);
3814
3815        for intf in intfs {
3816            let sock = new_socket_bind(&intf, true).unwrap();
3817            send_dns_outgoing(&packet_buffer, &intf, &sock);
3818        }
3819
3820        println!(
3821            "Sent PTR record invalidation. Starting second browse for {}",
3822            service
3823        );
3824
3825        // Restart the browse to force the sender to re-send the announcements.
3826        let browse_chan = d.browse(service).unwrap();
3827
3828        resolved = false;
3829        while let Ok(event) = browse_chan.recv_timeout(timeout) {
3830            match event {
3831                ServiceEvent::ServiceResolved(info) => {
3832                    resolved = true;
3833                    println!("Resolved a service of {}", &info.get_fullname());
3834                    break;
3835                }
3836                e => {
3837                    println!("Received event {:?}", e);
3838                }
3839            }
3840        }
3841
3842        assert!(resolved);
3843        d.shutdown().unwrap();
3844    }
3845
3846    #[test]
3847    fn test_expired_srv() {
3848        // construct service info
3849        let service_type = "_expired-srv._udp.local.";
3850        let instance = "test_instance";
3851        let host_name = "expired_srv_host.local.";
3852        let mut my_service = ServiceInfo::new(service_type, instance, host_name, "", 5023, None)
3853            .unwrap()
3854            .enable_addr_auto();
3855        // let fullname = my_service.get_fullname().to_string();
3856
3857        // set SRV to expire soon.
3858        let new_ttl = 3; // for testing only.
3859        my_service._set_host_ttl(new_ttl);
3860
3861        // register my service
3862        let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
3863        let result = mdns_server.register(my_service);
3864        assert!(result.is_ok());
3865
3866        let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
3867        let browse_chan = mdns_client.browse(service_type).unwrap();
3868        let timeout = Duration::from_secs(2);
3869        let mut resolved = false;
3870
3871        while let Ok(event) = browse_chan.recv_timeout(timeout) {
3872            match event {
3873                ServiceEvent::ServiceResolved(info) => {
3874                    resolved = true;
3875                    println!("Resolved a service of {}", &info.get_fullname());
3876                    break;
3877                }
3878                _ => {}
3879            }
3880        }
3881
3882        assert!(resolved);
3883
3884        // Exit the server so that no more responses.
3885        mdns_server.shutdown().unwrap();
3886
3887        // SRV record in the client cache will expire.
3888        let expire_timeout = Duration::from_secs(new_ttl as u64);
3889        while let Ok(event) = browse_chan.recv_timeout(expire_timeout) {
3890            match event {
3891                ServiceEvent::ServiceRemoved(service_type, full_name) => {
3892                    println!("Service removed: {}: {}", &service_type, &full_name);
3893                    break;
3894                }
3895                _ => {}
3896            }
3897        }
3898    }
3899
3900    #[test]
3901    fn test_hostname_resolution_address_removed() {
3902        // Create a mDNS server
3903        let server = ServiceDaemon::new().expect("Failed to create server");
3904        let hostname = "addr_remove_host._tcp.local.";
3905        let service_ip_addr = my_ip_interfaces(false)
3906            .iter()
3907            .find(|iface| iface.ip().is_ipv4())
3908            .map(|iface| iface.ip())
3909            .unwrap();
3910
3911        let mut my_service = ServiceInfo::new(
3912            "_host_res_test._tcp.local.",
3913            "my_instance",
3914            hostname,
3915            &service_ip_addr,
3916            1234,
3917            None,
3918        )
3919        .expect("invalid service info");
3920
3921        // Set a short TTL for addresses for testing.
3922        let addr_ttl = 2;
3923        my_service._set_host_ttl(addr_ttl); // Expire soon
3924
3925        server.register(my_service).unwrap();
3926
3927        // Create a mDNS client for resolving the hostname.
3928        let client = ServiceDaemon::new().expect("Failed to create client");
3929        let event_receiver = client.resolve_hostname(hostname, None).unwrap();
3930        let resolved = loop {
3931            match event_receiver.recv() {
3932                Ok(HostnameResolutionEvent::AddressesFound(found_hostname, addresses)) => {
3933                    assert!(found_hostname == hostname);
3934                    assert!(addresses.contains(&service_ip_addr));
3935                    println!("address found: {:?}", &addresses);
3936                    break true;
3937                }
3938                Ok(HostnameResolutionEvent::SearchStopped(_)) => break false,
3939                Ok(_event) => {}
3940                Err(_) => break false,
3941            }
3942        };
3943
3944        assert!(resolved);
3945
3946        // Shutdown the server so no more responses / refreshes for addresses.
3947        server.shutdown().unwrap();
3948
3949        // Wait till hostname address record expires, with 1 second grace period.
3950        let timeout = Duration::from_secs(addr_ttl as u64 + 1);
3951        let removed = loop {
3952            match event_receiver.recv_timeout(timeout) {
3953                Ok(HostnameResolutionEvent::AddressesRemoved(removed_host, addresses)) => {
3954                    assert!(removed_host == hostname);
3955                    assert!(addresses.contains(&service_ip_addr));
3956
3957                    println!(
3958                        "address removed: hostname: {} addresses: {:?}",
3959                        &hostname, &addresses
3960                    );
3961                    break true;
3962                }
3963                Ok(_event) => {}
3964                Err(_) => {
3965                    break false;
3966                }
3967            }
3968        };
3969
3970        assert!(removed);
3971
3972        client.shutdown().unwrap();
3973    }
3974
3975    #[test]
3976    fn test_refresh_ptr() {
3977        // construct service info
3978        let service_type = "_refresh-ptr._udp.local.";
3979        let instance = "test_instance";
3980        let host_name = "refresh_ptr_host.local.";
3981        let service_ip_addr = my_ip_interfaces(false)
3982            .iter()
3983            .find(|iface| iface.ip().is_ipv4())
3984            .map(|iface| iface.ip())
3985            .unwrap();
3986
3987        let mut my_service = ServiceInfo::new(
3988            service_type,
3989            instance,
3990            host_name,
3991            &service_ip_addr,
3992            5023,
3993            None,
3994        )
3995        .unwrap();
3996
3997        let new_ttl = 3; // for testing only.
3998        my_service._set_other_ttl(new_ttl);
3999
4000        // register my service
4001        let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
4002        let result = mdns_server.register(my_service);
4003        assert!(result.is_ok());
4004
4005        let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
4006        let browse_chan = mdns_client.browse(service_type).unwrap();
4007        let timeout = Duration::from_millis(1500); // Give at least 1 second for the service probing.
4008        let mut resolved = false;
4009
4010        // resolve the service first.
4011        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4012            match event {
4013                ServiceEvent::ServiceResolved(info) => {
4014                    resolved = true;
4015                    println!("Resolved a service of {}", &info.get_fullname());
4016                    break;
4017                }
4018                _ => {}
4019            }
4020        }
4021
4022        assert!(resolved);
4023
4024        // wait over 80% of TTL, and refresh PTR should be sent out.
4025        let timeout = Duration::from_millis(new_ttl as u64 * 1000 * 90 / 100);
4026        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4027            println!("event: {:?}", &event);
4028        }
4029
4030        // verify refresh counter.
4031        let metrics_chan = mdns_client.get_metrics().unwrap();
4032        let metrics = metrics_chan.recv_timeout(timeout).unwrap();
4033        let refresh_counter = metrics["cache-refresh-ptr"];
4034        assert_eq!(refresh_counter, 1);
4035
4036        // Exit the server so that no more responses.
4037        mdns_server.shutdown().unwrap();
4038        mdns_client.shutdown().unwrap();
4039    }
4040
4041    #[test]
4042    fn test_name_change() {
4043        assert_eq!(name_change("foo.local."), "foo (2).local.");
4044        assert_eq!(name_change("foo (2).local."), "foo (3).local.");
4045        assert_eq!(name_change("foo (9).local."), "foo (10).local.");
4046        assert_eq!(name_change("foo"), "foo (2)");
4047        assert_eq!(name_change("foo (2)"), "foo (3)");
4048        assert_eq!(name_change(""), " (2)");
4049
4050        // Additional edge cases
4051        assert_eq!(name_change("foo (abc)"), "foo (abc) (2)"); // Invalid number
4052        assert_eq!(name_change("foo (2"), "foo (2 (2)"); // Missing closing parenthesis
4053        assert_eq!(name_change("foo (2) extra"), "foo (2) extra (2)"); // Extra text after number
4054    }
4055
4056    #[test]
4057    fn test_hostname_change() {
4058        assert_eq!(hostname_change("foo.local."), "foo-2.local.");
4059        assert_eq!(hostname_change("foo"), "foo-2");
4060        assert_eq!(hostname_change("foo-2.local."), "foo-3.local.");
4061        assert_eq!(hostname_change("foo-9"), "foo-10");
4062        assert_eq!(hostname_change("test-42.domain."), "test-43.domain.");
4063    }
4064}