mdns_sd/
service_daemon.rs

1//! Service daemon for mDNS Service Discovery.
2
3// How DNS-based Service Discovery works in a nutshell:
4//
5// (excerpt from RFC 6763)
6// .... that a particular service instance can be
7//    described using a DNS SRV [RFC2782] and DNS TXT [RFC1035] record.
8//    The SRV record has a name of the form "<Instance>.<Service>.<Domain>"
9//    and gives the target host and port where the service instance can be
10//    reached.  The DNS TXT record of the same name gives additional
11//    information about this instance, in a structured form using key/value
12//    pairs, described in Section 6.  A client discovers the list of
13//    available instances of a given service type using a query for a DNS
14//    PTR [RFC1035] record with a name of the form "<Service>.<Domain>",
15//    which returns a set of zero or more names, which are the names of the
16//    aforementioned DNS SRV/TXT record pairs.
17//
18// Some naming conventions in this source code:
19//
20// `ty_domain` refers to service type together with domain name, i.e. <service>.<domain>.
21// Every <service> consists of two labels: service itself and "_udp." or "_tcp".
22// See RFC 6763 section 7 Service Names.
23//     for example: `_my-service._udp.local.`
24//
25// `fullname` refers to a full Service Instance Name, i.e. <instance>.<service>.<domain>
26//     for example: `my_home._my-service._udp.local.`
27//
28// In mDNS and DNS, the basic data structure is "Resource Record" (RR), where
29// in Service Discovery, the basic data structure is "Service Info". One Service Info
30// corresponds to a set of DNS Resource Records.
31#[cfg(feature = "logging")]
32use crate::log::{debug, trace};
33use crate::{
34    dns_cache::{current_time_millis, DnsCache},
35    dns_parser::{
36        ip_address_rr_type, DnsAddress, DnsEntryExt, DnsIncoming, DnsOutgoing, DnsPointer,
37        DnsRecordBox, DnsRecordExt, DnsSrv, DnsTxt, RRType, CLASS_CACHE_FLUSH, CLASS_IN, FLAGS_AA,
38        FLAGS_QR_QUERY, FLAGS_QR_RESPONSE, MAX_MSG_ABSOLUTE,
39    },
40    error::{e_fmt, Error, Result},
41    service_info::{
42        split_sub_domain, valid_ip_on_intf, DnsRegistry, Probe, ServiceInfo, ServiceStatus,
43    },
44    Receiver,
45};
46use flume::{bounded, Sender, TrySendError};
47use if_addrs::{IfAddr, Interface};
48use mio::{net::UdpSocket as MioUdpSocket, Poll};
49use socket2::Socket;
50use std::{
51    cmp::{self, Reverse},
52    collections::{BinaryHeap, HashMap, HashSet},
53    fmt,
54    net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, UdpSocket},
55    str, thread,
56    time::Duration,
57    vec,
58};
59
60/// The default max length of the service name without domain, not including the
61/// leading underscore (`_`). It is set to 15 per
62/// [RFC 6763 section 7.2](https://www.rfc-editor.org/rfc/rfc6763#section-7.2).
63pub const SERVICE_NAME_LEN_MAX_DEFAULT: u8 = 15;
64
65/// The default interval for checking IP changes automatically.
66pub const IP_CHECK_INTERVAL_IN_SECS_DEFAULT: u32 = 5;
67
68/// The default time out for [ServiceDaemon::verify] is 10 seconds, per
69/// [RFC 6762 section 10.4](https://datatracker.ietf.org/doc/html/rfc6762#section-10.4)
70pub const VERIFY_TIMEOUT_DEFAULT: Duration = Duration::from_secs(10);
71
72const MDNS_PORT: u16 = 5353;
73const GROUP_ADDR_V4: Ipv4Addr = Ipv4Addr::new(224, 0, 0, 251);
74const GROUP_ADDR_V6: Ipv6Addr = Ipv6Addr::new(0xff02, 0, 0, 0, 0, 0, 0, 0xfb);
75const LOOPBACK_V4: Ipv4Addr = Ipv4Addr::new(127, 0, 0, 1);
76
77const RESOLVE_WAIT_IN_MILLIS: u64 = 500;
78
79/// Response status code for the service `unregister` call.
80#[derive(Debug)]
81pub enum UnregisterStatus {
82    /// Unregister was successful.
83    OK,
84    /// The service was not found in the registration.
85    NotFound,
86}
87
88/// Status code for the service daemon.
89#[derive(Debug, PartialEq, Clone, Eq)]
90#[non_exhaustive]
91pub enum DaemonStatus {
92    /// The daemon is running as normal.
93    Running,
94
95    /// The daemon has been shutdown.
96    Shutdown,
97}
98
99/// Different counters included in the metrics.
100/// Currently all counters are for outgoing packets.
101#[derive(Hash, Eq, PartialEq)]
102enum Counter {
103    Register,
104    RegisterResend,
105    Unregister,
106    UnregisterResend,
107    Browse,
108    ResolveHostname,
109    Respond,
110    CacheRefreshPTR,
111    CacheRefreshSrvTxt,
112    CacheRefreshAddr,
113    KnownAnswerSuppression,
114    CachedPTR,
115    CachedSRV,
116    CachedAddr,
117    CachedTxt,
118    CachedNSec,
119    CachedSubtype,
120    DnsRegistryProbe,
121    DnsRegistryActive,
122    DnsRegistryTimer,
123    DnsRegistryNameChange,
124    Timer,
125}
126
127impl fmt::Display for Counter {
128    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
129        match self {
130            Self::Register => write!(f, "register"),
131            Self::RegisterResend => write!(f, "register-resend"),
132            Self::Unregister => write!(f, "unregister"),
133            Self::UnregisterResend => write!(f, "unregister-resend"),
134            Self::Browse => write!(f, "browse"),
135            Self::ResolveHostname => write!(f, "resolve-hostname"),
136            Self::Respond => write!(f, "respond"),
137            Self::CacheRefreshPTR => write!(f, "cache-refresh-ptr"),
138            Self::CacheRefreshSrvTxt => write!(f, "cache-refresh-srv-txt"),
139            Self::CacheRefreshAddr => write!(f, "cache-refresh-addr"),
140            Self::KnownAnswerSuppression => write!(f, "known-answer-suppression"),
141            Self::CachedPTR => write!(f, "cached-ptr"),
142            Self::CachedSRV => write!(f, "cached-srv"),
143            Self::CachedAddr => write!(f, "cached-addr"),
144            Self::CachedTxt => write!(f, "cached-txt"),
145            Self::CachedNSec => write!(f, "cached-nsec"),
146            Self::CachedSubtype => write!(f, "cached-subtype"),
147            Self::DnsRegistryProbe => write!(f, "dns-registry-probe"),
148            Self::DnsRegistryActive => write!(f, "dns-registry-active"),
149            Self::DnsRegistryTimer => write!(f, "dns-registry-timer"),
150            Self::DnsRegistryNameChange => write!(f, "dns-registry-name-change"),
151            Self::Timer => write!(f, "timer"),
152        }
153    }
154}
155
156/// The metrics is a HashMap of (name_key, i64_value).
157/// The main purpose is to help monitoring the mDNS packet traffic.
158pub type Metrics = HashMap<String, i64>;
159
160const SIGNAL_SOCK_EVENT_KEY: usize = usize::MAX - 1; // avoid to overlap with zc.poll_ids
161
162/// A daemon thread for mDNS
163///
164/// This struct provides a handle and an API to the daemon. It is cloneable.
165#[derive(Clone)]
166pub struct ServiceDaemon {
167    /// Sender handle of the channel to the daemon.
168    sender: Sender<Command>,
169
170    /// Send to this addr to signal that a `Command` is coming.
171    ///
172    /// The daemon listens on this addr together with other mDNS sockets,
173    /// to avoid busy polling the flume channel. If there is a way to poll
174    /// the channel and mDNS sockets together, then this can be removed.
175    signal_addr: SocketAddr,
176}
177
178impl ServiceDaemon {
179    /// Creates a new daemon and spawns a thread to run the daemon.
180    ///
181    /// The daemon (re)uses the default mDNS port 5353. To keep it simple, we don't
182    /// ask callers to set the port.
183    pub fn new() -> Result<Self> {
184        // Use port 0 to allow the system assign a random available port,
185        // no need for a pre-defined port number.
186        let signal_addr = SocketAddrV4::new(LOOPBACK_V4, 0);
187
188        let signal_sock = UdpSocket::bind(signal_addr)
189            .map_err(|e| e_fmt!("failed to create signal_sock for daemon: {}", e))?;
190
191        // Get the socket with the OS chosen port
192        let signal_addr = signal_sock
193            .local_addr()
194            .map_err(|e| e_fmt!("failed to get signal sock addr: {}", e))?;
195
196        // Must be nonblocking so we can listen to it together with mDNS sockets.
197        signal_sock
198            .set_nonblocking(true)
199            .map_err(|e| e_fmt!("failed to set nonblocking for signal socket: {}", e))?;
200
201        let poller = Poll::new().map_err(|e| e_fmt!("failed to create mio Poll: {e}"))?;
202
203        let (sender, receiver) = bounded(100);
204
205        // Spawn the daemon thread
206        let mio_sock = MioUdpSocket::from_std(signal_sock);
207        thread::Builder::new()
208            .name("mDNS_daemon".to_string())
209            .spawn(move || Self::daemon_thread(mio_sock, poller, receiver))
210            .map_err(|e| e_fmt!("thread builder failed to spawn: {}", e))?;
211
212        Ok(Self {
213            sender,
214            signal_addr,
215        })
216    }
217
218    /// Sends `cmd` to the daemon via its channel, and sends a signal
219    /// to its sock addr to notify.
220    fn send_cmd(&self, cmd: Command) -> Result<()> {
221        let cmd_name = cmd.to_string();
222
223        // First, send to the flume channel.
224        self.sender.try_send(cmd).map_err(|e| match e {
225            TrySendError::Full(_) => Error::Again,
226            e => e_fmt!("flume::channel::send failed: {}", e),
227        })?;
228
229        // Second, send a signal to notify the daemon.
230        let addr = SocketAddrV4::new(LOOPBACK_V4, 0);
231        let socket = UdpSocket::bind(addr)
232            .map_err(|e| e_fmt!("Failed to create socket to send signal: {}", e))?;
233        socket
234            .send_to(cmd_name.as_bytes(), self.signal_addr)
235            .map_err(|e| {
236                e_fmt!(
237                    "signal socket send_to {} ({}) failed: {}",
238                    self.signal_addr,
239                    cmd_name,
240                    e
241                )
242            })?;
243
244        Ok(())
245    }
246
247    /// Starts browsing for a specific service type.
248    ///
249    /// `service_type` must end with a valid mDNS domain: '._tcp.local.' or '._udp.local.'
250    ///
251    /// Returns a channel `Receiver` to receive events about the service. The caller
252    /// can call `.recv_async().await` on this receiver to handle events in an
253    /// async environment or call `.recv()` in a sync environment.
254    ///
255    /// When a new instance is found, the daemon automatically tries to resolve, i.e.
256    /// finding more details, i.e. SRV records and TXT records.
257    pub fn browse(&self, service_type: &str) -> Result<Receiver<ServiceEvent>> {
258        check_domain_suffix(service_type)?;
259
260        let (resp_s, resp_r) = bounded(10);
261        self.send_cmd(Command::Browse(service_type.to_string(), 1, resp_s))?;
262        Ok(resp_r)
263    }
264
265    /// Stops searching for a specific service type.
266    ///
267    /// When an error is returned, the caller should retry only when
268    /// the error is `Error::Again`, otherwise should log and move on.
269    pub fn stop_browse(&self, ty_domain: &str) -> Result<()> {
270        self.send_cmd(Command::StopBrowse(ty_domain.to_string()))
271    }
272
273    /// Starts querying for the ip addresses of a hostname.
274    ///
275    /// Returns a channel `Receiver` to receive events about the hostname.
276    /// The caller can call `.recv_async().await` on this receiver to handle events in an
277    /// async environment or call `.recv()` in a sync environment.
278    ///
279    /// The `timeout` is specified in milliseconds.
280    pub fn resolve_hostname(
281        &self,
282        hostname: &str,
283        timeout: Option<u64>,
284    ) -> Result<Receiver<HostnameResolutionEvent>> {
285        check_hostname(hostname)?;
286        let (resp_s, resp_r) = bounded(10);
287        self.send_cmd(Command::ResolveHostname(
288            hostname.to_string(),
289            1,
290            resp_s,
291            timeout,
292        ))?;
293        Ok(resp_r)
294    }
295
296    /// Stops querying for the ip addresses of a hostname.
297    ///
298    /// When an error is returned, the caller should retry only when
299    /// the error is `Error::Again`, otherwise should log and move on.
300    pub fn stop_resolve_hostname(&self, hostname: &str) -> Result<()> {
301        self.send_cmd(Command::StopResolveHostname(hostname.to_string()))
302    }
303
304    /// Registers a service provided by this host.
305    ///
306    /// If `service_info` has no addresses yet and its `addr_auto` is enabled,
307    /// this method will automatically fill in addresses from the host.
308    ///
309    /// To re-announce a service with an updated `service_info`, just call
310    /// this `register` function again. No need to call `unregister` first.
311    pub fn register(&self, service_info: ServiceInfo) -> Result<()> {
312        check_service_name(service_info.get_fullname())?;
313        check_hostname(service_info.get_hostname())?;
314
315        self.send_cmd(Command::Register(service_info))
316    }
317
318    /// Unregisters a service. This is a graceful shutdown of a service.
319    ///
320    /// Returns a channel receiver that is used to receive the status code
321    /// of the unregister.
322    ///
323    /// When an error is returned, the caller should retry only when
324    /// the error is `Error::Again`, otherwise should log and move on.
325    pub fn unregister(&self, fullname: &str) -> Result<Receiver<UnregisterStatus>> {
326        let (resp_s, resp_r) = bounded(1);
327        self.send_cmd(Command::Unregister(fullname.to_lowercase(), resp_s))?;
328        Ok(resp_r)
329    }
330
331    /// Starts to monitor events from the daemon.
332    ///
333    /// Returns a channel [`Receiver`] of [`DaemonEvent`].
334    pub fn monitor(&self) -> Result<Receiver<DaemonEvent>> {
335        let (resp_s, resp_r) = bounded(100);
336        self.send_cmd(Command::Monitor(resp_s))?;
337        Ok(resp_r)
338    }
339
340    /// Shuts down the daemon thread and returns a channel to receive the status.
341    ///
342    /// When an error is returned, the caller should retry only when
343    /// the error is `Error::Again`, otherwise should log and move on.
344    pub fn shutdown(&self) -> Result<Receiver<DaemonStatus>> {
345        let (resp_s, resp_r) = bounded(1);
346        self.send_cmd(Command::Exit(resp_s))?;
347        Ok(resp_r)
348    }
349
350    /// Returns the status of the daemon.
351    ///
352    /// When an error is returned, the caller should retry only when
353    /// the error is `Error::Again`, otherwise should consider the daemon
354    /// stopped working and move on.
355    pub fn status(&self) -> Result<Receiver<DaemonStatus>> {
356        let (resp_s, resp_r) = bounded(1);
357
358        if self.sender.is_disconnected() {
359            resp_s
360                .send(DaemonStatus::Shutdown)
361                .map_err(|e| e_fmt!("failed to send daemon status to the client: {}", e))?;
362        } else {
363            self.send_cmd(Command::GetStatus(resp_s))?;
364        }
365
366        Ok(resp_r)
367    }
368
369    /// Returns a channel receiver for the metrics, e.g. input/output counters.
370    ///
371    /// The metrics returned is a snapshot. Hence the caller should call
372    /// this method repeatedly if they want to monitor the metrics continuously.
373    pub fn get_metrics(&self) -> Result<Receiver<Metrics>> {
374        let (resp_s, resp_r) = bounded(1);
375        self.send_cmd(Command::GetMetrics(resp_s))?;
376        Ok(resp_r)
377    }
378
379    /// Change the max length allowed for a service name.
380    ///
381    /// As RFC 6763 defines a length max for a service name, a user should not call
382    /// this method unless they have to. See [`SERVICE_NAME_LEN_MAX_DEFAULT`].
383    ///
384    /// `len_max` is capped at an internal limit, which is currently 30.
385    pub fn set_service_name_len_max(&self, len_max: u8) -> Result<()> {
386        const SERVICE_NAME_LEN_MAX_LIMIT: u8 = 30; // Double the default length max.
387
388        if len_max > SERVICE_NAME_LEN_MAX_LIMIT {
389            return Err(Error::Msg(format!(
390                "service name length max {} is too large",
391                len_max
392            )));
393        }
394
395        self.send_cmd(Command::SetOption(DaemonOption::ServiceNameLenMax(len_max)))
396    }
397
398    /// Change the interval for checking IP changes automatically.
399    ///
400    /// Setting the interval to 0 disables the IP check.
401    ///
402    /// See [`IP_CHECK_INTERVAL_IN_SECS_DEFAULT`] for the default interval.
403    pub fn set_ip_check_interval(&self, interval_in_secs: u32) -> Result<()> {
404        let interval_in_millis = interval_in_secs as u64 * 1000;
405        self.send_cmd(Command::SetOption(DaemonOption::IpCheckInterval(
406            interval_in_millis,
407        )))
408    }
409
410    /// Get the current interval in seconds for checking IP changes automatically.
411    pub fn get_ip_check_interval(&self) -> Result<u32> {
412        let (resp_s, resp_r) = bounded(1);
413        self.send_cmd(Command::GetOption(resp_s))?;
414
415        let option = resp_r
416            .recv_timeout(Duration::from_secs(10))
417            .map_err(|e| e_fmt!("failed to receive ip check interval: {}", e))?;
418        let ip_check_interval_in_secs = option.ip_check_interval / 1000;
419        Ok(ip_check_interval_in_secs as u32)
420    }
421
422    /// Include interfaces that match `if_kind` for this service daemon.
423    ///
424    /// For example:
425    /// ```ignore
426    ///     daemon.enable_interface("en0")?;
427    /// ```
428    pub fn enable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
429        let if_kind_vec = if_kind.into_vec();
430        self.send_cmd(Command::SetOption(DaemonOption::EnableInterface(
431            if_kind_vec.kinds,
432        )))
433    }
434
435    /// Ignore/exclude interfaces that match `if_kind` for this daemon.
436    ///
437    /// For example:
438    /// ```ignore
439    ///     daemon.disable_interface(IfKind::IPv6)?;
440    /// ```
441    pub fn disable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
442        let if_kind_vec = if_kind.into_vec();
443        self.send_cmd(Command::SetOption(DaemonOption::DisableInterface(
444            if_kind_vec.kinds,
445        )))
446    }
447
448    /// Enable or disable the loopback for locally sent multicast packets in IPv4.
449    ///
450    /// By default, multicast loop is enabled for IPv4. When disabled, a querier will not
451    /// receive announcements from a responder on the same host.
452    ///
453    /// Reference: <https://learn.microsoft.com/en-us/windows/win32/winsock/ip-multicast-2>
454    ///
455    /// "The Winsock version of the IP_MULTICAST_LOOP option is semantically different than
456    /// the UNIX version of the IP_MULTICAST_LOOP option:
457    ///
458    /// In Winsock, the IP_MULTICAST_LOOP option applies only to the receive path.
459    /// In the UNIX version, the IP_MULTICAST_LOOP option applies to the send path."
460    ///
461    /// Which means, in order NOT to receive localhost announcements, you want to call
462    /// this API on the querier side on Windows, but on the responder side on Unix.
463    pub fn set_multicast_loop_v4(&self, on: bool) -> Result<()> {
464        self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV4(on)))
465    }
466
467    /// Enable or disable the loopback for locally sent multicast packets in IPv6.
468    ///
469    /// By default, multicast loop is enabled for IPv6. When disabled, a querier will not
470    /// receive announcements from a responder on the same host.
471    ///
472    /// Reference: <https://learn.microsoft.com/en-us/windows/win32/winsock/ip-multicast-2>
473    ///
474    /// "The Winsock version of the IP_MULTICAST_LOOP option is semantically different than
475    /// the UNIX version of the IP_MULTICAST_LOOP option:
476    ///
477    /// In Winsock, the IP_MULTICAST_LOOP option applies only to the receive path.
478    /// In the UNIX version, the IP_MULTICAST_LOOP option applies to the send path."
479    ///
480    /// Which means, in order NOT to receive localhost announcements, you want to call
481    /// this API on the querier side on Windows, but on the responder side on Unix.
482    pub fn set_multicast_loop_v6(&self, on: bool) -> Result<()> {
483        self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV6(on)))
484    }
485
486    /// Proactively confirms whether a service instance still valid.
487    ///
488    /// This call will issue queries for a service instance's SRV record and Address records.
489    ///
490    /// For `timeout`, most users should use [VERIFY_TIMEOUT_DEFAULT]
491    /// unless there is a reason not to follow RFC.
492    ///
493    /// If no response is received within `timeout`, the current resource
494    /// records will be flushed, and if needed, `ServiceRemoved` event will be
495    /// sent to active queriers.
496    ///
497    /// Reference: [RFC 6762](https://datatracker.ietf.org/doc/html/rfc6762#section-10.4)
498    pub fn verify(&self, instance_fullname: String, timeout: Duration) -> Result<()> {
499        self.send_cmd(Command::Verify(instance_fullname, timeout))
500    }
501
502    fn daemon_thread(signal_sock: MioUdpSocket, poller: Poll, receiver: Receiver<Command>) {
503        let zc = Zeroconf::new(signal_sock, poller);
504
505        if let Some(cmd) = Self::run(zc, receiver) {
506            match cmd {
507                Command::Exit(resp_s) => {
508                    // It is guaranteed that the receiver already dropped,
509                    // i.e. the daemon command channel closed.
510                    if let Err(e) = resp_s.send(DaemonStatus::Shutdown) {
511                        debug!("exit: failed to send response of shutdown: {}", e);
512                    }
513                }
514                _ => {
515                    debug!("Unexpected command: {:?}", cmd);
516                }
517            }
518        }
519    }
520
521    /// The main event loop of the daemon thread
522    ///
523    /// In each round, it will:
524    /// 1. select the listening sockets with a timeout.
525    /// 2. process the incoming packets if any.
526    /// 3. try_recv on its channel and execute commands.
527    /// 4. announce its registered services.
528    /// 5. process retransmissions if any.
529    fn run(mut zc: Zeroconf, receiver: Receiver<Command>) -> Option<Command> {
530        // Add the daemon's signal socket to the poller.
531        if let Err(e) = zc.poller.registry().register(
532            &mut zc.signal_sock,
533            mio::Token(SIGNAL_SOCK_EVENT_KEY),
534            mio::Interest::READABLE,
535        ) {
536            debug!("failed to add signal socket to the poller: {}", e);
537            return None;
538        }
539
540        // Add mDNS sockets to the poller.
541        for (intf, sock) in zc.intf_socks.iter_mut() {
542            let key =
543                Zeroconf::add_poll_impl(&mut zc.poll_ids, &mut zc.poll_id_count, intf.clone());
544
545            if let Err(e) =
546                zc.poller
547                    .registry()
548                    .register(sock, mio::Token(key), mio::Interest::READABLE)
549            {
550                debug!("add socket of {:?} to poller: {e}", intf);
551                return None;
552            }
553        }
554
555        // Setup timer for IP checks.
556        let mut next_ip_check = if zc.ip_check_interval > 0 {
557            current_time_millis() + zc.ip_check_interval
558        } else {
559            0
560        };
561
562        if next_ip_check > 0 {
563            zc.add_timer(next_ip_check);
564        }
565
566        // Start the run loop.
567
568        let mut events = mio::Events::with_capacity(1024);
569        loop {
570            let now = current_time_millis();
571
572            let earliest_timer = zc.peek_earliest_timer();
573            let timeout = earliest_timer.map(|timer| {
574                // If `timer` already passed, set `timeout` to be 1ms.
575                let millis = if timer > now { timer - now } else { 1 };
576                Duration::from_millis(millis)
577            });
578
579            // Process incoming packets, command events and optional timeout.
580            events.clear();
581            match zc.poller.poll(&mut events, timeout) {
582                Ok(_) => zc.handle_poller_events(&events),
583                Err(e) => debug!("failed to select from sockets: {}", e),
584            }
585
586            let now = current_time_millis();
587
588            // Remove the timers if already passed.
589            zc.pop_timers_till(now);
590
591            // Remove hostname resolvers with expired timeouts.
592            for hostname in zc
593                .hostname_resolvers
594                .clone()
595                .into_iter()
596                .filter(|(_, (_, timeout))| timeout.map(|t| now >= t).unwrap_or(false))
597                .map(|(hostname, _)| hostname)
598            {
599                trace!("hostname resolver timeout for {}", &hostname);
600                call_hostname_resolution_listener(
601                    &zc.hostname_resolvers,
602                    &hostname,
603                    HostnameResolutionEvent::SearchTimeout(hostname.to_owned()),
604                );
605                call_hostname_resolution_listener(
606                    &zc.hostname_resolvers,
607                    &hostname,
608                    HostnameResolutionEvent::SearchStopped(hostname.to_owned()),
609                );
610                zc.hostname_resolvers.remove(&hostname);
611            }
612
613            // process commands from the command channel
614            while let Ok(command) = receiver.try_recv() {
615                if matches!(command, Command::Exit(_)) {
616                    zc.status = DaemonStatus::Shutdown;
617                    return Some(command);
618                }
619                zc.exec_command(command, false);
620            }
621
622            // check for repeated commands and run them if their time is up.
623            let mut i = 0;
624            while i < zc.retransmissions.len() {
625                if now >= zc.retransmissions[i].next_time {
626                    let rerun = zc.retransmissions.remove(i);
627                    zc.exec_command(rerun.command, true);
628                } else {
629                    i += 1;
630                }
631            }
632
633            // Refresh cached service records with active queriers
634            zc.refresh_active_services();
635
636            // Refresh cached A/AAAA records with active queriers
637            let mut query_count = 0;
638            for (hostname, _sender) in zc.hostname_resolvers.iter() {
639                for (hostname, ip_addr) in
640                    zc.cache.refresh_due_hostname_resolutions(hostname).iter()
641                {
642                    zc.send_query(hostname, ip_address_rr_type(ip_addr));
643                    query_count += 1;
644                }
645            }
646
647            zc.increase_counter(Counter::CacheRefreshAddr, query_count);
648
649            // check and evict expired records in our cache
650            let now = current_time_millis();
651
652            // Notify service listeners about the expired records.
653            let expired_services = zc.cache.evict_expired_services(now);
654            zc.notify_service_removal(expired_services);
655
656            // Notify hostname listeners about the expired records.
657            let expired_addrs = zc.cache.evict_expired_addr(now);
658            for (hostname, addrs) in expired_addrs {
659                call_hostname_resolution_listener(
660                    &zc.hostname_resolvers,
661                    &hostname,
662                    HostnameResolutionEvent::AddressesRemoved(hostname.clone(), addrs),
663                );
664                let instances = zc.cache.get_instances_on_host(&hostname);
665                let instance_set: HashSet<String> = instances.into_iter().collect();
666                zc.resolve_updated_instances(&instance_set);
667            }
668
669            // Send out probing queries.
670            zc.probing_handler();
671
672            // check IP changes if next_ip_check is reached.
673            if now >= next_ip_check && next_ip_check > 0 {
674                next_ip_check = now + zc.ip_check_interval;
675                zc.add_timer(next_ip_check);
676
677                zc.check_ip_changes();
678            }
679        }
680    }
681}
682
683/// Creates a new UDP socket that uses `intf` to send and recv multicast.
684fn new_socket_bind(intf: &Interface, should_loop: bool) -> Result<MioUdpSocket> {
685    // Use the same socket for receiving and sending multicast packets.
686    // Such socket has to bind to INADDR_ANY or IN6ADDR_ANY.
687    let intf_ip = &intf.ip();
688    match intf_ip {
689        IpAddr::V4(ip) => {
690            let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), MDNS_PORT);
691            let sock = new_socket(addr.into(), true)?;
692
693            // Join mDNS group to receive packets.
694            sock.join_multicast_v4(&GROUP_ADDR_V4, ip)
695                .map_err(|e| e_fmt!("join multicast group on addr {}: {}", intf_ip, e))?;
696
697            // Set IP_MULTICAST_IF to send packets.
698            sock.set_multicast_if_v4(ip)
699                .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
700
701            // Per RFC 6762 section 11:
702            // "All Multicast DNS responses (including responses sent via unicast) SHOULD
703            // be sent with IP TTL set to 255."
704            // Here we set the TTL to 255 for multicast as we don't support unicast yet.
705            sock.set_multicast_ttl_v4(255)
706                .map_err(|e| e_fmt!("set set_multicast_ttl_v4 on addr {}: {}", ip, e))?;
707
708            if !should_loop {
709                sock.set_multicast_loop_v4(false)
710                    .map_err(|e| e_fmt!("failed to set multicast loop v4 for {ip}: {e}"))?;
711            }
712
713            // Test if we can send packets successfully.
714            let multicast_addr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
715            let test_packets = DnsOutgoing::new(0).to_data_on_wire();
716            for packet in test_packets {
717                sock.send_to(&packet, &multicast_addr)
718                    .map_err(|e| e_fmt!("send multicast packet on addr {}: {}", ip, e))?;
719            }
720            Ok(MioUdpSocket::from_std(UdpSocket::from(sock)))
721        }
722        IpAddr::V6(ip) => {
723            let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), MDNS_PORT, 0, 0);
724            let sock = new_socket(addr.into(), true)?;
725
726            // Join mDNS group to receive packets.
727            sock.join_multicast_v6(&GROUP_ADDR_V6, intf.index.unwrap_or(0))
728                .map_err(|e| e_fmt!("join multicast group on addr {}: {}", ip, e))?;
729
730            // Set IPV6_MULTICAST_IF to send packets.
731            sock.set_multicast_if_v6(intf.index.unwrap_or(0))
732                .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
733
734            // Per RFC 6762 section 11:
735            // "All Multicast DNS responses (including responses sent via unicast) SHOULD
736            // be sent with IP TTL set to 255."
737            sock.set_multicast_hops_v6(255)
738                .map_err(|e| e_fmt!("set set_multicast_hops_v6 on addr {}: {}", ip, e))?;
739
740            // We are not sending multicast packets to test this socket as there might
741            // be many IPv6 interfaces on a host and could cause such send error:
742            // "No buffer space available (os error 55)".
743
744            Ok(MioUdpSocket::from_std(UdpSocket::from(sock)))
745        }
746    }
747}
748
749/// Creates a new UDP socket to bind to `port` with REUSEPORT option.
750/// `non_block` indicates whether to set O_NONBLOCK for the socket.
751fn new_socket(addr: SocketAddr, non_block: bool) -> Result<Socket> {
752    let domain = match addr {
753        SocketAddr::V4(_) => socket2::Domain::IPV4,
754        SocketAddr::V6(_) => socket2::Domain::IPV6,
755    };
756
757    let fd = Socket::new(domain, socket2::Type::DGRAM, None)
758        .map_err(|e| e_fmt!("create socket failed: {}", e))?;
759
760    fd.set_reuse_address(true)
761        .map_err(|e| e_fmt!("set ReuseAddr failed: {}", e))?;
762    #[cfg(unix)] // this is currently restricted to Unix's in socket2
763    fd.set_reuse_port(true)
764        .map_err(|e| e_fmt!("set ReusePort failed: {}", e))?;
765
766    if non_block {
767        fd.set_nonblocking(true)
768            .map_err(|e| e_fmt!("set O_NONBLOCK: {}", e))?;
769    }
770
771    fd.bind(&addr.into())
772        .map_err(|e| e_fmt!("socket bind to {} failed: {}", &addr, e))?;
773
774    trace!("new socket bind to {}", &addr);
775    Ok(fd)
776}
777
778/// Specify a UNIX timestamp in millis to run `command` for the next time.
779struct ReRun {
780    /// UNIX timestamp in millis.
781    next_time: u64,
782    command: Command,
783}
784
785/// Enum to represent the IP version.
786#[derive(Debug, Eq, Hash, PartialEq)]
787enum IpVersion {
788    V4,
789    V6,
790}
791
792/// A struct to track multicast send status for a network interface.
793#[derive(Debug, Eq, Hash, PartialEq)]
794struct MulticastSendTracker {
795    intf_index: u32,
796    ip_version: IpVersion,
797}
798
799/// Returns the multicast send tracker if the interface index is valid
800fn multicast_send_tracker(intf: &Interface) -> Option<MulticastSendTracker> {
801    match intf.index {
802        Some(index) => {
803            let ip_ver = match intf.addr {
804                IfAddr::V4(_) => IpVersion::V4,
805                IfAddr::V6(_) => IpVersion::V6,
806            };
807            Some(MulticastSendTracker {
808                intf_index: index,
809                ip_version: ip_ver,
810            })
811        }
812        None => None,
813    }
814}
815
816/// Specify kinds of interfaces. It is used to enable or to disable interfaces in the daemon.
817///
818/// Note that for ergonomic reasons, `From<&str>` and `From<IpAddr>` are implemented.
819#[derive(Debug, Clone)]
820#[non_exhaustive]
821pub enum IfKind {
822    /// All interfaces.
823    All,
824
825    /// All IPv4 interfaces.
826    IPv4,
827
828    /// All IPv6 interfaces.
829    IPv6,
830
831    /// By the interface name, for example "en0"
832    Name(String),
833
834    /// By an IPv4 or IPv6 address.
835    Addr(IpAddr),
836
837    /// 127.0.0.1 (or anything in 127.0.0.0/8), disabled by default.
838    ///
839    /// Use [ServiceDaemon::enable_interface] to support registering services on loopback interfaces,
840    /// which is required by some use cases (e.g., OSCQuery) that publish via mDNS.
841    LoopbackV4,
842
843    /// ::1/128, disabled by default.
844    LoopbackV6,
845}
846
847impl IfKind {
848    /// Checks if `intf` matches with this interface kind.
849    fn matches(&self, intf: &Interface) -> bool {
850        match self {
851            Self::All => true,
852            Self::IPv4 => intf.ip().is_ipv4(),
853            Self::IPv6 => intf.ip().is_ipv6(),
854            Self::Name(ifname) => ifname == &intf.name,
855            Self::Addr(addr) => addr == &intf.ip(),
856            Self::LoopbackV4 => intf.is_loopback() && intf.ip().is_ipv4(),
857            Self::LoopbackV6 => intf.is_loopback() && intf.ip().is_ipv6(),
858        }
859    }
860}
861
862/// The first use case of specifying an interface was to
863/// use an interface name. Hence adding this for ergonomic reasons.
864impl From<&str> for IfKind {
865    fn from(val: &str) -> Self {
866        Self::Name(val.to_string())
867    }
868}
869
870impl From<&String> for IfKind {
871    fn from(val: &String) -> Self {
872        Self::Name(val.to_string())
873    }
874}
875
876/// Still for ergonomic reasons.
877impl From<IpAddr> for IfKind {
878    fn from(val: IpAddr) -> Self {
879        Self::Addr(val)
880    }
881}
882
883/// A list of `IfKind` that can be used to match interfaces.
884pub struct IfKindVec {
885    kinds: Vec<IfKind>,
886}
887
888/// A trait that converts a type into a Vec of `IfKind`.
889pub trait IntoIfKindVec {
890    fn into_vec(self) -> IfKindVec;
891}
892
893impl<T: Into<IfKind>> IntoIfKindVec for T {
894    fn into_vec(self) -> IfKindVec {
895        let if_kind: IfKind = self.into();
896        IfKindVec {
897            kinds: vec![if_kind],
898        }
899    }
900}
901
902impl<T: Into<IfKind>> IntoIfKindVec for Vec<T> {
903    fn into_vec(self) -> IfKindVec {
904        let kinds: Vec<IfKind> = self.into_iter().map(|x| x.into()).collect();
905        IfKindVec { kinds }
906    }
907}
908
909/// Selection of interfaces.
910struct IfSelection {
911    /// The interfaces to be selected.
912    if_kind: IfKind,
913
914    /// Whether the `if_kind` should be enabled or not.
915    selected: bool,
916}
917
918/// A struct holding the state. It was inspired by `zeroconf` package in Python.
919struct Zeroconf {
920    /// Local interfaces with sockets to recv/send on these interfaces.
921    intf_socks: HashMap<Interface, MioUdpSocket>,
922
923    /// Map poll id to Interface.
924    poll_ids: HashMap<usize, Interface>,
925
926    /// Next poll id value
927    poll_id_count: usize,
928
929    /// Local registered services, keyed by service full names.
930    my_services: HashMap<String, ServiceInfo>,
931
932    /// Received DNS records.
933    cache: DnsCache,
934
935    /// Registered service records.
936    dns_registry_map: HashMap<Interface, DnsRegistry>,
937
938    /// Active "Browse" commands.
939    service_queriers: HashMap<String, Sender<ServiceEvent>>, // <ty_domain, channel::sender>
940
941    /// Active "ResolveHostname" commands.
942    ///
943    /// The timestamps are set at the future timestamp when the command should timeout.
944    /// `hostname` is case-insensitive and stored in lowercase.
945    hostname_resolvers: HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>, // <hostname, (channel::sender, UNIX timestamp in millis)>
946
947    /// All repeating transmissions.
948    retransmissions: Vec<ReRun>,
949
950    counters: Metrics,
951
952    /// Waits for incoming packets.
953    poller: Poll,
954
955    /// Channels to notify events.
956    monitors: Vec<Sender<DaemonEvent>>,
957
958    /// Options
959    service_name_len_max: u8,
960
961    /// Interval in millis to check IP address changes.
962    ip_check_interval: u64,
963
964    /// All interface selections called to the daemon.
965    if_selections: Vec<IfSelection>,
966
967    /// Socket for signaling.
968    signal_sock: MioUdpSocket,
969
970    /// Timestamps marking where we need another iteration of the run loop,
971    /// to react to events like retransmissions, cache refreshes, interface IP address changes, etc.
972    ///
973    /// When the run loop goes through a single iteration, it will
974    /// set its timeout to the earliest timer in this list.
975    timers: BinaryHeap<Reverse<u64>>,
976
977    status: DaemonStatus,
978
979    /// Service instances that are pending for resolving SRV and TXT.
980    pending_resolves: HashSet<String>,
981
982    /// Service instances that are already resolved.
983    resolved: HashSet<String>,
984
985    multicast_loop_v4: bool,
986
987    multicast_loop_v6: bool,
988}
989
990impl Zeroconf {
991    fn new(signal_sock: MioUdpSocket, poller: Poll) -> Self {
992        // Get interfaces.
993        let my_ifaddrs = my_ip_interfaces(false);
994
995        // Create a socket for every IP addr.
996        // Note: it is possible that `my_ifaddrs` contains the same IP addr with different interface names,
997        // or the same interface name with different IP addrs.
998        let mut intf_socks = HashMap::new();
999        let mut dns_registry_map = HashMap::new();
1000
1001        for intf in my_ifaddrs {
1002            let sock = match new_socket_bind(&intf, true) {
1003                Ok(s) => s,
1004                Err(e) => {
1005                    trace!("bind a socket to {}: {}. Skipped.", &intf.ip(), e);
1006                    continue;
1007                }
1008            };
1009
1010            dns_registry_map.insert(intf.clone(), DnsRegistry::new());
1011
1012            intf_socks.insert(intf, sock);
1013        }
1014
1015        let monitors = Vec::new();
1016        let service_name_len_max = SERVICE_NAME_LEN_MAX_DEFAULT;
1017        let ip_check_interval = IP_CHECK_INTERVAL_IN_SECS_DEFAULT as u64 * 1000;
1018
1019        let timers = BinaryHeap::new();
1020
1021        // Disable loopback by default.
1022        let if_selections = vec![
1023            IfSelection {
1024                if_kind: IfKind::LoopbackV4,
1025                selected: false,
1026            },
1027            IfSelection {
1028                if_kind: IfKind::LoopbackV6,
1029                selected: false,
1030            },
1031        ];
1032
1033        let status = DaemonStatus::Running;
1034
1035        Self {
1036            intf_socks,
1037            poll_ids: HashMap::new(),
1038            poll_id_count: 0,
1039            my_services: HashMap::new(),
1040            cache: DnsCache::new(),
1041            dns_registry_map,
1042            hostname_resolvers: HashMap::new(),
1043            service_queriers: HashMap::new(),
1044            retransmissions: Vec::new(),
1045            counters: HashMap::new(),
1046            poller,
1047            monitors,
1048            service_name_len_max,
1049            ip_check_interval,
1050            if_selections,
1051            signal_sock,
1052            timers,
1053            status,
1054            pending_resolves: HashSet::new(),
1055            resolved: HashSet::new(),
1056            multicast_loop_v4: true,
1057            multicast_loop_v6: true,
1058        }
1059    }
1060
1061    fn process_set_option(&mut self, daemon_opt: DaemonOption) {
1062        match daemon_opt {
1063            DaemonOption::ServiceNameLenMax(length) => self.service_name_len_max = length,
1064            DaemonOption::IpCheckInterval(interval) => self.ip_check_interval = interval,
1065            DaemonOption::EnableInterface(if_kind) => self.enable_interface(if_kind),
1066            DaemonOption::DisableInterface(if_kind) => self.disable_interface(if_kind),
1067            DaemonOption::MulticastLoopV4(on) => self.set_multicast_loop_v4(on),
1068            DaemonOption::MulticastLoopV6(on) => self.set_multicast_loop_v6(on),
1069        }
1070    }
1071
1072    fn enable_interface(&mut self, kinds: Vec<IfKind>) {
1073        for if_kind in kinds {
1074            self.if_selections.push(IfSelection {
1075                if_kind,
1076                selected: true,
1077            });
1078        }
1079
1080        self.apply_intf_selections(my_ip_interfaces(true));
1081    }
1082
1083    fn disable_interface(&mut self, kinds: Vec<IfKind>) {
1084        for if_kind in kinds {
1085            self.if_selections.push(IfSelection {
1086                if_kind,
1087                selected: false,
1088            });
1089        }
1090
1091        self.apply_intf_selections(my_ip_interfaces(true));
1092    }
1093
1094    fn set_multicast_loop_v4(&mut self, on: bool) {
1095        for (_, sock) in self.intf_socks.iter_mut() {
1096            if let Err(e) = sock.set_multicast_loop_v4(on) {
1097                debug!("failed to set multicast loop v4: {e}");
1098            }
1099        }
1100    }
1101
1102    fn set_multicast_loop_v6(&mut self, on: bool) {
1103        for (_, sock) in self.intf_socks.iter_mut() {
1104            if let Err(e) = sock.set_multicast_loop_v6(on) {
1105                debug!("failed to set multicast loop v6: {e}");
1106            }
1107        }
1108    }
1109
1110    fn notify_monitors(&mut self, event: DaemonEvent) {
1111        // Only retain the monitors that are still connected.
1112        self.monitors.retain(|sender| {
1113            if let Err(e) = sender.try_send(event.clone()) {
1114                debug!("notify_monitors: try_send: {}", &e);
1115                if matches!(e, TrySendError::Disconnected(_)) {
1116                    return false; // This monitor is dropped.
1117                }
1118            }
1119            true
1120        });
1121    }
1122
1123    /// Remove `addr` in my services that enabled `addr_auto`.
1124    fn del_addr_in_my_services(&mut self, addr: &IpAddr) {
1125        for (_, service_info) in self.my_services.iter_mut() {
1126            if service_info.is_addr_auto() {
1127                service_info.remove_ipaddr(addr);
1128            }
1129        }
1130    }
1131
1132    /// Insert a new interface into the poll map and return key
1133    fn add_poll(&mut self, intf: Interface) -> usize {
1134        Self::add_poll_impl(&mut self.poll_ids, &mut self.poll_id_count, intf)
1135    }
1136
1137    /// Insert a new interface into the poll map and return its key.
1138    ///
1139    /// This exists to satisfy the borrow checker
1140    fn add_poll_impl(
1141        poll_ids: &mut HashMap<usize, Interface>,
1142        poll_id_count: &mut usize,
1143        intf: Interface,
1144    ) -> usize {
1145        let key = *poll_id_count;
1146        *poll_id_count += 1;
1147        let _ = (*poll_ids).insert(key, intf);
1148        key
1149    }
1150
1151    fn add_timer(&mut self, next_time: u64) {
1152        self.timers.push(Reverse(next_time));
1153    }
1154
1155    fn peek_earliest_timer(&self) -> Option<u64> {
1156        self.timers.peek().map(|Reverse(v)| *v)
1157    }
1158
1159    fn _pop_earliest_timer(&mut self) -> Option<u64> {
1160        self.timers.pop().map(|Reverse(v)| v)
1161    }
1162
1163    /// Pop all timers that are already passed till `now`.
1164    fn pop_timers_till(&mut self, now: u64) {
1165        while let Some(Reverse(v)) = self.timers.peek() {
1166            if *v > now {
1167                break;
1168            }
1169            self.timers.pop();
1170        }
1171    }
1172
1173    /// Apply all selections to `interfaces` and return the selected addresses.
1174    fn selected_addrs(&self, interfaces: Vec<Interface>) -> HashSet<IpAddr> {
1175        let intf_count = interfaces.len();
1176        let mut intf_selections = vec![true; intf_count];
1177
1178        // apply if_selections
1179        for selection in self.if_selections.iter() {
1180            // Mark the interfaces for this selection.
1181            for i in 0..intf_count {
1182                if selection.if_kind.matches(&interfaces[i]) {
1183                    intf_selections[i] = selection.selected;
1184                }
1185            }
1186        }
1187
1188        let mut selected_addrs = HashSet::new();
1189        for i in 0..intf_count {
1190            if intf_selections[i] {
1191                selected_addrs.insert(interfaces[i].addr.ip());
1192            }
1193        }
1194
1195        selected_addrs
1196    }
1197
1198    /// Apply all selections to `interfaces`.
1199    ///
1200    /// For any interface, add it if selected but not bound yet,
1201    /// delete it if not selected but still bound.
1202    fn apply_intf_selections(&mut self, interfaces: Vec<Interface>) {
1203        // By default, we enable all interfaces.
1204        let intf_count = interfaces.len();
1205        let mut intf_selections = vec![true; intf_count];
1206
1207        // apply if_selections
1208        for selection in self.if_selections.iter() {
1209            // Mark the interfaces for this selection.
1210            for i in 0..intf_count {
1211                if selection.if_kind.matches(&interfaces[i]) {
1212                    intf_selections[i] = selection.selected;
1213                }
1214            }
1215        }
1216
1217        // Update `intf_socks` based on the selections.
1218        for (idx, intf) in interfaces.into_iter().enumerate() {
1219            if intf_selections[idx] {
1220                // Add the interface
1221                if !self.intf_socks.contains_key(&intf) {
1222                    debug!("apply_intf_selections: add {:?}", &intf.ip());
1223                    self.add_new_interface(intf);
1224                }
1225            } else {
1226                // Remove the interface
1227                if let Some(mut sock) = self.intf_socks.remove(&intf) {
1228                    match self.poller.registry().deregister(&mut sock) {
1229                        Ok(()) => debug!("apply_intf_selections: deregister {:?}", &intf.ip()),
1230                        Err(e) => debug!("apply_intf_selections: poller.delete {:?}: {}", &intf, e),
1231                    }
1232
1233                    // Remove from poll_ids
1234                    self.poll_ids.retain(|_, v| v != &intf);
1235
1236                    // Remove cache records for this interface.
1237                    self.cache.remove_addrs_on_disabled_intf(&intf);
1238                }
1239            }
1240        }
1241    }
1242
1243    /// Check for IP changes and update intf_socks as needed.
1244    fn check_ip_changes(&mut self) {
1245        // Get the current interfaces.
1246        let my_ifaddrs = my_ip_interfaces(true);
1247
1248        let poll_ids = &mut self.poll_ids;
1249        let poller = &mut self.poller;
1250        // Remove unused sockets in the poller.
1251        let deleted_addrs = self
1252            .intf_socks
1253            .iter_mut()
1254            .filter_map(|(intf, sock)| {
1255                if !my_ifaddrs.contains(intf) {
1256                    if let Err(e) = poller.registry().deregister(sock) {
1257                        debug!("check_ip_changes: poller.delete {:?}: {}", intf, e);
1258                    }
1259                    // Remove from poll_ids
1260                    poll_ids.retain(|_, v| v != intf);
1261                    Some(intf.ip())
1262                } else {
1263                    None
1264                }
1265            })
1266            .collect::<Vec<IpAddr>>();
1267
1268        // Remove deleted addrs from my services that enabled `addr_auto`.
1269        for ip in deleted_addrs.iter() {
1270            self.del_addr_in_my_services(ip);
1271            self.notify_monitors(DaemonEvent::IpDel(*ip));
1272        }
1273
1274        // Keep the interfaces only if they still exist.
1275        self.intf_socks.retain(|intf, _| my_ifaddrs.contains(intf));
1276
1277        // Add newly found interfaces only if in our selections.
1278        self.apply_intf_selections(my_ifaddrs);
1279    }
1280
1281    fn add_new_interface(&mut self, intf: Interface) {
1282        // Bind the new interface.
1283        let new_ip = intf.ip();
1284        let should_loop = if new_ip.is_ipv4() {
1285            self.multicast_loop_v4
1286        } else {
1287            self.multicast_loop_v6
1288        };
1289        let mut sock = match new_socket_bind(&intf, should_loop) {
1290            Ok(s) => s,
1291            Err(e) => {
1292                debug!("bind a socket to {}: {}. Skipped.", &intf.ip(), e);
1293                return;
1294            }
1295        };
1296
1297        // Add the new interface into the poller.
1298        let key = self.add_poll(intf.clone());
1299        if let Err(e) =
1300            self.poller
1301                .registry()
1302                .register(&mut sock, mio::Token(key), mio::Interest::READABLE)
1303        {
1304            debug!("check_ip_changes: poller add ip {}: {}", new_ip, e);
1305            return;
1306        }
1307
1308        debug!("add new interface {}: {new_ip}", intf.name);
1309        let dns_registry = match self.dns_registry_map.get_mut(&intf) {
1310            Some(registry) => registry,
1311            None => self
1312                .dns_registry_map
1313                .entry(intf.clone())
1314                .or_insert_with(DnsRegistry::new),
1315        };
1316
1317        for (_, service_info) in self.my_services.iter_mut() {
1318            if service_info.is_addr_auto() {
1319                service_info.insert_ipaddr(new_ip);
1320
1321                if announce_service_on_intf(dns_registry, service_info, &intf, &sock) {
1322                    debug!(
1323                        "Announce service {} on {}",
1324                        service_info.get_fullname(),
1325                        intf.ip()
1326                    );
1327                    service_info.set_status(&intf, ServiceStatus::Announced);
1328                } else {
1329                    for timer in dns_registry.new_timers.drain(..) {
1330                        self.timers.push(Reverse(timer));
1331                    }
1332                    service_info.set_status(&intf, ServiceStatus::Probing);
1333                }
1334            }
1335        }
1336
1337        self.intf_socks.insert(intf, sock);
1338
1339        // Notify the monitors.
1340        self.notify_monitors(DaemonEvent::IpAdd(new_ip));
1341    }
1342
1343    /// Registers a service.
1344    ///
1345    /// RFC 6762 section 8.3.
1346    /// ...the Multicast DNS responder MUST send
1347    ///    an unsolicited Multicast DNS response containing, in the Answer
1348    ///    Section, all of its newly registered resource records
1349    ///
1350    /// Zeroconf will then respond to requests for information about this service.
1351    fn register_service(&mut self, mut info: ServiceInfo) {
1352        // Check the service name length.
1353        if let Err(e) = check_service_name_length(info.get_type(), self.service_name_len_max) {
1354            debug!("check_service_name_length: {}", &e);
1355            self.notify_monitors(DaemonEvent::Error(e));
1356            return;
1357        }
1358
1359        if info.is_addr_auto() {
1360            let selected_addrs = self.selected_addrs(my_ip_interfaces(true));
1361            for addr in selected_addrs {
1362                info.insert_ipaddr(addr);
1363            }
1364        }
1365
1366        debug!("register service {:?}", &info);
1367
1368        let outgoing_addrs = self.send_unsolicited_response(&mut info);
1369        if !outgoing_addrs.is_empty() {
1370            self.notify_monitors(DaemonEvent::Announce(
1371                info.get_fullname().to_string(),
1372                format!("{:?}", &outgoing_addrs),
1373            ));
1374        }
1375
1376        // The key has to be lower case letter as DNS record name is case insensitive.
1377        // The info will have the original name.
1378        let service_fullname = info.get_fullname().to_lowercase();
1379        self.my_services.insert(service_fullname, info);
1380    }
1381
1382    /// Sends out announcement of `info` on every valid interface.
1383    /// Returns the list of interface IPs that sent out the announcement.
1384    fn send_unsolicited_response(&mut self, info: &mut ServiceInfo) -> Vec<IpAddr> {
1385        let mut outgoing_addrs = Vec::new();
1386        // Send the announcement on one interface per ip version.
1387        let mut multicast_sent_trackers = HashSet::new();
1388
1389        let mut outgoing_intfs = Vec::new();
1390
1391        for (intf, sock) in self.intf_socks.iter() {
1392            if let Some(tracker) = multicast_send_tracker(intf) {
1393                if multicast_sent_trackers.contains(&tracker) {
1394                    continue; // No need to send again on the same interface with same ip version.
1395                }
1396            }
1397
1398            let dns_registry = match self.dns_registry_map.get_mut(intf) {
1399                Some(registry) => registry,
1400                None => self
1401                    .dns_registry_map
1402                    .entry(intf.clone())
1403                    .or_insert_with(DnsRegistry::new),
1404            };
1405
1406            if announce_service_on_intf(dns_registry, info, intf, sock) {
1407                if let Some(tracker) = multicast_send_tracker(intf) {
1408                    multicast_sent_trackers.insert(tracker);
1409                }
1410                outgoing_addrs.push(intf.ip());
1411                outgoing_intfs.push(intf.clone());
1412
1413                debug!("Announce service {} on {}", info.get_fullname(), intf.ip());
1414
1415                info.set_status(intf, ServiceStatus::Announced);
1416            } else {
1417                for timer in dns_registry.new_timers.drain(..) {
1418                    self.timers.push(Reverse(timer));
1419                }
1420                info.set_status(intf, ServiceStatus::Probing);
1421            }
1422        }
1423
1424        // RFC 6762 section 8.3.
1425        // ..The Multicast DNS responder MUST send at least two unsolicited
1426        //    responses, one second apart.
1427        let next_time = current_time_millis() + 1000;
1428        for intf in outgoing_intfs {
1429            self.add_retransmission(
1430                next_time,
1431                Command::RegisterResend(info.get_fullname().to_string(), intf),
1432            );
1433        }
1434
1435        outgoing_addrs
1436    }
1437
1438    /// Send probings or finish them if expired. Notify waiting services.
1439    fn probing_handler(&mut self) {
1440        let now = current_time_millis();
1441
1442        for (intf, sock) in self.intf_socks.iter() {
1443            let Some(dns_registry) = self.dns_registry_map.get_mut(intf) else {
1444                continue;
1445            };
1446
1447            let (out, expired_probes) = check_probing(dns_registry, &mut self.timers, now);
1448
1449            // send probing.
1450            if !out.questions().is_empty() {
1451                debug!("sending out probing of {} questions", out.questions().len());
1452                send_dns_outgoing(&out, intf, sock);
1453            }
1454
1455            // For finished probes, wake up services that are waiting for the probes.
1456            let waiting_services =
1457                handle_expired_probes(expired_probes, &intf.name, dns_registry, &mut self.monitors);
1458
1459            for service_name in waiting_services {
1460                debug!(
1461                    "try to announce service {service_name} on intf {}",
1462                    intf.ip()
1463                );
1464                // service names are lowercase
1465                if let Some(info) = self.my_services.get_mut(&service_name.to_lowercase()) {
1466                    if info.get_status(intf) == ServiceStatus::Announced {
1467                        debug!("service {} already announced", info.get_fullname());
1468                        continue;
1469                    }
1470
1471                    if announce_service_on_intf(dns_registry, info, intf, sock) {
1472                        let next_time = now + 1000;
1473                        let command =
1474                            Command::RegisterResend(info.get_fullname().to_string(), intf.clone());
1475                        self.retransmissions.push(ReRun { next_time, command });
1476                        self.timers.push(Reverse(next_time));
1477
1478                        let fullname = match dns_registry.name_changes.get(&service_name) {
1479                            Some(new_name) => new_name.to_string(),
1480                            None => service_name.to_string(),
1481                        };
1482
1483                        let mut hostname = info.get_hostname();
1484                        if let Some(new_name) = dns_registry.name_changes.get(hostname) {
1485                            hostname = new_name;
1486                        }
1487
1488                        debug!("wake up: announce service {} on {}", fullname, intf.ip());
1489                        notify_monitors(
1490                            &mut self.monitors,
1491                            DaemonEvent::Announce(fullname, format!("{}:{}", hostname, &intf.ip())),
1492                        );
1493
1494                        info.set_status(intf, ServiceStatus::Announced);
1495                    }
1496                }
1497            }
1498        }
1499    }
1500
1501    fn unregister_service(
1502        &self,
1503        info: &ServiceInfo,
1504        intf: &Interface,
1505        sock: &MioUdpSocket,
1506    ) -> Vec<u8> {
1507        let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
1508        out.add_answer_at_time(
1509            DnsPointer::new(
1510                info.get_type(),
1511                RRType::PTR,
1512                CLASS_IN,
1513                0,
1514                info.get_fullname().to_string(),
1515            ),
1516            0,
1517        );
1518
1519        if let Some(sub) = info.get_subtype() {
1520            trace!("Adding subdomain {}", sub);
1521            out.add_answer_at_time(
1522                DnsPointer::new(
1523                    sub,
1524                    RRType::PTR,
1525                    CLASS_IN,
1526                    0,
1527                    info.get_fullname().to_string(),
1528                ),
1529                0,
1530            );
1531        }
1532
1533        out.add_answer_at_time(
1534            DnsSrv::new(
1535                info.get_fullname(),
1536                CLASS_IN | CLASS_CACHE_FLUSH,
1537                0,
1538                info.get_priority(),
1539                info.get_weight(),
1540                info.get_port(),
1541                info.get_hostname().to_string(),
1542            ),
1543            0,
1544        );
1545        out.add_answer_at_time(
1546            DnsTxt::new(
1547                info.get_fullname(),
1548                CLASS_IN | CLASS_CACHE_FLUSH,
1549                0,
1550                info.generate_txt(),
1551            ),
1552            0,
1553        );
1554
1555        for address in info.get_addrs_on_intf(intf) {
1556            out.add_answer_at_time(
1557                DnsAddress::new(
1558                    info.get_hostname(),
1559                    ip_address_rr_type(&address),
1560                    CLASS_IN | CLASS_CACHE_FLUSH,
1561                    0,
1562                    address,
1563                    intf.into(),
1564                ),
1565                0,
1566            );
1567        }
1568
1569        // `out` data is non-empty, hence we can do this.
1570        send_dns_outgoing(&out, intf, sock).remove(0)
1571    }
1572
1573    /// Binds a channel `listener` to querying mDNS hostnames.
1574    ///
1575    /// If there is already a `listener`, it will be updated, i.e. overwritten.
1576    fn add_hostname_resolver(
1577        &mut self,
1578        hostname: String,
1579        listener: Sender<HostnameResolutionEvent>,
1580        timeout: Option<u64>,
1581    ) {
1582        let real_timeout = timeout.map(|t| current_time_millis() + t);
1583        self.hostname_resolvers
1584            .insert(hostname.to_lowercase(), (listener, real_timeout));
1585        if let Some(t) = real_timeout {
1586            self.add_timer(t);
1587        }
1588    }
1589
1590    /// Sends a multicast query for `name` with `qtype`.
1591    fn send_query(&self, name: &str, qtype: RRType) {
1592        self.send_query_vec(&[(name, qtype)]);
1593    }
1594
1595    /// Sends out a list of `questions` (i.e. DNS questions) via multicast.
1596    fn send_query_vec(&self, questions: &[(&str, RRType)]) {
1597        trace!("Sending query questions: {:?}", questions);
1598        let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
1599        let now = current_time_millis();
1600
1601        for (name, qtype) in questions {
1602            out.add_question(name, *qtype);
1603
1604            for record in self.cache.get_known_answers(name, *qtype, now) {
1605                /*
1606                RFC 6762 section 7.1: https://datatracker.ietf.org/doc/html/rfc6762#section-7.1
1607                ...
1608                    When a Multicast DNS querier sends a query to which it already knows
1609                    some answers, it populates the Answer Section of the DNS query
1610                    message with those answers.
1611                 */
1612                trace!("add known answer: {:?}", record);
1613                let mut new_record = record.clone();
1614                new_record.get_record_mut().update_ttl(now);
1615                out.add_answer_box(new_record);
1616            }
1617        }
1618
1619        // Send the query on one interface per ip version.
1620        let mut multicast_sent_trackers = HashSet::new();
1621        for (intf, sock) in self.intf_socks.iter() {
1622            if let Some(tracker) = multicast_send_tracker(intf) {
1623                if multicast_sent_trackers.contains(&tracker) {
1624                    continue; // no need to send query the same interface with same ip version.
1625                }
1626                multicast_sent_trackers.insert(tracker);
1627            }
1628            send_dns_outgoing(&out, intf, sock);
1629        }
1630    }
1631
1632    /// Reads one UDP datagram from the socket of `intf`.
1633    ///
1634    /// Returns false if failed to receive a packet,
1635    /// otherwise returns true.
1636    fn handle_read(&mut self, intf: &Interface) -> bool {
1637        let sock = match self.intf_socks.get_mut(intf) {
1638            Some(if_sock) => if_sock,
1639            None => return false,
1640        };
1641        let mut buf = vec![0u8; MAX_MSG_ABSOLUTE];
1642
1643        // Read the next mDNS UDP datagram.
1644        //
1645        // If the datagram is larger than `buf`, excess bytes may or may not
1646        // be truncated by the socket layer depending on the platform's libc.
1647        // In any case, such large datagram will not be decoded properly and
1648        // this function should return false but should not crash.
1649        let sz = match sock.recv(&mut buf) {
1650            Ok(sz) => sz,
1651            Err(e) => {
1652                if e.kind() != std::io::ErrorKind::WouldBlock {
1653                    debug!("listening socket read failed: {}", e);
1654                }
1655                return false;
1656            }
1657        };
1658
1659        trace!("received {} bytes at IP: {}", sz, intf.ip());
1660
1661        // If sz is 0, it means sock reached End-of-File.
1662        if sz == 0 {
1663            debug!("socket {:?} was likely shutdown", &sock);
1664            if let Err(e) = self.poller.registry().deregister(sock) {
1665                debug!("failed to remove sock {:?} from poller: {}", sock, &e);
1666            }
1667
1668            // Replace the closed socket with a new one.
1669            let should_loop = if intf.ip().is_ipv4() {
1670                self.multicast_loop_v4
1671            } else {
1672                self.multicast_loop_v6
1673            };
1674            match new_socket_bind(intf, should_loop) {
1675                Ok(new_sock) => {
1676                    trace!("reset socket for IP {}", intf.ip());
1677                    self.intf_socks.insert(intf.clone(), new_sock);
1678                }
1679                Err(e) => debug!("re-bind a socket to {:?}: {}", intf, e),
1680            }
1681
1682            return false;
1683        }
1684
1685        buf.truncate(sz); // reduce potential processing errors
1686
1687        match DnsIncoming::new(buf, intf.into()) {
1688            Ok(msg) => {
1689                if msg.is_query() {
1690                    self.handle_query(msg, intf);
1691                } else if msg.is_response() {
1692                    self.handle_response(msg, intf);
1693                } else {
1694                    debug!("Invalid message: not query and not response");
1695                }
1696            }
1697            Err(e) => debug!("Invalid incoming DNS message: {}", e),
1698        }
1699
1700        true
1701    }
1702
1703    /// Returns true, if sent query. Returns false if SRV already exists.
1704    fn query_unresolved(&mut self, instance: &str) -> bool {
1705        if !valid_instance_name(instance) {
1706            trace!("instance name {} not valid", instance);
1707            return false;
1708        }
1709
1710        if let Some(records) = self.cache.get_srv(instance) {
1711            for record in records {
1712                if let Some(srv) = record.any().downcast_ref::<DnsSrv>() {
1713                    if self.cache.get_addr(srv.host()).is_none() {
1714                        self.send_query_vec(&[(srv.host(), RRType::A), (srv.host(), RRType::AAAA)]);
1715                        return true;
1716                    }
1717                }
1718            }
1719        } else {
1720            self.send_query(instance, RRType::ANY);
1721            return true;
1722        }
1723
1724        false
1725    }
1726
1727    /// Checks if `ty_domain` has records in the cache. If yes, sends the
1728    /// cached records via `sender`.
1729    fn query_cache_for_service(
1730        &mut self,
1731        ty_domain: &str,
1732        sender: &Sender<ServiceEvent>,
1733        now: u64,
1734    ) {
1735        let mut resolved: HashSet<String> = HashSet::new();
1736        let mut unresolved: HashSet<String> = HashSet::new();
1737
1738        if let Some(records) = self.cache.get_ptr(ty_domain) {
1739            for record in records.iter().filter(|r| !r.expires_soon(now)) {
1740                if let Some(ptr) = record.any().downcast_ref::<DnsPointer>() {
1741                    let info = match self.create_service_info_from_cache(ty_domain, ptr.alias()) {
1742                        Ok(ok) => ok,
1743                        Err(err) => {
1744                            debug!("Error while creating service info from cache: {}", err);
1745                            continue;
1746                        }
1747                    };
1748
1749                    match sender.send(ServiceEvent::ServiceFound(
1750                        ty_domain.to_string(),
1751                        ptr.alias().to_string(),
1752                    )) {
1753                        Ok(()) => debug!("send service found {}", ptr.alias()),
1754                        Err(e) => {
1755                            debug!("failed to send service found: {}", e);
1756                            continue;
1757                        }
1758                    }
1759
1760                    if info.is_ready() {
1761                        resolved.insert(ptr.alias().to_string());
1762                        match sender.send(ServiceEvent::ServiceResolved(info)) {
1763                            Ok(()) => debug!("sent service resolved: {}", ptr.alias()),
1764                            Err(e) => debug!("failed to send service resolved: {}", e),
1765                        }
1766                    } else {
1767                        unresolved.insert(ptr.alias().to_string());
1768                    }
1769                }
1770            }
1771        }
1772
1773        for instance in resolved.drain() {
1774            self.pending_resolves.remove(&instance);
1775            self.resolved.insert(instance);
1776        }
1777
1778        for instance in unresolved.drain() {
1779            self.add_pending_resolve(instance);
1780        }
1781    }
1782
1783    /// Checks if `hostname` has records in the cache. If yes, sends the
1784    /// cached records via `sender`.
1785    fn query_cache_for_hostname(
1786        &mut self,
1787        hostname: &str,
1788        sender: Sender<HostnameResolutionEvent>,
1789    ) {
1790        let addresses_map = self.cache.get_addresses_for_host(hostname);
1791        for (name, addresses) in addresses_map {
1792            match sender.send(HostnameResolutionEvent::AddressesFound(name, addresses)) {
1793                Ok(()) => trace!("sent hostname addresses found"),
1794                Err(e) => debug!("failed to send hostname addresses found: {}", e),
1795            }
1796        }
1797    }
1798
1799    fn add_pending_resolve(&mut self, instance: String) {
1800        if !self.pending_resolves.contains(&instance) {
1801            let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
1802            self.add_retransmission(next_time, Command::Resolve(instance.clone(), 1));
1803            self.pending_resolves.insert(instance);
1804        }
1805    }
1806
1807    fn create_service_info_from_cache(
1808        &self,
1809        ty_domain: &str,
1810        fullname: &str,
1811    ) -> Result<ServiceInfo> {
1812        let my_name = {
1813            let name = fullname.trim_end_matches(split_sub_domain(ty_domain).0);
1814            name.strip_suffix('.').unwrap_or(name).to_string()
1815        };
1816
1817        let now = current_time_millis();
1818        let mut info = ServiceInfo::new(ty_domain, &my_name, "", (), 0, None)?;
1819
1820        // Be sure setting `subtype` if available even when querying for the parent domain.
1821        if let Some(subtype) = self.cache.get_subtype(fullname) {
1822            trace!(
1823                "ty_domain: {} found subtype {} for instance: {}",
1824                ty_domain,
1825                subtype,
1826                fullname
1827            );
1828            if info.get_subtype().is_none() {
1829                info.set_subtype(subtype.clone());
1830            }
1831        }
1832
1833        // resolve SRV record
1834        if let Some(records) = self.cache.get_srv(fullname) {
1835            if let Some(answer) = records.iter().find(|r| !r.expires_soon(now)) {
1836                if let Some(dns_srv) = answer.any().downcast_ref::<DnsSrv>() {
1837                    info.set_hostname(dns_srv.host().to_string());
1838                    info.set_port(dns_srv.port());
1839                }
1840            }
1841        }
1842
1843        // resolve TXT record
1844        if let Some(records) = self.cache.get_txt(fullname) {
1845            if let Some(record) = records.iter().find(|r| !r.expires_soon(now)) {
1846                if let Some(dns_txt) = record.any().downcast_ref::<DnsTxt>() {
1847                    info.set_properties_from_txt(dns_txt.text());
1848                }
1849            }
1850        }
1851
1852        // resolve A and AAAA records
1853        if let Some(records) = self.cache.get_addr(info.get_hostname()) {
1854            for answer in records.iter() {
1855                if let Some(dns_a) = answer.any().downcast_ref::<DnsAddress>() {
1856                    if dns_a.expires_soon(now) {
1857                        trace!("Addr expired or expires soon: {}", dns_a.address());
1858                    } else {
1859                        info.insert_ipaddr(dns_a.address());
1860                    }
1861                }
1862            }
1863        }
1864
1865        Ok(info)
1866    }
1867
1868    fn handle_poller_events(&mut self, events: &mio::Events) {
1869        for ev in events.iter() {
1870            trace!("event received with key {:?}", ev.token());
1871            if ev.token().0 == SIGNAL_SOCK_EVENT_KEY {
1872                // Drain signals as we will drain commands as well.
1873                self.signal_sock_drain();
1874
1875                if let Err(e) = self.poller.registry().reregister(
1876                    &mut self.signal_sock,
1877                    ev.token(),
1878                    mio::Interest::READABLE,
1879                ) {
1880                    debug!("failed to modify poller for signal socket: {}", e);
1881                }
1882                continue; // Next event.
1883            }
1884
1885            // Read until no more packets available.
1886            let intf = match self.poll_ids.get(&ev.token().0) {
1887                Some(interface) => interface.clone(),
1888                None => {
1889                    debug!("Ip for event key {} not found", ev.token().0);
1890                    break;
1891                }
1892            };
1893            while self.handle_read(&intf) {}
1894
1895            // we continue to monitor this socket.
1896            if let Some(sock) = self.intf_socks.get_mut(&intf) {
1897                if let Err(e) =
1898                    self.poller
1899                        .registry()
1900                        .reregister(sock, ev.token(), mio::Interest::READABLE)
1901                {
1902                    debug!("modify poller for interface {:?}: {}", &intf, e);
1903                    break;
1904                }
1905            }
1906        }
1907    }
1908
1909    /// Deal with incoming response packets.  All answers
1910    /// are held in the cache, and listeners are notified.
1911    fn handle_response(&mut self, mut msg: DnsIncoming, intf: &Interface) {
1912        trace!(
1913            "handle_response: {} answers {} authorities {} additionals",
1914            msg.answers().len(),
1915            &msg.authorities().len(),
1916            &msg.num_additionals()
1917        );
1918        let now = current_time_millis();
1919
1920        // remove records that are expired.
1921        let mut record_predicate = |record: &DnsRecordBox| {
1922            if !record.get_record().is_expired(now) {
1923                return true;
1924            }
1925
1926            debug!("record is expired, removing it from cache.");
1927            if self.cache.remove(record) {
1928                // for PTR records, send event to listeners
1929                if let Some(dns_ptr) = record.any().downcast_ref::<DnsPointer>() {
1930                    call_service_listener(
1931                        &self.service_queriers,
1932                        dns_ptr.get_name(),
1933                        ServiceEvent::ServiceRemoved(
1934                            dns_ptr.get_name().to_string(),
1935                            dns_ptr.alias().to_string(),
1936                        ),
1937                    );
1938                }
1939            }
1940            false
1941        };
1942        msg.answers_mut().retain(&mut record_predicate);
1943        msg.authorities_mut().retain(&mut record_predicate);
1944        msg.additionals_mut().retain(&mut record_predicate);
1945
1946        // check possible conflicts and handle them.
1947        self.conflict_handler(&msg, intf);
1948
1949        // check if the message is for us.
1950        let mut is_for_us = true; // assume it is for us.
1951
1952        // If there are any PTR records in the answers, there should be
1953        // at least one PTR for us. Otherwise, the message is not for us.
1954        // If there are no PTR records at all, assume this message is for us.
1955        for answer in msg.answers() {
1956            if answer.get_type() == RRType::PTR {
1957                if self.service_queriers.contains_key(answer.get_name()) {
1958                    is_for_us = true;
1959                    break; // OK to break: at least one PTR for us.
1960                } else {
1961                    is_for_us = false;
1962                }
1963            } else if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
1964                // If there is a hostname querier for this address, then it is for us.
1965                let answer_lowercase = answer.get_name().to_lowercase();
1966                if self.hostname_resolvers.contains_key(&answer_lowercase) {
1967                    is_for_us = true;
1968                    break; // OK to break: at least one hostname for us.
1969                }
1970            }
1971        }
1972
1973        /// Represents a DNS record change that involves one service instance.
1974        struct InstanceChange {
1975            ty: RRType,   // The type of DNS record for the instance.
1976            name: String, // The name of the record.
1977        }
1978
1979        // Go through all answers to get the new and updated records.
1980        // For new PTR records, send out ServiceFound immediately. For others,
1981        // collect them into `changes`.
1982        //
1983        // Note: we don't try to identify the update instances based on
1984        // each record immediately as the answers are likely related to each
1985        // other.
1986        let mut changes = Vec::new();
1987        let mut timers = Vec::new();
1988        for record in msg.all_records() {
1989            match self
1990                .cache
1991                .add_or_update(intf, record, &mut timers, is_for_us)
1992            {
1993                Some((dns_record, true)) => {
1994                    timers.push(dns_record.get_record().get_expire_time());
1995                    timers.push(dns_record.get_record().get_refresh_time());
1996
1997                    let ty = dns_record.get_type();
1998                    let name = dns_record.get_name();
1999
2000                    // Only process PTR that does not expire soon (i.e. TTL > 1).
2001                    if ty == RRType::PTR && dns_record.get_record().get_ttl() > 1 {
2002                        if self.service_queriers.contains_key(name) {
2003                            timers.push(dns_record.get_record().get_refresh_time());
2004                        }
2005
2006                        // send ServiceFound
2007                        if let Some(dns_ptr) = dns_record.any().downcast_ref::<DnsPointer>() {
2008                            call_service_listener(
2009                                &self.service_queriers,
2010                                name,
2011                                ServiceEvent::ServiceFound(
2012                                    name.to_string(),
2013                                    dns_ptr.alias().to_string(),
2014                                ),
2015                            );
2016                            changes.push(InstanceChange {
2017                                ty,
2018                                name: dns_ptr.alias().to_string(),
2019                            });
2020                        }
2021                    } else {
2022                        changes.push(InstanceChange {
2023                            ty,
2024                            name: name.to_string(),
2025                        });
2026                    }
2027                }
2028                Some((dns_record, false)) => {
2029                    timers.push(dns_record.get_record().get_expire_time());
2030                    timers.push(dns_record.get_record().get_refresh_time());
2031                }
2032                _ => {}
2033            }
2034        }
2035
2036        // Add timers for the new records.
2037        for t in timers {
2038            self.add_timer(t);
2039        }
2040
2041        // Go through remaining changes to see if any hostname resolutions were found or updated.
2042        for change in changes
2043            .iter()
2044            .filter(|change| change.ty == RRType::A || change.ty == RRType::AAAA)
2045        {
2046            let addr_map = self.cache.get_addresses_for_host(&change.name);
2047            for (name, addresses) in addr_map {
2048                call_hostname_resolution_listener(
2049                    &self.hostname_resolvers,
2050                    &change.name,
2051                    HostnameResolutionEvent::AddressesFound(name, addresses),
2052                )
2053            }
2054        }
2055
2056        // Identify the instances that need to be "resolved".
2057        let mut updated_instances = HashSet::new();
2058        for update in changes {
2059            match update.ty {
2060                RRType::PTR | RRType::SRV | RRType::TXT => {
2061                    updated_instances.insert(update.name);
2062                }
2063                RRType::A | RRType::AAAA => {
2064                    let instances = self.cache.get_instances_on_host(&update.name);
2065                    updated_instances.extend(instances);
2066                }
2067                _ => {}
2068            }
2069        }
2070
2071        self.resolve_updated_instances(&updated_instances);
2072    }
2073
2074    fn conflict_handler(&mut self, msg: &DnsIncoming, intf: &Interface) {
2075        let Some(dns_registry) = self.dns_registry_map.get_mut(intf) else {
2076            return;
2077        };
2078
2079        for answer in msg.answers().iter() {
2080            let mut new_records = Vec::new();
2081
2082            let name = answer.get_name();
2083            let Some(probe) = dns_registry.probing.get_mut(name) else {
2084                continue;
2085            };
2086
2087            // check against possible multicast forwarding
2088            if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2089                if let Some(answer_addr) = answer.any().downcast_ref::<DnsAddress>() {
2090                    if !valid_ip_on_intf(&answer_addr.address(), intf) {
2091                        debug!(
2092                            "conflict handler: answer addr {:?} not in the subnet of {:?}",
2093                            answer_addr, intf
2094                        );
2095                        continue;
2096                    }
2097                }
2098
2099                // double check if any other address record matches rrdata,
2100                // as there could be multiple addresses for the same name.
2101                let any_match = probe.records.iter().any(|r| {
2102                    r.get_type() == answer.get_type()
2103                        && r.get_class() == answer.get_class()
2104                        && r.rrdata_match(answer.as_ref())
2105                });
2106                if any_match {
2107                    continue; // no conflict for this answer.
2108                }
2109            }
2110
2111            probe.records.retain(|record| {
2112                if record.get_type() == answer.get_type()
2113                    && record.get_class() == answer.get_class()
2114                    && !record.rrdata_match(answer.as_ref())
2115                {
2116                    debug!(
2117                        "found conflict name: '{name}' record: {}: {} PEER: {}",
2118                        record.get_type(),
2119                        record.rdata_print(),
2120                        answer.rdata_print()
2121                    );
2122
2123                    // create a new name for this record
2124                    // then remove the old record in probing.
2125                    let mut new_record = record.clone();
2126                    let new_name = match record.get_type() {
2127                        RRType::A => hostname_change(name),
2128                        RRType::AAAA => hostname_change(name),
2129                        _ => name_change(name),
2130                    };
2131                    new_record.get_record_mut().set_new_name(new_name);
2132                    new_records.push(new_record);
2133                    return false; // old record is dropped from the probe.
2134                }
2135
2136                true
2137            });
2138
2139            // ?????
2140            // if probe.records.is_empty() {
2141            //     dns_registry.probing.remove(name);
2142            // }
2143
2144            // Probing again with the new names.
2145            let create_time = current_time_millis() + fastrand::u64(0..250);
2146
2147            let waiting_services = probe.waiting_services.clone();
2148
2149            for record in new_records {
2150                if dns_registry.update_hostname(name, record.get_name(), create_time) {
2151                    self.timers.push(Reverse(create_time));
2152                }
2153
2154                // remember the name changes (note: `name` might not be the original, it could be already changed once.)
2155                dns_registry.name_changes.insert(
2156                    record.get_record().get_original_name().to_string(),
2157                    record.get_name().to_string(),
2158                );
2159
2160                let new_probe = match dns_registry.probing.get_mut(record.get_name()) {
2161                    Some(p) => p,
2162                    None => {
2163                        let new_probe = dns_registry
2164                            .probing
2165                            .entry(record.get_name().to_string())
2166                            .or_insert_with(|| {
2167                                debug!("conflict handler: new probe of {}", record.get_name());
2168                                Probe::new(create_time)
2169                            });
2170                        self.timers.push(Reverse(new_probe.next_send));
2171                        new_probe
2172                    }
2173                };
2174
2175                debug!(
2176                    "insert record with new name '{}' {} into probe",
2177                    record.get_name(),
2178                    record.get_type()
2179                );
2180                new_probe.insert_record(record);
2181
2182                new_probe.waiting_services.extend(waiting_services.clone());
2183            }
2184        }
2185    }
2186
2187    /// Resolve the updated (including new) instances.
2188    ///
2189    /// Note: it is possible that more than 1 PTR pointing to the same
2190    /// instance. For example, a regular service type PTR and a sub-type
2191    /// service type PTR can both point to the same service instance.
2192    /// This loop automatically handles the sub-type PTRs.
2193    fn resolve_updated_instances(&mut self, updated_instances: &HashSet<String>) {
2194        let mut resolved: HashSet<String> = HashSet::new();
2195        let mut unresolved: HashSet<String> = HashSet::new();
2196        let mut removed_instances = HashMap::new();
2197
2198        let now = current_time_millis();
2199
2200        for (ty_domain, records) in self.cache.all_ptr().iter() {
2201            if !self.service_queriers.contains_key(ty_domain) {
2202                // No need to resolve if not in our queries.
2203                continue;
2204            }
2205
2206            for record in records.iter().filter(|r| !r.expires_soon(now)) {
2207                if let Some(dns_ptr) = record.any().downcast_ref::<DnsPointer>() {
2208                    if updated_instances.contains(dns_ptr.alias()) {
2209                        if let Ok(info) =
2210                            self.create_service_info_from_cache(ty_domain, dns_ptr.alias())
2211                        {
2212                            if info.is_ready() {
2213                                debug!("call queriers to resolve {}", dns_ptr.alias());
2214                                resolved.insert(dns_ptr.alias().to_string());
2215                                call_service_listener(
2216                                    &self.service_queriers,
2217                                    ty_domain,
2218                                    ServiceEvent::ServiceResolved(info),
2219                                );
2220                            } else {
2221                                if self.resolved.remove(dns_ptr.alias()) {
2222                                    removed_instances
2223                                        .entry(ty_domain.to_string())
2224                                        .or_insert_with(HashSet::new)
2225                                        .insert(dns_ptr.alias().to_string());
2226                                }
2227                                unresolved.insert(dns_ptr.alias().to_string());
2228                            }
2229                        }
2230                    }
2231                }
2232            }
2233        }
2234
2235        for instance in resolved.drain() {
2236            self.pending_resolves.remove(&instance);
2237            self.resolved.insert(instance);
2238        }
2239
2240        for instance in unresolved.drain() {
2241            self.add_pending_resolve(instance);
2242        }
2243
2244        self.notify_service_removal(removed_instances);
2245    }
2246
2247    /// Handle incoming query packets, figure out whether and what to respond.
2248    fn handle_query(&mut self, msg: DnsIncoming, intf: &Interface) {
2249        let sock = match self.intf_socks.get(intf) {
2250            Some(sock) => sock,
2251            None => return,
2252        };
2253        let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
2254
2255        // Special meta-query "_services._dns-sd._udp.<Domain>".
2256        // See https://datatracker.ietf.org/doc/html/rfc6763#section-9
2257        const META_QUERY: &str = "_services._dns-sd._udp.local.";
2258
2259        let Some(dns_registry) = self.dns_registry_map.get_mut(intf) else {
2260            debug!("missing dns registry for intf {}", intf.ip());
2261            return;
2262        };
2263
2264        for question in msg.questions().iter() {
2265            trace!("query question: {:?}", &question);
2266
2267            let qtype = question.entry_type();
2268
2269            if qtype == RRType::PTR {
2270                for service in self.my_services.values() {
2271                    if service.get_status(intf) != ServiceStatus::Announced {
2272                        continue;
2273                    }
2274
2275                    if question.entry_name() == service.get_type()
2276                        || service
2277                            .get_subtype()
2278                            .as_ref()
2279                            .is_some_and(|v| v == question.entry_name())
2280                    {
2281                        add_answer_with_additionals(&mut out, &msg, service, intf, dns_registry);
2282                    } else if question.entry_name() == META_QUERY {
2283                        let ptr_added = out.add_answer(
2284                            &msg,
2285                            DnsPointer::new(
2286                                question.entry_name(),
2287                                RRType::PTR,
2288                                CLASS_IN,
2289                                service.get_other_ttl(),
2290                                service.get_type().to_string(),
2291                            ),
2292                        );
2293                        if !ptr_added {
2294                            trace!("answer was not added for meta-query {:?}", &question);
2295                        }
2296                    }
2297                }
2298            } else {
2299                // Simultaneous Probe Tiebreaking (RFC 6762 section 8.2)
2300                if qtype == RRType::ANY && msg.num_authorities() > 0 {
2301                    let probe_name = question.entry_name();
2302
2303                    if let Some(probe) = dns_registry.probing.get_mut(probe_name) {
2304                        let now = current_time_millis();
2305
2306                        // Only do tiebreaking if probe already started.
2307                        // This check also helps avoid redo tiebreaking if start time
2308                        // was postponed.
2309                        if probe.start_time < now {
2310                            let incoming_records: Vec<_> = msg
2311                                .authorities()
2312                                .iter()
2313                                .filter(|r| r.get_name() == probe_name)
2314                                .collect();
2315
2316                            probe.tiebreaking(&incoming_records, now, probe_name);
2317                        }
2318                    }
2319                }
2320
2321                if qtype == RRType::A || qtype == RRType::AAAA || qtype == RRType::ANY {
2322                    for service in self.my_services.values() {
2323                        if service.get_status(intf) != ServiceStatus::Announced {
2324                            continue;
2325                        }
2326
2327                        let service_hostname =
2328                            match dns_registry.name_changes.get(service.get_hostname()) {
2329                                Some(new_name) => new_name,
2330                                None => service.get_hostname(),
2331                            };
2332
2333                        if service_hostname.to_lowercase() == question.entry_name().to_lowercase() {
2334                            let intf_addrs = service.get_addrs_on_intf(intf);
2335                            if intf_addrs.is_empty()
2336                                && (qtype == RRType::A || qtype == RRType::AAAA)
2337                            {
2338                                let t = match qtype {
2339                                    RRType::A => "TYPE_A",
2340                                    RRType::AAAA => "TYPE_AAAA",
2341                                    _ => "invalid_type",
2342                                };
2343                                trace!(
2344                                    "Cannot find valid addrs for {} response on intf {:?}",
2345                                    t,
2346                                    &intf
2347                                );
2348                                return;
2349                            }
2350                            for address in intf_addrs {
2351                                out.add_answer(
2352                                    &msg,
2353                                    DnsAddress::new(
2354                                        service_hostname,
2355                                        ip_address_rr_type(&address),
2356                                        CLASS_IN | CLASS_CACHE_FLUSH,
2357                                        service.get_host_ttl(),
2358                                        address,
2359                                        intf.into(),
2360                                    ),
2361                                );
2362                            }
2363                        }
2364                    }
2365                }
2366
2367                let query_name = question.entry_name().to_lowercase();
2368                let service_opt = self
2369                    .my_services
2370                    .iter()
2371                    .find(|(k, _v)| {
2372                        let service_name = match dns_registry.name_changes.get(k.as_str()) {
2373                            Some(new_name) => new_name,
2374                            None => k,
2375                        };
2376                        service_name == &query_name
2377                    })
2378                    .map(|(_, v)| v);
2379
2380                let Some(service) = service_opt else {
2381                    continue;
2382                };
2383
2384                if service.get_status(intf) != ServiceStatus::Announced {
2385                    continue;
2386                }
2387
2388                add_answer_of_service(&mut out, &msg, question.entry_name(), service, qtype, intf);
2389            }
2390        }
2391
2392        if !out.answers_count() > 0 {
2393            out.set_id(msg.id());
2394            send_dns_outgoing(&out, intf, sock);
2395
2396            self.increase_counter(Counter::Respond, 1);
2397            self.notify_monitors(DaemonEvent::Respond(intf.ip()));
2398        }
2399
2400        self.increase_counter(Counter::KnownAnswerSuppression, out.known_answer_count());
2401    }
2402
2403    /// Increases the value of `counter` by `count`.
2404    fn increase_counter(&mut self, counter: Counter, count: i64) {
2405        let key = counter.to_string();
2406        match self.counters.get_mut(&key) {
2407            Some(v) => *v += count,
2408            None => {
2409                self.counters.insert(key, count);
2410            }
2411        }
2412    }
2413
2414    /// Sets the value of `counter` to `count`.
2415    fn set_counter(&mut self, counter: Counter, count: i64) {
2416        let key = counter.to_string();
2417        self.counters.insert(key, count);
2418    }
2419
2420    fn signal_sock_drain(&self) {
2421        let mut signal_buf = [0; 1024];
2422
2423        // This recv is non-blocking as the socket is non-blocking.
2424        while let Ok(sz) = self.signal_sock.recv(&mut signal_buf) {
2425            trace!(
2426                "signal socket recvd: {}",
2427                String::from_utf8_lossy(&signal_buf[0..sz])
2428            );
2429        }
2430    }
2431
2432    fn add_retransmission(&mut self, next_time: u64, command: Command) {
2433        self.retransmissions.push(ReRun { next_time, command });
2434        self.add_timer(next_time);
2435    }
2436
2437    /// Sends service removal event to listeners for expired service records.
2438    fn notify_service_removal(&self, expired: HashMap<String, HashSet<String>>) {
2439        for (ty_domain, sender) in self.service_queriers.iter() {
2440            if let Some(instances) = expired.get(ty_domain) {
2441                for instance_name in instances {
2442                    let event = ServiceEvent::ServiceRemoved(
2443                        ty_domain.to_string(),
2444                        instance_name.to_string(),
2445                    );
2446                    match sender.send(event) {
2447                        Ok(()) => debug!("notify_service_removal: sent ServiceRemoved to listener of {ty_domain}: {instance_name}"),
2448                        Err(e) => debug!("Failed to send event: {}", e),
2449                    }
2450                }
2451            }
2452        }
2453    }
2454
2455    /// The entry point that executes all commands received by the daemon.
2456    ///
2457    /// `repeating`: whether this is a retransmission.
2458    fn exec_command(&mut self, command: Command, repeating: bool) {
2459        match command {
2460            Command::Browse(ty, next_delay, listener) => {
2461                self.exec_command_browse(repeating, ty, next_delay, listener);
2462            }
2463
2464            Command::ResolveHostname(hostname, next_delay, listener, timeout) => {
2465                self.exec_command_resolve_hostname(
2466                    repeating, hostname, next_delay, listener, timeout,
2467                );
2468            }
2469
2470            Command::Register(service_info) => {
2471                self.register_service(service_info);
2472                self.increase_counter(Counter::Register, 1);
2473            }
2474
2475            Command::RegisterResend(fullname, intf) => {
2476                trace!("register-resend service: {fullname} on {:?}", &intf.addr);
2477                self.exec_command_register_resend(fullname, intf);
2478            }
2479
2480            Command::Unregister(fullname, resp_s) => {
2481                trace!("unregister service {} repeat {}", &fullname, &repeating);
2482                self.exec_command_unregister(repeating, fullname, resp_s);
2483            }
2484
2485            Command::UnregisterResend(packet, ip) => {
2486                self.exec_command_unregister_resend(packet, ip);
2487            }
2488
2489            Command::StopBrowse(ty_domain) => self.exec_command_stop_browse(ty_domain),
2490
2491            Command::StopResolveHostname(hostname) => {
2492                self.exec_command_stop_resolve_hostname(hostname.to_lowercase())
2493            }
2494
2495            Command::Resolve(instance, try_count) => self.exec_command_resolve(instance, try_count),
2496
2497            Command::GetMetrics(resp_s) => self.exec_command_get_metrics(resp_s),
2498
2499            Command::GetStatus(resp_s) => match resp_s.send(self.status.clone()) {
2500                Ok(()) => trace!("Sent status to the client"),
2501                Err(e) => debug!("Failed to send status: {}", e),
2502            },
2503
2504            Command::Monitor(resp_s) => {
2505                self.monitors.push(resp_s);
2506            }
2507
2508            Command::SetOption(daemon_opt) => {
2509                self.process_set_option(daemon_opt);
2510            }
2511
2512            Command::GetOption(resp_s) => {
2513                let val = DaemonOptionVal {
2514                    _service_name_len_max: self.service_name_len_max,
2515                    ip_check_interval: self.ip_check_interval,
2516                };
2517                if let Err(e) = resp_s.send(val) {
2518                    debug!("Failed to send options: {}", e);
2519                }
2520            }
2521
2522            Command::Verify(instance_fullname, timeout) => {
2523                self.exec_command_verify(instance_fullname, timeout, repeating);
2524            }
2525
2526            _ => {
2527                debug!("unexpected command: {:?}", &command);
2528            }
2529        }
2530    }
2531
2532    fn exec_command_get_metrics(&mut self, resp_s: Sender<HashMap<String, i64>>) {
2533        self.set_counter(Counter::CachedPTR, self.cache.ptr_count() as i64);
2534        self.set_counter(Counter::CachedSRV, self.cache.srv_count() as i64);
2535        self.set_counter(Counter::CachedAddr, self.cache.addr_count() as i64);
2536        self.set_counter(Counter::CachedTxt, self.cache.txt_count() as i64);
2537        self.set_counter(Counter::CachedNSec, self.cache.nsec_count() as i64);
2538        self.set_counter(Counter::CachedSubtype, self.cache.subtype_count() as i64);
2539        self.set_counter(Counter::Timer, self.timers.len() as i64);
2540
2541        let dns_registry_probe_count: usize = self
2542            .dns_registry_map
2543            .values()
2544            .map(|r| r.probing.len())
2545            .sum();
2546        self.set_counter(Counter::DnsRegistryProbe, dns_registry_probe_count as i64);
2547
2548        let dns_registry_active_count: usize = self
2549            .dns_registry_map
2550            .values()
2551            .map(|r| r.active.values().map(|a| a.len()).sum::<usize>())
2552            .sum();
2553        self.set_counter(Counter::DnsRegistryActive, dns_registry_active_count as i64);
2554
2555        let dns_registry_timer_count: usize = self
2556            .dns_registry_map
2557            .values()
2558            .map(|r| r.new_timers.len())
2559            .sum();
2560        self.set_counter(Counter::DnsRegistryTimer, dns_registry_timer_count as i64);
2561
2562        let dns_registry_name_change_count: usize = self
2563            .dns_registry_map
2564            .values()
2565            .map(|r| r.name_changes.len())
2566            .sum();
2567        self.set_counter(
2568            Counter::DnsRegistryNameChange,
2569            dns_registry_name_change_count as i64,
2570        );
2571
2572        // Send the metrics to the client.
2573        if let Err(e) = resp_s.send(self.counters.clone()) {
2574            debug!("Failed to send metrics: {}", e);
2575        }
2576    }
2577
2578    fn exec_command_browse(
2579        &mut self,
2580        repeating: bool,
2581        ty: String,
2582        next_delay: u32,
2583        listener: Sender<ServiceEvent>,
2584    ) {
2585        let pretty_addrs: Vec<String> = self
2586            .intf_socks
2587            .keys()
2588            .map(|itf| format!("{} ({})", itf.ip(), itf.name))
2589            .collect();
2590
2591        if let Err(e) = listener.send(ServiceEvent::SearchStarted(format!(
2592            "{ty} on {} interfaces [{}]",
2593            pretty_addrs.len(),
2594            pretty_addrs.join(", ")
2595        ))) {
2596            debug!(
2597                "Failed to send SearchStarted({})(repeating:{}): {}",
2598                &ty, repeating, e
2599            );
2600            return;
2601        }
2602
2603        let now = current_time_millis();
2604        if !repeating {
2605            // Binds a `listener` to querying mDNS domain type `ty`.
2606            //
2607            // If there is already a `listener`, it will be updated, i.e. overwritten.
2608            self.service_queriers.insert(ty.clone(), listener.clone());
2609
2610            // if we already have the records in our cache, just send them
2611            self.query_cache_for_service(&ty, &listener, now);
2612        }
2613
2614        self.send_query(&ty, RRType::PTR);
2615        self.increase_counter(Counter::Browse, 1);
2616
2617        let next_time = now + (next_delay * 1000) as u64;
2618        let max_delay = 60 * 60;
2619        let delay = cmp::min(next_delay * 2, max_delay);
2620        self.add_retransmission(next_time, Command::Browse(ty, delay, listener));
2621    }
2622
2623    fn exec_command_resolve_hostname(
2624        &mut self,
2625        repeating: bool,
2626        hostname: String,
2627        next_delay: u32,
2628        listener: Sender<HostnameResolutionEvent>,
2629        timeout: Option<u64>,
2630    ) {
2631        let addr_list: Vec<_> = self.intf_socks.keys().collect();
2632        if let Err(e) = listener.send(HostnameResolutionEvent::SearchStarted(format!(
2633            "{} on addrs {:?}",
2634            &hostname, &addr_list
2635        ))) {
2636            debug!(
2637                "Failed to send ResolveStarted({})(repeating:{}): {}",
2638                &hostname, repeating, e
2639            );
2640            return;
2641        }
2642        if !repeating {
2643            self.add_hostname_resolver(hostname.to_owned(), listener.clone(), timeout);
2644            // if we already have the records in our cache, just send them
2645            self.query_cache_for_hostname(&hostname, listener.clone());
2646        }
2647
2648        self.send_query_vec(&[(&hostname, RRType::A), (&hostname, RRType::AAAA)]);
2649        self.increase_counter(Counter::ResolveHostname, 1);
2650
2651        let now = current_time_millis();
2652        let next_time = now + u64::from(next_delay) * 1000;
2653        let max_delay = 60 * 60;
2654        let delay = cmp::min(next_delay * 2, max_delay);
2655
2656        // Only add retransmission if it does not exceed the hostname resolver timeout, if any.
2657        if self
2658            .hostname_resolvers
2659            .get(&hostname)
2660            .and_then(|(_sender, timeout)| *timeout)
2661            .map(|timeout| next_time < timeout)
2662            .unwrap_or(true)
2663        {
2664            self.add_retransmission(
2665                next_time,
2666                Command::ResolveHostname(hostname, delay, listener, None),
2667            );
2668        }
2669    }
2670
2671    fn exec_command_resolve(&mut self, instance: String, try_count: u16) {
2672        let pending_query = self.query_unresolved(&instance);
2673        let max_try = 3;
2674        if pending_query && try_count < max_try {
2675            // Note that if the current try already succeeds, the next retransmission
2676            // will be no-op as the cache has been updated.
2677            let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
2678            self.add_retransmission(next_time, Command::Resolve(instance, try_count + 1));
2679        }
2680    }
2681
2682    fn exec_command_unregister(
2683        &mut self,
2684        repeating: bool,
2685        fullname: String,
2686        resp_s: Sender<UnregisterStatus>,
2687    ) {
2688        let response = match self.my_services.remove_entry(&fullname) {
2689            None => {
2690                debug!("unregister: cannot find such service {}", &fullname);
2691                UnregisterStatus::NotFound
2692            }
2693            Some((_k, info)) => {
2694                let mut timers = Vec::new();
2695                // Send one unregister per interface and ip version
2696                let mut multicast_sent_trackers = HashSet::new();
2697
2698                for (intf, sock) in self.intf_socks.iter() {
2699                    if let Some(tracker) = multicast_send_tracker(intf) {
2700                        if multicast_sent_trackers.contains(&tracker) {
2701                            continue; // no need to send unregister the same interface with same ip version.
2702                        }
2703                        multicast_sent_trackers.insert(tracker);
2704                    }
2705                    let packet = self.unregister_service(&info, intf, sock);
2706                    // repeat for one time just in case some peers miss the message
2707                    if !repeating && !packet.is_empty() {
2708                        let next_time = current_time_millis() + 120;
2709                        self.retransmissions.push(ReRun {
2710                            next_time,
2711                            command: Command::UnregisterResend(packet, intf.clone()),
2712                        });
2713                        timers.push(next_time);
2714                    }
2715                }
2716
2717                for t in timers {
2718                    self.add_timer(t);
2719                }
2720
2721                self.increase_counter(Counter::Unregister, 1);
2722                UnregisterStatus::OK
2723            }
2724        };
2725        if let Err(e) = resp_s.send(response) {
2726            debug!("unregister: failed to send response: {}", e);
2727        }
2728    }
2729
2730    fn exec_command_unregister_resend(&mut self, packet: Vec<u8>, intf: Interface) {
2731        if let Some(sock) = self.intf_socks.get(&intf) {
2732            debug!("UnregisterResend from {}", &intf.ip());
2733            multicast_on_intf(&packet[..], &intf, sock);
2734            self.increase_counter(Counter::UnregisterResend, 1);
2735        }
2736    }
2737
2738    fn exec_command_stop_browse(&mut self, ty_domain: String) {
2739        match self.service_queriers.remove_entry(&ty_domain) {
2740            None => debug!("StopBrowse: cannot find querier for {}", &ty_domain),
2741            Some((ty, sender)) => {
2742                // Remove pending browse commands in the reruns.
2743                trace!("StopBrowse: removed queryer for {}", &ty);
2744                let mut i = 0;
2745                while i < self.retransmissions.len() {
2746                    if let Command::Browse(t, _, _) = &self.retransmissions[i].command {
2747                        if t == &ty {
2748                            self.retransmissions.remove(i);
2749                            trace!("StopBrowse: removed retransmission for {}", &ty);
2750                            continue;
2751                        }
2752                    }
2753                    i += 1;
2754                }
2755
2756                // Remove cache entries.
2757                self.cache.remove_service_type(&ty_domain);
2758
2759                // Notify the client.
2760                match sender.send(ServiceEvent::SearchStopped(ty_domain)) {
2761                    Ok(()) => trace!("Sent SearchStopped to the listener"),
2762                    Err(e) => debug!("Failed to send SearchStopped: {}", e),
2763                }
2764            }
2765        }
2766    }
2767
2768    fn exec_command_stop_resolve_hostname(&mut self, hostname: String) {
2769        if let Some((host, (sender, _timeout))) = self.hostname_resolvers.remove_entry(&hostname) {
2770            // Remove pending resolve commands in the reruns.
2771            trace!("StopResolve: removed queryer for {}", &host);
2772            let mut i = 0;
2773            while i < self.retransmissions.len() {
2774                if let Command::Resolve(t, _) = &self.retransmissions[i].command {
2775                    if t == &host {
2776                        self.retransmissions.remove(i);
2777                        trace!("StopResolve: removed retransmission for {}", &host);
2778                        continue;
2779                    }
2780                }
2781                i += 1;
2782            }
2783
2784            // Notify the client.
2785            match sender.send(HostnameResolutionEvent::SearchStopped(hostname)) {
2786                Ok(()) => trace!("Sent SearchStopped to the listener"),
2787                Err(e) => debug!("Failed to send SearchStopped: {}", e),
2788            }
2789        }
2790    }
2791
2792    fn exec_command_register_resend(&mut self, fullname: String, intf: Interface) {
2793        let Some(info) = self.my_services.get_mut(&fullname) else {
2794            trace!("announce: cannot find such service {}", &fullname);
2795            return;
2796        };
2797
2798        let Some(dns_registry) = self.dns_registry_map.get_mut(&intf) else {
2799            return;
2800        };
2801
2802        let Some(sock) = self.intf_socks.get(&intf) else {
2803            return;
2804        };
2805
2806        if announce_service_on_intf(dns_registry, info, &intf, sock) {
2807            let mut hostname = info.get_hostname();
2808            if let Some(new_name) = dns_registry.name_changes.get(hostname) {
2809                hostname = new_name;
2810            }
2811            let service_name = match dns_registry.name_changes.get(&fullname) {
2812                Some(new_name) => new_name.to_string(),
2813                None => fullname,
2814            };
2815
2816            debug!("resend: announce service {} on {}", service_name, intf.ip());
2817
2818            notify_monitors(
2819                &mut self.monitors,
2820                DaemonEvent::Announce(service_name, format!("{}:{}", hostname, &intf.ip())),
2821            );
2822            info.set_status(&intf, ServiceStatus::Announced);
2823        } else {
2824            debug!("register-resend should not fail");
2825        }
2826
2827        self.increase_counter(Counter::RegisterResend, 1);
2828    }
2829
2830    fn exec_command_verify(&mut self, instance: String, timeout: Duration, repeating: bool) {
2831        /*
2832        RFC 6762 section 10.4:
2833        ...
2834        When the cache receives this hint that it should reconfirm some
2835        record, it MUST issue two or more queries for the resource record in
2836        dispute.  If no response is received within ten seconds, then, even
2837        though its TTL may indicate that it is not yet due to expire, that
2838        record SHOULD be promptly flushed from the cache.
2839        */
2840        let now = current_time_millis();
2841        let expire_at = if repeating {
2842            None
2843        } else {
2844            Some(now + timeout.as_millis() as u64)
2845        };
2846
2847        // send query for the resource records.
2848        let record_vec = self.cache.service_verify_queries(&instance, expire_at);
2849
2850        if !record_vec.is_empty() {
2851            let query_vec: Vec<(&str, RRType)> = record_vec
2852                .iter()
2853                .map(|(record, rr_type)| (record.as_str(), *rr_type))
2854                .collect();
2855            self.send_query_vec(&query_vec);
2856
2857            if let Some(new_expire) = expire_at {
2858                self.add_timer(new_expire); // ensure a check for the new expire time.
2859
2860                // schedule a resend 1 second later
2861                self.add_retransmission(now + 1000, Command::Verify(instance, timeout));
2862            }
2863        }
2864    }
2865
2866    /// Refresh cached service records with active queriers
2867    fn refresh_active_services(&mut self) {
2868        let mut query_ptr_count = 0;
2869        let mut query_srv_count = 0;
2870        let mut new_timers = HashSet::new();
2871        let mut query_addr_count = 0;
2872
2873        for (ty_domain, _sender) in self.service_queriers.iter() {
2874            let refreshed_timers = self.cache.refresh_due_ptr(ty_domain);
2875            if !refreshed_timers.is_empty() {
2876                trace!("sending refresh query for PTR: {}", ty_domain);
2877                self.send_query(ty_domain, RRType::PTR);
2878                query_ptr_count += 1;
2879                new_timers.extend(refreshed_timers);
2880            }
2881
2882            let (instances, timers) = self.cache.refresh_due_srv_txt(ty_domain);
2883            for (instance, types) in instances {
2884                trace!("sending refresh query for: {}", &instance);
2885                let query_vec = types
2886                    .into_iter()
2887                    .map(|ty| (instance.as_str(), ty))
2888                    .collect::<Vec<_>>();
2889                self.send_query_vec(&query_vec);
2890                query_srv_count += 1;
2891            }
2892            new_timers.extend(timers);
2893            let (hostnames, timers) = self.cache.refresh_due_hosts(ty_domain);
2894            for hostname in hostnames.iter() {
2895                trace!("sending refresh queries for A and AAAA:  {}", hostname);
2896                self.send_query_vec(&[(hostname, RRType::A), (hostname, RRType::AAAA)]);
2897                query_addr_count += 2;
2898            }
2899            new_timers.extend(timers);
2900        }
2901
2902        for timer in new_timers {
2903            self.add_timer(timer);
2904        }
2905
2906        self.increase_counter(Counter::CacheRefreshPTR, query_ptr_count);
2907        self.increase_counter(Counter::CacheRefreshSrvTxt, query_srv_count);
2908        self.increase_counter(Counter::CacheRefreshAddr, query_addr_count);
2909    }
2910}
2911
2912/// Adds one or more answers of a service for incoming msg and RR entry name.
2913fn add_answer_of_service(
2914    out: &mut DnsOutgoing,
2915    msg: &DnsIncoming,
2916    entry_name: &str,
2917    service: &ServiceInfo,
2918    qtype: RRType,
2919    intf: &Interface,
2920) {
2921    if qtype == RRType::SRV || qtype == RRType::ANY {
2922        out.add_answer(
2923            msg,
2924            DnsSrv::new(
2925                entry_name,
2926                CLASS_IN | CLASS_CACHE_FLUSH,
2927                service.get_host_ttl(),
2928                service.get_priority(),
2929                service.get_weight(),
2930                service.get_port(),
2931                service.get_hostname().to_string(),
2932            ),
2933        );
2934    }
2935
2936    if qtype == RRType::TXT || qtype == RRType::ANY {
2937        out.add_answer(
2938            msg,
2939            DnsTxt::new(
2940                entry_name,
2941                CLASS_IN | CLASS_CACHE_FLUSH,
2942                service.get_other_ttl(),
2943                service.generate_txt(),
2944            ),
2945        );
2946    }
2947
2948    if qtype == RRType::SRV {
2949        let intf_addrs = service.get_addrs_on_intf(intf);
2950        if intf_addrs.is_empty() {
2951            debug!(
2952                "Cannot find valid addrs for TYPE_SRV response on intf {:?}",
2953                &intf
2954            );
2955            return;
2956        }
2957        for address in intf_addrs {
2958            out.add_additional_answer(DnsAddress::new(
2959                service.get_hostname(),
2960                ip_address_rr_type(&address),
2961                CLASS_IN | CLASS_CACHE_FLUSH,
2962                service.get_host_ttl(),
2963                address,
2964                intf.into(),
2965            ));
2966        }
2967    }
2968}
2969
2970/// All possible events sent to the client from the daemon
2971/// regarding service discovery.
2972#[derive(Clone, Debug)]
2973pub enum ServiceEvent {
2974    /// Started searching for a service type.
2975    SearchStarted(String),
2976
2977    /// Found a specific (service_type, fullname).
2978    ServiceFound(String, String),
2979
2980    /// Resolved a service instance with detailed info.
2981    ServiceResolved(ServiceInfo),
2982
2983    /// A service instance (service_type, fullname) was removed.
2984    ServiceRemoved(String, String),
2985
2986    /// Stopped searching for a service type.
2987    SearchStopped(String),
2988}
2989
2990/// All possible events sent to the client from the daemon
2991/// regarding host resolution.
2992#[derive(Clone, Debug)]
2993#[non_exhaustive]
2994pub enum HostnameResolutionEvent {
2995    /// Started searching for the ip address of a hostname.
2996    SearchStarted(String),
2997    /// One or more addresses for a hostname has been found.
2998    AddressesFound(String, HashSet<IpAddr>),
2999    /// One or more addresses for a hostname has been removed.
3000    AddressesRemoved(String, HashSet<IpAddr>),
3001    /// The search for the ip address of a hostname has timed out.
3002    SearchTimeout(String),
3003    /// Stopped searching for the ip address of a hostname.
3004    SearchStopped(String),
3005}
3006
3007/// Some notable events from the daemon besides [`ServiceEvent`].
3008/// These events are expected to happen infrequently.
3009#[derive(Clone, Debug)]
3010#[non_exhaustive]
3011pub enum DaemonEvent {
3012    /// Daemon unsolicitly announced a service from an interface.
3013    Announce(String, String),
3014
3015    /// Daemon encountered an error.
3016    Error(Error),
3017
3018    /// Daemon detected a new IP address from the host.
3019    IpAdd(IpAddr),
3020
3021    /// Daemon detected a IP address removed from the host.
3022    IpDel(IpAddr),
3023
3024    /// Daemon resolved a name conflict by changing one of its names.
3025    /// see [DnsNameChange] for more details.
3026    NameChange(DnsNameChange),
3027
3028    /// Send out a multicast response via an IP address.
3029    Respond(IpAddr),
3030}
3031
3032/// Represents a name change due to a name conflict resolution.
3033/// See [RFC 6762 section 9](https://datatracker.ietf.org/doc/html/rfc6762#section-9)
3034#[derive(Clone, Debug)]
3035pub struct DnsNameChange {
3036    /// The original name set in `ServiceInfo` by the user.
3037    pub original: String,
3038
3039    /// A new name is created by appending a suffix after the original name.
3040    ///
3041    /// - for a service instance name, the suffix is `(N)`, where N starts at 2.
3042    /// - for a host name, the suffix is `-N`, where N starts at 2.
3043    ///
3044    /// For example:
3045    ///
3046    /// - Service name `foo._service-type._udp` becomes `foo (2)._service-type._udp`
3047    /// - Host name `foo.local.` becomes `foo-2.local.`
3048    pub new_name: String,
3049
3050    /// The resource record type
3051    pub rr_type: RRType,
3052
3053    /// The interface where the name conflict and its change happened.
3054    pub intf_name: String,
3055}
3056
3057/// Commands supported by the daemon
3058#[derive(Debug)]
3059enum Command {
3060    /// Browsing for a service type (ty_domain, next_time_delay_in_seconds, channel::sender)
3061    Browse(String, u32, Sender<ServiceEvent>),
3062
3063    /// Resolve a hostname to IP addresses.
3064    ResolveHostname(String, u32, Sender<HostnameResolutionEvent>, Option<u64>), // (hostname, next_time_delay_in_seconds, sender, timeout_in_milliseconds)
3065
3066    /// Register a service
3067    Register(ServiceInfo),
3068
3069    /// Unregister a service
3070    Unregister(String, Sender<UnregisterStatus>), // (fullname)
3071
3072    /// Announce again a service to local network
3073    RegisterResend(String, Interface), // (fullname)
3074
3075    /// Resend unregister packet.
3076    UnregisterResend(Vec<u8>, Interface), // (packet content)
3077
3078    /// Stop browsing a service type
3079    StopBrowse(String), // (ty_domain)
3080
3081    /// Stop resolving a hostname
3082    StopResolveHostname(String), // (hostname)
3083
3084    /// Send query to resolve a service instance.
3085    /// This is used when a PTR record exists but SRV & TXT records are missing.
3086    Resolve(String, u16), // (service_instance_fullname, try_count)
3087
3088    /// Read the current values of the counters
3089    GetMetrics(Sender<Metrics>),
3090
3091    /// Get the current status of the daemon.
3092    GetStatus(Sender<DaemonStatus>),
3093
3094    /// Monitor noticeable events in the daemon.
3095    Monitor(Sender<DaemonEvent>),
3096
3097    SetOption(DaemonOption),
3098
3099    GetOption(Sender<DaemonOptionVal>),
3100
3101    /// Proactively confirm a DNS resource record.
3102    ///
3103    /// The intention is to check if a service name or IP address still valid
3104    /// before its TTL expires.
3105    Verify(String, Duration),
3106
3107    Exit(Sender<DaemonStatus>),
3108}
3109
3110impl fmt::Display for Command {
3111    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3112        match self {
3113            Self::Browse(_, _, _) => write!(f, "Command Browse"),
3114            Self::ResolveHostname(_, _, _, _) => write!(f, "Command ResolveHostname"),
3115            Self::Exit(_) => write!(f, "Command Exit"),
3116            Self::GetStatus(_) => write!(f, "Command GetStatus"),
3117            Self::GetMetrics(_) => write!(f, "Command GetMetrics"),
3118            Self::Monitor(_) => write!(f, "Command Monitor"),
3119            Self::Register(_) => write!(f, "Command Register"),
3120            Self::RegisterResend(_, _) => write!(f, "Command RegisterResend"),
3121            Self::SetOption(_) => write!(f, "Command SetOption"),
3122            Self::GetOption(_) => write!(f, "Command GetOption"),
3123            Self::StopBrowse(_) => write!(f, "Command StopBrowse"),
3124            Self::StopResolveHostname(_) => write!(f, "Command StopResolveHostname"),
3125            Self::Unregister(_, _) => write!(f, "Command Unregister"),
3126            Self::UnregisterResend(_, _) => write!(f, "Command UnregisterResend"),
3127            Self::Resolve(_, _) => write!(f, "Command Resolve"),
3128            Self::Verify(_, _) => write!(f, "Command VerifyResource"),
3129        }
3130    }
3131}
3132
3133struct DaemonOptionVal {
3134    _service_name_len_max: u8,
3135    ip_check_interval: u64,
3136}
3137
3138#[derive(Debug)]
3139enum DaemonOption {
3140    ServiceNameLenMax(u8),
3141    IpCheckInterval(u64),
3142    EnableInterface(Vec<IfKind>),
3143    DisableInterface(Vec<IfKind>),
3144    MulticastLoopV4(bool),
3145    MulticastLoopV6(bool),
3146}
3147
3148/// The length of Service Domain name supported in this lib.
3149const DOMAIN_LEN: usize = "._tcp.local.".len();
3150
3151/// Validate the length of "service_name" in a "_<service_name>.<domain_name>." string.
3152fn check_service_name_length(ty_domain: &str, limit: u8) -> Result<()> {
3153    if ty_domain.len() <= DOMAIN_LEN + 1 {
3154        // service name cannot be empty or only '_'.
3155        return Err(e_fmt!("Service type name cannot be empty: {}", ty_domain));
3156    }
3157
3158    let service_name_len = ty_domain.len() - DOMAIN_LEN - 1; // exclude the leading `_`
3159    if service_name_len > limit as usize {
3160        return Err(e_fmt!("Service name length must be <= {} bytes", limit));
3161    }
3162    Ok(())
3163}
3164
3165/// Checks if `name` ends with a valid domain: '._tcp.local.' or '._udp.local.'
3166fn check_domain_suffix(name: &str) -> Result<()> {
3167    if !(name.ends_with("._tcp.local.") || name.ends_with("._udp.local.")) {
3168        return Err(e_fmt!(
3169            "mDNS service {} must end with '._tcp.local.' or '._udp.local.'",
3170            name
3171        ));
3172    }
3173
3174    Ok(())
3175}
3176
3177/// Validate the service name in a fully qualified name.
3178///
3179/// A Full Name = <Instance>.<Service>.<Domain>
3180/// The only `<Domain>` supported are "._tcp.local." and "._udp.local.".
3181///
3182/// Note: this function does not check for the length of the service name.
3183/// Instead, `register_service` method will check the length.
3184fn check_service_name(fullname: &str) -> Result<()> {
3185    check_domain_suffix(fullname)?;
3186
3187    let remaining: Vec<&str> = fullname[..fullname.len() - DOMAIN_LEN].split('.').collect();
3188    let name = remaining.last().ok_or_else(|| e_fmt!("No service name"))?;
3189
3190    if &name[0..1] != "_" {
3191        return Err(e_fmt!("Service name must start with '_'"));
3192    }
3193
3194    let name = &name[1..];
3195
3196    if name.contains("--") {
3197        return Err(e_fmt!("Service name must not contain '--'"));
3198    }
3199
3200    if name.starts_with('-') || name.ends_with('-') {
3201        return Err(e_fmt!("Service name (%s) may not start or end with '-'"));
3202    }
3203
3204    let ascii_count = name.chars().filter(|c| c.is_ascii_alphabetic()).count();
3205    if ascii_count < 1 {
3206        return Err(e_fmt!(
3207            "Service name must contain at least one letter (eg: 'A-Za-z')"
3208        ));
3209    }
3210
3211    Ok(())
3212}
3213
3214/// Validate a hostname.
3215fn check_hostname(hostname: &str) -> Result<()> {
3216    if !hostname.ends_with(".local.") {
3217        return Err(e_fmt!("Hostname must end with '.local.': {hostname}"));
3218    }
3219
3220    if hostname == ".local." {
3221        return Err(e_fmt!(
3222            "The part of the hostname before '.local.' cannot be empty"
3223        ));
3224    }
3225
3226    if hostname.len() > 255 {
3227        return Err(e_fmt!("Hostname length must be <= 255 bytes"));
3228    }
3229
3230    Ok(())
3231}
3232
3233fn call_service_listener(
3234    listeners_map: &HashMap<String, Sender<ServiceEvent>>,
3235    ty_domain: &str,
3236    event: ServiceEvent,
3237) {
3238    if let Some(listener) = listeners_map.get(ty_domain) {
3239        match listener.send(event) {
3240            Ok(()) => trace!("Sent event to listener successfully"),
3241            Err(e) => debug!("Failed to send event: {}", e),
3242        }
3243    }
3244}
3245
3246fn call_hostname_resolution_listener(
3247    listeners_map: &HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>,
3248    hostname: &str,
3249    event: HostnameResolutionEvent,
3250) {
3251    let hostname_lower = hostname.to_lowercase();
3252    if let Some(listener) = listeners_map.get(&hostname_lower).map(|(l, _)| l) {
3253        match listener.send(event) {
3254            Ok(()) => trace!("Sent event to listener successfully"),
3255            Err(e) => debug!("Failed to send event: {}", e),
3256        }
3257    }
3258}
3259
3260/// Returns valid network interfaces in the host system.
3261/// Loopback interfaces are excluded.
3262fn my_ip_interfaces(with_loopback: bool) -> Vec<Interface> {
3263    if_addrs::get_if_addrs()
3264        .unwrap_or_default()
3265        .into_iter()
3266        .filter(|i| !i.is_loopback() || with_loopback)
3267        .collect()
3268}
3269
3270/// Send an outgoing mDNS query or response, and returns the packet bytes.
3271fn send_dns_outgoing(out: &DnsOutgoing, intf: &Interface, sock: &MioUdpSocket) -> Vec<Vec<u8>> {
3272    let qtype = if out.is_query() { "query" } else { "response" };
3273    trace!(
3274        "send outgoing {}: {} questions {} answers {} authorities {} additional",
3275        qtype,
3276        out.questions().len(),
3277        out.answers_count(),
3278        out.authorities().len(),
3279        out.additionals().len()
3280    );
3281    let packet_list = out.to_data_on_wire();
3282    for packet in packet_list.iter() {
3283        multicast_on_intf(packet, intf, sock);
3284    }
3285    packet_list
3286}
3287
3288/// Sends a multicast packet, and returns the packet bytes.
3289fn multicast_on_intf(packet: &[u8], intf: &Interface, socket: &MioUdpSocket) {
3290    if packet.len() > MAX_MSG_ABSOLUTE {
3291        debug!("Drop over-sized packet ({})", packet.len());
3292        return;
3293    }
3294
3295    let addr: SocketAddr = match intf.addr {
3296        if_addrs::IfAddr::V4(_) => SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into(),
3297        if_addrs::IfAddr::V6(_) => {
3298            let mut sock = SocketAddrV6::new(GROUP_ADDR_V6, MDNS_PORT, 0, 0);
3299            sock.set_scope_id(intf.index.unwrap_or(0)); // Choose iface for multicast
3300            sock.into()
3301        }
3302    };
3303
3304    send_packet(packet, addr, intf, socket);
3305}
3306
3307/// Sends out `packet` to `addr` on the socket in `intf_sock`.
3308fn send_packet(packet: &[u8], addr: SocketAddr, intf: &Interface, sock: &MioUdpSocket) {
3309    match sock.send_to(packet, addr) {
3310        Ok(sz) => trace!("sent out {} bytes on interface {:?}", sz, intf),
3311        Err(e) => trace!("Failed to send to {} via {:?}: {}", addr, &intf, e),
3312    }
3313}
3314
3315/// Returns true if `name` is a valid instance name of format:
3316/// <instance>.<service_type>.<_udp|_tcp>.local.
3317/// Note: <instance> could contain '.' as well.
3318fn valid_instance_name(name: &str) -> bool {
3319    name.split('.').count() >= 5
3320}
3321
3322fn notify_monitors(monitors: &mut Vec<Sender<DaemonEvent>>, event: DaemonEvent) {
3323    monitors.retain(|sender| {
3324        if let Err(e) = sender.try_send(event.clone()) {
3325            debug!("notify_monitors: try_send: {}", &e);
3326            if matches!(e, TrySendError::Disconnected(_)) {
3327                return false; // This monitor is dropped.
3328            }
3329        }
3330        true
3331    });
3332}
3333
3334/// Check if all unique records passed "probing", and if yes, create a packet
3335/// to announce the service.
3336fn prepare_announce(
3337    info: &ServiceInfo,
3338    intf: &Interface,
3339    dns_registry: &mut DnsRegistry,
3340) -> Option<DnsOutgoing> {
3341    let intf_addrs = info.get_addrs_on_intf(intf);
3342    if intf_addrs.is_empty() {
3343        trace!("No valid addrs to add on intf {:?}", &intf);
3344        return None;
3345    }
3346
3347    // check if we changed our name due to conflicts.
3348    let service_fullname = match dns_registry.name_changes.get(info.get_fullname()) {
3349        Some(new_name) => new_name,
3350        None => info.get_fullname(),
3351    };
3352
3353    debug!(
3354        "prepare to announce service {service_fullname} on {}: {}",
3355        &intf.name,
3356        &intf.ip()
3357    );
3358
3359    let mut probing_count = 0;
3360    let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
3361    let create_time = current_time_millis() + fastrand::u64(0..250);
3362
3363    out.add_answer_at_time(
3364        DnsPointer::new(
3365            info.get_type(),
3366            RRType::PTR,
3367            CLASS_IN,
3368            info.get_other_ttl(),
3369            service_fullname.to_string(),
3370        ),
3371        0,
3372    );
3373
3374    if let Some(sub) = info.get_subtype() {
3375        trace!("Adding subdomain {}", sub);
3376        out.add_answer_at_time(
3377            DnsPointer::new(
3378                sub,
3379                RRType::PTR,
3380                CLASS_IN,
3381                info.get_other_ttl(),
3382                service_fullname.to_string(),
3383            ),
3384            0,
3385        );
3386    }
3387
3388    // SRV records.
3389    let hostname = match dns_registry.name_changes.get(info.get_hostname()) {
3390        Some(new_name) => new_name.to_string(),
3391        None => info.get_hostname().to_string(),
3392    };
3393
3394    let mut srv = DnsSrv::new(
3395        info.get_fullname(),
3396        CLASS_IN | CLASS_CACHE_FLUSH,
3397        info.get_host_ttl(),
3398        info.get_priority(),
3399        info.get_weight(),
3400        info.get_port(),
3401        hostname,
3402    );
3403
3404    if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
3405        srv.get_record_mut().set_new_name(new_name.to_string());
3406    }
3407
3408    if !info.requires_probe()
3409        || dns_registry.is_probing_done(&srv, info.get_fullname(), create_time)
3410    {
3411        out.add_answer_at_time(srv, 0);
3412    } else {
3413        probing_count += 1;
3414    }
3415
3416    // TXT records.
3417
3418    let mut txt = DnsTxt::new(
3419        info.get_fullname(),
3420        CLASS_IN | CLASS_CACHE_FLUSH,
3421        info.get_other_ttl(),
3422        info.generate_txt(),
3423    );
3424
3425    if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
3426        txt.get_record_mut().set_new_name(new_name.to_string());
3427    }
3428
3429    if !info.requires_probe()
3430        || dns_registry.is_probing_done(&txt, info.get_fullname(), create_time)
3431    {
3432        out.add_answer_at_time(txt, 0);
3433    } else {
3434        probing_count += 1;
3435    }
3436
3437    // Address records. (A and AAAA)
3438
3439    let hostname = info.get_hostname();
3440    for address in intf_addrs {
3441        let mut dns_addr = DnsAddress::new(
3442            hostname,
3443            ip_address_rr_type(&address),
3444            CLASS_IN | CLASS_CACHE_FLUSH,
3445            info.get_host_ttl(),
3446            address,
3447            intf.into(),
3448        );
3449
3450        if let Some(new_name) = dns_registry.name_changes.get(hostname) {
3451            dns_addr.get_record_mut().set_new_name(new_name.to_string());
3452        }
3453
3454        if !info.requires_probe()
3455            || dns_registry.is_probing_done(&dns_addr, info.get_fullname(), create_time)
3456        {
3457            out.add_answer_at_time(dns_addr, 0);
3458        } else {
3459            probing_count += 1;
3460        }
3461    }
3462
3463    if probing_count > 0 {
3464        return None;
3465    }
3466
3467    Some(out)
3468}
3469
3470/// Send an unsolicited response for owned service via `intf` and `sock`.
3471/// Returns true if sent out successfully.
3472fn announce_service_on_intf(
3473    dns_registry: &mut DnsRegistry,
3474    info: &ServiceInfo,
3475    intf: &Interface,
3476    sock: &MioUdpSocket,
3477) -> bool {
3478    if let Some(out) = prepare_announce(info, intf, dns_registry) {
3479        send_dns_outgoing(&out, intf, sock);
3480        return true;
3481    }
3482    false
3483}
3484
3485/// Returns a new name based on the `original` to avoid conflicts.
3486/// If the name already contains a number in parentheses, increments that number.
3487///
3488/// Examples:
3489/// - `foo.local.` becomes `foo (2).local.`
3490/// - `foo (2).local.` becomes `foo (3).local.`
3491/// - `foo (9)` becomes `foo (10)`
3492fn name_change(original: &str) -> String {
3493    let mut parts: Vec<_> = original.split('.').collect();
3494    let Some(first_part) = parts.get_mut(0) else {
3495        return format!("{original} (2)");
3496    };
3497
3498    let mut new_name = format!("{} (2)", first_part);
3499
3500    // check if there is already has `(<num>)` suffix.
3501    if let Some(paren_pos) = first_part.rfind(" (") {
3502        // Check if there's a closing parenthesis
3503        if let Some(end_paren) = first_part[paren_pos..].find(')') {
3504            let absolute_end_pos = paren_pos + end_paren;
3505            // Only process if the closing parenthesis is the last character
3506            if absolute_end_pos == first_part.len() - 1 {
3507                let num_start = paren_pos + 2; // Skip " ("
3508                                               // Try to parse the number between parentheses
3509                if let Ok(number) = first_part[num_start..absolute_end_pos].parse::<u32>() {
3510                    let base_name = &first_part[..paren_pos];
3511                    new_name = format!("{} ({})", base_name, number + 1)
3512                }
3513            }
3514        }
3515    }
3516
3517    *first_part = &new_name;
3518    parts.join(".")
3519}
3520
3521/// Returns a new name based on the `original` to avoid conflicts.
3522/// If the name already contains a hyphenated number, increments that number.
3523///
3524/// Examples:
3525/// - `foo.local.` becomes `foo-2.local.`
3526/// - `foo-2.local.` becomes `foo-3.local.`
3527/// - `foo` becomes `foo-2`
3528fn hostname_change(original: &str) -> String {
3529    let mut parts: Vec<_> = original.split('.').collect();
3530    let Some(first_part) = parts.get_mut(0) else {
3531        return format!("{original}-2");
3532    };
3533
3534    let mut new_name = format!("{}-2", first_part);
3535
3536    // check if there is already a `-<num>` suffix
3537    if let Some(hyphen_pos) = first_part.rfind('-') {
3538        // Try to parse everything after the hyphen as a number
3539        if let Ok(number) = first_part[hyphen_pos + 1..].parse::<u32>() {
3540            let base_name = &first_part[..hyphen_pos];
3541            new_name = format!("{}-{}", base_name, number + 1);
3542        }
3543    }
3544
3545    *first_part = &new_name;
3546    parts.join(".")
3547}
3548
3549fn add_answer_with_additionals(
3550    out: &mut DnsOutgoing,
3551    msg: &DnsIncoming,
3552    service: &ServiceInfo,
3553    intf: &Interface,
3554    dns_registry: &DnsRegistry,
3555) {
3556    let intf_addrs = service.get_addrs_on_intf(intf);
3557    if intf_addrs.is_empty() {
3558        trace!("No addrs on LAN of intf {:?}", intf);
3559        return;
3560    }
3561
3562    // check if we changed our name due to conflicts.
3563    let service_fullname = match dns_registry.name_changes.get(service.get_fullname()) {
3564        Some(new_name) => new_name,
3565        None => service.get_fullname(),
3566    };
3567
3568    let hostname = match dns_registry.name_changes.get(service.get_hostname()) {
3569        Some(new_name) => new_name,
3570        None => service.get_hostname(),
3571    };
3572
3573    let ptr_added = out.add_answer(
3574        msg,
3575        DnsPointer::new(
3576            service.get_type(),
3577            RRType::PTR,
3578            CLASS_IN,
3579            service.get_other_ttl(),
3580            service_fullname.to_string(),
3581        ),
3582    );
3583
3584    if !ptr_added {
3585        trace!("answer was not added for msg {:?}", msg);
3586        return;
3587    }
3588
3589    if let Some(sub) = service.get_subtype() {
3590        trace!("Adding subdomain {}", sub);
3591        out.add_additional_answer(DnsPointer::new(
3592            sub,
3593            RRType::PTR,
3594            CLASS_IN,
3595            service.get_other_ttl(),
3596            service_fullname.to_string(),
3597        ));
3598    }
3599
3600    // Add recommended additional answers according to
3601    // https://tools.ietf.org/html/rfc6763#section-12.1.
3602    out.add_additional_answer(DnsSrv::new(
3603        service_fullname,
3604        CLASS_IN | CLASS_CACHE_FLUSH,
3605        service.get_host_ttl(),
3606        service.get_priority(),
3607        service.get_weight(),
3608        service.get_port(),
3609        hostname.to_string(),
3610    ));
3611
3612    out.add_additional_answer(DnsTxt::new(
3613        service_fullname,
3614        CLASS_IN | CLASS_CACHE_FLUSH,
3615        service.get_other_ttl(),
3616        service.generate_txt(),
3617    ));
3618
3619    for address in intf_addrs {
3620        out.add_additional_answer(DnsAddress::new(
3621            hostname,
3622            ip_address_rr_type(&address),
3623            CLASS_IN | CLASS_CACHE_FLUSH,
3624            service.get_host_ttl(),
3625            address,
3626            intf.into(),
3627        ));
3628    }
3629}
3630
3631/// Check probes in a registry and returns: a probing packet to send out, and a list of probe names
3632/// that are finished.
3633fn check_probing(
3634    dns_registry: &mut DnsRegistry,
3635    timers: &mut BinaryHeap<Reverse<u64>>,
3636    now: u64,
3637) -> (DnsOutgoing, Vec<String>) {
3638    let mut expired_probes = Vec::new();
3639    let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
3640
3641    for (name, probe) in dns_registry.probing.iter_mut() {
3642        if now >= probe.next_send {
3643            if probe.expired(now) {
3644                // move the record to active
3645                expired_probes.push(name.clone());
3646            } else {
3647                out.add_question(name, RRType::ANY);
3648
3649                /*
3650                RFC 6762 section 8.2: https://datatracker.ietf.org/doc/html/rfc6762#section-8.2
3651                ...
3652                for tiebreaking to work correctly in all
3653                cases, the Authority Section must contain *all* the records and
3654                proposed rdata being probed for uniqueness.
3655                    */
3656                for record in probe.records.iter() {
3657                    out.add_authority(record.clone());
3658                }
3659
3660                probe.update_next_send(now);
3661
3662                // add timer
3663                timers.push(Reverse(probe.next_send));
3664            }
3665        }
3666    }
3667
3668    (out, expired_probes)
3669}
3670
3671/// Process expired probes on an interface and return a list of services
3672/// that are waiting for the probe to finish.
3673///
3674/// `DnsNameChange` events are sent to the monitors.
3675fn handle_expired_probes(
3676    expired_probes: Vec<String>,
3677    intf_name: &str,
3678    dns_registry: &mut DnsRegistry,
3679    monitors: &mut Vec<Sender<DaemonEvent>>,
3680) -> HashSet<String> {
3681    let mut waiting_services = HashSet::new();
3682
3683    for name in expired_probes {
3684        let Some(probe) = dns_registry.probing.remove(&name) else {
3685            continue;
3686        };
3687
3688        // send notifications about name changes
3689        for record in probe.records.iter() {
3690            if let Some(new_name) = record.get_record().get_new_name() {
3691                dns_registry
3692                    .name_changes
3693                    .insert(name.clone(), new_name.to_string());
3694
3695                let event = DnsNameChange {
3696                    original: record.get_record().get_original_name().to_string(),
3697                    new_name: new_name.to_string(),
3698                    rr_type: record.get_type(),
3699                    intf_name: intf_name.to_string(),
3700                };
3701                notify_monitors(monitors, DaemonEvent::NameChange(event));
3702            }
3703        }
3704
3705        // move RR from probe to active.
3706        debug!(
3707            "probe of '{name}' finished: move {} records to active. ({} waiting services)",
3708            probe.records.len(),
3709            probe.waiting_services.len(),
3710        );
3711
3712        // Move records to active and plan to wake up services if records are not empty.
3713        if !probe.records.is_empty() {
3714            match dns_registry.active.get_mut(&name) {
3715                Some(records) => {
3716                    records.extend(probe.records);
3717                }
3718                None => {
3719                    dns_registry.active.insert(name, probe.records);
3720                }
3721            }
3722
3723            waiting_services.extend(probe.waiting_services);
3724        }
3725    }
3726
3727    waiting_services
3728}
3729
3730#[cfg(test)]
3731mod tests {
3732    use super::{
3733        check_domain_suffix, check_service_name_length, hostname_change, my_ip_interfaces,
3734        name_change, new_socket_bind, send_dns_outgoing, valid_instance_name,
3735        HostnameResolutionEvent, ServiceDaemon, ServiceEvent, ServiceInfo, GROUP_ADDR_V4,
3736        MDNS_PORT,
3737    };
3738    use crate::{
3739        dns_parser::{
3740            DnsIncoming, DnsOutgoing, DnsPointer, InterfaceId, RRType, CLASS_IN, FLAGS_AA,
3741            FLAGS_QR_RESPONSE,
3742        },
3743        service_daemon::{add_answer_of_service, check_hostname},
3744    };
3745    use std::{
3746        net::{SocketAddr, SocketAddrV4},
3747        time::Duration,
3748    };
3749    use test_log::test;
3750
3751    #[test]
3752    fn test_socketaddr_print() {
3753        let addr: SocketAddr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
3754        let print = format!("{}", addr);
3755        assert_eq!(print, "224.0.0.251:5353");
3756    }
3757
3758    #[test]
3759    fn test_instance_name() {
3760        assert!(valid_instance_name("my-laser._printer._tcp.local."));
3761        assert!(valid_instance_name("my-laser.._printer._tcp.local."));
3762        assert!(!valid_instance_name("_printer._tcp.local."));
3763    }
3764
3765    #[test]
3766    fn test_check_service_name_length() {
3767        let result = check_service_name_length("_tcp", 100);
3768        assert!(result.is_err());
3769        if let Err(e) = result {
3770            println!("{}", e);
3771        }
3772    }
3773
3774    #[test]
3775    fn test_check_hostname() {
3776        // valid hostnames
3777        for hostname in &[
3778            "my_host.local.",
3779            &("A".repeat(255 - ".local.".len()) + ".local."),
3780        ] {
3781            let result = check_hostname(hostname);
3782            assert!(result.is_ok());
3783        }
3784
3785        // erroneous hostnames
3786        for hostname in &[
3787            "my_host.local",
3788            ".local.",
3789            &("A".repeat(256 - ".local.".len()) + ".local."),
3790        ] {
3791            let result = check_hostname(hostname);
3792            assert!(result.is_err());
3793            if let Err(e) = result {
3794                println!("{}", e);
3795            }
3796        }
3797    }
3798
3799    #[test]
3800    fn test_check_domain_suffix() {
3801        assert!(check_domain_suffix("_missing_dot._tcp.local").is_err());
3802        assert!(check_domain_suffix("_missing_bar.tcp.local.").is_err());
3803        assert!(check_domain_suffix("_mis_spell._tpp.local.").is_err());
3804        assert!(check_domain_suffix("_mis_spell._upp.local.").is_err());
3805        assert!(check_domain_suffix("_has_dot._tcp.local.").is_ok());
3806        assert!(check_domain_suffix("_goodname._udp.local.").is_ok());
3807    }
3808
3809    #[test]
3810    fn test_service_with_temporarily_invalidated_ptr() {
3811        // Create a daemon
3812        let d = ServiceDaemon::new().expect("Failed to create daemon");
3813
3814        let service = "_test_inval_ptr._udp.local.";
3815        let host_name = "my_host_tmp_invalidated_ptr.local.";
3816        let intfs: Vec<_> = my_ip_interfaces(false);
3817        let intf_ips: Vec<_> = intfs.iter().map(|intf| intf.ip()).collect();
3818        let port = 5201;
3819        let my_service =
3820            ServiceInfo::new(service, "my_instance", host_name, &intf_ips[..], port, None)
3821                .expect("invalid service info")
3822                .enable_addr_auto();
3823        let result = d.register(my_service.clone());
3824        assert!(result.is_ok());
3825
3826        // Browse for a service
3827        let browse_chan = d.browse(service).unwrap();
3828        let timeout = Duration::from_secs(2);
3829        let mut resolved = false;
3830
3831        while let Ok(event) = browse_chan.recv_timeout(timeout) {
3832            match event {
3833                ServiceEvent::ServiceResolved(info) => {
3834                    resolved = true;
3835                    println!("Resolved a service of {}", &info.get_fullname());
3836                    break;
3837                }
3838                e => {
3839                    println!("Received event {:?}", e);
3840                }
3841            }
3842        }
3843
3844        assert!(resolved);
3845
3846        println!("Stopping browse of {}", service);
3847        // Pause browsing so restarting will cause a new immediate query.
3848        // Unregistering will not work here, it will invalidate all the records.
3849        d.stop_browse(service).unwrap();
3850
3851        // Ensure the search is stopped.
3852        // Reduces the chance of receiving an answer adding the ptr back to the
3853        // cache causing the later browse to return directly from the cache.
3854        // (which invalidates what this test is trying to test for.)
3855        let mut stopped = false;
3856        while let Ok(event) = browse_chan.recv_timeout(timeout) {
3857            match event {
3858                ServiceEvent::SearchStopped(_) => {
3859                    stopped = true;
3860                    println!("Stopped browsing service");
3861                    break;
3862                }
3863                // Other `ServiceResolved` messages may be received
3864                // here as they come from different interfaces.
3865                // That's fine for this test.
3866                e => {
3867                    println!("Received event {:?}", e);
3868                }
3869            }
3870        }
3871
3872        assert!(stopped);
3873
3874        // Invalidate the ptr from the service to the host.
3875        let invalidate_ptr_packet = DnsPointer::new(
3876            my_service.get_type(),
3877            RRType::PTR,
3878            CLASS_IN,
3879            0,
3880            my_service.get_fullname().to_string(),
3881        );
3882
3883        let mut packet_buffer = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
3884        packet_buffer.add_additional_answer(invalidate_ptr_packet);
3885
3886        for intf in intfs {
3887            let sock = new_socket_bind(&intf, true).unwrap();
3888            send_dns_outgoing(&packet_buffer, &intf, &sock);
3889        }
3890
3891        println!(
3892            "Sent PTR record invalidation. Starting second browse for {}",
3893            service
3894        );
3895
3896        // Restart the browse to force the sender to re-send the announcements.
3897        let browse_chan = d.browse(service).unwrap();
3898
3899        resolved = false;
3900        while let Ok(event) = browse_chan.recv_timeout(timeout) {
3901            match event {
3902                ServiceEvent::ServiceResolved(info) => {
3903                    resolved = true;
3904                    println!("Resolved a service of {}", &info.get_fullname());
3905                    break;
3906                }
3907                e => {
3908                    println!("Received event {:?}", e);
3909                }
3910            }
3911        }
3912
3913        assert!(resolved);
3914        d.shutdown().unwrap();
3915    }
3916
3917    #[test]
3918    fn test_expired_srv() {
3919        // construct service info
3920        let service_type = "_expired-srv._udp.local.";
3921        let instance = "test_instance";
3922        let host_name = "expired_srv_host.local.";
3923        let mut my_service = ServiceInfo::new(service_type, instance, host_name, "", 5023, None)
3924            .unwrap()
3925            .enable_addr_auto();
3926        // let fullname = my_service.get_fullname().to_string();
3927
3928        // set SRV to expire soon.
3929        let new_ttl = 3; // for testing only.
3930        my_service._set_host_ttl(new_ttl);
3931
3932        // register my service
3933        let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
3934        let result = mdns_server.register(my_service);
3935        assert!(result.is_ok());
3936
3937        let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
3938        let browse_chan = mdns_client.browse(service_type).unwrap();
3939        let timeout = Duration::from_secs(2);
3940        let mut resolved = false;
3941
3942        while let Ok(event) = browse_chan.recv_timeout(timeout) {
3943            match event {
3944                ServiceEvent::ServiceResolved(info) => {
3945                    resolved = true;
3946                    println!("Resolved a service of {}", &info.get_fullname());
3947                    break;
3948                }
3949                _ => {}
3950            }
3951        }
3952
3953        assert!(resolved);
3954
3955        // Exit the server so that no more responses.
3956        mdns_server.shutdown().unwrap();
3957
3958        // SRV record in the client cache will expire.
3959        let expire_timeout = Duration::from_secs(new_ttl as u64);
3960        while let Ok(event) = browse_chan.recv_timeout(expire_timeout) {
3961            match event {
3962                ServiceEvent::ServiceRemoved(service_type, full_name) => {
3963                    println!("Service removed: {}: {}", &service_type, &full_name);
3964                    break;
3965                }
3966                _ => {}
3967            }
3968        }
3969    }
3970
3971    #[test]
3972    fn test_hostname_resolution_address_removed() {
3973        // Create a mDNS server
3974        let server = ServiceDaemon::new().expect("Failed to create server");
3975        let hostname = "addr_remove_host._tcp.local.";
3976        let service_ip_addr = my_ip_interfaces(false)
3977            .iter()
3978            .find(|iface| iface.ip().is_ipv4())
3979            .map(|iface| iface.ip())
3980            .unwrap();
3981
3982        let mut my_service = ServiceInfo::new(
3983            "_host_res_test._tcp.local.",
3984            "my_instance",
3985            hostname,
3986            &service_ip_addr,
3987            1234,
3988            None,
3989        )
3990        .expect("invalid service info");
3991
3992        // Set a short TTL for addresses for testing.
3993        let addr_ttl = 2;
3994        my_service._set_host_ttl(addr_ttl); // Expire soon
3995
3996        server.register(my_service).unwrap();
3997
3998        // Create a mDNS client for resolving the hostname.
3999        let client = ServiceDaemon::new().expect("Failed to create client");
4000        let event_receiver = client.resolve_hostname(hostname, None).unwrap();
4001        let resolved = loop {
4002            match event_receiver.recv() {
4003                Ok(HostnameResolutionEvent::AddressesFound(found_hostname, addresses)) => {
4004                    assert!(found_hostname == hostname);
4005                    assert!(addresses.contains(&service_ip_addr));
4006                    println!("address found: {:?}", &addresses);
4007                    break true;
4008                }
4009                Ok(HostnameResolutionEvent::SearchStopped(_)) => break false,
4010                Ok(_event) => {}
4011                Err(_) => break false,
4012            }
4013        };
4014
4015        assert!(resolved);
4016
4017        // Shutdown the server so no more responses / refreshes for addresses.
4018        server.shutdown().unwrap();
4019
4020        // Wait till hostname address record expires, with 1 second grace period.
4021        let timeout = Duration::from_secs(addr_ttl as u64 + 1);
4022        let removed = loop {
4023            match event_receiver.recv_timeout(timeout) {
4024                Ok(HostnameResolutionEvent::AddressesRemoved(removed_host, addresses)) => {
4025                    assert!(removed_host == hostname);
4026                    assert!(addresses.contains(&service_ip_addr));
4027
4028                    println!(
4029                        "address removed: hostname: {} addresses: {:?}",
4030                        &hostname, &addresses
4031                    );
4032                    break true;
4033                }
4034                Ok(_event) => {}
4035                Err(_) => {
4036                    break false;
4037                }
4038            }
4039        };
4040
4041        assert!(removed);
4042
4043        client.shutdown().unwrap();
4044    }
4045
4046    #[test]
4047    fn test_refresh_ptr() {
4048        // construct service info
4049        let service_type = "_refresh-ptr._udp.local.";
4050        let instance = "test_instance";
4051        let host_name = "refresh_ptr_host.local.";
4052        let service_ip_addr = my_ip_interfaces(false)
4053            .iter()
4054            .find(|iface| iface.ip().is_ipv4())
4055            .map(|iface| iface.ip())
4056            .unwrap();
4057
4058        let mut my_service = ServiceInfo::new(
4059            service_type,
4060            instance,
4061            host_name,
4062            &service_ip_addr,
4063            5023,
4064            None,
4065        )
4066        .unwrap();
4067
4068        let new_ttl = 3; // for testing only.
4069        my_service._set_other_ttl(new_ttl);
4070
4071        // register my service
4072        let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
4073        let result = mdns_server.register(my_service);
4074        assert!(result.is_ok());
4075
4076        let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
4077        let browse_chan = mdns_client.browse(service_type).unwrap();
4078        let timeout = Duration::from_millis(1500); // Give at least 1 second for the service probing.
4079        let mut resolved = false;
4080
4081        // resolve the service first.
4082        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4083            match event {
4084                ServiceEvent::ServiceResolved(info) => {
4085                    resolved = true;
4086                    println!("Resolved a service of {}", &info.get_fullname());
4087                    break;
4088                }
4089                _ => {}
4090            }
4091        }
4092
4093        assert!(resolved);
4094
4095        // wait over 80% of TTL, and refresh PTR should be sent out.
4096        let timeout = Duration::from_millis(new_ttl as u64 * 1000 * 90 / 100);
4097        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4098            println!("event: {:?}", &event);
4099        }
4100
4101        // verify refresh counter.
4102        let metrics_chan = mdns_client.get_metrics().unwrap();
4103        let metrics = metrics_chan.recv_timeout(timeout).unwrap();
4104        let ptr_refresh_counter = metrics["cache-refresh-ptr"];
4105        assert_eq!(ptr_refresh_counter, 1);
4106        let srvtxt_refresh_counter = metrics["cache-refresh-srv-txt"];
4107        assert_eq!(srvtxt_refresh_counter, 1);
4108
4109        // Exit the server so that no more responses.
4110        mdns_server.shutdown().unwrap();
4111        mdns_client.shutdown().unwrap();
4112    }
4113
4114    #[test]
4115    fn test_name_change() {
4116        assert_eq!(name_change("foo.local."), "foo (2).local.");
4117        assert_eq!(name_change("foo (2).local."), "foo (3).local.");
4118        assert_eq!(name_change("foo (9).local."), "foo (10).local.");
4119        assert_eq!(name_change("foo"), "foo (2)");
4120        assert_eq!(name_change("foo (2)"), "foo (3)");
4121        assert_eq!(name_change(""), " (2)");
4122
4123        // Additional edge cases
4124        assert_eq!(name_change("foo (abc)"), "foo (abc) (2)"); // Invalid number
4125        assert_eq!(name_change("foo (2"), "foo (2 (2)"); // Missing closing parenthesis
4126        assert_eq!(name_change("foo (2) extra"), "foo (2) extra (2)"); // Extra text after number
4127    }
4128
4129    #[test]
4130    fn test_hostname_change() {
4131        assert_eq!(hostname_change("foo.local."), "foo-2.local.");
4132        assert_eq!(hostname_change("foo"), "foo-2");
4133        assert_eq!(hostname_change("foo-2.local."), "foo-3.local.");
4134        assert_eq!(hostname_change("foo-9"), "foo-10");
4135        assert_eq!(hostname_change("test-42.domain."), "test-43.domain.");
4136    }
4137
4138    #[test]
4139    fn test_add_answer_txt_ttl() {
4140        // construct a simple service info
4141        let service_type = "_test_add_answer._udp.local.";
4142        let instance = "test_instance";
4143        let host_name = "add_answer_host.local.";
4144        let service_intf = my_ip_interfaces(false)
4145            .into_iter()
4146            .find(|iface| iface.ip().is_ipv4())
4147            .unwrap();
4148        let service_ip_addr = service_intf.ip();
4149        let my_service = ServiceInfo::new(
4150            service_type,
4151            instance,
4152            host_name,
4153            &service_ip_addr,
4154            5023,
4155            None,
4156        )
4157        .unwrap();
4158
4159        // construct a DnsOutgoing message
4160        let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
4161
4162        // Construct a dummy DnsIncoming message
4163        let mut dummy_data = out.to_data_on_wire();
4164        let interface_id = InterfaceId::from(&service_intf);
4165        let incoming = DnsIncoming::new(dummy_data.pop().unwrap(), interface_id).unwrap();
4166
4167        // Add an answer of TXT type for the service.
4168        add_answer_of_service(
4169            &mut out,
4170            &incoming,
4171            instance,
4172            &my_service,
4173            RRType::TXT,
4174            &service_intf,
4175        );
4176
4177        // Check if the answer was added correctly
4178        assert!(
4179            out.answers_count() > 0,
4180            "No answers added to the outgoing message"
4181        );
4182
4183        // Check if the first answer is of type TXT
4184        let answer = out._answers().first().unwrap();
4185        assert_eq!(answer.0.get_type(), RRType::TXT);
4186
4187        // Check TTL is set properly for the TXT record
4188        assert_eq!(answer.0.get_record().get_ttl(), my_service.get_other_ttl());
4189    }
4190}